diff --git a/webapp/frontend/generated/index.ts b/webapp/frontend/generated/index.ts index ad9ff5b5a6..4d3647d066 100644 --- a/webapp/frontend/generated/index.ts +++ b/webapp/frontend/generated/index.ts @@ -11,10 +11,10 @@ ******************************************************************************/ // import Vaadin client-router to handle client-side and server-side navigation -import {Router} from '@vaadin/router'; +import { Router } from '@vaadin/router'; // import Flow module to enable navigation to Vaadin server-side views -import {Flow} from '@vaadin/flow-frontend/Flow'; +import { Flow } from '@vaadin/flow-frontend/Flow'; const { serverSideRoutes } = new Flow({ imports: () => import('../../target/frontend/generated-flow-imports') diff --git a/webapp/frontend/generated/theme-datamanager.generated.js b/webapp/frontend/generated/theme-datamanager.generated.js index 86b8d3adfe..17183076af 100644 --- a/webapp/frontend/generated/theme-datamanager.generated.js +++ b/webapp/frontend/generated/theme-datamanager.generated.js @@ -1,12 +1,4 @@ import 'construct-style-sheets-polyfill'; -import stylesCss from 'themes/datamanager/styles.css?inline'; -import { - badge, - color, - spacing, - typography, - utility -} from '@vaadin/vaadin-lumo-styles'; const createLinkReferences = (css, target) => { // Unresolved urls are written as '@import url(text);' to the css @@ -54,6 +46,12 @@ export const injectGlobalCss = (css, target, first) => { target.adoptedStyleSheets = [...target.adoptedStyleSheets, sheet]; } }; +import stylesCss from 'themes/datamanager/styles.css?inline'; +import { typography } from '@vaadin/vaadin-lumo-styles'; +import { color } from '@vaadin/vaadin-lumo-styles'; +import { spacing } from '@vaadin/vaadin-lumo-styles'; +import { badge } from '@vaadin/vaadin-lumo-styles'; +import { utility } from '@vaadin/vaadin-lumo-styles'; window.Vaadin = window.Vaadin || {}; window.Vaadin.theme = window.Vaadin.theme || {}; diff --git a/webapp/frontend/generated/theme.js b/webapp/frontend/generated/theme.js index 1c82e206bc..b4ce217073 100644 --- a/webapp/frontend/generated/theme.js +++ b/webapp/frontend/generated/theme.js @@ -1,3 +1,2 @@ import {applyTheme as _applyTheme} from './theme-datamanager.generated.js'; - export const applyTheme = _applyTheme; diff --git a/webapp/frontend/generated/vaadin.ts b/webapp/frontend/generated/vaadin.ts index d668e08718..5b3b9a283c 100644 --- a/webapp/frontend/generated/vaadin.ts +++ b/webapp/frontend/generated/vaadin.ts @@ -4,6 +4,5 @@ import './index'; import '@vaadin/flow-frontend/VaadinDevmodeGizmo.js'; -import {applyTheme} from './theme'; - +import { applyTheme } from './theme'; applyTheme(document); diff --git a/webapp/src/main/java/life/qbic/messaging/Exchange.java b/webapp/src/main/java/life/qbic/messaging/Exchange.java index a867550f46..d9ef7220c2 100644 --- a/webapp/src/main/java/life/qbic/messaging/Exchange.java +++ b/webapp/src/main/java/life/qbic/messaging/Exchange.java @@ -3,7 +3,11 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import life.qbic.apps.datamanager.notifications.MessageBusInterface; import life.qbic.apps.datamanager.notifications.MessageParameters; import life.qbic.apps.datamanager.notifications.MessageSubscriber; @@ -17,14 +21,21 @@ *

