Skip to content
12 changes: 8 additions & 4 deletions backend/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

class Settings(BaseSettings):
APP_NAME: str = "Clowder"
API_HOST: str = "http://127.0.0.1:8000"
API_V2_STR: str = "/api/v2"
admin_email: str = "[email protected]"
frontend_url: str = "http://localhost:3000"
Expand All @@ -26,9 +27,11 @@ class Settings(BaseSettings):
"http://localhost:3002",
]

# Mongo database connection
MONGODB_URL: str = "mongodb://localhost:27017"
MONGO_DATABASE: str = "clowder2"

# Minio (file storage) information
MINIO_SERVER_URL: str = "localhost:9000"
MINIO_BUCKET_NAME: str = "clowder"
MINIO_ACCESS_KEY: str = "minioadmin"
Expand Down Expand Up @@ -73,12 +76,13 @@ class Settings(BaseSettings):
}

# RabbitMQ message bus
RABBITMQ_USER = "guest"
RABBITMQ_PASS = "guest"
RABBITMQ_HOST = "localhost"
RABBITMQ_URL = (
RABBITMQ_USER: str = "guest"
RABBITMQ_PASS: str = "guest"
RABBITMQ_HOST: str = "localhost"
RABBITMQ_URL: str = (
"amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASS + "@" + RABBITMQ_HOST + "/"
)
HEARTBEAT_EXCHANGE: str = "extractors"


settings = Settings()
2 changes: 2 additions & 0 deletions backend/app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get_rabbitmq() -> BlockingChannel:
parameters = pika.ConnectionParameters("localhost", credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
"""
channel.exchange_declare(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this block be deleted?

exchange="test_exchange",
exchange_type=ExchangeType.direct,
Expand All @@ -60,6 +61,7 @@ def get_rabbitmq() -> BlockingChannel:
auto_delete=False,
)
channel.queue_declare(queue="standard_key")
"""
return channel


Expand Down
7 changes: 0 additions & 7 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
metadata_files,
datasets,
metadata_datasets,
collections,
authentication,
keycloak,
elasticsearch,
Expand Down Expand Up @@ -102,12 +101,6 @@
tags=["metadata"],
dependencies=[Depends(get_current_username)],
)
api_router.include_router(
collections.router,
prefix="/collections",
tags=["collections"],
dependencies=[Depends(get_current_username)],
)
api_router.include_router(
folders.router,
prefix="/folders",
Expand Down
19 changes: 0 additions & 19 deletions backend/app/models/collections.py

This file was deleted.

9 changes: 6 additions & 3 deletions backend/app/models/listeners.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
from pydantic import Field, BaseModel
from typing import Optional, List, Union

from app.config import settings
from app.models.pyobjectid import PyObjectId
from app.models.mongomodel import MongoModel
from app.models.users import UserOut
Expand Down Expand Up @@ -28,7 +30,7 @@ class ExtractorInfo(BaseModel):
bibtex: List[str] = []
default_labels: List[str] = []
categories: List[str] = []
parameters: List[dict] = []
parameters: Optional[dict] = None
version: str = "1.0"


Expand Down Expand Up @@ -83,7 +85,8 @@ class FeedListener(BaseModel):
class EventListenerMessage(BaseModel):
"""This describes contents of JSON object that is submitted to RabbitMQ for the Event Listeners/Extractors to consume."""

host: str = "http://127.0.0.1:8000"
# TODO better solution for host
host: str = settings.API_HOST
secretKey: str = "secretKey"
retry_count: int = 0
resource_type: str = "file"
Expand All @@ -98,7 +101,7 @@ class EventListenerMessage(BaseModel):
class EventListenerDatasetMessage(BaseModel):
"""This describes contents of JSON object that is submitted to RabbitMQ for the Event Listeners/Extractors to consume."""

host: str = "http://127.0.0.1:8000"
host: str = settings.API_HOST
secretKey: str = "secretKey"
retry_count: int = 0
resource_type: str = "dataset"
Expand Down
7 changes: 6 additions & 1 deletion backend/app/models/users.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional

from passlib.context import CryptContext
from pydantic import Field, EmailStr
from pydantic import Field, EmailStr, BaseModel
from pymongo import MongoClient

from app.models.mongomodel import MongoModel
Expand All @@ -19,6 +19,11 @@ class UserIn(UserBase):
password: str


class UserLogin(BaseModel):
email: EmailStr
password: str


class UserDB(UserBase):
hashed_password: str = Field()
keycloak_id: Optional[str] = None
Expand Down
63 changes: 30 additions & 33 deletions backend/app/rabbitmq/heartbeat_listener_sync.py
Original file line number Diff line number Diff line change
@@ -1,77 +1,74 @@
import logging
import pika
import json
from packaging import version
from app.config import settings
from pymongo import MongoClient
from app.models.listeners import (
EventListenerDB,
EventListenerOut,
)

from app.config import settings
from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def callback(ch, method, properties, body):
statusBody = json.loads(body.decode("utf-8"))
print("received extractor heartbeat: " + str(statusBody))
extractor_id = statusBody["id"]
extractor_queue = statusBody["queue"]
extractor_info = statusBody["extractor_info"]
"""This method receives messages from RabbitMQ and processes them."""
msg = json.loads(body.decode("utf-8"))

extractor_info = msg["extractor_info"]
extractor_name = extractor_info["name"]
extractor_db = EventListenerDB(**extractor_info)
client = MongoClient(settings.MONGODB_URL)
db = client["clowder2"]
existing_extractor = db["listeners"].find_one({"name": extractor_queue})
extractor_db = EventListenerDB(
**extractor_info, properties=ExtractorInfo(**extractor_info)
)

mongo_client = MongoClient(settings.MONGODB_URL)
db = mongo_client[settings.MONGO_DATABASE]

# TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods?
existing_extractor = db["listeners"].find_one({"name": msg["queue"]})
if existing_extractor is not None:
# Update existing listener
existing_version = existing_extractor["version"]
new_version = extractor_db.version
if version.parse(new_version) > version.parse(existing_version):
# TODO: Should this delete old version, or just add new entry? If 1st one, why not update?
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
extractor_out = EventListenerOut.from_mongo(found)
print(
"extractor updated: "
+ extractor_name
+ ", old version: "
+ existing_version
+ ", new version: "
+ new_version
logger.info(
"%s updated from %s to %s"
% (extractor_name, existing_version, new_version)
)
return extractor_out
else:
# Register new listener
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
extractor_out = EventListenerOut.from_mongo(found)
print("new extractor registered: " + extractor_name)
logger.info("New extractor registered: " + extractor_name)
return extractor_out


def listen_for_heartbeats():
credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS)

parameters = pika.ConnectionParameters(
settings.RABBITMQ_HOST, 5672, "/", credentials
)

connection = pika.BlockingConnection(parameters)

channel = connection.channel()

channel.exchange_declare(
exchange="extractors", exchange_type="fanout", durable=True
exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True
)

result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name)

channel.queue_bind(exchange="extractors", queue=queue_name)

print(" [*] Waiting for heartbeats. To exit press CTRL+C")

logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C")
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()


if __name__ == "__main__":
print("starting heartbeat listener")
listen_for_heartbeats()
18 changes: 3 additions & 15 deletions backend/app/rabbitmq/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
import pika
from fastapi import Request, HTTPException, Depends
from pymongo import MongoClient
from bson import ObjectId
from pika.adapters.blocking_connection import BlockingChannel

from app.config import settings
from app.keycloak_auth import get_token
from app import dependencies
from app.models.files import FileOut
Expand All @@ -27,8 +27,6 @@ def submit_file_message(
current_id = file_out.id
current_datasetId = file_out.dataset_id
current_secretKey = token
print(current_secretKey)
print(type(current_secretKey))
try:
msg_body = EventListenerMessage(
filename=file_out.name,
Expand All @@ -40,13 +38,8 @@ def submit_file_message(
except Exception as e:
print(e)

rabbitmq_client.queue_bind(
exchange="extractors",
queue=queue,
routing_key=routing_key,
)
rabbitmq_client.basic_publish(
exchange="extractors",
exchange="",
routing_key=routing_key,
body=json.dumps(msg_body.dict(), ensure_ascii=False),
properties=pika.BasicProperties(
Expand All @@ -73,13 +66,8 @@ def submit_dataset_message(
secretKey=token,
)

rabbitmq_client.queue_bind(
exchange="extractors",
queue=queue,
routing_key=routing_key,
)
rabbitmq_client.basic_publish(
exchange="extractors",
exchange="",
routing_key=routing_key,
body=json.dumps(msg_body.dict(), ensure_ascii=False),
properties=pika.BasicProperties(
Expand Down
Loading