Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8c6ccb6
changing parameters to dict
tcnichol Oct 5, 2022
21169c7
not sure if parameters are being sent properly
tcnichol Oct 5, 2022
5382616
handling parameters dict not list dict
tcnichol Oct 6, 2022
2bf9843
black formatting
tcnichol Oct 6, 2022
e52daef
new pipfile lock
tcnichol Oct 6, 2022
cef2aad
delete test
tcnichol Oct 6, 2022
ececa5a
does this fix build errors?
tcnichol Oct 6, 2022
a8a4f52
new pipfile lock
tcnichol Oct 6, 2022
c4bd7ca
new Pipfile.lock
tcnichol Oct 6, 2022
9dd23aa
adding tests for extractors
tcnichol Oct 11, 2022
edfb976
does this fix file not found error?
tcnichol Oct 11, 2022
735de2e
trying adding it as dict
tcnichol Oct 11, 2022
c5ea191
adding user - will this fix the tests?
tcnichol Oct 11, 2022
12db5ad
black formatting
tcnichol Oct 11, 2022
1a5db77
comments in methods
tcnichol Oct 12, 2022
0dcb64c
black formatting
tcnichol Oct 12, 2022
1d01fc5
Merge remote-tracking branch 'origin/main' into 127-add-parameters-to…
tcnichol Oct 12, 2022
c2fc039
new classes
tcnichol Oct 12, 2022
8d67f08
matching listener feed branch
tcnichol Oct 12, 2022
f45d0d1
more fields necessary for listener to match extractors from v1
tcnichol Oct 12, 2022
4847a4f
import pymongo
tcnichol Oct 19, 2022
2b00067
updating and adding models from codegen
tcnichol Oct 25, 2022
9f963d4
services added by codegen
tcnichol Oct 25, 2022
cec3a3c
adding to Explore
tcnichol Oct 26, 2022
c4e8ad0
reverting changes
tcnichol Oct 27, 2022
2bbfdd5
reverting
tcnichol Oct 27, 2022
77ade89
Merge remote-tracking branch 'origin/main' into 127-add-parameters-to…
tcnichol Oct 27, 2022
5ef1ac4
Merge remote-tracking branch 'origin/main' into 127-add-parameters-to…
tcnichol Oct 27, 2022
bdc8dac
merging and fixing to work with existing code
tcnichol Oct 27, 2022
985cb5e
pipenv run black app formatting
tcnichol Oct 27, 2022
f447ba5
use listeners not extractors, routers have changed names
tcnichol Oct 27, 2022
8258af8
str for author for extractor
tcnichol Oct 27, 2022
a1e16cc
removing author for extractor info
tcnichol Oct 27, 2022
c8cf5d3
remove author from the extractor info
tcnichol Oct 27, 2022
95d3608
using Creator instead of Author to avoid conflict with old extractor …
tcnichol Oct 28, 2022
e12b64a
Change extractors to listeners consistently
max-zilla Oct 31, 2022
13f5dbb
remove renamed models file
max-zilla Oct 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/models/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class EventListenerBase(BaseModel):
"""An Event Listener is the expanded version of v1 Extractors."""

name: str
version: int = 1
version: str = "1.0"
description: str = ""


Expand All @@ -55,7 +55,7 @@ class LegacyEventListenerIn(ExtractorInfo):
class EventListenerDB(EventListenerBase, MongoModel):
"""EventListeners have a name, version, author, description, and optionally properties where extractor_info will be saved."""

author: UserOut
creator: Optional[UserOut] = None
created: datetime = Field(default_factory=datetime.utcnow)
modified: datetime = Field(default_factory=datetime.utcnow)
properties: Optional[ExtractorInfo] = None
Expand Down
27 changes: 10 additions & 17 deletions backend/app/rabbitmq/heartbeat_listener_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@
from app.config import settings
from aio_pika.abc import AbstractIncomingMessage
from pymongo import MongoClient
from app.models.extractors import (
ExtractorBase,
ExtractorIn,
ExtractorDB,
ExtractorOut,
)
from app.models.listeners import LegacyEventListenerIn, EventListenerOut


async def on_message(message: AbstractIncomingMessage) -> None:
Expand All @@ -21,20 +16,18 @@ async def on_message(message: AbstractIncomingMessage) -> None:
extractor_queue = statusBody["queue"]
extractor_info = statusBody["extractor_info"]
extractor_name = extractor_info["name"]
extractor_db = ExtractorDB(**extractor_info)
extractor_db = LegacyEventListenerIn(**extractor_info)
client = MongoClient(settings.MONGODB_URL)
db = client["clowder2"]
existing_extractor = db["extractors"].find_one({"name": extractor_queue})
existing_extractor = db["listeners"].find_one({"name": extractor_queue})
if existing_extractor is not None:
existing_version = existing_extractor["version"]
new_version = extractor_db.version
if version.parse(new_version) > version.parse(existing_version):
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
removed = db["extractors"].delete_one(
{"_id": existing_extractor["_id"]}
)
extractor_out = ExtractorOut.from_mongo(found)
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
extractor_out = EventListenerOut.from_mongo(found)
print(
"extractor updated: "
+ extractor_name
Expand All @@ -45,9 +38,9 @@ async def on_message(message: AbstractIncomingMessage) -> None:
)
return extractor_out
else:
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
extractor_out = ExtractorOut.from_mongo(found)
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
extractor_out = EventListenerOut.from_mongo(found)
print("new extractor registered: " + extractor_name)
return extractor_out

