Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ rocrate = "*"
[dev-packages]
requests = "*"
pytest = "*"
pytest-asyncio = "*"
black = "*"
faker = "*"

Expand Down
347 changes: 181 additions & 166 deletions backend/Pipfile.lock

Large diffs are not rendered by default.

42 changes: 42 additions & 0 deletions backend/app/database/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging
import traceback
import motor.motor_asyncio
from typing import Optional, Generator
from fastapi import Depends
from pymongo import MongoClient

from app.config import settings
from app.mongo import crete_mongo_indexes
from app.models.errors import Error
from app.models.metadata import MongoDBRef

logger = logging.getLogger(__name__)


async def _get_db() -> Generator:
"""Duplicate of app.dependencies.get_db(), but importing that causes circular import."""
mongo_client = motor.motor_asyncio.AsyncIOMotorClient(settings.MONGODB_URL)
db = mongo_client[settings.MONGO_DATABASE]
await crete_mongo_indexes(db)
yield db


async def log_error(
exception: Exception,
resource: Optional[MongoDBRef] = None,
user: Optional[str] = None,
):
"""Insert new Error into the database.

Arguments:
exception -- instance of an Exception or subclass
resource -- if error relates to a specific resource, you can include it
user --- if error relates to actions performed by a user, you can include them
"""
db = _get_db()
message = str(exception)
trace = traceback.format_exc(exception, limit=4)

logger.error(message)
error_log = Error(message=message, trace=trace, resource=resource, user=user)
await db["errors"].insert_one(error_log.to_mongo())
2 changes: 1 addition & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
@app.on_event("startup")
async def startup_elasticsearch():
# create elasticsearch indices
es = connect_elasticsearch()
es = await connect_elasticsearch()
create_index(
es, "file", settings.elasticsearch_setting, indexSettings.file_mappings
)
Expand Down
25 changes: 25 additions & 0 deletions backend/app/models/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import datetime
from typing import Optional
from pydantic import Field

from app.models.mongomodel import MongoModel
from app.models.metadata import MongoDBRef


class ServiceUnreachable(Exception):
"""Raised when Clowder can't connect to an outside service e.g. MongoDB, Elasticsearch."""

def __init__(self, service, *args):
super().__init__(args)
self.service = service

def __str__(self):
return f"{self.service} could not be reached."


