diff --git a/webapp/frontend/generated/index.ts b/webapp/frontend/generated/index.ts index ad9ff5b5a6..4d3647d066 100644 --- a/webapp/frontend/generated/index.ts +++ b/webapp/frontend/generated/index.ts @@ -11,10 +11,10 @@ ******************************************************************************/ // import Vaadin client-router to handle client-side and server-side navigation -import {Router} from '@vaadin/router'; +import { Router } from '@vaadin/router'; // import Flow module to enable navigation to Vaadin server-side views -import {Flow} from '@vaadin/flow-frontend/Flow'; +import { Flow } from '@vaadin/flow-frontend/Flow'; const { serverSideRoutes } = new Flow({ imports: () => import('../../target/frontend/generated-flow-imports') diff --git a/webapp/frontend/generated/theme-datamanager.generated.js b/webapp/frontend/generated/theme-datamanager.generated.js index 86b8d3adfe..17183076af 100644 --- a/webapp/frontend/generated/theme-datamanager.generated.js +++ b/webapp/frontend/generated/theme-datamanager.generated.js @@ -1,12 +1,4 @@ import 'construct-style-sheets-polyfill'; -import stylesCss from 'themes/datamanager/styles.css?inline'; -import { - badge, - color, - spacing, - typography, - utility -} from '@vaadin/vaadin-lumo-styles'; const createLinkReferences = (css, target) => { // Unresolved urls are written as '@import url(text);' to the css @@ -54,6 +46,12 @@ export const injectGlobalCss = (css, target, first) => { target.adoptedStyleSheets = [...target.adoptedStyleSheets, sheet]; } }; +import stylesCss from 'themes/datamanager/styles.css?inline'; +import { typography } from '@vaadin/vaadin-lumo-styles'; +import { color } from '@vaadin/vaadin-lumo-styles'; +import { spacing } from '@vaadin/vaadin-lumo-styles'; +import { badge } from '@vaadin/vaadin-lumo-styles'; +import { utility } from '@vaadin/vaadin-lumo-styles'; window.Vaadin = window.Vaadin || {}; window.Vaadin.theme = window.Vaadin.theme || {}; diff --git a/webapp/frontend/generated/theme.js b/webapp/frontend/generated/theme.js index 1c82e206bc..b4ce217073 100644 --- a/webapp/frontend/generated/theme.js +++ b/webapp/frontend/generated/theme.js @@ -1,3 +1,2 @@ import {applyTheme as _applyTheme} from './theme-datamanager.generated.js'; - export const applyTheme = _applyTheme; diff --git a/webapp/frontend/generated/vaadin.ts b/webapp/frontend/generated/vaadin.ts index d668e08718..5b3b9a283c 100644 --- a/webapp/frontend/generated/vaadin.ts +++ b/webapp/frontend/generated/vaadin.ts @@ -4,6 +4,5 @@ import './index'; import '@vaadin/flow-frontend/VaadinDevmodeGizmo.js'; -import {applyTheme} from './theme'; - +import { applyTheme } from './theme'; applyTheme(document); diff --git a/webapp/src/main/java/life/qbic/messaging/Exchange.java b/webapp/src/main/java/life/qbic/messaging/Exchange.java index a867550f46..d9ef7220c2 100644 --- a/webapp/src/main/java/life/qbic/messaging/Exchange.java +++ b/webapp/src/main/java/life/qbic/messaging/Exchange.java @@ -3,7 +3,11 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import life.qbic.apps.datamanager.notifications.MessageBusInterface; import life.qbic.apps.datamanager.notifications.MessageParameters; import life.qbic.apps.datamanager.notifications.MessageSubscriber; @@ -17,14 +21,21 @@ *
Note: use this class for development purposes only and replace it with an implementation that
* utilizes a production grade messaging middleware, such as for example RabbitMQ or Apache Kafka.
*
- * @since
+ * Note: if the instance already exists prior to this call, then the capacity argument will be
+ * ignored.
+ *
+ * @param capacity the capacity of the queue size
+ * @return an exchange instance
+ * @since 1.0.0
+ */
+ public static Exchange instance(int capacity) {
+ if (instance == null) {
+ instance = new Exchange(capacity);
+ }
+ return instance;
+ }
+
+ protected Exchange(int capacity) {
topics = new ArrayList<>();
+ submissionTasks = new ArrayBlockingQueue<>(capacity);
+ lock = new ReentrantLock();
+ launchSubmissionTaskWorker();
+ }
+
+ protected Exchange() {
+ this(DEFAULT_CAPACITY);
+ }
+
+ private void launchSubmissionTaskWorker() {
+ worker = new SubmissionTaskWorker(submissionTasks, topics, lock);
+ worker.setName("Message Submission Worker");
+ worker.start();
}
/**
- * Submits a message to the exchange. The topic is taken from the {@link
- * MessageParameters#messageType} parameter, and all subscriber to this topic are informed.
+ * Submits a message to the exchange. The topic is taken from the
+ * {@link MessageParameters#messageType} parameter, and all subscriber to this topic are
+ * informed.
*
- * @param message the message to publish via the exchange instance
- * @param messageParameters some message parameters, such as the type (aka topic), the occurredOn
- * timepoint and a unique message identifier.
+ * @param message the message to publish via the exchange instance
+ * @param messageParameters some message parameters, such as the type (aka topic), the occuredOn
+ * timepoint and a unique message identifier.
*/
@Override
- public synchronized void submit(String message, MessageParameters messageParameters) {
- this.topics.forEach(it -> it.informAllSubscribers(message, messageParameters));
+ public void submit(String message, MessageParameters messageParameters) {
+ SubmissionTask newTask = new SubmissionTask(message, messageParameters);
+ submissionTasks.add(newTask);
}
/**
@@ -60,11 +103,34 @@ public synchronized void submit(String message, MessageParameters messageParamet
* over this Exchange instance.
*
* @param subscriber the subscriber callback reference. A subscriber can only subscribe once to a
- * topic. Multiple calls will not overwrite the subscriber.
- * @param topic the topic to subscribe to
+ * topic. Multiple calls will not overwrite the subscriber.
+ * @param topic the topic to subscribe to
*/
@Override
- public synchronized void subscribe(MessageSubscriber subscriber, String topic) {
+ public void subscribe(MessageSubscriber subscriber, String topic) {
+ try {
+ // We try to acquire the lock for 1 second. If the lock
+ // cannot be acquired within 1 second, we throw an exception
+ // so the client can try to subscribe again.
+ if (lock.tryLock(1000, TimeUnit.MILLISECONDS)) {
+ try {
+ tryToSubscribe(subscriber, topic);
+ } finally {
+ lock.unlock(); // release the lock afterwards
+ }
+ } else {
+ throw new RuntimeException("Subscription failed");
+ }
+ } catch (InterruptedException e) {
+ stop();
+ }
+ }
+
+ private void stop() {
+ this.worker.interrupt();
+ }
+
+ private void tryToSubscribe(MessageSubscriber messageSubscriber, String topic) {
Topic matchingTopic = null;
for (Topic availableTopic : topics) {
if (availableTopic.matchesTopic(topic)) {
@@ -76,10 +142,12 @@ public synchronized void subscribe(MessageSubscriber subscriber, String topic) {
matchingTopic = new Topic(topic);
topics.add(matchingTopic);
}
- matchingTopic.addSubscriber(subscriber);
+ matchingTopic.addSubscriber(messageSubscriber);
}
- /** Small helper class to handle topics and their subscribers. */
+ /**
+ * Small helper class to handle topics and their subscribers.
+ */
static class Topic {
private final String topic;
@@ -102,11 +170,11 @@ protected Topic(String topic) {
subscribers = new HashSet<>();
}
- synchronized void addSubscriber(MessageSubscriber subscriber) {
+ void addSubscriber(MessageSubscriber subscriber) {
subscribers.add(subscriber);
}
- synchronized void removeSubscriber(MessageSubscriber subscriber) {
+ void removeSubscriber(MessageSubscriber subscriber) {
subscribers.remove(subscriber);
}
@@ -114,7 +182,7 @@ boolean matchesTopic(String topic) {
return this.topic.equalsIgnoreCase(topic);
}
- synchronized void informAllSubscribers(String message, MessageParameters messageParameters) {
+ void informAllSubscribers(String message, MessageParameters messageParameters) {
if (messageParameters.messageType.equalsIgnoreCase(topic)) {
informSubscribers(message, messageParameters);
}
@@ -124,4 +192,79 @@ private void informSubscribers(String message, MessageParameters messageParamete
subscribers.forEach(it -> it.receive(message, messageParameters));
}
}
+
+ static class SubmissionTask {
+
+ String message;
+
+ MessageParameters messageParameters;
+
+ SubmissionTask(String message, MessageParameters messageParameters) {
+ this.message = message;
+ this.messageParameters = messageParameters;
+ }
+
+ }
+
+ static class SubmissionTaskWorker extends Thread {
+
+ Queue