Expand Down
12 changes: 6 additions & 6 deletions backend/app/rabbitmq/heartbeat_listener_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ def callback(ch, method, properties, body):
extractor_db = EventListenerDB(**extractor_info)
client = MongoClient(settings.MONGODB_URL)
db = client["clowder2"]
existing_extractor = db["extractors"].find_one({"name": extractor_queue})
existing_extractor = db["listeners"].find_one({"name": extractor_queue})
if existing_extractor is not None:
existing_version = existing_extractor["version"]
new_version = extractor_db.version
if version.parse(new_version) > version.parse(existing_version):
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
removed = db["extractors"].delete_one({"_id": existing_extractor["_id"]})
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
removed = db["listeners"].delete_one({"_id": existing_extractor["_id"]})
extractor_out = EventListenerOut.from_mongo(found)
print(
"extractor updated: "
Expand All @@ -38,8 +38,8 @@ def callback(ch, method, properties, body):
)
return extractor_out
else:
new_extractor = db["extractors"].insert_one(extractor_db.to_mongo())
found = db["extractors"].find_one({"_id": new_extractor.inserted_id})
new_extractor = db["listeners"].insert_one(extractor_db.to_mongo())
found = db["listeners"].find_one({"_id": new_extractor.inserted_id})
extractor_out = EventListenerOut.from_mongo(found)
print("new extractor registered: " + extractor_name)
return extractor_out
Expand Down
35 changes: 35 additions & 0 deletions backend/app/rabbitmq/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from app.keycloak_auth import get_token
from app import dependencies
from app.models.files import FileOut
from app.models.datasets import DatasetOut
from app.models.listeners import EventListenerMessage


Expand Down Expand Up @@ -43,3 +44,37 @@ def submit_file_message(
),
)
return {"message": "testing", "file_id": file_out.id}


def submit_dataset_message(
dataset_out: DatasetOut,
queue: str,
routing_key: str,
parameters: dict,
token: str = Depends(get_token),
db: MongoClient = Depends(dependencies.get_db),
rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq),
):
# TODO check if extractor is registered
msg_body = EventListenerMessage(
filename=dataset_out.name,
fileSize=dataset_out.bytes,
id=dataset_out.id,
datasetId=dataset_out.dataset_id,
secretKey=token,
)

rabbitmq_client.queue_bind(
exchange="extractors",
queue=queue,
routing_key=routing_key,
)
rabbitmq_client.basic_publish(
exchange="extractors",
routing_key=routing_key,
body=json.dumps(msg_body.dict(), ensure_ascii=False),
properties=pika.BasicProperties(
content_type="application/json", delivery_mode=1
),
)
return {"message": "testing", "dataset_id": dataset_out.id}
10 changes: 7 additions & 3 deletions backend/app/routers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
import zipfile
from collections.abc import Mapping, Iterable
from typing import List, Optional, Union
import json

