From 6d5de10e8fe940ec65ffc018bd21bc01e164a813 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 30 Jan 2023 16:01:12 -0600 Subject: [PATCH 1/6] heartbeat listener should create MIME feed --- backend/app/models/search.py | 11 -------- .../app/rabbitmq/heartbeat_listener_sync.py | 27 +++++++++++++++++++ backend/app/routers/feeds.py | 1 - backend/app/routers/files.py | 2 -- 4 files changed, 27 insertions(+), 14 deletions(-) diff --git a/backend/app/models/search.py b/backend/app/models/search.py index f6a439b97..72f05c3ad 100644 --- a/backend/app/models/search.py +++ b/backend/app/models/search.py @@ -3,17 +3,6 @@ from typing import Optional, List -# TODO: may eventually be split by index (resource type) -class SearchIndexContents(BaseModel): - """This describes what is indexed in Elasticsearch for a given resource.""" - - id: str - name: str - creator: str # currently just email - created: datetime - download: int - - class SearchCriteria(BaseModel): field: str operator: str = "==" diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 8ece9f09c..e860b4907 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -5,6 +5,8 @@ from pymongo import MongoClient from app.config import settings +from app.models.search import SearchCriteria +from app.routers.feeds import FeedIn, FeedListener, save_feed, associate_listener from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo logging.basicConfig(level=logging.INFO) @@ -51,6 +53,31 @@ def callback(ch, method, properties, body): found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) extractor_out = EventListenerOut.from_mongo(found) logger.info("New extractor registered: " + extractor_name) + + # Assign MIME-based listener if needed + criteria_list = [] + if extractor_out.properties and extractor_out.properties.process: + # { "file" : [ "text/*", "application/json" ] } + process = extractor_out.properties.process + if "file" in process: + # Create a MIME-based feed for this v1 extractor + for mime in process["file"]: + criteria_list.append( + SearchCriteria(field="content_type", value=mime) + ) + feed_data = { + "name": extractor_name, + "search": { + "index_name": "file", + "criteria": criteria_list, + }, + } + new_feed = await save_feed(FeedIn(feed_data)) + + # Assign the extractor to the new feed + feed_listener = FeedListener(listener_id=extractor_out.id, automatic=True) + await associate_listener(new_feed.id, feed_listener) + return extractor_out diff --git a/backend/app/routers/feeds.py b/backend/app/routers/feeds.py index e493b9749..57a4fe19b 100644 --- a/backend/app/routers/feeds.py +++ b/backend/app/routers/feeds.py @@ -17,7 +17,6 @@ FeedDB, FeedOut, ) -from app.models.search import SearchIndexContents from app.search.connect import check_search_result from app.rabbitmq.listeners import submit_file_message diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 36b6d7650..5adb4393a 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -34,7 +34,6 @@ from app.models.files import FileIn, FileOut, FileVersion, FileDB from app.models.listeners import EventListenerMessage, ExtractorInfo from app.models.users import UserOut -from app.models.search import SearchIndexContents from app.routers.feeds import check_feed_listeners from app.keycloak_auth import get_user, get_current_user, get_token from app.rabbitmq.listeners import submit_file_message @@ -213,7 +212,6 @@ async def update_file( {"resource.resource_id": ObjectId(updated_file.id)} ) ) is not None: - print("updating metadata") doc = { "doc": { "name": updated_file.name, From b46f71248ae58ad15a5deca5a2a03af5ec84d37d Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 30 Jan 2023 16:06:56 -0600 Subject: [PATCH 2/6] formatting --- .../app/rabbitmq/heartbeat_listener_sync.py | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index e860b4907..768217ea4 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -55,27 +55,30 @@ def callback(ch, method, properties, body): logger.info("New extractor registered: " + extractor_name) # Assign MIME-based listener if needed - criteria_list = [] if extractor_out.properties and extractor_out.properties.process: - # { "file" : [ "text/*", "application/json" ] } process = extractor_out.properties.process if "file" in process: # Create a MIME-based feed for this v1 extractor + criteria_list = [] for mime in process["file"]: criteria_list.append( SearchCriteria(field="content_type", value=mime) ) - feed_data = { - "name": extractor_name, - "search": { - "index_name": "file", - "criteria": criteria_list, - }, - } - new_feed = await save_feed(FeedIn(feed_data)) + feed_data = FeedIn( + { + "name": extractor_name, + "search": { + "index_name": "file", + "criteria": criteria_list, + }, + } + ) + new_feed = await save_feed(feed_data) # Assign the extractor to the new feed - feed_listener = FeedListener(listener_id=extractor_out.id, automatic=True) + feed_listener = FeedListener( + listener_id=extractor_out.id, automatic=True + ) await associate_listener(new_feed.id, feed_listener) return extractor_out From 19dbb64804ff807eb672d40e48ca913d2c5e8cdf Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Tue, 31 Jan 2023 08:25:40 -0600 Subject: [PATCH 3/6] improve auto feed registration --- backend/app/heartbeat_listener_sync.py | 74 ------------------- backend/app/models/feeds.py | 2 +- .../app/rabbitmq/heartbeat_listener_sync.py | 20 ++--- 3 files changed, 8 insertions(+), 88 deletions(-) delete mode 100644 backend/app/heartbeat_listener_sync.py diff --git a/backend/app/heartbeat_listener_sync.py b/backend/app/heartbeat_listener_sync.py deleted file mode 100644 index 15f6a9823..000000000 --- a/backend/app/heartbeat_listener_sync.py +++ /dev/null @@ -1,74 +0,0 @@ -import logging -import pika -import json -from packaging import version -from pymongo import MongoClient - -from app.config import settings -from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def callback(ch, method, properties, body): - """This method receives messages from RabbitMQ and processes them.""" - msg = json.loads(body.decode("utf-8")) - - extractor_info = msg["extractor_info"] - extractor_name = extractor_info["name"] - extractor_db = EventListenerDB( - **extractor_info, properties=ExtractorInfo(**extractor_info) - ) - - mongo_client = MongoClient(settings.MONGODB_URL) - db = mongo_client[settings.MONGO_DATABASE] - - # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods? - existing_extractor = db["listeners"].find_one({"name": msg["queue"]}) - if existing_extractor is not None: - # Update existing listener - existing_version = existing_extractor["version"] - new_version = extractor_db.version - if version.parse(new_version) > version.parse(existing_version): - # TODO: Should this delete old version, or just add new entry? If 1st one, why not update? - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info( - "%s updated from %s to %s" - % (extractor_name, existing_version, new_version) - ) - return extractor_out - else: - # Register new listener - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info("New extractor registered: " + extractor_name) - return extractor_out - - -def listen_for_heartbeats(): - credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters( - settings.RABBITMQ_HOST, 5672, "/", credentials - ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - channel.exchange_declare( - exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True - ) - result = channel.queue_declare(queue="", exclusive=True) - queue_name = result.method.queue - channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name) - - logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) - channel.start_consuming() - - -if __name__ == "__main__": - listen_for_heartbeats() diff --git a/backend/app/models/feeds.py b/backend/app/models/feeds.py index 46952092f..75732e65f 100644 --- a/backend/app/models/feeds.py +++ b/backend/app/models/feeds.py @@ -25,7 +25,7 @@ class FeedIn(JobFeed): class FeedDB(JobFeed, MongoModel): - author: UserOut + author: Optional[UserOut] = None updated: datetime = Field(default_factory=datetime.utcnow) diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 768217ea4..8d3cc3df3 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -6,7 +6,7 @@ from app.config import settings from app.models.search import SearchCriteria -from app.routers.feeds import FeedIn, FeedListener, save_feed, associate_listener +from app.routers.feeds import FeedIn, FeedListener, FeedOut, FeedDB, associate_listener from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo logging.basicConfig(level=logging.INFO) @@ -64,22 +64,16 @@ def callback(ch, method, properties, body): criteria_list.append( SearchCriteria(field="content_type", value=mime) ) - feed_data = FeedIn( - { - "name": extractor_name, - "search": { + # TODO: Who should the author be for an auto-generated feed? Currently None. + new_feed = FeedDB( + name=extractor_name, + search={ "index_name": "file", "criteria": criteria_list, }, - } + listeners=[FeedListener(listener_id=extractor_out.id, automatic=True)] ) - new_feed = await save_feed(feed_data) - - # Assign the extractor to the new feed - feed_listener = FeedListener( - listener_id=extractor_out.id, automatic=True - ) - await associate_listener(new_feed.id, feed_listener) + db["feeds"].insert_one(new_feed.to_mongo()) return extractor_out From 559c34309f8066bbe7df74cb91299f394917e37e Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Tue, 31 Jan 2023 08:27:27 -0600 Subject: [PATCH 4/6] default to OR search --- backend/app/rabbitmq/heartbeat_listener_sync.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 8d3cc3df3..82c1c8c18 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -70,6 +70,7 @@ def callback(ch, method, properties, body): search={ "index_name": "file", "criteria": criteria_list, + "mode": "or", }, listeners=[FeedListener(listener_id=extractor_out.id, automatic=True)] ) From 2c78b9d04d3d66b2305f2620ac566addc529a4d9 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Thu, 2 Feb 2023 08:54:20 -0600 Subject: [PATCH 5/6] split content type into main type for v1 compatibility --- backend/app/models/files.py | 15 ++++++++++--- .../app/rabbitmq/heartbeat_listener_sync.py | 21 ++++++++++++++++--- backend/app/routers/files.py | 11 ++++++---- 3 files changed, 37 insertions(+), 10 deletions(-) diff --git a/backend/app/models/files.py b/backend/app/models/files.py index 926fac105..725b65874 100644 --- a/backend/app/models/files.py +++ b/backend/app/models/files.py @@ -1,20 +1,29 @@ from datetime import datetime from typing import Optional -from pydantic import Field +from pydantic import Field, BaseModel from app.models.mongomodel import MongoModel from app.models.pyobjectid import PyObjectId from app.models.users import UserOut +class FileContentType(BaseModel): + """This model describes the content type of a file uploaded to Clowder. A typical example is "text/plain" for .txt. + In Clowder v1 extractors, "text/*" syntax is acceptable for wildcard matches. To support this, the content type is + split into main ("text") and secondary ("plain") parts so the dynamic matching with * can still be done. + + """ + content_type: str = "N/A" + main_type: str = "N/A" + class FileVersion(MongoModel): version_id: str version_num: int = 1 file_id: PyObjectId creator: UserOut bytes: int = 0 - content_type: str = "N/A" + content_type: FileContentType = FileContentType() created: datetime = Field(default_factory=datetime.utcnow) @@ -36,7 +45,7 @@ class FileDB(FileBase): views: int = 0 downloads: int = 0 bytes: int = 0 - content_type: str = "N/A" + content_type: FileContentType = FileContentType() class FileOut(FileDB): diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 82c1c8c18..fe844fc37 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -61,9 +61,24 @@ def callback(ch, method, properties, body): # Create a MIME-based feed for this v1 extractor criteria_list = [] for mime in process["file"]: - criteria_list.append( - SearchCriteria(field="content_type", value=mime) - ) + main_type = mime.split("/")[0] if mime.find("/") > -1 else mime + sub_type = mime.split("/")[1] if mime.find("/") > -1 else None + if sub_type: + if sub_type == "*": + # If a wildcard, just match on main type + criteria_list.append( + SearchCriteria(field="content_type_main", value=main_type) + ) + else: + # Otherwise match the whole string + criteria_list.append( + SearchCriteria(field="content_type", value=mime) + ) + else: + criteria_list.append( + SearchCriteria(field="content_type", value=mime) + ) + # TODO: Who should the author be for an auto-generated feed? Currently None. new_feed = FeedDB( name=extractor_name, diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 5adb4393a..d72fb4eae 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -31,7 +31,7 @@ update_record, delete_document_by_query, ) -from app.models.files import FileIn, FileOut, FileVersion, FileDB +from app.models.files import FileIn, FileOut, FileVersion, FileContentType, FileDB from app.models.listeners import EventListenerMessage, ExtractorInfo from app.models.users import UserOut from app.routers.feeds import check_feed_listeners @@ -70,6 +70,8 @@ async def add_file_entry( if content_type is None: content_type = mimetypes.guess_type(file_db.name) content_type = content_type[0] if len(content_type) > 1 else content_type + type_main = content_type.split("/")[0] if type(content_type) is str else "N/A" + content_type_obj = FileContentType(content_type=content_type, main_type=type_main) # Use unique ID as key for Minio and get initial version ID response = fs.put_object( @@ -87,7 +89,7 @@ async def add_file_entry( file_db.version_id = version_id file_db.version_num = 1 file_db.bytes = bytes - file_db.content_type = content_type if type(content_type) is str else "N/A" + file_db.content_type = content_type_obj await db["files"].replace_one({"_id": ObjectId(new_file_id)}, file_db.to_mongo()) file_out = FileOut(**file_db.dict()) @@ -97,7 +99,7 @@ async def add_file_entry( file_id=new_file_id, creator=user, bytes=bytes, - content_type=file_db.content_type, + content_type=content_type_obj, ) await db["file_versions"].insert_one(new_version.to_mongo()) @@ -110,7 +112,8 @@ async def add_file_entry( "dataset_id": str(file_db.dataset_id), "folder_id": str(file_db.folder_id), "bytes": file_db.bytes, - "content_type": file_db.content_type, + "content_type": content_type_obj.content_type, + "content_type_main": content_type_obj.main_type, } insert_record(es, "file", doc, file_db.id) From 26da88533d75f95281c39e98601442f8555b7f76 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Thu, 2 Feb 2023 09:00:03 -0600 Subject: [PATCH 6/6] formatting --- backend/app/models/files.py | 2 ++ backend/app/rabbitmq/heartbeat_listener_sync.py | 16 ++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/backend/app/models/files.py b/backend/app/models/files.py index 725b65874..646bb1033 100644 --- a/backend/app/models/files.py +++ b/backend/app/models/files.py @@ -14,9 +14,11 @@ class FileContentType(BaseModel): split into main ("text") and secondary ("plain") parts so the dynamic matching with * can still be done. """ + content_type: str = "N/A" main_type: str = "N/A" + class FileVersion(MongoModel): version_id: str version_num: int = 1 diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index fe844fc37..8814e2cf9 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -67,7 +67,9 @@ def callback(ch, method, properties, body): if sub_type == "*": # If a wildcard, just match on main type criteria_list.append( - SearchCriteria(field="content_type_main", value=main_type) + SearchCriteria( + field="content_type_main", value=main_type + ) ) else: # Otherwise match the whole string @@ -83,11 +85,13 @@ def callback(ch, method, properties, body): new_feed = FeedDB( name=extractor_name, search={ - "index_name": "file", - "criteria": criteria_list, - "mode": "or", - }, - listeners=[FeedListener(listener_id=extractor_out.id, automatic=True)] + "index_name": "file", + "criteria": criteria_list, + "mode": "or", + }, + listeners=[ + FeedListener(listener_id=extractor_out.id, automatic=True) + ], ) db["feeds"].insert_one(new_feed.to_mongo())