diff --git a/dataflow/__init__.py b/dataflow/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dataflow/conftest.py b/dataflow/conftest.py new file mode 100644 index 00000000000..13314bf86dd --- /dev/null +++ b/dataflow/conftest.py @@ -0,0 +1,378 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +from dataclasses import dataclass +import itertools +import json +import multiprocessing as mp +import os +import subprocess +import sys +import time +from typing import Any, Callable, Dict, Iterable, Optional +import uuid + +import pytest + +# Default options. +UUID = uuid.uuid4().hex[0:6] +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +REGION = "us-west1" +ZONE = "us-west1-b" + +RETRY_MAX_TIME = 5 * 60 # 5 minutes in seconds + + +@dataclass +class Utils: + uuid: str = UUID + project: str = PROJECT + region: str = REGION + zone: str = ZONE + + @staticmethod + def storage_bucket(bucket_name: str) -> str: + from google.cloud import storage + + storage_client = storage.Client() + bucket_unique_name = f"{bucket_name}-{UUID}" + bucket = storage_client.create_bucket(bucket_unique_name) + + print(f"storage_bucket: {bucket_unique_name}") + yield bucket_unique_name + + bucket.delete(force=True) + + @staticmethod + def bigquery_dataset(dataset_name: str, project: str = PROJECT) -> str: + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + dataset = bigquery_client.create_dataset( + bigquery.Dataset(f"{project}.{dataset_name.replace('-', '_')}_{UUID}") + ) + + print(f"bigquery_dataset: {dataset.full_dataset_id}") + yield dataset.full_dataset_id + + bigquery_client.delete_dataset( + dataset.full_dataset_id.replace(":", "."), delete_contents=True + ) + + @staticmethod + def bigquery_query(query: str) -> Iterable[Dict[str, Any]]: + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + for row in bigquery_client.query(query): + yield dict(row) + + @staticmethod + def pubsub_topic(topic_name: str, project: str = PROJECT) -> str: + from google.cloud import pubsub + + publisher_client = pubsub.PublisherClient() + topic_path = publisher_client.topic_path(project, f"{topic_name}-{UUID}") + topic = publisher_client.create_topic(topic_path) + + print(f"pubsub_topic: {topic.name}") + yield topic.name + + # Due to the pinned library dependencies in apache-beam, client + # library throws an error upon deletion. + # We use gcloud for a workaround. See also: + # https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4492 + cmd = ["gcloud", "pubsub", "--project", project, "topics", "delete", topic.name] + print(cmd) + subprocess.run(cmd, check=True) + + @staticmethod + def pubsub_subscription( + topic_path: str, + subscription_name: str, + project: str = PROJECT, + ) -> str: + from google.cloud import pubsub + + subscriber = pubsub.SubscriberClient() + subscription_path = subscriber.subscription_path( + project, f"{subscription_name}-{UUID}" + ) + subscription = subscriber.create_subscription(subscription_path, topic_path) + + print(f"pubsub_subscription: {subscription.name}") + yield subscription.name + + # Due to the pinned library dependencies in apache-beam, client + # library throws an error upon deletion. + # We use gcloud for a workaround. See also: + # https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4492 + cmd = [ + "gcloud", + "pubsub", + "--project", + project, + "subscriptions", + "delete", + subscription.name, + ] + print(cmd) + subprocess.run(cmd, check=True) + + @staticmethod + def pubsub_publisher( + topic_path: str, + new_msg: Callable[[int], str] = lambda i: json.dumps( + {"id": i, "content": f"message {i}"} + ), + sleep_sec: int = 1, + ) -> bool: + from google.cloud import pubsub + + def _infinite_publish_job() -> None: + publisher_client = pubsub.PublisherClient() + for i in itertools.count(): + msg = new_msg(i) + publisher_client.publish(topic_path, msg.encode("utf-8")).result() + time.sleep(sleep_sec) + + # Start a subprocess in the background to do the publishing. + print(f"Starting publisher on {topic_path}") + p = mp.Process(target=_infinite_publish_job) + p.start() + + yield p.is_alive() + + # For cleanup, terminate the background process. + print("Stopping publisher") + p.join(timeout=0) + p.terminate() + + @staticmethod + def container_image( + image_path: str, + project: str = PROJECT, + tag: str = "latest", + ) -> str: + image_name = f"gcr.io/{project}/{image_path}-{UUID}:{tag}" + cmd = ["gcloud", "auth", "configure-docker"] + print(cmd) + subprocess.run(cmd, check=True) + cmd = [ + "gcloud", + "builds", + "submit", + f"--project={project}", + f"--tag={image_name}", + ".", + ] + print(cmd) + subprocess.run(cmd, check=True) + + print(f"container_image: {image_name}") + yield image_name + + cmd = [ + "gcloud", + "container", + "images", + "delete", + image_name, + f"--project={project}", + "--quiet", + ] + print(cmd) + subprocess.run(cmd, check=True) + + @staticmethod + def dataflow_job_id_from_job_name( + job_name: str, + project: str = PROJECT, + ) -> Optional[str]: + from googleapiclient.discovery import build + + dataflow = build("dataflow", "v1b3") + + # Only return the 50 most recent results - our job is likely to be in here. + # If the job is not found, first try increasing this number.[]''job_id + # For more info see: + # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list + jobs_request = ( + dataflow.projects() + .jobs() + .list( + projectId=project, + filter="ACTIVE", + pageSize=50, + ) + ) + response = jobs_request.execute() + + # Search for the job in the list that has our name (names are unique) + for job in response["jobs"]: + if job["name"] == job_name: + return job["id"] + return None + + @staticmethod + def dataflow_jobs_wait( + job_id: str, + project: str = PROJECT, + status: str = "JOB_STATE_RUNNING", + ) -> bool: + from googleapiclient.discovery import build + + dataflow = build("dataflow", "v1b3") + + sleep_time_seconds = 30 + max_sleep_time = 10 * 60 + + print(f"Waiting for Dataflow job ID: {job_id} (until status {status})") + for _ in range(0, max_sleep_time, sleep_time_seconds): + try: + # For more info see: + # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/get + jobs_request = ( + dataflow.projects() + .jobs() + .get( + projectId=project, + jobId=job_id, + view="JOB_VIEW_SUMMARY", + ) + ) + response = jobs_request.execute() + print(response) + if response["currentState"] == status: + return True + except: + pass + time.sleep(sleep_time_seconds) + return False + + @staticmethod + def dataflow_jobs_cancel_by_job_id( + job_id: str, project: str = PROJECT, region: str = REGION + ) -> None: + print(f"Canceling Dataflow job ID: {job_id}") + # We get an error using the googleapiclient.discovery APIs, probably + # due to incompatible dependencies with apache-beam. + # We use gcloud instead to cancel the job. + # https://cloud.google.com/sdk/gcloud/reference/dataflow/jobs/cancel + cmd = [ + "gcloud", + f"--project={project}", + "dataflow", + "jobs", + "cancel", + job_id, + f"--region={region}", + ] + subprocess.run(cmd, check=True) + + @staticmethod + def dataflow_jobs_cancel_by_job_name( + job_name: str, project: str = PROJECT, region: str = REGION + ) -> None: + # To cancel a dataflow job, we need its ID, not its name. + # If it doesn't, job_id will be equal to None. + job_id = Utils.dataflow_job_id_from_job_name(project, job_name) + if job_id is not None: + Utils.dataflow_jobs_cancel_by_job_id(job_id, project, region) + + @staticmethod + def dataflow_flex_template_build( + bucket_name: str, + template_image: str, + metadata_file: str, + project: str = PROJECT, + template_file: str = "template.json", + ) -> str: + # https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/build + template_gcs_path = f"gs://{bucket_name}/{template_file}" + cmd = [ + "gcloud", + "dataflow", + "flex-template", + "build", + template_gcs_path, + f"--project={project}", + f"--image={template_image}", + "--sdk-language=PYTHON", + f"--metadata-file={metadata_file}", + ] + print(cmd) + subprocess.run(cmd, check=True) + + print(f"dataflow_flex_template_build: {template_gcs_path}") + yield template_gcs_path + # The template file gets deleted when we delete the bucket. + + @staticmethod + def dataflow_flex_template_run( + job_name: str, + template_path: str, + bucket_name: str, + parameters: Dict[str, str] = {}, + project: str = PROJECT, + region: str = REGION, + ) -> str: + import yaml + + # https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run + unique_job_name = f"{job_name}-{UUID}" + print(f"dataflow_job_name: {unique_job_name}") + cmd = [ + "gcloud", + "dataflow", + "flex-template", + "run", + unique_job_name, + f"--template-file-gcs-location={template_path}", + f"--project={project}", + f"--region={region}", + ] + [ + f"--parameters={name}={value}" + for name, value in { + **parameters, + "temp_location": f"gs://{bucket_name}/temp", + }.items() + ] + print(cmd) + try: + # The `capture_output` option was added in Python 3.7, so we must + # pass the `stdout` and `stderr` options explicitly to support 3.6. + # https://docs.python.org/3/library/subprocess.html#subprocess.run + p = subprocess.run( + cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + stdout = p.stdout.decode("utf-8") + stderr = p.stderr.decode("utf-8") + print(f"Launched Dataflow Flex Template job: {unique_job_name}") + except subprocess.CalledProcessError as e: + print(e, file=sys.stderr) + stdout = stdout.decode("utf-8") + stderr = stderr.decode("utf-8") + finally: + print("--- stderr ---") + print(stderr) + print("--- stdout ---") + print(stdout) + print("--- end ---") + return yaml.safe_load(stdout)["job"]["id"] + + +@pytest.fixture(scope="session") +def utils() -> Utils: + print(f"Test unique identifier: {UUID}") + subprocess.run(["gcloud", "version"]) + return Utils() diff --git a/dataflow/flex-templates/__init__.py b/dataflow/flex-templates/__init__.py new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/dataflow/flex-templates/__init__.py @@ -0,0 +1 @@ + diff --git a/dataflow/flex-templates/streaming_beam/.dockerignore b/dataflow/flex-templates/streaming_beam/.dockerignore new file mode 100644 index 00000000000..bd849df68f6 --- /dev/null +++ b/dataflow/flex-templates/streaming_beam/.dockerignore @@ -0,0 +1,4 @@ +# Ignore everything except for Python files and the requirements file. +* +!*.py +!requirements.txt \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/.gcloudignore b/dataflow/flex-templates/streaming_beam/.gcloudignore new file mode 100644 index 00000000000..594de3d29c8 --- /dev/null +++ b/dataflow/flex-templates/streaming_beam/.gcloudignore @@ -0,0 +1,5 @@ +# Ignore everything except for Python files and the requirements file. +* +!Dockerfile +!*.py +!requirements.txt \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/Dockerfile b/dataflow/flex-templates/streaming_beam/Dockerfile index 554720eee96..02f346957af 100644 --- a/dataflow/flex-templates/streaming_beam/Dockerfile +++ b/dataflow/flex-templates/streaming_beam/Dockerfile @@ -14,19 +14,20 @@ FROM gcr.io/dataflow-templates-base/python3-template-launcher-base -ARG WORKDIR=/dataflow/template -RUN mkdir -p ${WORKDIR} -WORKDIR ${WORKDIR} +ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt" +ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py" -# Due to a change in the Beam base image in version 2.24, we need to install -# libffi-dev manually as a dependency. For more information: -# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891 -RUN apt-get update && apt-get install -y libffi-dev git && rm -rf /var/lib/apt/lists/* +COPY . /template -COPY requirements.txt . -COPY streaming_beam.py . +# We could get rid of installing libffi-dev and git, or we could leave them. +RUN apt-get update \ + && apt-get install -y libffi-dev git \ + && rm -rf /var/lib/apt/lists/* \ + # Upgrade pip and install the requirements. + && pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \ + # Download the requirements to speed up launching the Dataflow job. + && pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE -ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt" -ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py" - -RUN pip install -U -r ./requirements.txt +# Since we already downloaded all the dependencies, there's no need to rebuild everything. +ENV PIP_NO_DEPS=True diff --git a/dataflow/flex-templates/streaming_beam/__init__.py b/dataflow/flex-templates/streaming_beam/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dataflow/flex-templates/streaming_beam/e2e_test.py b/dataflow/flex-templates/streaming_beam/e2e_test.py new file mode 100644 index 00000000000..e642306ed4b --- /dev/null +++ b/dataflow/flex-templates/streaming_beam/e2e_test.py @@ -0,0 +1,109 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +import json +import time + +# `conftest` cannot be imported when running in `nox`, but we still +# try to import it for the autocomplete when writing the tests. +try: + from conftest import Utils +except ModuleNotFoundError: + from typing import Any + + Utils = Any +import pytest + +NAME = "dataflow-flex-templates-streaming-beam" + + +@pytest.fixture(scope="session") +def bucket_name(utils: Utils) -> str: + yield from utils.storage_bucket(NAME) + + +@pytest.fixture(scope="session") +def pubsub_topic(utils: Utils) -> str: + yield from utils.pubsub_topic(NAME) + + +@pytest.fixture(scope="session") +def pubsub_subscription(utils: Utils, pubsub_topic: str) -> str: + yield from utils.pubsub_subscription(pubsub_topic, NAME) + + +@pytest.fixture(scope="session") +def bigquery_dataset(utils: Utils) -> str: + yield from utils.bigquery_dataset(NAME) + + +@pytest.fixture(scope="session") +def pubsub_publisher(utils: Utils, pubsub_topic: str) -> bool: + yield from utils.pubsub_publisher( + pubsub_topic, + new_msg=lambda i: json.dumps( + { + "url": "https://beam.apache.org/", + "review": "positive" if i % 2 == 0 else "negative", + } + ), + ) + + +@pytest.fixture(scope="session") +def flex_template_image(utils: Utils) -> str: + yield from utils.container_image(NAME) + + +@pytest.fixture(scope="session") +def flex_template_path(utils: Utils, bucket_name: str, flex_template_image: str) -> str: + yield from utils.dataflow_flex_template_build( + bucket_name=bucket_name, + template_image=flex_template_image, + metadata_file="metadata.json", + ) + + +def test_flex_template_run( + utils: Utils, + bucket_name: str, + pubsub_publisher: str, + pubsub_subscription: str, + flex_template_path: str, + bigquery_dataset: str, +) -> None: + + bigquery_table = "output_table" + job_id = utils.dataflow_flex_template_run( + job_name=NAME, + template_path=flex_template_path, + bucket_name=bucket_name, + parameters={ + "input_subscription": pubsub_subscription, + "output_table": f"{bigquery_dataset}.{bigquery_table}", + }, + ) + + # Since this is a streaming job, it will never finish running. + # First, lets wait until the job is running. + utils.dataflow_jobs_wait(job_id) + + # Then, wait a minute for data to arrive, get processed, and cancel it. + time.sleep(60) + utils.dataflow_jobs_cancel_by_job_id(job_id) + + # Check for the output data in BigQuery. + query = f"SELECT * FROM {bigquery_dataset.replace(':', '.')}.{bigquery_table}" + rows = list(utils.bigquery_query(query)) + assert len(rows) > 0 + for row in rows: + assert "score" in row diff --git a/dataflow/flex-templates/streaming_beam/noxfile.py b/dataflow/flex-templates/streaming_beam/noxfile.py deleted file mode 100644 index c917ea77ced..00000000000 --- a/dataflow/flex-templates/streaming_beam/noxfile.py +++ /dev/null @@ -1,171 +0,0 @@ -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -from pathlib import Path - -import nox - - -# DO NOT EDIT - automatically generated. -# All versions used to tested samples. -ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8", "3.9"] - -# Any default versions that should be ignored. -IGNORED_VERSIONS = ["2.7", "3.8", "3.9"] - -TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) - -# -# Style Checks -# - - -def _determine_local_import_names(start_dir): - """Determines all import names that should be considered "local". - - This is used when running the linter to insure that import order is - properly checked. - """ - file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] - return [ - basename - for basename, extension in file_ext_pairs - if extension == ".py" - or os.path.isdir(os.path.join(start_dir, basename)) - and basename not in ("__pycache__") - ] - - -# Linting with flake8. -# -# We ignore the following rules: -# E203: whitespace before ‘:’ -# E266: too many leading ‘#’ for block comment -# E501: line too long -# I202: Additional newline in a section of imports -# -# We also need to specify the rules which are ignored by default: -# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] -FLAKE8_COMMON_ARGS = [ - "--show-source", - "--builtin=gettext", - "--max-complexity=20", - "--import-order-style=google", - "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", - "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", - "--max-line-length=88", -] - - -@nox.session -def lint(session): - session.install("flake8", "flake8-import-order") - - local_names = _determine_local_import_names(".") - args = FLAKE8_COMMON_ARGS + [ - "--application-import-names", - ",".join(local_names), - ".", - ] - session.run("flake8", *args) - - -# -# Black -# - -@nox.session -def blacken(session): - session.install("black") - python_files = [path for path in os.listdir(".") if path.endswith(".py")] - - session.run("black", *python_files) - - -# -# Sample Tests -# - - -PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] - - -def _session_tests(session, post_install=None): - """Runs py.test for a particular project.""" - if os.path.exists("requirements.txt"): - session.install("-r", "requirements.txt") - - if os.path.exists("requirements-test.txt"): - session.install("-r", "requirements-test.txt") - - if post_install: - post_install(session) - - session.run( - "pytest", - *(PYTEST_COMMON_ARGS + session.posargs), - # Pytest will return 5 when no tests are collected. This can happen - # on travis where slow and flaky tests are excluded. - # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html - success_codes=[0, 5] - ) - - -@nox.session(python=ALL_VERSIONS) -def py(session): - """Runs py.test for a sample using the specified version of Python.""" - if session.python in TESTED_VERSIONS: - _session_tests(session) - else: - print("SKIPPED: {} tests are disabled for this sample.".format(session.python)) - - -# -# Readmegen -# - - -def _get_repo_root(): - """ Returns the root folder of the project. """ - # Get root of this repository. Assume we don't have directories nested deeper than 10 items. - p = Path(os.getcwd()) - for i in range(10): - if p is None: - break - if Path(p / ".git").exists(): - return str(p) - p = p.parent - raise Exception("Unable to detect repository root.") - - -GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) - - -@nox.session -@nox.parametrize("path", GENERATED_READMES) -def readmegen(session, path): - """(Re-)generates the readme for a sample.""" - session.install("jinja2", "pyyaml") - dir_ = os.path.dirname(path) - - if os.path.exists(os.path.join(dir_, "requirements.txt")): - session.install("-r", os.path.join(dir_, "requirements.txt")) - - in_file = os.path.join(dir_, "README.rst.in") - session.run( - "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file - ) diff --git a/dataflow/flex-templates/streaming_beam/noxfile_config.py b/dataflow/flex-templates/streaming_beam/noxfile_config.py new file mode 100644 index 00000000000..9d0a10cec91 --- /dev/null +++ b/dataflow/flex-templates/streaming_beam/noxfile_config.py @@ -0,0 +1,44 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Default TEST_CONFIG_OVERRIDE for python repos. + +# You can copy this file into your directory, then it will be inported from +# the noxfile.py. + +# The source of truth: +# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/noxfile_config.py + +TEST_CONFIG_OVERRIDE = { + # You can opt out from the test for specific Python versions. + "ignored_versions": ["2.7"], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": True, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": { + "PYTEST_ADDOPTS": "-n=8", # parallelize tests in multiple CPUs + }, +} diff --git a/dataflow/flex-templates/streaming_beam/requirements-test.txt b/dataflow/flex-templates/streaming_beam/requirements-test.txt index 95ea1e6a02b..53cd9882b4e 100644 --- a/dataflow/flex-templates/streaming_beam/requirements-test.txt +++ b/dataflow/flex-templates/streaming_beam/requirements-test.txt @@ -1 +1,5 @@ +google-api-python-client==2.1.0 +google-cloud-storage==1.38.0 +pytest-xdist==2.2.1 pytest==6.2.4 +pyyaml==5.4.1 \ No newline at end of file diff --git a/dataflow/flex-templates/streaming_beam/streaming_beam.py b/dataflow/flex-templates/streaming_beam/streaming_beam.py index af1321e8e18..ab2ecfc9087 100644 --- a/dataflow/flex-templates/streaming_beam/streaming_beam.py +++ b/dataflow/flex-templates/streaming_beam/streaming_beam.py @@ -24,82 +24,99 @@ import json import logging import time +from typing import Any, Dict, List import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions import apache_beam.transforms.window as window # Defines the BigQuery schema for the output table. -SCHEMA = ','.join([ - 'url:STRING', - 'num_reviews:INTEGER', - 'score:FLOAT64', - 'first_date:TIMESTAMP', - 'last_date:TIMESTAMP', -]) - - -def parse_json_message(message): +SCHEMA = ",".join( + [ + "url:STRING", + "num_reviews:INTEGER", + "score:FLOAT64", + "first_date:TIMESTAMP", + "last_date:TIMESTAMP", + ] +) + + +def parse_json_message(message: str) -> Dict[str, Any]: """Parse the input json message and add 'score' & 'processing_time' keys.""" row = json.loads(message) return { - 'url': row['url'], - 'score': 1.0 if row['review'] == 'positive' else 0.0, - 'processing_time': int(time.time()), - } - - -def get_statistics(url_messages): - """Get statistics from the input URL messages.""" - url, messages = url_messages - return { - 'url': url, - 'num_reviews': len(messages), - 'score': sum(msg['score'] for msg in messages) / len(messages), - 'first_date': min(msg['processing_time'] for msg in messages), - 'last_date': max(msg['processing_time'] for msg in messages), + "url": row["url"], + "score": 1.0 if row["review"] == "positive" else 0.0, + "processing_time": int(time.time()), } -def run(args, input_subscription, output_table, window_interval): +def run( + input_subscription: str, + output_table: str, + window_interval_sec: int = 60, + beam_args: List[str] = None, +) -> None: """Build and run the pipeline.""" - options = PipelineOptions(args, save_main_session=True, streaming=True) + options = PipelineOptions(beam_args, save_main_session=True, streaming=True) with beam.Pipeline(options=options) as pipeline: - - # Read the messages from PubSub and process them. messages = ( pipeline - | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub( - subscription=input_subscription).with_output_types(bytes) - | 'UTF-8 bytes to string' >> beam.Map(lambda msg: msg.decode('utf-8')) - | 'Parse JSON messages' >> beam.Map(parse_json_message) - | 'Fixed-size windows' >> beam.WindowInto( - window.FixedWindows(int(window_interval), 0)) - | 'Add URL keys' >> beam.Map(lambda msg: (msg['url'], msg)) - | 'Group by URLs' >> beam.GroupByKey() - | 'Get statistics' >> beam.Map(get_statistics)) + | "Read from Pub/Sub" + >> beam.io.ReadFromPubSub( + subscription=input_subscription + ).with_output_types(bytes) + | "UTF-8 bytes to string" >> beam.Map(lambda msg: msg.decode("utf-8")) + | "Parse JSON messages" >> beam.Map(parse_json_message) + | "Fixed-size windows" + >> beam.WindowInto(window.FixedWindows(window_interval_sec, 0)) + | "Add URL keys" >> beam.WithKeys(lambda msg: msg["url"]) + | "Group by URLs" >> beam.GroupByKey() + | "Get statistics" + >> beam.MapTuple( + lambda url, messages: { + "url": url, + "num_reviews": len(messages), + "score": sum(msg["score"] for msg in messages) / len(messages), + "first_date": min(msg["processing_time"] for msg in messages), + "last_date": max(msg["processing_time"] for msg in messages), + } + ) + ) # Output the results into BigQuery table. - _ = messages | 'Write to Big Query' >> beam.io.WriteToBigQuery( - output_table, schema=SCHEMA) + _ = messages | "Write to Big Query" >> beam.io.WriteToBigQuery( + output_table, schema=SCHEMA + ) -if __name__ == '__main__': +if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) + parser = argparse.ArgumentParser() parser.add_argument( - '--output_table', - help='Output BigQuery table for results specified as: ' - 'PROJECT:DATASET.TABLE or DATASET.TABLE.') + "--output_table", + help="Output BigQuery table for results specified as: " + "PROJECT:DATASET.TABLE or DATASET.TABLE.", + ) parser.add_argument( - '--input_subscription', - help='Input PubSub subscription of the form ' - '"projects//subscriptions/."') + "--input_subscription", + help="Input PubSub subscription of the form " + '"projects//subscriptions/."', + ) parser.add_argument( - '--window_interval', + "--window_interval_sec", default=60, - help='Window interval in seconds for grouping incoming messages.') - known_args, pipeline_args = parser.parse_known_args() - run(pipeline_args, known_args.input_subscription, known_args.output_table, - known_args.window_interval) + type=int, + help="Window interval in seconds for grouping incoming messages.", + ) + args, beam_args = parser.parse_known_args() + + run( + input_subscription=args.input_subscription, + output_table=args.output_table, + window_interval_sec=args.window_interval_sec, + beam_args=beam_args, + ) diff --git a/dataflow/flex-templates/streaming_beam/streaming_beam_test.py b/dataflow/flex-templates/streaming_beam/streaming_beam_test.py deleted file mode 100644 index d588adf432f..00000000000 --- a/dataflow/flex-templates/streaming_beam/streaming_beam_test.py +++ /dev/null @@ -1,165 +0,0 @@ -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the 'License'); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an 'AS IS' BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - -import multiprocessing as mp -import os -import subprocess as sp -import tempfile -import time -import uuid - -from google.cloud import bigquery -from google.cloud import pubsub -import pytest - - -PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] -UUID = str(uuid.uuid4()).split('-')[0] -DATASET = 'beam_samples_{}'.format(UUID) -TABLE = 'streaming_beam_sql' -TOPIC = 'messages-{}'.format(UUID) -SUBSCRIPTION = TOPIC - - -@pytest.fixture -def topic_path(): - publisher_client = pubsub.PublisherClient() - topic_path = publisher_client.topic_path(PROJECT, TOPIC) - try: - publisher_client.delete_topic(topic_path) - except Exception: - pass - topic = publisher_client.create_topic(topic_path) - yield topic.name - # Due to the pinned library dependencies in apache-beam, client - # library throws an error upon deletion. - # We use gcloud for a workaround. See also: - # https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4492 - sp.check_call( - ['gcloud', 'pubsub', '--project', PROJECT, 'topics', 'delete', TOPIC]) - - -@pytest.fixture -def subscription_path(topic_path): - subscriber = pubsub.SubscriberClient() - subscription_path = subscriber.subscription_path(PROJECT, SUBSCRIPTION) - try: - subscriber.delete_subscription(subscription_path) - except Exception: - pass - subscription = subscriber.create_subscription(subscription_path, topic_path) - yield subscription.name - - # Due to the pinned library dependencies in apache-beam, client - # library throws an error upon deletion. - # We use gcloud for a workaround. See also: - # https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4492 - sp.check_call( - ['gcloud', 'pubsub', '--project', PROJECT, 'subscriptions', 'delete', - SUBSCRIPTION]) - - -@pytest.fixture -def dataset(): - bigquery_client = bigquery.Client(project=PROJECT) - dataset_id = '{}.{}'.format(PROJECT, DATASET) - dataset = bigquery.Dataset(dataset_id) - dataset = bigquery_client.create_dataset(dataset, exists_ok=True) - yield '{}:{}'.format(PROJECT, DATASET) - bigquery_client.delete_table('{}.{}'.format(DATASET, TABLE), not_found_ok=True) - bigquery_client.delete_dataset(DATASET, not_found_ok=True) - - -def _infinite_publish_job(topic_path): - publisher_client = pubsub.PublisherClient() - while True: - future = publisher_client.publish( - topic_path, - b'{"url": "https://beam.apache.org/", "review": "positive"}') - future.result() - time.sleep(1) - - -def test_dataflow_flex_templates_pubsub_to_bigquery(dataset, topic_path, - subscription_path): - # Use one process to publish messages to a topic. - publish_process = mp.Process(target=lambda: _infinite_publish_job(topic_path)) - - # Use another process to run the streaming pipeline that should write one - # row to BigQuery every minute (according to the default window size). - pipeline_process = mp.Process(target=lambda: sp.call([ - 'python', 'streaming_beam.py', - '--project', PROJECT, - '--runner', 'DirectRunner', - '--temp_location', tempfile.mkdtemp(), - '--input_subscription', subscription_path, - '--output_table', '{}.{}'.format(dataset, TABLE), - '--window_interval', '5', - ])) - - publish_process.start() - pipeline_process.start() - - pipeline_process.join(timeout=30) - publish_process.join(timeout=0) - - pipeline_process.terminate() - publish_process.terminate() - - # Check for output data in BigQuery. - bigquery_client = bigquery.Client(project=PROJECT) - query = 'SELECT * FROM {}.{}'.format(DATASET, TABLE) - query_job = bigquery_client.query(query) - rows = query_job.result() - assert rows.total_rows > 0 - for row in rows: - assert row['score'] == 1 - - -# TODO:Testcase using Teststream currently does not work as intended. -# The first write to BigQuery fails. Have filed a bug. The test case -# to be changed once the bug gets fixed. b/152446921 -''' -@mock.patch("apache_beam.Pipeline", TestPipeline) -@mock.patch( - "apache_beam.io.ReadFromPubSub", - lambda subscription: ( - TestStream() - .advance_watermark_to(0) - .advance_processing_time(60) - .add_elements([TimestampedValue( - b'{"url": "https://beam.apache.org/", "review": "positive"}', - 1575937195)]) - .advance_processing_time(60) - .add_elements([TimestampedValue( - b'{"url": "https://beam.apache.org/", "review": "positive"}', - 1575937255)]) - .advance_watermark_to_infinity() - ), -) -def test_dataflow_flex_templates_pubsub_to_bigquery(dataset): - streaming_beam.run( - args=[ - "--project", PROJECT, - "--runner", "DirectRunner" - ], - input_subscription="unused", - output_table='{}:{}.{}'.format(PROJECT, DATASET, TABLE), - ) - - # Check for output data in BigQuery. - bigquery_client = bigquery.Client(project=PROJECT) - query = 'SELECT * FROM {}.{}'.format(DATASET, TABLE) - query_job = bigquery_client.query(query) - rows = query_job.result() - assert rows.total_rows > 0 -''' diff --git a/dataflow/run_template/main_test.py b/dataflow/run_template/main_test.py index 6a5b9792692..0887f1b7817 100644 --- a/dataflow/run_template/main_test.py +++ b/dataflow/run_template/main_test.py @@ -31,15 +31,20 @@ from werkzeug.urls import url_encode -import main +# Relative imports cannot be found when running in `nox`, but we still +# try to import it for the autocomplete when writing the tests. +try: + from . import main +except ImportError: + import main RETRY_MAX_TIME = 5 * 60 # 5 minutes in seconds -PROJECT = os.environ['GOOGLE_CLOUD_PROJECT'] -BUCKET = os.environ['CLOUD_STORAGE_BUCKET'] +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +BUCKET = os.environ["CLOUD_STORAGE_BUCKET"] -dataflow = build('dataflow', 'v1b3') +dataflow = build("dataflow", "v1b3") # Create a fake "app" for generating test request contexts. @@ -53,8 +58,9 @@ def app(): @pytest.fixture(scope="function") def dataflow_job_name(request): label = request.param - job_name = datetime.now().strftime('{}-%Y%m%d-%H%M%S-{}'.format( - label, uuid.uuid4().hex[:5])) + job_name = datetime.now().strftime( + "{}-%Y%m%d-%H%M%S-{}".format(label, uuid.uuid4().hex[:5]) + ) yield job_name @@ -69,17 +75,21 @@ def dataflow_job_name(request): # Takes in a Dataflow job name and returns its job ID def get_job_id_from_name(job_name): # list the 50 most recent Dataflow jobs - jobs_request = dataflow.projects().jobs().list( - projectId=PROJECT, - filter="ACTIVE", - pageSize=50 # only return the 50 most recent results - our job is likely to be in here. If the job is not found, first try increasing this number. For more info see:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list + jobs_request = ( + dataflow.projects() + .jobs() + .list( + projectId=PROJECT, + filter="ACTIVE", + pageSize=50, # only return the 50 most recent results - our job is likely to be in here. If the job is not found, first try increasing this number. For more info see:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/list + ) ) response = jobs_request.execute() # search for the job in the list that has our name (names are unique) - for job in response['jobs']: - if job['name'] == job_name: - return job['id'] + for job in response["jobs"]: + if job["name"] == job_name: + return job["id"] # if we don't find a job, just return return @@ -92,32 +102,40 @@ def dataflow_jobs_cancel(job_name): if job_id: # Cancel the Dataflow job if it exists. If it doesn't, job_id will be equal to None. For more info, see: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/update - request = dataflow.projects().jobs().update( - projectId=PROJECT, - jobId=job_id, - body={'requestedState': 'JOB_STATE_CANCELLED'} + request = ( + dataflow.projects() + .jobs() + .update( + projectId=PROJECT, + jobId=job_id, + body={"requestedState": "JOB_STATE_CANCELLED"}, + ) ) request.execute() -@pytest.mark.parametrize('dataflow_job_name', [('test_run_template_empty')], indirect=True) +@pytest.mark.parametrize( + "dataflow_job_name", [("test_run_template_empty")], indirect=True +) def test_run_template_python_empty_args(app, dataflow_job_name): project = PROJECT - template = 'gs://dataflow-templates/latest/Word_Count' + template = "gs://dataflow-templates/latest/Word_Count" with pytest.raises(HttpError): main.run(project, dataflow_job_name, template) -@pytest.mark.parametrize('dataflow_job_name', [('test_run_template_python')], indirect=True) +@pytest.mark.parametrize( + "dataflow_job_name", [("test_run_template_python")], indirect=True +) def test_run_template_python(app, dataflow_job_name): project = PROJECT - template = 'gs://dataflow-templates/latest/Word_Count' + template = "gs://dataflow-templates/latest/Word_Count" parameters = { - 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', - 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), + "inputFile": "gs://apache-beam-samples/shakespeare/kinglear.txt", + "output": "gs://{}/dataflow/wordcount/outputs".format(BUCKET), } res = main.run(project, dataflow_job_name, template, parameters) - assert 'test_run_template_python' in res['job']['name'] + assert "test_run_template_python" in res["job"]["name"] def test_run_template_http_empty_args(app): @@ -126,46 +144,52 @@ def test_run_template_http_empty_args(app): main.run_template(flask.request) -@pytest.mark.parametrize('dataflow_job_name', [('test_run_template_url')], indirect=True) +@pytest.mark.parametrize( + "dataflow_job_name", [("test_run_template_url")], indirect=True +) def test_run_template_http_url(app, dataflow_job_name): args = { - 'project': PROJECT, - 'job': dataflow_job_name, - 'template': 'gs://dataflow-templates/latest/Word_Count', - 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', - 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), + "project": PROJECT, + "job": dataflow_job_name, + "template": "gs://dataflow-templates/latest/Word_Count", + "inputFile": "gs://apache-beam-samples/shakespeare/kinglear.txt", + "output": "gs://{}/dataflow/wordcount/outputs".format(BUCKET), } - with app.test_request_context('/?' + url_encode(args)): + with app.test_request_context("/?" + url_encode(args)): res = main.run_template(flask.request) data = json.loads(res) - assert 'test_run_template_url' in data['job']['name'] + assert "test_run_template_url" in data["job"]["name"] -@pytest.mark.parametrize('dataflow_job_name', [('test_run_template_data')], indirect=True) +@pytest.mark.parametrize( + "dataflow_job_name", [("test_run_template_data")], indirect=True +) def test_run_template_http_data(app, dataflow_job_name): args = { - 'project': PROJECT, - 'job': dataflow_job_name, - 'template': 'gs://dataflow-templates/latest/Word_Count', - 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', - 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), + "project": PROJECT, + "job": dataflow_job_name, + "template": "gs://dataflow-templates/latest/Word_Count", + "inputFile": "gs://apache-beam-samples/shakespeare/kinglear.txt", + "output": "gs://{}/dataflow/wordcount/outputs".format(BUCKET), } with app.test_request_context(data=args): res = main.run_template(flask.request) data = json.loads(res) - assert 'test_run_template_data' in data['job']['name'] + assert "test_run_template_data" in data["job"]["name"] -@pytest.mark.parametrize('dataflow_job_name', [('test_run_template_json')], indirect=True) +@pytest.mark.parametrize( + "dataflow_job_name", [("test_run_template_json")], indirect=True +) def test_run_template_http_json(app, dataflow_job_name): args = { - 'project': PROJECT, - 'job': dataflow_job_name, - 'template': 'gs://dataflow-templates/latest/Word_Count', - 'inputFile': 'gs://apache-beam-samples/shakespeare/kinglear.txt', - 'output': 'gs://{}/dataflow/wordcount/outputs'.format(BUCKET), + "project": PROJECT, + "job": dataflow_job_name, + "template": "gs://dataflow-templates/latest/Word_Count", + "inputFile": "gs://apache-beam-samples/shakespeare/kinglear.txt", + "output": "gs://{}/dataflow/wordcount/outputs".format(BUCKET), } with app.test_request_context(json=args): res = main.run_template(flask.request) data = json.loads(res) - assert 'test_run_template_json' in data['job']['name'] + assert "test_run_template_json" in data["job"]["name"]