diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index efe89f825..574529e80 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -225,7 +225,7 @@ def eventually_consistent_test() -> None: eventually_consistent_test() -def test_create( +def test_create_subscription( subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str, capsys: CaptureFixture, @@ -357,39 +357,50 @@ def test_create_subscription_with_ordering( assert "enable_message_ordering: true" in out -def test_create_push( +def test_create_push_subscription( subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str, capsys: CaptureFixture, ) -> None: # The scope of `subscription_path` is limited to this function. - subscription_path = subscriber_client.subscription_path( - PROJECT_ID, SUBSCRIPTION_ADMIN - ) - try: - subscriber_client.delete_subscription( - request={"subscription": subscription_path} + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test() -> None: + subscription_path = subscriber_client.subscription_path( + PROJECT_ID, SUBSCRIPTION_ADMIN ) - except NotFound: - pass + try: + subscriber_client.delete_subscription( + request={"subscription": subscription_path} + ) + except NotFound: + pass - subscriber.create_push_subscription(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) + subscriber.create_push_subscription(PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) - out, _ = capsys.readouterr() - assert f"{subscription_admin}" in out + out, _ = capsys.readouterr() + assert f"{subscription_admin}" in out + eventually_consistent_test() -def test_update(subscription_admin: str, capsys: CaptureFixture) -> None: - subscriber.update_push_subscription( - PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT - ) - out, _ = capsys.readouterr() - assert "Subscription updated" in out - assert f"{subscription_admin}" in out +def test_update_push_suscription( + subscription_admin: str, + capsys: CaptureFixture, +) -> None: + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test() -> None: + subscriber.update_push_subscription( + PROJECT_ID, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + ) + + out, _ = capsys.readouterr() + assert "Subscription updated" in out + assert f"{subscription_admin}" in out + + eventually_consistent_test() -def test_delete( +def test_delete_subscription( subscriber_client: pubsub_v1.SubscriberClient, subscription_admin: str ) -> None: subscriber.delete_subscription(PROJECT_ID, SUBSCRIPTION_ADMIN) @@ -410,14 +421,19 @@ def test_receive( subscription_async: str, capsys: CaptureFixture, ) -> None: - _publish_messages(publisher_client, topic) - subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + @backoff.on_exception(backoff.expo, Unknown, max_time=60) + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic) - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out + subscriber.receive_messages(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + eventually_consistent_test() def test_receive_with_custom_attributes( @@ -427,17 +443,21 @@ def test_receive_with_custom_attributes( capsys: CaptureFixture, ) -> None: - _publish_messages(publisher_client, topic, origin="python-sample") + @backoff.on_exception(backoff.expo, Unknown, max_time=60) + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic, origin="python-sample") - subscriber.receive_messages_with_custom_attributes( - PROJECT_ID, SUBSCRIPTION_ASYNC, 5 - ) + subscriber.receive_messages_with_custom_attributes( + PROJECT_ID, SUBSCRIPTION_ASYNC, 5 + ) - out, _ = capsys.readouterr() - assert subscription_async in out - assert "message" in out - assert "origin" in out - assert "python-sample" in out + out, _ = capsys.readouterr() + assert subscription_async in out + assert "message" in out + assert "origin" in out + assert "python-sample" in out + + eventually_consistent_test() def test_receive_with_flow_control( @@ -447,14 +467,18 @@ def test_receive_with_flow_control( capsys: CaptureFixture, ) -> None: - _publish_messages(publisher_client, topic) + @backoff.on_exception(backoff.expo, Unknown, max_time=300) + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + subscriber.receive_messages_with_flow_control(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) - out, _ = capsys.readouterr() - assert "Listening" in out - assert subscription_async in out - assert "message" in out + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + eventually_consistent_test() def test_receive_with_blocking_shutdown( @@ -463,57 +487,67 @@ def test_receive_with_blocking_shutdown( subscription_async: str, capsys: CaptureFixture, ) -> None: - _publish_messages(publisher_client, topic, message_num=3) - subscriber.receive_messages_with_blocking_shutdown( - PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0 - ) + _received = re.compile(r".*received.*message.*", flags=re.IGNORECASE) + _done = re.compile(r".*done processing.*message.*", flags=re.IGNORECASE) + _canceled = re.compile(r".*streaming pull future canceled.*", flags=re.IGNORECASE) + _shut_down = re.compile(r".*done waiting.*stream shutdown.*", flags=re.IGNORECASE) - out, _ = capsys.readouterr() - out_lines = out.splitlines() - - msg_received_lines = [ - i - for i, line in enumerate(out_lines) - if re.search(r".*received.*message.*", line, flags=re.IGNORECASE) - ] - msg_done_lines = [ - i - for i, line in enumerate(out_lines) - if re.search(r".*done processing.*message.*", line, flags=re.IGNORECASE) - ] - stream_canceled_lines = [ - i - for i, line in enumerate(out_lines) - if re.search(r".*streaming pull future canceled.*", line, flags=re.IGNORECASE) - ] - shutdown_done_waiting_lines = [ - i - for i, line in enumerate(out_lines) - if re.search(r".*done waiting.*stream shutdown.*", line, flags=re.IGNORECASE) - ] + @backoff.on_exception(backoff.expo, Unknown, max_time=300) + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic, message_num=3) - try: - assert "Listening" in out - assert subscription_async in out + subscriber.receive_messages_with_blocking_shutdown( + PROJECT_ID, SUBSCRIPTION_ASYNC, timeout=5.0 + ) + + out, _ = capsys.readouterr() + out_lines = out.splitlines() + + msg_received_lines = [ + i + for i, line in enumerate(out_lines) + if _received.search(line) + ] + msg_done_lines = [ + i + for i, line in enumerate(out_lines) + if _done.search(line) + ] + stream_canceled_lines = [ + i + for i, line in enumerate(out_lines) + if _canceled.search(line) + ] + shutdown_done_waiting_lines = [ + i + for i, line in enumerate(out_lines) + if _shut_down.search(line) + ] + + try: + assert "Listening" in out + assert subscription_async in out - assert len(stream_canceled_lines) == 1 - assert len(shutdown_done_waiting_lines) == 1 - assert len(msg_received_lines) == 3 - assert len(msg_done_lines) == 3 + assert len(stream_canceled_lines) == 1 + assert len(shutdown_done_waiting_lines) == 1 + assert len(msg_received_lines) == 3 + assert len(msg_done_lines) == 3 - # The stream should have been canceled *after* receiving messages, but before - # message processing was done. - assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] + # The stream should have been canceled *after* receiving messages, but before + # message processing was done. + assert msg_received_lines[-1] < stream_canceled_lines[0] < msg_done_lines[0] - # Yet, waiting on the stream shutdown should have completed *after* - # the processing of received messages has ended. - assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] - except AssertionError: # pragma: NO COVER - from pprint import pprint + # Yet, waiting on the stream shutdown should have completed *after* + # the processing of received messages has ended. + assert msg_done_lines[-1] < shutdown_done_waiting_lines[0] + except AssertionError: # pragma: NO COVER + from pprint import pprint - pprint(out_lines) # To make possible flakiness debugging easier. - raise + pprint(out_lines) # To make possible flakiness debugging easier. + raise + + eventually_consistent_test() def test_listen_for_errors( @@ -523,13 +557,17 @@ def test_listen_for_errors( capsys: CaptureFixture, ) -> None: - _publish_messages(publisher_client, topic) + @backoff.on_exception(backoff.expo, Unknown, max_time=60) + def eventually_consistent_test() -> None: + _publish_messages(publisher_client, topic) - subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) - out, _ = capsys.readouterr() - assert subscription_async in out - assert "threw an exception" in out + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + eventually_consistent_test() def test_receive_synchronously(