Skip to content

Commit 7da7a22

Browse files
authored
ensures new requests with existing stream id are handled properly (#912)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 603abac commit 7da7a22

File tree

7 files changed

+159
-27
lines changed

7 files changed

+159
-27
lines changed

rsocket-core/src/main/java/io/rsocket/core/FireAndForgetResponderSubscriber.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,15 @@ public void onComplete() {}
8989

9090
@Override
9191
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
92-
final CompositeByteBuf frames;
92+
final CompositeByteBuf frames = this.frames;
93+
9394
try {
94-
frames =
95-
ReassemblyUtils.addFollowingFrame(
96-
this.frames, followingFrame, this.maxInboundPayloadSize);
95+
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
9796
} catch (IllegalStateException t) {
9897
this.requesterResponderSupport.remove(this.streamId, this);
9998

100-
CompositeByteBuf framesToRelease = this.frames;
10199
this.frames = null;
102-
framesToRelease.release();
100+
frames.release();
103101

104102
logger.debug("Reassembly has failed", t);
105103
return;
@@ -128,8 +126,8 @@ public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLas
128126
public final void handleCancel() {
129127
final CompositeByteBuf frames = this.frames;
130128
if (frames != null) {
131-
this.frames = null;
132129
this.requesterResponderSupport.remove(this.streamId, this);
130+
this.frames = null;
133131
frames.release();
134132
}
135133
}

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ private void handleIncomingFrames(ByteBuf frame) {
205205
handleFrame(streamId, type, frame);
206206
}
207207
} catch (Throwable t) {
208+
LOGGER.error("Unexpected error during frame handling", t);
208209
super.getSendProcessor()
209210
.onNext(
210211
ErrorFrameCodec.encode(

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ private void handleFrame(ByteBuf frame) {
301301
break;
302302
}
303303
} catch (Throwable t) {
304+
LOGGER.error("Unexpected error during frame handling", t);
304305
super.getSendProcessor()
305306
.onNext(
306307
ErrorFrameCodec.encode(

rsocket-core/src/main/java/io/rsocket/core/RequestChannelResponderSubscriber.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,14 @@ public final void handleCancel() {
309309
this.requesterResponderSupport.remove(this.streamId, this);
310310

311311
final CompositeByteBuf frames = this.frames;
312-
this.frames = null;
313-
frames.release();
312+
if (frames != null) {
313+
this.frames = null;
314+
frames.release();
315+
} else {
316+
final Payload firstPayload = this.firstPayload;
317+
this.firstPayload = null;
318+
firstPayload.release();
319+
}
314320
return;
315321
}
316322

rsocket-core/src/main/java/io/rsocket/core/RequestResponseResponderSubscriber.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,10 @@ public void handleCancel() {
193193
this.requesterResponderSupport.remove(this.streamId, this);
194194

195195
final CompositeByteBuf frames = this.frames;
196-
this.frames = null;
197-
frames.release();
196+
if (frames != null) {
197+
this.frames = null;
198+
frames.release();
199+
}
198200

199201
return;
200202
}
@@ -210,17 +212,20 @@ public void handleCancel() {
210212

211213
@Override
212214
public void handleNext(ByteBuf frame, boolean hasFollows, boolean isLastPayload) {
213-
final CompositeByteBuf frames;
215+
final CompositeByteBuf frames = this.frames;
216+
if (frames == null) {
217+
return;
218+
}
219+
214220
try {
215-
frames = ReassemblyUtils.addFollowingFrame(this.frames, frame, this.maxInboundPayloadSize);
221+
ReassemblyUtils.addFollowingFrame(frames, frame, this.maxInboundPayloadSize);
216222
} catch (IllegalStateException t) {
217223
S.lazySet(this, Operators.cancelledSubscription());
218224

219225
this.requesterResponderSupport.remove(this.streamId, this);
220226

221-
CompositeByteBuf framesToRelease = this.frames;
222227
this.frames = null;
223-
framesToRelease.release();
228+
frames.release();
224229

225230
logger.debug("Reassembly has failed", t);
226231

rsocket-core/src/main/java/io/rsocket/core/RequestStreamResponderSubscriber.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,10 @@ public final void handleCancel() {
233233
this.requesterResponderSupport.remove(this.streamId, this);
234234

235235
final CompositeByteBuf frames = this.frames;
236-
this.frames = null;
237-
frames.release();
236+
if (frames != null) {
237+
this.frames = null;
238+
frames.release();
239+
}
238240

239241
return;
240242
}
@@ -250,21 +252,22 @@ public final void handleCancel() {
250252

251253
@Override
252254
public void handleNext(ByteBuf followingFrame, boolean hasFollows, boolean isLastPayload) {
253-
final CompositeByteBuf frames;
255+
final CompositeByteBuf frames = this.frames;
256+
if (frames == null) {
257+
return;
258+
}
259+
254260
try {
255-
frames =
256-
ReassemblyUtils.addFollowingFrame(
257-
this.frames, followingFrame, this.maxInboundPayloadSize);
261+
ReassemblyUtils.addFollowingFrame(frames, followingFrame, this.maxInboundPayloadSize);
258262
} catch (IllegalStateException t) {
259263
// if subscription is null, it means that streams has not yet reassembled all the fragments
260264
// and fragmentation of the first frame was cancelled before
261265
S.lazySet(this, Operators.cancelledSubscription());
262266

263267
this.requesterResponderSupport.remove(this.streamId, this);
264268

265-
CompositeByteBuf framesToRelease = this.frames;
266269
this.frames = null;
267-
framesToRelease.release();
270+
frames.release();
268271

269272
logger.debug("Reassembly has failed", t);
270273

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 122 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.concurrent.atomic.AtomicReference;
7676
import java.util.stream.Stream;
7777
import org.assertj.core.api.Assertions;
78+
import org.assertj.core.api.Assumptions;
7879
import org.junit.jupiter.api.AfterEach;
7980
import org.junit.jupiter.api.BeforeEach;
8081
import org.junit.jupiter.api.Disabled;
@@ -94,6 +95,7 @@
9495
import reactor.core.publisher.FluxSink;
9596
import reactor.core.publisher.Hooks;
9697
import reactor.core.publisher.Mono;
98+
import reactor.core.publisher.MonoProcessor;
9799
import reactor.core.publisher.Operators;
98100
import reactor.core.scheduler.Scheduler;
99101
import reactor.core.scheduler.Schedulers;
@@ -830,13 +832,13 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
830832
rule.assertHasNoLeaks();
831833
}
832834

833-
static Stream<FrameType> fragmentationCases() {
835+
static Stream<FrameType> requestCases() {
834836
return Stream.of(REQUEST_FNF, REQUEST_RESPONSE, REQUEST_STREAM, REQUEST_CHANNEL);
835837
}
836838

837839
@DisplayName("reassembles payload")
838840
@ParameterizedTest
839-
@MethodSource("fragmentationCases")
841+
@MethodSource("requestCases")
840842
void reassemblePayload(FrameType frameType) {
841843
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
842844
rule.setAcceptingSocket(
@@ -891,7 +893,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
891893

892894
@DisplayName("reassembles metadata")
893895
@ParameterizedTest
894-
@MethodSource("fragmentationCases")
896+
@MethodSource("requestCases")
895897
void reassembleMetadataOnly(FrameType frameType) {
896898
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
897899
rule.setAcceptingSocket(
@@ -948,7 +950,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
948950
}
949951

950952
@ParameterizedTest(name = "throws error if reassembling payload size exceeds {0}")
951-
@MethodSource("fragmentationCases")
953+
@MethodSource("requestCases")
952954
public void errorTooBigPayload(FrameType frameType) {
953955
final int mtu = ThreadLocalRandom.current().nextInt(64, 256);
954956
final int maxInboundPayloadSize = ThreadLocalRandom.current().nextInt(mtu + 1, 4096);
@@ -1000,6 +1002,122 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
10001002
rule.assertHasNoLeaks();
10011003
}
10021004

1005+
@ParameterizedTest
1006+
@MethodSource("requestCases")
1007+
void receivingRequestOnStreamIdThaIsAlreadyInUseMUSTBeIgnored_ReassemblyCase(
1008+
FrameType requestType) {
1009+
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
1010+
final MonoProcessor<Void> delayer = MonoProcessor.create();
1011+
rule.setAcceptingSocket(
1012+
new RSocket() {
1013+
1014+
@Override
1015+
public Mono<Void> fireAndForget(Payload payload) {
1016+
receivedPayload.set(payload);
1017+
return delayer;
1018+
}
1019+
1020+
@Override
1021+
public Mono<Payload> requestResponse(Payload payload) {
1022+
receivedPayload.set(payload);
1023+
return Mono.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1024+
}
1025+
1026+
@Override
1027+
public Flux<Payload> requestStream(Payload payload) {
1028+
receivedPayload.set(payload);
1029+
return Flux.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1030+
}
1031+
1032+
@Override
1033+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
1034+
Flux.from(payloads).subscribe(receivedPayload::set, null, null, s -> s.request(1));
1035+
return Flux.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1036+
}
1037+
});
1038+
final Payload randomPayload1 = fixedSizePayload(rule.allocator, 128);
1039+
final List<ByteBuf> fragments1 =
1040+
prepareFragments(rule.allocator, 64, randomPayload1, requestType);
1041+
final Payload randomPayload2 = fixedSizePayload(rule.allocator, 128);
1042+
final List<ByteBuf> fragments2 =
1043+
prepareFragments(rule.allocator, 64, randomPayload2, requestType);
1044+
randomPayload2.release();
1045+
rule.connection.addToReceivedBuffer(fragments1.remove(0));
1046+
rule.connection.addToReceivedBuffer(fragments2.remove(0));
1047+
1048+
rule.connection.addToReceivedBuffer(fragments1.toArray(new ByteBuf[0]));
1049+
if (requestType != REQUEST_CHANNEL) {
1050+
rule.connection.addToReceivedBuffer(fragments2.toArray(new ByteBuf[0]));
1051+
delayer.onComplete();
1052+
} else {
1053+
delayer.onComplete();
1054+
rule.connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(rule.allocator, 1));
1055+
rule.connection.addToReceivedBuffer(fragments2.toArray(new ByteBuf[0]));
1056+
}
1057+
1058+
PayloadAssert.assertThat(receivedPayload.get()).isEqualTo(randomPayload1).hasNoLeaks();
1059+
randomPayload1.release();
1060+
1061+
if (requestType != REQUEST_FNF) {
1062+
FrameAssert.assertThat(rule.connection.getSent().poll())
1063+
.typeOf(requestType == REQUEST_RESPONSE ? NEXT_COMPLETE : NEXT)
1064+
.hasNoLeaks();
1065+
1066+
if (requestType != REQUEST_RESPONSE) {
1067+
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(COMPLETE).hasNoLeaks();
1068+
}
1069+
}
1070+
1071+
rule.assertHasNoLeaks();
1072+
}
1073+
1074+
@ParameterizedTest
1075+
@MethodSource("requestCases")
1076+
void receivingRequestOnStreamIdThaIsAlreadyInUseMUSTBeIgnored(FrameType requestType) {
1077+
Assumptions.assumeThat(requestType).isNotEqualTo(REQUEST_FNF);
1078+
AtomicReference<Payload> receivedPayload = new AtomicReference<>();
1079+
final MonoProcessor<Object> delayer = MonoProcessor.create();
1080+
rule.setAcceptingSocket(
1081+
new RSocket() {
1082+
@Override
1083+
public Mono<Payload> requestResponse(Payload payload) {
1084+
receivedPayload.set(payload);
1085+
return Mono.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1086+
}
1087+
1088+
@Override
1089+
public Flux<Payload> requestStream(Payload payload) {
1090+
receivedPayload.set(payload);
1091+
return Flux.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1092+
}
1093+
1094+
@Override
1095+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
1096+
Flux.from(payloads).subscribe(receivedPayload::set, null, null, s -> s.request(1));
1097+
return Flux.just(genericPayload(rule.allocator)).delaySubscription(delayer);
1098+
}
1099+
});
1100+
final Payload randomPayload1 = fixedSizePayload(rule.allocator, 64);
1101+
final Payload randomPayload2 = fixedSizePayload(rule.allocator, 64);
1102+
rule.sendRequest(1, requestType, randomPayload1.retain());
1103+
rule.sendRequest(1, requestType, randomPayload2);
1104+
1105+
delayer.onComplete();
1106+
1107+
PayloadAssert.assertThat(receivedPayload.get()).isEqualTo(randomPayload1).hasNoLeaks();
1108+
randomPayload1.release();
1109+
1110+
FrameAssert.assertThat(rule.connection.getSent().poll())
1111+
.typeOf(requestType == REQUEST_RESPONSE ? NEXT_COMPLETE : NEXT)
1112+
.hasNoLeaks();
1113+
1114+
if (requestType != REQUEST_RESPONSE) {
1115+
FrameAssert.assertThat(rule.connection.getSent().poll()).typeOf(COMPLETE).hasNoLeaks();
1116+
}
1117+
1118+
rule.assertHasNoLeaks();
1119+
}
1120+
10031121
public static class ServerSocketRule extends AbstractSocketRule<RSocketResponder> {
10041122

10051123
private RSocket acceptingSocket;

0 commit comments

Comments
 (0)