import pymongo
from pymongo import MongoClient
import pika
from bson import ObjectId
from bson import json_util
Expand All @@ -24,6 +23,7 @@
)
from minio import Minio
from pika.adapters.blocking_connection import BlockingChannel
import pymongo
from pymongo import MongoClient
from rocrate.model.person import Person
from rocrate.rocrate import ROCrate
Expand Down Expand Up @@ -761,6 +761,8 @@ async def download_dataset(
raise HTTPException(status_code=404, detail=f"Dataset {dataset_id} not found")


# submits file to extractor
# can handle parameeters pass in as key/values in info
@router.post("/{dataset_id}/extract")
async def get_dataset_extract(
dataset_id: str,
Expand All @@ -780,10 +782,11 @@ async def get_dataset_extract(
token = token.lstrip("Bearer")
token = token.lstrip(" ")
# TODO check of extractor exists
msg = {"message": "testing", "dataseet_id": dataset_id}
msg = {"message": "testing", "dataset_id": dataset_id}
body = {}
body["secretKey"] = token
body["token"] = token
# TODO better solution for host
body["host"] = "http://127.0.0.1:8000"
body["retry_count"] = 0
body["filename"] = dataset["name"]
Expand All @@ -794,6 +797,7 @@ async def get_dataset_extract(
current_queue = req_info["extractor"]
if "parameters" in req_info:
current_parameters = req_info["parameters"]
body["parameters"] = current_parameters
current_routing_key = "extractors." + current_queue
rabbitmq_client.queue_bind(
exchange="extractors",
Expand Down
2 changes: 2 additions & 0 deletions backend/app/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ async def get_file_versions(
raise HTTPException(status_code=404, detail=f"File {file_id} not found")


# submits file to extractor
# can handle parameeters pass in as key/values in info
@router.post("/{file_id}/extract")
async def get_file_extract(
file_id: str,
Expand Down
4 changes: 2 additions & 2 deletions backend/app/routers/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def save_listener(
db: MongoClient = Depends(get_db),
):
"""Register a new Event Listener with the system."""
listener = EventListenerDB(**listener_in.dict(), author=user)
listener = EventListenerDB(**listener_in.dict(), creator=user)
# TODO: Check for duplicates somehow?
new_listener = await db["listeners"].insert_one(listener.to_mongo())
found = await db["listeners"].find_one({"_id": new_listener.inserted_id})
Expand All @@ -49,7 +49,7 @@ async def save_legacy_listener(
name=legacy_in.name,
version=int(legacy_in.version),
description=legacy_in.description,
author=user,
creator=user,
properties=listener_properties,
)
new_listener = await db["listeners"].insert_one(listener.to_mongo())
Expand Down
8 changes: 4 additions & 4 deletions backend/app/routers/metadata_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async def _build_metadata_db_obj(
extractor_info = metadata_in.extractor_info
if extractor_info is not None:
if (
extractor := await db["extractors"].find_one(
extractor := await db["listeners"].find_one(
{"name": extractor_info.name, "version": extractor_info.version}
)
) is not None:
Expand Down Expand Up @@ -176,7 +176,7 @@ async def replace_file_metadata(
extractor_info = metadata_in.extractor_info
if extractor_info is not None:
if (
extractor := await db["extractors"].find_one(
extractor := await db["listeners"].find_one(
{"name": extractor_info.name, "version": extractor_info.version}
)
) is not None:
Expand Down Expand Up @@ -267,7 +267,7 @@ async def update_file_metadata(
extractor_info = metadata_in.extractor_info
if extractor_info is not None:
if (
extractor := await db["extractors"].find_one(
extractor := await db["listeners"].find_one(
{"name": extractor_info.name, "version": extractor_info.version}
)
) is not None:
Expand Down Expand Up @@ -402,7 +402,7 @@ async def delete_file_metadata(
extractor_info = metadata_in.extractor_info
if extractor_info is not None:
if (
extractor := await db["extractors"].find_one(
extractor := await db["listeners"].find_one(
{"name": extractor_info.name, "version": extractor_info.version}
)
) is not None:
Expand Down
30 changes: 30 additions & 0 deletions backend/app/tests/extractor_info.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
"name": "ncsa.wordcount",
"version": "2.0",
"description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.",
"author": "Rob Kooper <[email protected]>",
"contributors": [],
"contexts": [
{
"lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines",
"words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words",
"characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters"
}
],
"repository": [
{
"repType": "git",
"repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git"
}
],
"process": {
"file": [
"text/*",
"application/json"
]
},
"external_services": [],
"dependencies": [],
"bibtex": []
}
73 changes: 73 additions & 0 deletions backend/app/tests/test_extractors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
from fastapi.testclient import TestClient
from app.config import settings
from app.models.pyobjectid import PyObjectId

user = {
"email": "[email protected]",
"password": "not_a_password",
"first_name": "Foo",
"last_name": "Bar",
}

extractor_info = {
"@context": "http://clowder.ncsa.illinois.edu/contexts/extractors.jsonld",
"name": "ncsa.wordcount",
"version": "2.0",
"description": "WordCount extractor. Counts the number of characters, words and lines in the text file that was uploaded.",
"contributors": [],
"contexts": [
{
"lines": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#lines",
"words": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#words",
"characters": "http://clowder.ncsa.illinois.edu/metadata/ncsa.wordcount#characters",
}
],
"repository": [
{
"repType": "git",
"repUrl": "https://opensource.ncsa.illinois.edu/stash/scm/cats/pyclowder.git",
}
],
"process": {"file": ["text/*", "application/json"]},
"external_services": [],
"dependencies": [],
"bibtex": [],
}

# extractor_info_file = os.path.join(os.getcwd(), 'extractor_info.json')


def test_register(client: TestClient, headers: dict):
response = client.post(
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
)
assert response.json().get("id") is not None
assert response.status_code == 200


def test_get_one(client: TestClient, headers: dict):
response = client.post(
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
)
assert response.status_code == 200
assert response.json().get("id") is not None
extractor_id = response.json().get("id")
response = client.get(
f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers
)
assert response.status_code == 200
assert response.json().get("id") is not None


def test_delete(client: TestClient, headers: dict):
response = client.post(
f"{settings.API_V2_STR}/listeners", json=extractor_info, headers=headers
)
assert response.status_code == 200
assert response.json().get("id") is not None
extractor_id = response.json().get("id")
response = client.delete(
f"{settings.API_V2_STR}/listeners/{extractor_id}", headers=headers
)
assert response.status_code == 200
2 changes: 1 addition & 1 deletion frontend/src/openapi/v2/services/DatasetsService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,4 @@ export class DatasetsService {
});
}

}
}