From 5d052597c212f3e7f60b297359ba82c27ea1c8ea Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Thu, 15 Dec 2022 08:27:35 -0600 Subject: [PATCH 01/11] Move heartbeat & some updates --- backend/app/rabbitmq/listeners.py | 2 + backend/heartbeat.py | 76 +++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+) create mode 100644 backend/heartbeat.py diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 87cc80b4a..cf8f7b4f1 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -40,6 +40,7 @@ def submit_file_message( except Exception as e: print(e) + # TODO: Change default name to listeners rabbitmq_client.queue_bind( exchange="extractors", queue=queue, @@ -73,6 +74,7 @@ def submit_dataset_message( secretKey=token, ) + # TODO: Change default name to listeners rabbitmq_client.queue_bind( exchange="extractors", queue=queue, diff --git a/backend/heartbeat.py b/backend/heartbeat.py new file mode 100644 index 000000000..2087b7530 --- /dev/null +++ b/backend/heartbeat.py @@ -0,0 +1,76 @@ +import logging +import pika +import json +from packaging import version +from pymongo import MongoClient + +from app.config import settings +from app.models.listeners import ( + EventListenerDB, + EventListenerOut, + ExtractorInfo +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +def callback(ch, method, properties, body): + """This method receives messages from RabbitMQ and processes them.""" + statusBody = json.loads(body.decode("utf-8")) + logger.info("Received extractor heartbeat: " + str(statusBody)) + + extractor_id = statusBody["id"] + extractor_queue = statusBody["queue"] + extractor_info = statusBody["extractor_info"] + current_info = ExtractorInfo(**extractor_info) + extractor_name = extractor_info["name"] + extractor_db = EventListenerDB(**extractor_info) + extractor_db.properties = current_info + mongo_client = MongoClient(settings.MONGODB_URL) + db = mongo_client[settings.MONGO_DATABASE] + + # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods + existing_extractor = db["listeners"].find_one({"name": extractor_queue}) + if existing_extractor is not None: + existing_version = existing_extractor["version"] + new_version = extractor_db.version + if version.parse(new_version) > version.parse(existing_version): + # TODO: Should this delete old version, or just add new entry? If 1st one, why not update? + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) + removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) + extractor_out = EventListenerOut.from_mongo(found) + logger.info("%s updated from %s to %s" % (extractor_name, existing_version, new_version)) + return extractor_out + else: + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) + extractor_out = EventListenerOut.from_mongo(found) + logger.info("New extractor registered: " + extractor_name) + return extractor_out + + +def listen_for_heartbeats(): + credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) + parameters = pika.ConnectionParameters( + settings.RABBITMQ_HOST, 5672, "/", credentials + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + # TODO: Change default name to listeners + channel.exchange_declare( + exchange="extractors", exchange_type="fanout", durable=True + ) + result = channel.queue_declare(queue="", exclusive=True) + queue_name = result.method.queue + channel.queue_bind(exchange="extractors", queue=queue_name) + + logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") + channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) + channel.start_consuming() + +if __name__ == "__main__": + print() + listen_for_heartbeats() From 247b4c666c6cf9fe305d4d1b21dafb922353dfd1 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 10:08:17 -0600 Subject: [PATCH 02/11] various cleanup and bugfix removed collections, fixed exchange declaration, made host a config, a few other small things --- backend/app/config.py | 12 +++++--- backend/app/dependencies.py | 2 ++ backend/app/main.py | 7 ----- backend/app/models/collections.py | 19 ------------ backend/app/models/listeners.py | 7 +++-- backend/app/models/users.py | 6 ++-- backend/app/rabbitmq/listeners.py | 14 ++------- backend/app/routers/authentication.py | 4 +-- backend/app/routers/collections.py | 43 --------------------------- backend/app/routers/datasets.py | 5 ++-- backend/app/routers/files.py | 12 +++++--- backend/heartbeat.py | 24 +++++++-------- 12 files changed, 43 insertions(+), 112 deletions(-) delete mode 100644 backend/app/models/collections.py delete mode 100644 backend/app/routers/collections.py diff --git a/backend/app/config.py b/backend/app/config.py index 4ce4367da..b6fafac2f 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -5,6 +5,7 @@ class Settings(BaseSettings): APP_NAME: str = "Clowder" + API_HOST: str = "http://127.0.0.1:3002" API_V2_STR: str = "/api/v2" admin_email: str = "devnull@ncsa.illinois.edu" frontend_url: str = "http://localhost:3000" @@ -26,9 +27,11 @@ class Settings(BaseSettings): "http://localhost:3002", ] + # Mongo database connection MONGODB_URL: str = "mongodb://localhost:27017" MONGO_DATABASE: str = "clowder2" + # Minio (file storage) information MINIO_SERVER_URL: str = "localhost:9000" MINIO_BUCKET_NAME: str = "clowder" MINIO_ACCESS_KEY: str = "minioadmin" @@ -73,12 +76,13 @@ class Settings(BaseSettings): } # RabbitMQ message bus - RABBITMQ_USER = "guest" - RABBITMQ_PASS = "guest" - RABBITMQ_HOST = "localhost" - RABBITMQ_URL = ( + RABBITMQ_USER: str = "guest" + RABBITMQ_PASS: str = "guest" + RABBITMQ_HOST: str = "localhost" + RABBITMQ_URL: str = ( "amqp://" + RABBITMQ_USER + ":" + RABBITMQ_PASS + "@" + RABBITMQ_HOST + "/" ) + HEARTBEAT_EXCHANGE: str = "extractors" settings = Settings() diff --git a/backend/app/dependencies.py b/backend/app/dependencies.py index 8247999fb..345b081e1 100644 --- a/backend/app/dependencies.py +++ b/backend/app/dependencies.py @@ -52,6 +52,7 @@ def get_rabbitmq() -> BlockingChannel: parameters = pika.ConnectionParameters("localhost", credentials=credentials) connection = pika.BlockingConnection(parameters) channel = connection.channel() + """ channel.exchange_declare( exchange="test_exchange", exchange_type=ExchangeType.direct, @@ -60,6 +61,7 @@ def get_rabbitmq() -> BlockingChannel: auto_delete=False, ) channel.queue_declare(queue="standard_key") + """ return channel diff --git a/backend/app/main.py b/backend/app/main.py index 7a63fcced..d6198cf69 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -22,7 +22,6 @@ metadata_files, datasets, metadata_datasets, - collections, authentication, keycloak, elasticsearch, @@ -102,12 +101,6 @@ tags=["metadata"], dependencies=[Depends(get_current_username)], ) -api_router.include_router( - collections.router, - prefix="/collections", - tags=["collections"], - dependencies=[Depends(get_current_username)], -) api_router.include_router( folders.router, prefix="/folders", diff --git a/backend/app/models/collections.py b/backend/app/models/collections.py deleted file mode 100644 index 520b93f28..000000000 --- a/backend/app/models/collections.py +++ /dev/null @@ -1,19 +0,0 @@ -from mongoengine import DynamicDocument - -from app.models.mongomodel import MongoModel - - -class Collection(MongoModel): - name: str - description: str = "" - - -# class MongoDataset(Document): -# name = StringField() -# description = StringField() -# price = IntField() -# tax = IntField() - - -class MongoCollection(DynamicDocument): - pass diff --git a/backend/app/models/listeners.py b/backend/app/models/listeners.py index ec877af13..808fb2b72 100644 --- a/backend/app/models/listeners.py +++ b/backend/app/models/listeners.py @@ -1,6 +1,8 @@ from datetime import datetime from pydantic import Field, BaseModel from typing import Optional, List, Union + +from app.config import settings from app.models.pyobjectid import PyObjectId from app.models.mongomodel import MongoModel from app.models.users import UserOut @@ -83,7 +85,8 @@ class FeedListener(BaseModel): class EventListenerMessage(BaseModel): """This describes contents of JSON object that is submitted to RabbitMQ for the Event Listeners/Extractors to consume.""" - host: str = "http://127.0.0.1:8000" + # TODO better solution for host + host: str = settings.API_HOST secretKey: str = "secretKey" retry_count: int = 0 resource_type: str = "file" @@ -98,7 +101,7 @@ class EventListenerMessage(BaseModel): class EventListenerDatasetMessage(BaseModel): """This describes contents of JSON object that is submitted to RabbitMQ for the Event Listeners/Extractors to consume.""" - host: str = "http://127.0.0.1:8000" + host: str = settings.API_HOST secretKey: str = "secretKey" retry_count: int = 0 resource_type: str = "dataset" diff --git a/backend/app/models/users.py b/backend/app/models/users.py index 457a37963..39bd64ea1 100644 --- a/backend/app/models/users.py +++ b/backend/app/models/users.py @@ -1,7 +1,7 @@ from typing import Optional from passlib.context import CryptContext -from pydantic import Field, EmailStr +from pydantic import Field, EmailStr, BaseModel from pymongo import MongoClient from app.models.mongomodel import MongoModel @@ -14,10 +14,12 @@ class UserBase(MongoModel): first_name: str last_name: str - class UserIn(UserBase): password: str +class UserLogin(BaseModel): + email: EmailStr + password: str class UserDB(UserBase): hashed_password: str = Field() diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index cf8f7b4f1..6bd6b9fe5 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -2,9 +2,9 @@ import pika from fastapi import Request, HTTPException, Depends from pymongo import MongoClient -from bson import ObjectId from pika.adapters.blocking_connection import BlockingChannel +from app.config import settings from app.keycloak_auth import get_token from app import dependencies from app.models.files import FileOut @@ -27,8 +27,6 @@ def submit_file_message( current_id = file_out.id current_datasetId = file_out.dataset_id current_secretKey = token - print(current_secretKey) - print(type(current_secretKey)) try: msg_body = EventListenerMessage( filename=file_out.name, @@ -40,14 +38,8 @@ def submit_file_message( except Exception as e: print(e) - # TODO: Change default name to listeners - rabbitmq_client.queue_bind( - exchange="extractors", - queue=queue, - routing_key=routing_key, - ) rabbitmq_client.basic_publish( - exchange="extractors", + exchange='', routing_key=routing_key, body=json.dumps(msg_body.dict(), ensure_ascii=False), properties=pika.BasicProperties( @@ -76,12 +68,10 @@ def submit_dataset_message( # TODO: Change default name to listeners rabbitmq_client.queue_bind( - exchange="extractors", queue=queue, routing_key=routing_key, ) rabbitmq_client.basic_publish( - exchange="extractors", routing_key=routing_key, body=json.dumps(msg_body.dict(), ensure_ascii=False), properties=pika.BasicProperties( diff --git a/backend/app/routers/authentication.py b/backend/app/routers/authentication.py index 3d4c7df60..254608eaf 100644 --- a/backend/app/routers/authentication.py +++ b/backend/app/routers/authentication.py @@ -12,7 +12,7 @@ from app import dependencies from app.keycloak_auth import create_user from app.keycloak_auth import keycloak_openid -from app.models.users import UserDB, UserIn, UserOut +from app.models.users import UserDB, UserIn, UserOut, UserLogin router = APIRouter() @@ -51,7 +51,7 @@ async def save_user(userIn: UserIn, db: MongoClient = Depends(dependencies.get_d @router.post("/login") -async def login(userIn: UserIn, db: MongoClient = Depends(dependencies.get_db)): +async def login(userIn: UserLogin, db: MongoClient = Depends(dependencies.get_db)): try: token = keycloak_openid.token(userIn.email, userIn.password) return {"token": token["access_token"]} diff --git a/backend/app/routers/collections.py b/backend/app/routers/collections.py deleted file mode 100644 index d9aa5ba11..000000000 --- a/backend/app/routers/collections.py +++ /dev/null @@ -1,43 +0,0 @@ -from typing import List - -from bson import ObjectId -from fastapi import APIRouter, HTTPException, Depends -from pymongo import MongoClient - -from app import dependencies -from app.models.collections import Collection - -router = APIRouter() - - -@router.post("/", response_model=Collection) -async def save_collection( - body: Collection, db: MongoClient = Depends(dependencies.get_db) -): - body = body - res = await db["collections"].insert_one(body.to_mongo()) - found = await db["collections"].find_one({"_id": res.inserted_id}) - return Collection.from_mongo(found) - - -@router.get("/", response_model=List[Collection]) -async def get_collections( - db: MongoClient = Depends(dependencies.get_db), skip: int = 0, limit: int = 2 -): - collections = [] - for doc in ( - await db["collections"].find().skip(skip).limit(limit).to_list(length=limit) - ): - collections.append(doc) - return collections - - -@router.get("/{collection_id}") -async def get_collection( - collection_id: str, db: MongoClient = Depends(dependencies.get_db) -): - if ( - collection := await db["collections"].find_one({"_id": ObjectId(collection_id)}) - ) is not None: - return Collection.from_mongo(collection) - raise HTTPException(status_code=404, detail=f"Collection {collection_id} not found") diff --git a/backend/app/routers/datasets.py b/backend/app/routers/datasets.py index 4ad2b7adc..823e2667c 100644 --- a/backend/app/routers/datasets.py +++ b/backend/app/routers/datasets.py @@ -817,8 +817,7 @@ async def get_dataset_extract( body = {} body["secretKey"] = access_token body["token"] = access_token - # TODO better solution for host - body["host"] = "http://127.0.0.1:8000" + body["host"] = settings.API_HOST body["retry_count"] = 0 body["filename"] = dataset["name"] body["id"] = dataset_id @@ -830,7 +829,7 @@ async def get_dataset_extract( if "parameters" in req_info: current_parameters = req_info["parameters"] body["parameters"] = current_parameters - current_routing_key = "extractors." + current_queue + current_routing_key = current_queue submit_dataset_message( dataset_out, diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 2f5ed3605..1c3813351 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -303,17 +303,21 @@ async def get_file_versions( raise HTTPException(status_code=404, detail=f"File {file_id} not found") -# submits file to extractor -# can handle parameeters pass in as key/values in info @router.post("/{file_id}/extract") async def get_file_extract( file_id: str, - info: Request, + info: Request, # TODO: Pydantic class? token: str = Depends(get_token), credentials: HTTPAuthorizationCredentials = Security(security), db: MongoClient = Depends(dependencies.get_db), rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), ): + """ + Submit file to an extractor. + + :param file_id: UUID of file + :param info: must include "extractor" field with name, can also include key/value pairs in "parameters" + """ req_info = await info.json() access_token = credentials.credentials if "extractor" not in req_info: @@ -332,7 +336,7 @@ async def get_file_extract( parameters = {} if "parameters" in req_info: parameters = req_info["parameters"] - routing_key = "extractors." + queue + routing_key = queue submit_file_message( file_out, queue, routing_key, parameters, access_token, db, rabbitmq_client diff --git a/backend/heartbeat.py b/backend/heartbeat.py index 2087b7530..a79bbc68e 100644 --- a/backend/heartbeat.py +++ b/backend/heartbeat.py @@ -17,22 +17,19 @@ def callback(ch, method, properties, body): """This method receives messages from RabbitMQ and processes them.""" - statusBody = json.loads(body.decode("utf-8")) - logger.info("Received extractor heartbeat: " + str(statusBody)) + msg = json.loads(body.decode("utf-8")) - extractor_id = statusBody["id"] - extractor_queue = statusBody["queue"] - extractor_info = statusBody["extractor_info"] - current_info = ExtractorInfo(**extractor_info) + extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] - extractor_db = EventListenerDB(**extractor_info) - extractor_db.properties = current_info + extractor_db = EventListenerDB(**extractor_info, properties=ExtractorInfo(**extractor_info)) + mongo_client = MongoClient(settings.MONGODB_URL) db = mongo_client[settings.MONGO_DATABASE] - # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods - existing_extractor = db["listeners"].find_one({"name": extractor_queue}) + # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods? + existing_extractor = db["listeners"].find_one({"name": msg["queue"]}) if existing_extractor is not None: + # Update existing listener existing_version = existing_extractor["version"] new_version = extractor_db.version if version.parse(new_version) > version.parse(existing_version): @@ -44,6 +41,7 @@ def callback(ch, method, properties, body): logger.info("%s updated from %s to %s" % (extractor_name, existing_version, new_version)) return extractor_out else: + # Register new listener new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) extractor_out = EventListenerOut.from_mongo(found) @@ -59,18 +57,16 @@ def listen_for_heartbeats(): connection = pika.BlockingConnection(parameters) channel = connection.channel() - # TODO: Change default name to listeners channel.exchange_declare( - exchange="extractors", exchange_type="fanout", durable=True + exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True ) result = channel.queue_declare(queue="", exclusive=True) queue_name = result.method.queue - channel.queue_bind(exchange="extractors", queue=queue_name) + channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name) logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() if __name__ == "__main__": - print() listen_for_heartbeats() From cc4aba0bacdf822c257df34f5cba7612bd123ebb Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 10:09:51 -0600 Subject: [PATCH 03/11] add blank exchange --- backend/app/rabbitmq/listeners.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 6bd6b9fe5..e0c93ed77 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -66,12 +66,12 @@ def submit_dataset_message( secretKey=token, ) - # TODO: Change default name to listeners rabbitmq_client.queue_bind( queue=queue, routing_key=routing_key, ) rabbitmq_client.basic_publish( + exchange='', routing_key=routing_key, body=json.dumps(msg_body.dict(), ensure_ascii=False), properties=pika.BasicProperties( From 8dca37370ac85c4376ef4e1ea54d305ad1ede938 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 10:11:27 -0600 Subject: [PATCH 04/11] remove dataset queue bind --- backend/app/rabbitmq/listeners.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index e0c93ed77..037d13baf 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -66,10 +66,6 @@ def submit_dataset_message( secretKey=token, ) - rabbitmq_client.queue_bind( - queue=queue, - routing_key=routing_key, - ) rabbitmq_client.basic_publish( exchange='', routing_key=routing_key, From 3680083f01c2e22aeed062862505fa07a3369d2c Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 10:14:49 -0600 Subject: [PATCH 05/11] rename heartbeat from root --- .../app/rabbitmq/heartbeat_listener_sync.py | 57 ++- backend/app/rabbitmq/rabbitmq_async_client.py | 387 ------------------ backend/heartbeat.py | 72 ---- 3 files changed, 26 insertions(+), 490 deletions(-) delete mode 100644 backend/app/rabbitmq/rabbitmq_async_client.py delete mode 100644 backend/heartbeat.py diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index f8835238f..a79bbc68e 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -1,77 +1,72 @@ +import logging import pika import json from packaging import version -from app.config import settings from pymongo import MongoClient + +from app.config import settings from app.models.listeners import ( EventListenerDB, EventListenerOut, + ExtractorInfo ) +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + def callback(ch, method, properties, body): - statusBody = json.loads(body.decode("utf-8")) - print("received extractor heartbeat: " + str(statusBody)) - extractor_id = statusBody["id"] - extractor_queue = statusBody["queue"] - extractor_info = statusBody["extractor_info"] + """This method receives messages from RabbitMQ and processes them.""" + msg = json.loads(body.decode("utf-8")) + + extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] - extractor_db = EventListenerDB(**extractor_info) - client = MongoClient(settings.MONGODB_URL) - db = client["clowder2"] - existing_extractor = db["listeners"].find_one({"name": extractor_queue}) + extractor_db = EventListenerDB(**extractor_info, properties=ExtractorInfo(**extractor_info)) + + mongo_client = MongoClient(settings.MONGODB_URL) + db = mongo_client[settings.MONGO_DATABASE] + + # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods? + existing_extractor = db["listeners"].find_one({"name": msg["queue"]}) if existing_extractor is not None: + # Update existing listener existing_version = existing_extractor["version"] new_version = extractor_db.version if version.parse(new_version) > version.parse(existing_version): + # TODO: Should this delete old version, or just add new entry? If 1st one, why not update? new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) extractor_out = EventListenerOut.from_mongo(found) - print( - "extractor updated: " - + extractor_name - + ", old version: " - + existing_version - + ", new version: " - + new_version - ) + logger.info("%s updated from %s to %s" % (extractor_name, existing_version, new_version)) return extractor_out else: + # Register new listener new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) extractor_out = EventListenerOut.from_mongo(found) - print("new extractor registered: " + extractor_name) + logger.info("New extractor registered: " + extractor_name) return extractor_out def listen_for_heartbeats(): credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters( settings.RABBITMQ_HOST, 5672, "/", credentials ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() channel.exchange_declare( - exchange="extractors", exchange_type="fanout", durable=True + exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True ) - result = channel.queue_declare(queue="", exclusive=True) queue_name = result.method.queue + channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name) - channel.queue_bind(exchange="extractors", queue=queue_name) - - print(" [*] Waiting for heartbeats. To exit press CTRL+C") - + logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) - channel.start_consuming() - if __name__ == "__main__": - print("starting heartbeat listener") listen_for_heartbeats() diff --git a/backend/app/rabbitmq/rabbitmq_async_client.py b/backend/app/rabbitmq/rabbitmq_async_client.py deleted file mode 100644 index 15418509c..000000000 --- a/backend/app/rabbitmq/rabbitmq_async_client.py +++ /dev/null @@ -1,387 +0,0 @@ -# -*- coding: utf-8 -*- -# pylint: disable=C0111,C0103,R0205 -""" -This is not being used right now. - -An example of using ioloop to listen for messages from rabbitmq. - -From https://github.com/pika/pika/blob/master/examples/asynchronous_publisher_example.py -""" -import functools -import logging -import json -import pika -from pika.exchange_type import ExchangeType - -LOG_FORMAT = ( - "%(levelname) -10s %(asctime)s %(name) -30s %(funcName) " - "-35s %(lineno) -5d: %(message)s" -) -LOGGER = logging.getLogger(__name__) - - -class ExamplePublisher(object): - """This is an example publisher that will handle unexpected interactions - with RabbitMQ such as channel and connection closures. - - If RabbitMQ closes the connection, it will reopen it. You should - look at the output, as there are limited reasons why the connection may - be closed, which usually are tied to permission related issues or - socket timeouts. - - It uses delivery confirmations and illustrates one way to keep track of - messages that have been sent and if they've been confirmed by RabbitMQ. - - """ - - EXCHANGE = "message" - EXCHANGE_TYPE = ExchangeType.topic - PUBLISH_INTERVAL = 1 - QUEUE = "text" - ROUTING_KEY = "example.text" - - def __init__(self, amqp_url): - """Setup the example publisher object, passing in the URL we will use - to connect to RabbitMQ. - - :param str amqp_url: The URL for connecting to RabbitMQ - - """ - self._connection = None - self._channel = None - - self._deliveries = None - self._acked = None - self._nacked = None - self._message_number = None - - self._stopping = False - self._url = amqp_url - - def connect(self): - """This method connects to RabbitMQ, returning the connection handle. - When the connection is established, the on_connection_open method - will be invoked by pika. - - :rtype: pika.SelectConnection - - """ - LOGGER.info("Connecting to %s", self._url) - return pika.SelectConnection( - pika.URLParameters(self._url), - on_open_callback=self.on_connection_open, - on_open_error_callback=self.on_connection_open_error, - on_close_callback=self.on_connection_closed, - ) - - def on_connection_open(self, _unused_connection): - """This method is called by pika once the connection to RabbitMQ has - been established. It passes the handle to the connection object in - case we need it, but in this case, we'll just mark it unused. - - :param pika.SelectConnection _unused_connection: The connection - - """ - LOGGER.info("Connection opened") - self.open_channel() - - def on_connection_open_error(self, _unused_connection, err): - """This method is called by pika if the connection to RabbitMQ - can't be established. - - :param pika.SelectConnection _unused_connection: The connection - :param Exception err: The error - - """ - LOGGER.error("Connection open failed, reopening in 5 seconds: %s", err) - self._connection.ioloop.call_later(5, self._connection.ioloop.stop) - - def on_connection_closed(self, _unused_connection, reason): - """This method is invoked by pika when the connection to RabbitMQ is - closed unexpectedly. Since it is unexpected, we will reconnect to - RabbitMQ if it disconnects. - - :param pika.connection.Connection connection: The closed connection obj - :param Exception reason: exception representing reason for loss of - connection. - - """ - self._channel = None - if self._stopping: - self._connection.ioloop.stop() - else: - LOGGER.warning("Connection closed, reopening in 5 seconds: %s", reason) - self._connection.ioloop.call_later(5, self._connection.ioloop.stop) - - def open_channel(self): - """This method will open a new channel with RabbitMQ by issuing the - Channel.Open RPC command. When RabbitMQ confirms the channel is open - by sending the Channel.OpenOK RPC reply, the on_channel_open method - will be invoked. - - """ - LOGGER.info("Creating a new channel") - self._connection.channel(on_open_callback=self.on_channel_open) - - def on_channel_open(self, channel): - """This method is invoked by pika when the channel has been opened. - The channel object is passed in so we can make use of it. - - Since the channel is now open, we'll declare the exchange to use. - - :param pika.channel.Channel channel: The channel object - - """ - LOGGER.info("Channel opened") - self._channel = channel - self.add_on_channel_close_callback() - self.setup_exchange(self.EXCHANGE) - - def add_on_channel_close_callback(self): - """This method tells pika to call the on_channel_closed method if - RabbitMQ unexpectedly closes the channel. - - """ - LOGGER.info("Adding channel close callback") - self._channel.add_on_close_callback(self.on_channel_closed) - - def on_channel_closed(self, channel, reason): - """Invoked by pika when RabbitMQ unexpectedly closes the channel. - Channels are usually closed if you attempt to do something that - violates the protocol, such as re-declare an exchange or queue with - different parameters. In this case, we'll close the connection - to shutdown the object. - - :param pika.channel.Channel channel: The closed channel - :param Exception reason: why the channel was closed - - """ - LOGGER.warning("Channel %i was closed: %s", channel, reason) - self._channel = None - if not self._stopping: - self._connection.close() - - def setup_exchange(self, exchange_name): - """Setup the exchange on RabbitMQ by invoking the Exchange.Declare RPC - command. When it is complete, the on_exchange_declareok method will - be invoked by pika. - - :param str|unicode exchange_name: The name of the exchange to declare - - """ - LOGGER.info("Declaring exchange %s", exchange_name) - # Note: using functools.partial is not required, it is demonstrating - # how arbitrary data can be passed to the callback when it is called - cb = functools.partial(self.on_exchange_declareok, userdata=exchange_name) - self._channel.exchange_declare( - exchange=exchange_name, exchange_type=self.EXCHANGE_TYPE, callback=cb - ) - - def on_exchange_declareok(self, _unused_frame, userdata): - """Invoked by pika when RabbitMQ has finished the Exchange.Declare RPC - command. - - :param pika.Frame.Method unused_frame: Exchange.DeclareOk response frame - :param str|unicode userdata: Extra user data (exchange name) - - """ - LOGGER.info("Exchange declared: %s", userdata) - self.setup_queue(self.QUEUE) - - def setup_queue(self, queue_name): - """Setup the queue on RabbitMQ by invoking the Queue.Declare RPC - command. When it is complete, the on_queue_declareok method will - be invoked by pika. - - :param str|unicode queue_name: The name of the queue to declare. - - """ - LOGGER.info("Declaring queue %s", queue_name) - self._channel.queue_declare(queue=queue_name, callback=self.on_queue_declareok) - - def on_queue_declareok(self, _unused_frame): - """Method invoked by pika when the Queue.Declare RPC call made in - setup_queue has completed. In this method we will bind the queue - and exchange together with the routing key by issuing the Queue.Bind - RPC command. When this command is complete, the on_bindok method will - be invoked by pika. - - :param pika.frame.Method method_frame: The Queue.DeclareOk frame - - """ - LOGGER.info( - "Binding %s to %s with %s", self.EXCHANGE, self.QUEUE, self.ROUTING_KEY - ) - self._channel.queue_bind( - self.QUEUE, - self.EXCHANGE, - routing_key=self.ROUTING_KEY, - callback=self.on_bindok, - ) - - def on_bindok(self, _unused_frame): - """This method is invoked by pika when it receives the Queue.BindOk - response from RabbitMQ. Since we know we're now setup and bound, it's - time to start publishing.""" - LOGGER.info("Queue bound") - self.start_publishing() - - def start_publishing(self): - """This method will enable delivery confirmations and schedule the - first message to be sent to RabbitMQ - - """ - LOGGER.info("Issuing consumer related RPC commands") - self.enable_delivery_confirmations() - self.schedule_next_message() - - def enable_delivery_confirmations(self): - """Send the Confirm.Select RPC method to RabbitMQ to enable delivery - confirmations on the channel. The only way to turn this off is to close - the channel and create a new one. - - When the message is confirmed from RabbitMQ, the - on_delivery_confirmation method will be invoked passing in a Basic.Ack - or Basic.Nack method from RabbitMQ that will indicate which messages it - is confirming or rejecting. - - """ - LOGGER.info("Issuing Confirm.Select RPC command") - self._channel.confirm_delivery(self.on_delivery_confirmation) - - def on_delivery_confirmation(self, method_frame): - """Invoked by pika when RabbitMQ responds to a Basic.Publish RPC - command, passing in either a Basic.Ack or Basic.Nack frame with - the delivery tag of the message that was published. The delivery tag - is an integer counter indicating the message number that was sent - on the channel via Basic.Publish. Here we're just doing house keeping - to keep track of stats and remove message numbers that we expect - a delivery confirmation of from the list used to keep track of messages - that are pending confirmation. - - :param pika.frame.Method method_frame: Basic.Ack or Basic.Nack frame - - """ - confirmation_type = method_frame.method.NAME.split(".")[1].lower() - LOGGER.info( - "Received %s for delivery tag: %i", - confirmation_type, - method_frame.method.delivery_tag, - ) - if confirmation_type == "ack": - self._acked += 1 - elif confirmation_type == "nack": - self._nacked += 1 - self._deliveries.remove(method_frame.method.delivery_tag) - LOGGER.info( - "Published %i messages, %i have yet to be confirmed, " - "%i were acked and %i were nacked", - self._message_number, - len(self._deliveries), - self._acked, - self._nacked, - ) - - def schedule_next_message(self): - """If we are not closing our connection to RabbitMQ, schedule another - message to be delivered in PUBLISH_INTERVAL seconds. - - """ - LOGGER.info("Scheduling next message for %0.1f seconds", self.PUBLISH_INTERVAL) - self._connection.ioloop.call_later(self.PUBLISH_INTERVAL, self.publish_message) - - def publish_message(self): - """If the class is not stopping, publish a message to RabbitMQ, - appending a list of deliveries with the message number that was sent. - This list will be used to check for delivery confirmations in the - on_delivery_confirmations method. - - Once the message has been sent, schedule another message to be sent. - The main reason I put scheduling in was just so you can get a good idea - of how the process is flowing by slowing down and speeding up the - delivery intervals by changing the PUBLISH_INTERVAL constant in the - class. - - """ - if self._channel is None or not self._channel.is_open: - return - - hdrs = {"مفتاح": " قيمة", "键": "值", "キー": "値"} - properties = pika.BasicProperties( - app_id="example-publisher", content_type="application/json", headers=hdrs - ) - - message = "مفتاح قيمة 键 值 キー 値" - self._channel.basic_publish( - self.EXCHANGE, - self.ROUTING_KEY, - json.dumps(message, ensure_ascii=False), - properties, - ) - self._message_number += 1 - self._deliveries.append(self._message_number) - LOGGER.info("Published message # %i", self._message_number) - self.schedule_next_message() - - def run(self): - """Run the example code by connecting and then starting the IOLoop.""" - while not self._stopping: - self._connection = None - self._deliveries = [] - self._acked = 0 - self._nacked = 0 - self._message_number = 0 - - try: - self._connection = self.connect() - self._connection.ioloop.start() - except KeyboardInterrupt: - self.stop() - if self._connection is not None and not self._connection.is_closed: - # Finish closing - self._connection.ioloop.start() - - LOGGER.info("Stopped") - - def stop(self): - """Stop the example by closing the channel and connection. We - set a flag here so that we stop scheduling new messages to be - published. The IOLoop is started because this method is - invoked by the Try/Catch below when KeyboardInterrupt is caught. - Starting the IOLoop again will allow the publisher to cleanly - disconnect from RabbitMQ. - - """ - LOGGER.info("Stopping") - self._stopping = True - self.close_channel() - self.close_connection() - - def close_channel(self): - """Invoke this command to close the channel with RabbitMQ by sending - the Channel.Close RPC command. - - """ - if self._channel is not None: - LOGGER.info("Closing the channel") - self._channel.close() - - def close_connection(self): - """This method closes the connection to RabbitMQ.""" - if self._connection is not None: - LOGGER.info("Closing connection") - self._connection.close() - - -def main(): - logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT) - - # Connect to localhost:5672 as guest with the password guest and virtual host "/" (%2F) - example = ExamplePublisher( - "amqp://guest:guest@localhost:5672/%2F?connection_attempts=3&heartbeat=3600" - ) - example.run() - - -if __name__ == "__main__": - main() diff --git a/backend/heartbeat.py b/backend/heartbeat.py deleted file mode 100644 index a79bbc68e..000000000 --- a/backend/heartbeat.py +++ /dev/null @@ -1,72 +0,0 @@ -import logging -import pika -import json -from packaging import version -from pymongo import MongoClient - -from app.config import settings -from app.models.listeners import ( - EventListenerDB, - EventListenerOut, - ExtractorInfo -) - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def callback(ch, method, properties, body): - """This method receives messages from RabbitMQ and processes them.""" - msg = json.loads(body.decode("utf-8")) - - extractor_info = msg["extractor_info"] - extractor_name = extractor_info["name"] - extractor_db = EventListenerDB(**extractor_info, properties=ExtractorInfo(**extractor_info)) - - mongo_client = MongoClient(settings.MONGODB_URL) - db = mongo_client[settings.MONGO_DATABASE] - - # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods? - existing_extractor = db["listeners"].find_one({"name": msg["queue"]}) - if existing_extractor is not None: - # Update existing listener - existing_version = existing_extractor["version"] - new_version = extractor_db.version - if version.parse(new_version) > version.parse(existing_version): - # TODO: Should this delete old version, or just add new entry? If 1st one, why not update? - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info("%s updated from %s to %s" % (extractor_name, existing_version, new_version)) - return extractor_out - else: - # Register new listener - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info("New extractor registered: " + extractor_name) - return extractor_out - - -def listen_for_heartbeats(): - credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters( - settings.RABBITMQ_HOST, 5672, "/", credentials - ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - channel.exchange_declare( - exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True - ) - result = channel.queue_declare(queue="", exclusive=True) - queue_name = result.method.queue - channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name) - - logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) - channel.start_consuming() - -if __name__ == "__main__": - listen_for_heartbeats() From dd12472603486ce3cb1d1b82d134214fb5b63345 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 10:43:30 -0600 Subject: [PATCH 06/11] default API port to 8000 --- backend/app/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/app/config.py b/backend/app/config.py index b6fafac2f..4f1c4f736 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -5,7 +5,7 @@ class Settings(BaseSettings): APP_NAME: str = "Clowder" - API_HOST: str = "http://127.0.0.1:3002" + API_HOST: str = "http://127.0.0.1:8000" API_V2_STR: str = "/api/v2" admin_email: str = "devnull@ncsa.illinois.edu" frontend_url: str = "http://localhost:3000" From efd9e622c42c6950d520ac1b06062daef1343926 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 16 Dec 2022 11:03:25 -0600 Subject: [PATCH 07/11] Black formatting --- backend/app/models/users.py | 3 +++ backend/app/rabbitmq/heartbeat_listener_sync.py | 16 +++++++++------- backend/app/rabbitmq/listeners.py | 4 ++-- backend/app/routers/files.py | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/backend/app/models/users.py b/backend/app/models/users.py index 39bd64ea1..2751912a6 100644 --- a/backend/app/models/users.py +++ b/backend/app/models/users.py @@ -14,13 +14,16 @@ class UserBase(MongoModel): first_name: str last_name: str + class UserIn(UserBase): password: str + class UserLogin(BaseModel): email: EmailStr password: str + class UserDB(UserBase): hashed_password: str = Field() keycloak_id: Optional[str] = None diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index a79bbc68e..15f6a9823 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -5,11 +5,7 @@ from pymongo import MongoClient from app.config import settings -from app.models.listeners import ( - EventListenerDB, - EventListenerOut, - ExtractorInfo -) +from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -21,7 +17,9 @@ def callback(ch, method, properties, body): extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] - extractor_db = EventListenerDB(**extractor_info, properties=ExtractorInfo(**extractor_info)) + extractor_db = EventListenerDB( + **extractor_info, properties=ExtractorInfo(**extractor_info) + ) mongo_client = MongoClient(settings.MONGODB_URL) db = mongo_client[settings.MONGO_DATABASE] @@ -38,7 +36,10 @@ def callback(ch, method, properties, body): found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) extractor_out = EventListenerOut.from_mongo(found) - logger.info("%s updated from %s to %s" % (extractor_name, existing_version, new_version)) + logger.info( + "%s updated from %s to %s" + % (extractor_name, existing_version, new_version) + ) return extractor_out else: # Register new listener @@ -68,5 +69,6 @@ def listen_for_heartbeats(): channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() + if __name__ == "__main__": listen_for_heartbeats() diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 037d13baf..f7471ce52 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -39,7 +39,7 @@ def submit_file_message( print(e) rabbitmq_client.basic_publish( - exchange='', + exchange="", routing_key=routing_key, body=json.dumps(msg_body.dict(), ensure_ascii=False), properties=pika.BasicProperties( @@ -67,7 +67,7 @@ def submit_dataset_message( ) rabbitmq_client.basic_publish( - exchange='', + exchange="", routing_key=routing_key, body=json.dumps(msg_body.dict(), ensure_ascii=False), properties=pika.BasicProperties( diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 1c3813351..b836dbd47 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -306,7 +306,7 @@ async def get_file_versions( @router.post("/{file_id}/extract") async def get_file_extract( file_id: str, - info: Request, # TODO: Pydantic class? + info: Request, # TODO: Pydantic class? token: str = Depends(get_token), credentials: HTTPAuthorizationCredentials = Security(security), db: MongoClient = Depends(dependencies.get_db), From 024d1c90910282e085d49aa53dc589fbc02ecf58 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 19 Dec 2022 14:31:05 -0600 Subject: [PATCH 08/11] parameters fixed, is now saved is optional[dict] instead of list[dict] --- backend/app/models/listeners.py | 2 +- backend/app/rabbitmq/heartbeat_listener_sync.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/app/models/listeners.py b/backend/app/models/listeners.py index 808fb2b72..7310dedbe 100644 --- a/backend/app/models/listeners.py +++ b/backend/app/models/listeners.py @@ -30,7 +30,7 @@ class ExtractorInfo(BaseModel): bibtex: List[str] = [] default_labels: List[str] = [] categories: List[str] = [] - parameters: List[dict] = [] + parameters: Optional[dict] = None version: str = "1.0" diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index a79bbc68e..2f77fa753 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -21,6 +21,8 @@ def callback(ch, method, properties, body): extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] + parameters = extractor_info["parameters"]["schema"] + extractor_info["parameters"] = parameters extractor_db = EventListenerDB(**extractor_info, properties=ExtractorInfo(**extractor_info)) mongo_client = MongoClient(settings.MONGODB_URL) From b3cd5fafd2f9f54f368ccf03bdf4460fb9d50396 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 19 Dec 2022 14:33:37 -0600 Subject: [PATCH 09/11] parameters fixed, is now saved is optional[dict] instead of list[dict] --- backend/app/rabbitmq/heartbeat_listener_sync.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 15f6a9823..f81964ae6 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -17,6 +17,8 @@ def callback(ch, method, properties, body): extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] + extractor_parameters = extractor_info["parameters"]["schema"] + extractor_info["parameters"] = extractor_parameters extractor_db = EventListenerDB( **extractor_info, properties=ExtractorInfo(**extractor_info) ) From 7e70c9e29a72d96edf49705a08f511f908fe6910 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 19 Dec 2022 16:01:19 -0600 Subject: [PATCH 10/11] properly registers parameters under schema --- backend/app/rabbitmq/heartbeat_listener_sync.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index f81964ae6..15f6a9823 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -17,8 +17,6 @@ def callback(ch, method, properties, body): extractor_info = msg["extractor_info"] extractor_name = extractor_info["name"] - extractor_parameters = extractor_info["parameters"]["schema"] - extractor_info["parameters"] = extractor_parameters extractor_db = EventListenerDB( **extractor_info, properties=ExtractorInfo(**extractor_info) ) From 1c39efa97ae2107c4db12aee70998f122814be3d Mon Sep 17 00:00:00 2001 From: toddn Date: Tue, 20 Dec 2022 12:01:20 -0600 Subject: [PATCH 11/11] fixed docker file --- docker-compose.dev.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 47fbb1f1c..9b67717df 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -68,7 +68,7 @@ services: volumes: - postgres_data:/var/lib/postgresql/data environment: - POSTGRES_DB: keycloak + POSTGRES_DB: keycloak_dev POSTGRES_USER: keycloak POSTGRES_PASSWORD: password @@ -86,7 +86,7 @@ services: KEYCLOAK_ADMIN_PASSWORD: admin KC_DB: postgres KC_DB_URL_HOST: postgres - KC_DB_URL_DATABASE: keycloak + KC_DB_URL_DATABASE: keycloak_dev KC_DB_USERNAME: keycloak KC_DB_PASSWORD: password ports: