Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion backend/heartbeat_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
from datetime import datetime
import time

from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
Expand All @@ -17,6 +18,9 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

timeout = 5 * 60 # five minute timeout
time_ran = 0


async def callback(message: AbstractIncomingMessage):
"""This method receives messages from RabbitMQ and processes them.
Expand Down Expand Up @@ -118,4 +122,14 @@ async def listen_for_heartbeats():


if __name__ == "__main__":
asyncio.run(listen_for_heartbeats())
start = datetime.now()
while time_ran < timeout:
try:
asyncio.run(listen_for_heartbeats())
except Exception as e:
logger.info(f" Heartbeat listner failed, retry in 10 seconds...")
time.sleep(10)
current_time = datetime.now()
current_seconds = (current_time - start).total_seconds()
time_ran += current_seconds
logger.info(f" Heartbeat listener could not connect to rabbitmq.")
16 changes: 15 additions & 1 deletion backend/message_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import random
import string
from datetime import datetime
import time

from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
Expand All @@ -22,6 +23,9 @@
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

timeout = 5 * 60 # five minute timeout
time_ran = 0


def parse_message_status(msg):
"""Determine if the message corresponds to start/middle/end of job if possible. See pyclowder.utils.StatusMessage."""
Expand Down Expand Up @@ -219,4 +223,14 @@ async def listen_for_messages():


if __name__ == "__main__":
asyncio.run(listen_for_messages())
start = datetime.now()
while time_ran < timeout:
try:
asyncio.run(listen_for_messages())
except Exception as e:
logger.info(f" Message listener failed, retry in 10 seconds...")
time.sleep(10)
current_time = datetime.now()
current_seconds = (current_time - start).total_seconds()
time_ran += current_seconds
logger.info(f"Message listener could not connect to rabbitmq. Timeout.")