Note: use this class for development purposes only and replace it with an implementation that * utilizes a production grade messaging middleware, such as for example RabbitMQ or Apache Kafka. * - * @since + * @since 1.0.0 */ public class Exchange implements MessageBusInterface { - List topics; + private static final int DEFAULT_CAPACITY = 100; + private final Queue submissionTasks; + + private final List topics; private static Exchange instance; + private final ReentrantLock lock; + + private Thread worker; + /** * Queries the current instance of the Exchange class. * @@ -37,22 +48,54 @@ public static Exchange instance() { return instance; } - protected Exchange() { - super(); + /** + * Queries an instance of the Exchange class. If none exists, one will be created with the + * provided capacity for the messages the exchange instance can hold in its queue. + *

+ * Note: if the instance already exists prior to this call, then the capacity argument will be + * ignored. + * + * @param capacity the capacity of the queue size + * @return an exchange instance + * @since 1.0.0 + */ + public static Exchange instance(int capacity) { + if (instance == null) { + instance = new Exchange(capacity); + } + return instance; + } + + protected Exchange(int capacity) { topics = new ArrayList<>(); + submissionTasks = new ArrayBlockingQueue<>(capacity); + lock = new ReentrantLock(); + launchSubmissionTaskWorker(); + } + + protected Exchange() { + this(DEFAULT_CAPACITY); + } + + private void launchSubmissionTaskWorker() { + worker = new SubmissionTaskWorker(submissionTasks, topics, lock); + worker.setName("Message Submission Worker"); + worker.start(); } /** - * Submits a message to the exchange. The topic is taken from the {@link - * MessageParameters#messageType} parameter, and all subscriber to this topic are informed. + * Submits a message to the exchange. The topic is taken from the + * {@link MessageParameters#messageType} parameter, and all subscriber to this topic are + * informed. * - * @param message the message to publish via the exchange instance - * @param messageParameters some message parameters, such as the type (aka topic), the occurredOn - * timepoint and a unique message identifier. + * @param message the message to publish via the exchange instance + * @param messageParameters some message parameters, such as the type (aka topic), the occuredOn + * timepoint and a unique message identifier. */ @Override - public synchronized void submit(String message, MessageParameters messageParameters) { - this.topics.forEach(it -> it.informAllSubscribers(message, messageParameters)); + public void submit(String message, MessageParameters messageParameters) { + SubmissionTask newTask = new SubmissionTask(message, messageParameters); + submissionTasks.add(newTask); } /** @@ -60,11 +103,34 @@ public synchronized void submit(String message, MessageParameters messageParamet * over this Exchange instance. * * @param subscriber the subscriber callback reference. A subscriber can only subscribe once to a - * topic. Multiple calls will not overwrite the subscriber. - * @param topic the topic to subscribe to + * topic. Multiple calls will not overwrite the subscriber. + * @param topic the topic to subscribe to */ @Override - public synchronized void subscribe(MessageSubscriber subscriber, String topic) { + public void subscribe(MessageSubscriber subscriber, String topic) { + try { + // We try to acquire the lock for 1 second. If the lock + // cannot be acquired within 1 second, we throw an exception + // so the client can try to subscribe again. + if (lock.tryLock(1000, TimeUnit.MILLISECONDS)) { + try { + tryToSubscribe(subscriber, topic); + } finally { + lock.unlock(); // release the lock afterwards + } + } else { + throw new RuntimeException("Subscription failed"); + } + } catch (InterruptedException e) { + stop(); + } + } + + private void stop() { + this.worker.interrupt(); + } + + private void tryToSubscribe(MessageSubscriber messageSubscriber, String topic) { Topic matchingTopic = null; for (Topic availableTopic : topics) { if (availableTopic.matchesTopic(topic)) { @@ -76,10 +142,12 @@ public synchronized void subscribe(MessageSubscriber subscriber, String topic) { matchingTopic = new Topic(topic); topics.add(matchingTopic); } - matchingTopic.addSubscriber(subscriber); + matchingTopic.addSubscriber(messageSubscriber); } - /** Small helper class to handle topics and their subscribers. */ + /** + * Small helper class to handle topics and their subscribers. + */ static class Topic { private final String topic; @@ -102,11 +170,11 @@ protected Topic(String topic) { subscribers = new HashSet<>(); } - synchronized void addSubscriber(MessageSubscriber subscriber) { + void addSubscriber(MessageSubscriber subscriber) { subscribers.add(subscriber); } - synchronized void removeSubscriber(MessageSubscriber subscriber) { + void removeSubscriber(MessageSubscriber subscriber) { subscribers.remove(subscriber); } @@ -114,7 +182,7 @@ boolean matchesTopic(String topic) { return this.topic.equalsIgnoreCase(topic); } - synchronized void informAllSubscribers(String message, MessageParameters messageParameters) { + void informAllSubscribers(String message, MessageParameters messageParameters) { if (messageParameters.messageType.equalsIgnoreCase(topic)) { informSubscribers(message, messageParameters); } @@ -124,4 +192,79 @@ private void informSubscribers(String message, MessageParameters messageParamete subscribers.forEach(it -> it.receive(message, messageParameters)); } } + + static class SubmissionTask { + + String message; + + MessageParameters messageParameters; + + SubmissionTask(String message, MessageParameters messageParameters) { + this.message = message; + this.messageParameters = messageParameters; + } + + } + + static class SubmissionTaskWorker extends Thread { + + Queue tasks; + + List topics; + + ReentrantLock lock; + + SubmissionTaskWorker(Queue tasks, List topics, + ReentrantLock lock) { + this.tasks = tasks; + this.lock = lock; + this.topics = topics; + } + + private void submit(String message, MessageParameters messageParameters) { + this.topics.forEach(it -> it.informAllSubscribers(message, messageParameters)); + } + + @Override + public void run() { + while (true) { + if (Thread.currentThread().isInterrupted()) { + cleanup(); + return; + } + SubmissionTask currentTask = tasks.poll(); + if (currentTask != null) { + try { + handleSubmission(currentTask); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // no need to do anything atm, we don't save the state of the working queue. + // we interrupt the current thread and wait for the cleanup + Thread.currentThread().interrupt(); + } + } + } + + private void cleanup() { + // do potential cleanup work when the worker receives + // an interrupted signal + } + + private void handleSubmission(SubmissionTask currentTask) throws InterruptedException { + while (!lock.tryLock()) { + Thread.sleep(100); + } + try { + submit(currentTask.message, currentTask.messageParameters); + } finally { + lock.unlock(); + } + } + + } }