Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions webapp/frontend/generated/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
******************************************************************************/

// import Vaadin client-router to handle client-side and server-side navigation
import {Router} from '@vaadin/router';
import { Router } from '@vaadin/router';

// import Flow module to enable navigation to Vaadin server-side views
import {Flow} from '@vaadin/flow-frontend/Flow';
import { Flow } from '@vaadin/flow-frontend/Flow';

const { serverSideRoutes } = new Flow({
imports: () => import('../../target/frontend/generated-flow-imports')
Expand Down
14 changes: 6 additions & 8 deletions webapp/frontend/generated/theme-datamanager.generated.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,4 @@
import 'construct-style-sheets-polyfill';
import stylesCss from 'themes/datamanager/styles.css?inline';
import {
badge,
color,
spacing,
typography,
utility
} from '@vaadin/vaadin-lumo-styles';

const createLinkReferences = (css, target) => {
// Unresolved urls are written as '@import url(text);' to the css
Expand Down Expand Up @@ -54,6 +46,12 @@ export const injectGlobalCss = (css, target, first) => {
target.adoptedStyleSheets = [...target.adoptedStyleSheets, sheet];
}
};
import stylesCss from 'themes/datamanager/styles.css?inline';
import { typography } from '@vaadin/vaadin-lumo-styles';
import { color } from '@vaadin/vaadin-lumo-styles';
import { spacing } from '@vaadin/vaadin-lumo-styles';
import { badge } from '@vaadin/vaadin-lumo-styles';
import { utility } from '@vaadin/vaadin-lumo-styles';
Comment thread
KochTobi marked this conversation as resolved.

window.Vaadin = window.Vaadin || {};
window.Vaadin.theme = window.Vaadin.theme || {};
Expand Down
1 change: 0 additions & 1 deletion webapp/frontend/generated/theme.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
import {applyTheme as _applyTheme} from './theme-datamanager.generated.js';

export const applyTheme = _applyTheme;
3 changes: 1 addition & 2 deletions webapp/frontend/generated/vaadin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,5 @@ import './index';

import '@vaadin/flow-frontend/VaadinDevmodeGizmo.js';

import {applyTheme} from './theme';

import { applyTheme } from './theme';
applyTheme(document);
181 changes: 162 additions & 19 deletions webapp/src/main/java/life/qbic/messaging/Exchange.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import life.qbic.apps.datamanager.notifications.MessageBusInterface;
import life.qbic.apps.datamanager.notifications.MessageParameters;
import life.qbic.apps.datamanager.notifications.MessageSubscriber;
Expand All @@ -17,14 +21,21 @@
* <p>Note: use this class for development purposes only and replace it with an implementation that
* utilizes a production grade messaging middleware, such as for example RabbitMQ or Apache Kafka.
*
* @since <version tag>
* @since 1.0.0
*/
public class Exchange implements MessageBusInterface {

List<Topic> topics;
private static final int DEFAULT_CAPACITY = 100;
private final Queue<SubmissionTask> submissionTasks;

private final List<Topic> topics;
Comment thread
KochTobi marked this conversation as resolved.

private static Exchange instance;

private final ReentrantLock lock;

private Thread worker;

/**
* Queries the current instance of the Exchange class.
*
Expand All @@ -37,34 +48,89 @@ public static Exchange instance() {
return instance;
}

protected Exchange() {
super();
/**
* Queries an instance of the Exchange class. If none exists, one will be created with the
* provided capacity for the messages the exchange instance can hold in its queue.
* <p>
* Note: if the instance already exists prior to this call, then the capacity argument will be
* ignored.
*
* @param capacity the capacity of the queue size
* @return an exchange instance
* @since 1.0.0
*/
public static Exchange instance(int capacity) {
if (instance == null) {
Comment thread
KochTobi marked this conversation as resolved.
instance = new Exchange(capacity);
}
return instance;
}

protected Exchange(int capacity) {
topics = new ArrayList<>();
submissionTasks = new ArrayBlockingQueue<>(capacity);
lock = new ReentrantLock();
launchSubmissionTaskWorker();
}

protected Exchange() {
this(DEFAULT_CAPACITY);
}
Comment thread
KochTobi marked this conversation as resolved.

private void launchSubmissionTaskWorker() {
worker = new SubmissionTaskWorker(submissionTasks, topics, lock);
worker.setName("Message Submission Worker");
worker.start();
}

/**
* Submits a message to the exchange. The topic is taken from the {@link
* MessageParameters#messageType} parameter, and all subscriber to this topic are informed.
* Submits a message to the exchange. The topic is taken from the
* {@link MessageParameters#messageType} parameter, and all subscriber to this topic are
* informed.
*
* @param message the message to publish via the exchange instance
* @param messageParameters some message parameters, such as the type (aka topic), the occurredOn
* timepoint and a unique message identifier.
* @param message the message to publish via the exchange instance
* @param messageParameters some message parameters, such as the type (aka topic), the occuredOn
* timepoint and a unique message identifier.
*/
@Override
public synchronized void submit(String message, MessageParameters messageParameters) {
this.topics.forEach(it -> it.informAllSubscribers(message, messageParameters));
public void submit(String message, MessageParameters messageParameters) {
SubmissionTask newTask = new SubmissionTask(message, messageParameters);
submissionTasks.add(newTask);
}

/**
* Subscribe to a topic in order to get informed, whenever a message with this topic is published
* over this Exchange instance.
*
* @param subscriber the subscriber callback reference. A subscriber can only subscribe once to a
* topic. Multiple calls will not overwrite the subscriber.
* @param topic the topic to subscribe to
* topic. Multiple calls will not overwrite the subscriber.
* @param topic the topic to subscribe to
*/
@Override
public synchronized void subscribe(MessageSubscriber subscriber, String topic) {
public void subscribe(MessageSubscriber subscriber, String topic) {
try {
// We try to acquire the lock for 1 second. If the lock
// cannot be acquired within 1 second, we throw an exception
// so the client can try to subscribe again.
if (lock.tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
tryToSubscribe(subscriber, topic);
} finally {
lock.unlock(); // release the lock afterwards
}
} else {
throw new RuntimeException("Subscription failed");
}
} catch (InterruptedException e) {
stop();
}
}

private void stop() {
this.worker.interrupt();
}

private void tryToSubscribe(MessageSubscriber messageSubscriber, String topic) {
Topic matchingTopic = null;
for (Topic availableTopic : topics) {
if (availableTopic.matchesTopic(topic)) {
Expand All @@ -76,10 +142,12 @@ public synchronized void subscribe(MessageSubscriber subscriber, String topic) {
matchingTopic = new Topic(topic);
topics.add(matchingTopic);
}
matchingTopic.addSubscriber(subscriber);
matchingTopic.addSubscriber(messageSubscriber);
}

/** Small helper class to handle topics and their subscribers. */
/**
* Small helper class to handle topics and their subscribers.
*/
static class Topic {

private final String topic;
Expand All @@ -102,19 +170,19 @@ protected Topic(String topic) {
subscribers = new HashSet<>();
}

synchronized void addSubscriber(MessageSubscriber subscriber) {
void addSubscriber(MessageSubscriber subscriber) {
subscribers.add(subscriber);
}

synchronized void removeSubscriber(MessageSubscriber subscriber) {
void removeSubscriber(MessageSubscriber subscriber) {
subscribers.remove(subscriber);
}

boolean matchesTopic(String topic) {
return this.topic.equalsIgnoreCase(topic);
}

synchronized void informAllSubscribers(String message, MessageParameters messageParameters) {
void informAllSubscribers(String message, MessageParameters messageParameters) {
if (messageParameters.messageType.equalsIgnoreCase(topic)) {
informSubscribers(message, messageParameters);
}
Expand All @@ -124,4 +192,79 @@ private void informSubscribers(String message, MessageParameters messageParamete
subscribers.forEach(it -> it.receive(message, messageParameters));
}
}

static class SubmissionTask {

String message;

MessageParameters messageParameters;

SubmissionTask(String message, MessageParameters messageParameters) {
this.message = message;
this.messageParameters = messageParameters;
}

}

static class SubmissionTaskWorker extends Thread {

Queue<SubmissionTask> tasks;

List<Topic> topics;

ReentrantLock lock;

SubmissionTaskWorker(Queue<SubmissionTask> tasks, List<Topic> topics,
ReentrantLock lock) {
this.tasks = tasks;
this.lock = lock;
this.topics = topics;
}

private void submit(String message, MessageParameters messageParameters) {
this.topics.forEach(it -> it.informAllSubscribers(message, messageParameters));
}

@Override
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
cleanup();
return;
}
SubmissionTask currentTask = tasks.poll();
if (currentTask != null) {
try {
handleSubmission(currentTask);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
Thread.sleep(100);
Comment thread
KochTobi marked this conversation as resolved.
} catch (InterruptedException e) {
// no need to do anything atm, we don't save the state of the working queue.
// we interrupt the current thread and wait for the cleanup
Thread.currentThread().interrupt();
}
}
}

private void cleanup() {
// do potential cleanup work when the worker receives
// an interrupted signal
}

private void handleSubmission(SubmissionTask currentTask) throws InterruptedException {
while (!lock.tryLock()) {
Thread.sleep(100);
}
Comment thread
KochTobi marked this conversation as resolved.
try {
submit(currentTask.message, currentTask.messageParameters);
} finally {
lock.unlock();
}
}

}
}