diff --git a/backend/app/heartbeat_listener_sync.py b/backend/app/heartbeat_listener_sync.py deleted file mode 100644 index 15f6a9823..000000000 --- a/backend/app/heartbeat_listener_sync.py +++ /dev/null @@ -1,74 +0,0 @@ -import logging -import pika -import json -from packaging import version -from pymongo import MongoClient - -from app.config import settings -from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - - -def callback(ch, method, properties, body): - """This method receives messages from RabbitMQ and processes them.""" - msg = json.loads(body.decode("utf-8")) - - extractor_info = msg["extractor_info"] - extractor_name = extractor_info["name"] - extractor_db = EventListenerDB( - **extractor_info, properties=ExtractorInfo(**extractor_info) - ) - - mongo_client = MongoClient(settings.MONGODB_URL) - db = mongo_client[settings.MONGO_DATABASE] - - # TODO: This block could go in app.rabbitmq.listeners register_listener and update_listener methods? - existing_extractor = db["listeners"].find_one({"name": msg["queue"]}) - if existing_extractor is not None: - # Update existing listener - existing_version = existing_extractor["version"] - new_version = extractor_db.version - if version.parse(new_version) > version.parse(existing_version): - # TODO: Should this delete old version, or just add new entry? If 1st one, why not update? - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info( - "%s updated from %s to %s" - % (extractor_name, existing_version, new_version) - ) - return extractor_out - else: - # Register new listener - new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) - found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) - extractor_out = EventListenerOut.from_mongo(found) - logger.info("New extractor registered: " + extractor_name) - return extractor_out - - -def listen_for_heartbeats(): - credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters( - settings.RABBITMQ_HOST, 5672, "/", credentials - ) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() - - channel.exchange_declare( - exchange=settings.HEARTBEAT_EXCHANGE, exchange_type="fanout", durable=True - ) - result = channel.queue_declare(queue="", exclusive=True) - queue_name = result.method.queue - channel.queue_bind(exchange=settings.HEARTBEAT_EXCHANGE, queue=queue_name) - - logger.info(" [*] Waiting for heartbeats. To exit press CTRL+C") - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) - channel.start_consuming() - - -if __name__ == "__main__": - listen_for_heartbeats() diff --git a/backend/app/models/feeds.py b/backend/app/models/feeds.py index 46952092f..75732e65f 100644 --- a/backend/app/models/feeds.py +++ b/backend/app/models/feeds.py @@ -25,7 +25,7 @@ class FeedIn(JobFeed): class FeedDB(JobFeed, MongoModel): - author: UserOut + author: Optional[UserOut] = None updated: datetime = Field(default_factory=datetime.utcnow) diff --git a/backend/app/models/files.py b/backend/app/models/files.py index 926fac105..646bb1033 100644 --- a/backend/app/models/files.py +++ b/backend/app/models/files.py @@ -1,20 +1,31 @@ from datetime import datetime from typing import Optional -from pydantic import Field +from pydantic import Field, BaseModel from app.models.mongomodel import MongoModel from app.models.pyobjectid import PyObjectId from app.models.users import UserOut +class FileContentType(BaseModel): + """This model describes the content type of a file uploaded to Clowder. A typical example is "text/plain" for .txt. + In Clowder v1 extractors, "text/*" syntax is acceptable for wildcard matches. To support this, the content type is + split into main ("text") and secondary ("plain") parts so the dynamic matching with * can still be done. + + """ + + content_type: str = "N/A" + main_type: str = "N/A" + + class FileVersion(MongoModel): version_id: str version_num: int = 1 file_id: PyObjectId creator: UserOut bytes: int = 0 - content_type: str = "N/A" + content_type: FileContentType = FileContentType() created: datetime = Field(default_factory=datetime.utcnow) @@ -36,7 +47,7 @@ class FileDB(FileBase): views: int = 0 downloads: int = 0 bytes: int = 0 - content_type: str = "N/A" + content_type: FileContentType = FileContentType() class FileOut(FileDB): diff --git a/backend/app/models/search.py b/backend/app/models/search.py index f6a439b97..72f05c3ad 100644 --- a/backend/app/models/search.py +++ b/backend/app/models/search.py @@ -3,17 +3,6 @@ from typing import Optional, List -# TODO: may eventually be split by index (resource type) -class SearchIndexContents(BaseModel): - """This describes what is indexed in Elasticsearch for a given resource.""" - - id: str - name: str - creator: str # currently just email - created: datetime - download: int - - class SearchCriteria(BaseModel): field: str operator: str = "==" diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 8ece9f09c..8814e2cf9 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -5,6 +5,8 @@ from pymongo import MongoClient from app.config import settings +from app.models.search import SearchCriteria +from app.routers.feeds import FeedIn, FeedListener, FeedOut, FeedDB, associate_listener from app.models.listeners import EventListenerDB, EventListenerOut, ExtractorInfo logging.basicConfig(level=logging.INFO) @@ -51,6 +53,48 @@ def callback(ch, method, properties, body): found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) extractor_out = EventListenerOut.from_mongo(found) logger.info("New extractor registered: " + extractor_name) + + # Assign MIME-based listener if needed + if extractor_out.properties and extractor_out.properties.process: + process = extractor_out.properties.process + if "file" in process: + # Create a MIME-based feed for this v1 extractor + criteria_list = [] + for mime in process["file"]: + main_type = mime.split("/")[0] if mime.find("/") > -1 else mime + sub_type = mime.split("/")[1] if mime.find("/") > -1 else None + if sub_type: + if sub_type == "*": + # If a wildcard, just match on main type + criteria_list.append( + SearchCriteria( + field="content_type_main", value=main_type + ) + ) + else: + # Otherwise match the whole string + criteria_list.append( + SearchCriteria(field="content_type", value=mime) + ) + else: + criteria_list.append( + SearchCriteria(field="content_type", value=mime) + ) + + # TODO: Who should the author be for an auto-generated feed? Currently None. + new_feed = FeedDB( + name=extractor_name, + search={ + "index_name": "file", + "criteria": criteria_list, + "mode": "or", + }, + listeners=[ + FeedListener(listener_id=extractor_out.id, automatic=True) + ], + ) + db["feeds"].insert_one(new_feed.to_mongo()) + return extractor_out diff --git a/backend/app/routers/feeds.py b/backend/app/routers/feeds.py index a0999f32d..329e9583c 100644 --- a/backend/app/routers/feeds.py +++ b/backend/app/routers/feeds.py @@ -17,7 +17,6 @@ FeedDB, FeedOut, ) -from app.models.search import SearchIndexContents from app.search.connect import check_search_result from app.rabbitmq.listeners import submit_file_job diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 59ebf07ac..4623b6b7e 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -31,7 +31,7 @@ update_record, delete_document_by_query, ) -from app.models.files import FileIn, FileOut, FileVersion, FileDB +from app.models.files import FileIn, FileOut, FileVersion, FileContentType, FileDB from app.models.users import UserOut from app.routers.feeds import check_feed_listeners from app.keycloak_auth import get_user, get_current_user, get_token @@ -70,6 +70,8 @@ async def add_file_entry( if content_type is None: content_type = mimetypes.guess_type(file_db.name) content_type = content_type[0] if len(content_type) > 1 else content_type + type_main = content_type.split("/")[0] if type(content_type) is str else "N/A" + content_type_obj = FileContentType(content_type=content_type, main_type=type_main) # Use unique ID as key for Minio and get initial version ID response = fs.put_object( @@ -87,7 +89,7 @@ async def add_file_entry( file_db.version_id = version_id file_db.version_num = 1 file_db.bytes = bytes - file_db.content_type = content_type if type(content_type) is str else "N/A" + file_db.content_type = content_type_obj await db["files"].replace_one({"_id": ObjectId(new_file_id)}, file_db.to_mongo()) file_out = FileOut(**file_db.dict()) @@ -97,7 +99,7 @@ async def add_file_entry( file_id=new_file_id, creator=user, bytes=bytes, - content_type=file_db.content_type, + content_type=content_type_obj, ) await db["file_versions"].insert_one(new_version.to_mongo()) @@ -110,7 +112,8 @@ async def add_file_entry( "dataset_id": str(file_db.dataset_id), "folder_id": str(file_db.folder_id), "bytes": file_db.bytes, - "content_type": file_db.content_type, + "content_type": content_type_obj.content_type, + "content_type_main": content_type_obj.main_type, } insert_record(es, "file", doc, file_db.id) @@ -210,7 +213,6 @@ async def update_file( {"resource.resource_id": ObjectId(updated_file.id)} ) ) is not None: - print("updating metadata") doc = { "doc": { "name": updated_file.name,