class Error(MongoModel):
message: str # Shorthand message of the error
trace: str # Full stack trace of the error
resource: Optional[MongoDBRef] = None
user_id: Optional[str] = None
timestamp: datetime = Field(default_factory=datetime.utcnow)
5 changes: 5 additions & 0 deletions backend/app/routers/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from app import dependencies
from app import keycloak_auth
from app.search.connect import (
connect_elasticsearch,
insert_record,
delete_document_by_id,
update_record,
Expand Down Expand Up @@ -191,6 +192,7 @@ async def save_dataset(
db: MongoClient = Depends(dependencies.get_db),
es=Depends(dependencies.get_elasticsearchclient),
):
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or es is None:
Expand Down Expand Up @@ -304,6 +306,7 @@ async def edit_dataset(
user_id=Depends(get_user),
es=Depends(dependencies.get_elasticsearchclient),
):
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or es is None:
Expand Down Expand Up @@ -347,6 +350,7 @@ async def patch_dataset(
db: MongoClient = Depends(dependencies.get_db),
es=Depends(dependencies.get_elasticsearchclient),
):
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or es is None:
Expand Down Expand Up @@ -388,6 +392,7 @@ async def delete_dataset(
fs: Minio = Depends(dependencies.get_fs),
es=Depends(dependencies.get_elasticsearchclient),
):
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or fs is None or es is None:
Expand Down
8 changes: 4 additions & 4 deletions backend/app/routers/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,26 @@

@router.put("/search", response_model=str)
async def search(index_name: str, query: str):
es = connect_elasticsearch()
es = await connect_elasticsearch()
return search_index(es, index_name, query)


@router.post("/file/_msearch")
async def search_file(request: Request):
es = connect_elasticsearch()
es = await connect_elasticsearch()
query = await request.body()
return search_index(es, "file", query)


@router.post("/dataset/_msearch")
async def search_dataset(request: Request):
es = connect_elasticsearch()
es = await connect_elasticsearch()
query = await request.body()
return search_index(es, "dataset", query)


@router.post("/file,dataset/_msearch")
async def search_file_and_dataset(request: Request):
es = connect_elasticsearch()
es = await connect_elasticsearch()
query = await request.body()
return search_index(es, ["file", "dataset"], query)
5 changes: 5 additions & 0 deletions backend/app/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from app import dependencies
from app.config import settings
from app.search.connect import (
connect_elasticsearch,
insert_record,
delete_document_by_id,
update_record,
Expand Down Expand Up @@ -55,6 +56,7 @@ async def add_file_entry(
file_db: FileDB object controlling dataset and folder destination
file: bytes to upload
"""
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or fs is None or es is None:
Expand Down Expand Up @@ -124,6 +126,8 @@ async def remove_file_entry(
"""Remove FileDB object into MongoDB, Minio, and associated metadata and version information."""
# TODO: Deleting individual versions will require updating version_id in mongo, or deleting entire document

es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or fs is None or es is None:
raise HTTPException(status_code=503, detail="Service not available")
Expand All @@ -146,6 +150,7 @@ async def update_file(
file: UploadFile = File(...),
es=Depends(dependencies.get_elasticsearchclient),
):
es = await connect_elasticsearch()

# Check all connection and abort if any one of them is not available
if db is None or fs is None or es is None:
Expand Down
15 changes: 10 additions & 5 deletions backend/app/search/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@
from app.models.files import FileOut
from app.models.search import SearchCriteria
from app.models.feeds import SearchObject
from app.models.errors import ServiceUnreachable
from app.database.errors import log_error

logger = logging.getLogger(__name__)
no_of_shards = settings.elasticsearch_no_of_shards
no_of_replicas = settings.elasticsearch_no_of_replicas


def connect_elasticsearch():
async def connect_elasticsearch():
"""To connect to elasticsearch server and return the elasticsearch client"""
_es = None
logger.info(settings.elasticsearch_url)
_es = Elasticsearch(settings.elasticsearch_url)
if _es.ping():
logger.info("Successfully connected to Elasticsearch")
else:
logger.info("Can not connect to Elasticsearch")
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looks good, but I am not sure how to test it in this case. If I turn off elasticsearch, I get an error before we get to ping.

WARNING:elastic_transport.node_pool:Node <Urllib3HttpNode(http://localhost:9200)> has failed for 11 times in a row, putting on 30 second timeout
WARNING:elastic_transport.transport:Retrying request after failure (attempt 2 of 3)
Traceback (most recent call last):
  File "/Users/lmarini/.local/share/virtualenvs/clowder2-pkRZ1537/lib/python3.9/site-packages/elastic_transport/_transport.py", line 329, in perform_request
    meta, raw_data = node.perform_request(
  File "/Users/lmarini/.local/share/virtualenvs/clowder2-pkRZ1537/lib/python3.9/site-packages/elastic_transport/_node/_http_urllib3.py", line 199, in perform_request
    raise err from None
elastic_transport.ConnectionError: Connection error caused by: NewConnectionError(<urllib3.connection.HTTPConnection object at 0x10dce7910>: Failed to establish a new connection: [Errno 61] Connection refused)
WARNING:elastic_transport.node_pool:Node <Urllib3HttpNode(http://localhost:9200)> has failed for 12 times in a row, putting on 30 second timeout```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this was 1) start up docker ES, 2) start up backend, 3) kill docker ES?

otherwise, we could come up with a contrived test case like "if es.ping() =.TRUE, pretend it failed" just to check the actual logging of errors even if the error is nonsense.

if _es.ping():
logger.info("Successfully connected to Elasticsearch")
else:
raise ServiceUnreachable("Elasticsearch")
except ServiceUnreachable as e:
await log_error(e)
return _es


Expand Down
11 changes: 7 additions & 4 deletions backend/app/tests/test_elastic_search.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import time
import pytest
from datetime import datetime

from bson import ObjectId
Expand Down Expand Up @@ -57,8 +58,9 @@
}


def test_files():
es = connect_elasticsearch()
@pytest.mark.asyncio
async def test_files():
es = await connect_elasticsearch()
if es is not None:
create_index(
es,
Expand Down Expand Up @@ -95,8 +97,9 @@ def test_files():
delete_index(es, dummy_file_index_name)


def test_datasets():
es = connect_elasticsearch()
@pytest.mark.asyncio
async def test_datasets():
es = await connect_elasticsearch()
if es is not None:
create_index(
es,
Expand Down