diff --git a/backend/app/models/listeners.py b/backend/app/models/listeners.py index 9cfc24a16..e9453b8c6 100644 --- a/backend/app/models/listeners.py +++ b/backend/app/models/listeners.py @@ -34,7 +34,7 @@ class EventListenerBase(BaseModel): """An Event Listener is the expanded version of v1 Extractors.""" name: str - version: int = 1 + version: str = "1.0" description: str = "" @@ -55,7 +55,7 @@ class LegacyEventListenerIn(ExtractorInfo): class EventListenerDB(EventListenerBase, MongoModel): """EventListeners have a name, version, author, description, and optionally properties where extractor_info will be saved.""" - author: UserOut + creator: Optional[UserOut] = None created: datetime = Field(default_factory=datetime.utcnow) modified: datetime = Field(default_factory=datetime.utcnow) properties: Optional[ExtractorInfo] = None diff --git a/backend/app/rabbitmq/heartbeat_listener_async.py b/backend/app/rabbitmq/heartbeat_listener_async.py index 45f628270..df11c6d46 100644 --- a/backend/app/rabbitmq/heartbeat_listener_async.py +++ b/backend/app/rabbitmq/heartbeat_listener_async.py @@ -5,12 +5,7 @@ from app.config import settings from aio_pika.abc import AbstractIncomingMessage from pymongo import MongoClient -from app.models.extractors import ( - ExtractorBase, - ExtractorIn, - ExtractorDB, - ExtractorOut, -) +from app.models.listeners import LegacyEventListenerIn, EventListenerOut async def on_message(message: AbstractIncomingMessage) -> None: @@ -21,20 +16,18 @@ async def on_message(message: AbstractIncomingMessage) -> None: extractor_queue = statusBody["queue"] extractor_info = statusBody["extractor_info"] extractor_name = extractor_info["name"] - extractor_db = ExtractorDB(**extractor_info) + extractor_db = LegacyEventListenerIn(**extractor_info) client = MongoClient(settings.MONGODB_URL) db = client["clowder2"] - existing_extractor = db["extractors"].find_one({"name": extractor_queue}) + existing_extractor = db["listeners"].find_one({"name": extractor_queue}) if existing_extractor is not None: existing_version = existing_extractor["version"] new_version = extractor_db.version if version.parse(new_version) > version.parse(existing_version): - new_extractor = db["extractors"].insert_one(extractor_db.to_mongo()) - found = db["extractors"].find_one({"_id": new_extractor.inserted_id}) - removed = db["extractors"].delete_one( - {"_id": existing_extractor["_id"]} - ) - extractor_out = ExtractorOut.from_mongo(found) + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) + removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) + extractor_out = EventListenerOut.from_mongo(found) print( "extractor updated: " + extractor_name @@ -45,9 +38,9 @@ async def on_message(message: AbstractIncomingMessage) -> None: ) return extractor_out else: - new_extractor = db["extractors"].insert_one(extractor_db.to_mongo()) - found = db["extractors"].find_one({"_id": new_extractor.inserted_id}) - extractor_out = ExtractorOut.from_mongo(found) + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) + extractor_out = EventListenerOut.from_mongo(found) print("new extractor registered: " + extractor_name) return extractor_out diff --git a/backend/app/rabbitmq/heartbeat_listener_sync.py b/backend/app/rabbitmq/heartbeat_listener_sync.py index 76eadb19a..f8835238f 100644 --- a/backend/app/rabbitmq/heartbeat_listener_sync.py +++ b/backend/app/rabbitmq/heartbeat_listener_sync.py @@ -19,14 +19,14 @@ def callback(ch, method, properties, body): extractor_db = EventListenerDB(**extractor_info) client = MongoClient(settings.MONGODB_URL) db = client["clowder2"] - existing_extractor = db["extractors"].find_one({"name": extractor_queue}) + existing_extractor = db["listeners"].find_one({"name": extractor_queue}) if existing_extractor is not None: existing_version = existing_extractor["version"] new_version = extractor_db.version if version.parse(new_version) > version.parse(existing_version): - new_extractor = db["extractors"].insert_one(extractor_db.to_mongo()) - found = db["extractors"].find_one({"_id": new_extractor.inserted_id}) - removed = db["extractors"].delete_one({"_id": existing_extractor["_id"]}) + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) + removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]}) extractor_out = EventListenerOut.from_mongo(found) print( "extractor updated: " @@ -38,8 +38,8 @@ def callback(ch, method, properties, body): ) return extractor_out else: - new_extractor = db["extractors"].insert_one(extractor_db.to_mongo()) - found = db["extractors"].find_one({"_id": new_extractor.inserted_id}) + new_extractor = db["listeners"].insert_one(extractor_db.to_mongo()) + found = db["listeners"].find_one({"_id": new_extractor.inserted_id}) extractor_out = EventListenerOut.from_mongo(found) print("new extractor registered: " + extractor_name) return extractor_out diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 0ba7db533..ba05d5665 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -8,6 +8,7 @@ from app.keycloak_auth import get_token from app import dependencies from app.models.files import FileOut +from app.models.datasets import DatasetOut from app.models.listeners import EventListenerMessage @@ -43,3 +44,37 @@ def submit_file_message( ), ) return {"message": "testing", "file_id": file_out.id} + + +def submit_dataset_message( + dataset_out: DatasetOut, + queue: str, + routing_key: str, + parameters: dict, + token: str = Depends(get_token), + db: MongoClient = Depends(dependencies.get_db), + rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), +): + # TODO check if extractor is registered + msg_body = EventListenerMessage( + filename=dataset_out.name, + fileSize=dataset_out.bytes, + id=dataset_out.id, + datasetId=dataset_out.dataset_id, + secretKey=token, + ) + + rabbitmq_client.queue_bind( + exchange="extractors", + queue=queue, + routing_key=routing_key, + ) + rabbitmq_client.basic_publish( + exchange="extractors", + routing_key=routing_key, + body=json.dumps(msg_body.dict(), ensure_ascii=False), + properties=pika.BasicProperties( + content_type="application/json", delivery_mode=1 + ), + ) + return {"message": "testing", "dataset_id": dataset_out.id} diff --git a/backend/app/routers/datasets.py b/backend/app/routers/datasets.py index c6a113d0e..d42f8a69b 100644 --- a/backend/app/routers/datasets.py +++ b/backend/app/routers/datasets.py @@ -7,9 +7,8 @@ import zipfile from collections.abc import Mapping, Iterable from typing import List, Optional, Union +import json -import pymongo -from pymongo import MongoClient import pika from bson import ObjectId from bson import json_util @@ -24,6 +23,7 @@ ) from minio import Minio from pika.adapters.blocking_connection import BlockingChannel +import pymongo from pymongo import MongoClient from rocrate.model.person import Person from rocrate.rocrate import ROCrate @@ -761,6 +761,8 @@ async def download_dataset( raise HTTPException(status_code=404, detail=f"Dataset {dataset_id} not found") +# submits file to extractor +# can handle parameeters pass in as key/values in info @router.post("/{dataset_id}/extract") async def get_dataset_extract( dataset_id: str, @@ -780,10 +782,11 @@ async def get_dataset_extract( token = token.lstrip("Bearer") token = token.lstrip(" ") # TODO check of extractor exists - msg = {"message": "testing", "dataseet_id": dataset_id} + msg = {"message": "testing", "dataset_id": dataset_id} body = {} body["secretKey"] = token body["token"] = token + # TODO better solution for host body["host"] = "http://127.0.0.1:8000" body["retry_count"] = 0 body["filename"] = dataset["name"] @@ -794,6 +797,7 @@ async def get_dataset_extract( current_queue = req_info["extractor"] if "parameters" in req_info: current_parameters = req_info["parameters"] + body["parameters"] = current_parameters current_routing_key = "extractors." + current_queue rabbitmq_client.queue_bind( exchange="extractors", diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 76798cc90..7d5d22370 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -294,6 +294,8 @@ async def get_file_versions( raise HTTPException(status_code=404, detail=f"File {file_id} not found") +# submits file to extractor +# can handle parameeters pass in as key/values in info @router.post("/{file_id}/extract") async def get_file_extract( file_id: str, diff --git a/backend/app/routers/listeners.py b/backend/app/routers/listeners.py index f1655c111..83bfd3c4a 100644 --- a/backend/app/routers/listeners.py +++ b/backend/app/routers/listeners.py @@ -29,7 +29,7 @@ async def save_listener( db: MongoClient = Depends(get_db), ): """Register a new Event Listener with the system.""" - listener = EventListenerDB(**listener_in.dict(), author=user) + listener = EventListenerDB(**listener_in.dict(), creator=user) # TODO: Check for duplicates somehow? new_listener = await db["listeners"].insert_one(listener.to_mongo()) found = await db["listeners"].find_one({"_id": new_listener.inserted_id}) @@ -49,7 +49,7 @@ async def save_legacy_listener( name=legacy_in.name, version=int(legacy_in.version), description=legacy_in.description, - author=user, + creator=user, properties=listener_properties, ) new_listener = await db["listeners"].insert_one(listener.to_mongo()) diff --git a/backend/app/routers/metadata_files.py b/backend/app/routers/metadata_files.py index 12263cc81..943a79d6d 100644 --- a/backend/app/routers/metadata_files.py +++ b/backend/app/routers/metadata_files.py @@ -76,7 +76,7 @@ async def _build_metadata_db_obj( extractor_info = metadata_in.extractor_info if extractor_info is not None: if ( - extractor := await db["extractors"].find_one( + extractor := await db["listeners"].find_one( {"name": extractor_info.name, "version": extractor_info.version} ) ) is not None: @@ -176,7 +176,7 @@ async def replace_file_metadata( extractor_info = metadata_in.extractor_info if extractor_info is not None: if ( - extractor := await db["extractors"].find_one( + extractor := await db["listeners"].find_one( {"name": extractor_info.name, "version": extractor_info.version} ) ) is not None: @@ -267,7 +267,7 @@ async def update_file_metadata( extractor_info = metadata_in.extractor_info if extractor_info is not None: if ( - extractor := await db["extractors"].find_one( + extractor := await db["listeners"].find_one( {"name": extractor_info.name, "version": extractor_info.version} ) ) is not None: @@ -402,7 +402,7 @@ async def delete_file_metadata( extractor_info = metadata_in.extractor_info if extractor_info is not None: if ( - extractor := await db["extractors"].find_one( + extractor := await db["listeners"].find_one( {"name": extractor_info.name, "version": extractor_info.version} ) ) is not None: diff --git a/backend/app/tests/extractor_info.json b/backend/app/tests/extractor_info.json new file mode 100644 index 000000000..6009f5c2b --- /dev/null +++ b/backend/app/tests/extractor_info.json @@ -0,0 +1,30 @@ +{ + "@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld", + "name": "ncsa.wordcount", + "version": "2.0", + "description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.", + "author": "Rob Kooper ", + "contributors": [], + "contexts": [ + { + "lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines", + "words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words", + "characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters" + } + ], + "repository": [ + { + "repType": "git", + "repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git" + } + ], + "process": { + "file": [ + "text/*", + "application/json" + ] + }, + "external_services": [], + "dependencies": [], + "bibtex": [] +} diff --git a/backend/app/tests/test_extractors.py b/backend/app/tests/test_extractors.py new file mode 100644 index 000000000..74529f2fe --- /dev/null +++ b/backend/app/tests/test_extractors.py @@ -0,0 +1,73 @@ +import os +from fastapi.testclient import TestClient +from app.config import settings +from app.models.pyobjectid import PyObjectId + +user = { + "email": "test@test.org", + "password": "not_a_password", + "first_name": "Foo", + "last_name": "Bar", +} + +extractor_info = { + "@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld", + "name": "ncsa.wordcount", + "version": "2.0", + "description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.", + "contributors": [], + "contexts": [ + { + "lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines", + "words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words", + "characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters", + } + ], + "repository": [ + { + "repType": "git", + "repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git", + } + ], + "process": {"file": ["text/*", "application/json"]}, + "external_services": [], + "dependencies": [], + "bibtex": [], +} + +# extractor_info_file = os.path.join(os.getcwd(), 'extractor_info.json') + + +def test_register(client: TestClient, headers: dict): + response = client.post( + f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers + ) + assert response.json().get("id") is not None + assert response.status_code == 200 + + +def test_get_one(client: TestClient, headers: dict): + response = client.post( + f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers + ) + assert response.status_code == 200 + assert response.json().get("id") is not None + extractor_id = response.json().get("id") + response = client.get( + f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers + ) + assert response.status_code == 200 + assert response.json().get("id") is not None + + +def test_delete(client: TestClient, headers: dict): + response = client.post( + f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers + ) + assert response.status_code == 200 + assert response.json().get("id") is not None + extractor_id = response.json().get("id") + response = client.delete( + f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers + ) + assert response.status_code == 200 diff --git a/frontend/src/openapi/v2/services/DatasetsService.ts b/frontend/src/openapi/v2/services/DatasetsService.ts index 711fcab0e..f16f5a35b 100644 --- a/frontend/src/openapi/v2/services/DatasetsService.ts +++ b/frontend/src/openapi/v2/services/DatasetsService.ts @@ -301,4 +301,4 @@ export class DatasetsService { }); } -} \ No newline at end of file +}