-
Notifications
You must be signed in to change notification settings - Fork 357
Implements dedicated Publisher/Subscriber for each request type #761
#761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
d71b8c0 to
7aa340b
Compare
7aa340b to
406daee
Compare
BenchesBase Line RC7-SNAPSHOTCurrent Branch |
231cae8 to
a78a14d
Compare
Testing Matrix ChecklistGeneric
Racing Cases |
a78a14d to
e7efe6d
Compare
5bd5fcb to
2d6c698
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamManager provides a way for requester Mono's and responder Subscriber's to access something back from RSocketRequester and RSocketResponder. This can be taken further to avoid passing other fields as well.
For example RSocketRequester and RSocketResponder could extend this (replacing StreamManager and AbstractStreamManager):
class RequesterResponderSupport {
private final int mtu;
private final int maxFrameLength;
private final int maxInboundPayloadSize;
private final PayloadDecoder payloadDecoder;
private final ByteBufAllocator allocator;
@Nullable
final StreamIdSupplier streamIdSupplier;
final IntObjectMap<FrameHandler> activeStreams;
private final UnboundedProcessor<ByteBuf> sendProcessor;
public RequesterResponderSupport(
int mtu,
int maxFrameLength,
int maxInboundPayloadSize,
PayloadDecoder payloadDecoder,
ByteBufAllocator allocator,
@Nullable StreamIdSupplier streamIdSupplier,
IntObjectMap<FrameHandler> activeStreams) {
this.activeStreams = activeStreams;
this.mtu = mtu;
this.maxFrameLength = maxFrameLength;
this.maxInboundPayloadSize = maxInboundPayloadSize;
this.payloadDecoder = payloadDecoder;
this.allocator = allocator;
this.streamIdSupplier = streamIdSupplier;
this.sendProcessor = new UnboundedProcessor<>();
}
public int getMtu() {
return mtu;
}
public int getMaxFrameLength() {
return maxFrameLength;
}
public int getMaxInboundPayloadSize() {
return maxInboundPayloadSize;
}
public PayloadDecoder getPayloadDecoder() {
return payloadDecoder;
}
public ByteBufAllocator getAllocator() {
return allocator;
}
public UnboundedProcessor<ByteBuf> getSendProcessor() {
return sendProcessor;
}
public synchronized int getNextId() {
if (this.streamIdSupplier != null) {
return this.streamIdSupplier.nextStreamId(this.activeStreams);
}
else {
throw new UnsupportedOperationException("Responder can not issue id");
}
}
public synchronized int addAndGetNextId(FrameHandler frameHandler) {
if (this.streamIdSupplier != null) {
final IntObjectMap<FrameHandler> activeStreams = this.activeStreams;
final int streamId = this.streamIdSupplier.nextStreamId(activeStreams);
activeStreams.put(streamId, frameHandler);
return streamId;
}
else {
throw new UnsupportedOperationException("Responder can not issue id");
}
}
public FrameHandler get(int streamId) {
return this.activeStreams.get(streamId);
}
public boolean remove(int streamId, FrameHandler frameHandler) {
return this.activeStreams.remove(streamId, frameHandler);
}
}Now the requester Mono's and Flux's can be created more easily:
@Override
public Flux<Payload> requestStream(Payload payload) {
return new RequestStreamFlux(payload, this);
}Likewise for responder Subscriber's:
RequestResponseSubscriber subscriber = new RequestResponseSubscriber(streamId, frame, this);There might be other opportunities for re-use as well through such a common base class.
f86226d to
eaeb099
Compare
rsocket-core/src/test/java/io/rsocket/core/StateMachineAssert.java
Outdated
Show resolved
Hide resolved
| AtomicLongFieldUpdater<T> updater, | ||
| T instance, | ||
| ReassembledFramesHolder reassembledFramesHolder, | ||
| Subscription subscription, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The subscription is the RequesterFrameHandler instance. Why don't we add cancel() to RequesterFrameHandler or have it extend Subscription so that only RequesterFrameHandler is passed in, making it easier to understand where cancel() is handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to avoid mixing RSocket spec frame handling with the reactive streams. First of all, it makes it impossible to understand from where method was invoked, especially when it comes to requestChannel case. Initially, it was reactive streams interfaces, but after lots of issues with understanding where and how should I handle every call (depends on the inbound vs outbound) I decided to get rid of that idea and make things fully separate
rsocket-core/src/main/java/io/rsocket/core/ReassemblyUtils.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java
Outdated
Show resolved
Hide resolved
rsocket-core/src/main/java/io/rsocket/core/RequestChannelRequesterFlux.java
Outdated
Show resolved
Hide resolved
| final Payload p = this.payload; | ||
| try { | ||
| if (!isValid(this.mtu, this.maxFrameLength, p, false)) { | ||
| lazyTerminate(STATE, this); | ||
| Operators.error( | ||
| actual, | ||
| new IllegalArgumentException( | ||
| String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength))); | ||
| p.release(); | ||
| return; | ||
| } | ||
| } catch (IllegalReferenceCountException e) { | ||
| lazyTerminate(STATE, this); | ||
| Operators.error(actual, e); | ||
| return; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't the payload be validated before subscription, basically as soon as it is provided? Maybe RSocketRequester could hold this logic and make the check before creating a requester Mono/Flux, thus also re-using the logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case, it is going to be code duplication. The only different - that code would be in a different place. Thus, once something needed to be adjusted - we would need to look at more places and not only to Requester / Responder operators
simonbasle
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't dig far deeper after yesterday's live review. it seems @rstoyanchev has already spotted quite a few elements to improve. overall I'd try to add more comments and documentations, especially on utils classes (like what's been done in StateUtils 👍)
rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java
Outdated
Show resolved
Hide resolved
eaeb099 to
e3b7998
Compare
93f9d5c to
2854e04
Compare
Done. Ready for another round of review |
c531667 to
0b63bbf
Compare
|
Applied most of requested changes. Will be merging this one. More polishing can be done in the followups to this PR |
Signed-off-by: Oleh Dokuka <[email protected]>
0b63bbf to
8be980e
Compare
Publisher/Subscriber for each request type
Publisher/Subscriber for each request typePublisher/Subscriber for each request type #761
This PR provides fully reworked internals for all possible interactions for Rsocket requester and responder
fixes #742 #641 #613 #641 #760