Skip to content

Commit 9ca64fe

Browse files
committed
fixes incorrect requestN behaviour on the requester side
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 3f37e6e commit 9ca64fe

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
317317
(s, flux) -> {
318318
Payload payload = s.get();
319319
if (payload != null) {
320-
return handleChannel(payload, flux.skip(1));
320+
return handleChannel(payload, flux);
321321
} else {
322322
return flux;
323323
}
@@ -334,13 +334,20 @@ private Flux<? extends Payload> handleChannel(Payload initialPayload, Flux<Paylo
334334
final BaseSubscriber<Payload> upstreamSubscriber =
335335
new BaseSubscriber<Payload>() {
336336

337+
boolean first = true;
338+
337339
@Override
338340
protected void hookOnSubscribe(Subscription subscription) {
339341
// noops
340342
}
341343

342344
@Override
343345
protected void hookOnNext(Payload payload) {
346+
if (first) {
347+
// need to skip first since we have already sent it
348+
first = false;
349+
return;
350+
}
344351
final ByteBuf frame =
345352
PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, payload);
346353

rsocket-test/src/main/java/io/rsocket/test/TransportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ default void requestChannel3() {
229229
.verify(getTimeout());
230230

231231
Assertions.assertThat(requested.get())
232-
.isEqualTo(257L); // 257 because of eager behavior of limitRate
232+
.isEqualTo(256L); // 256 because of eager behavior of limitRate
233233
}
234234

235235
@DisplayName("makes 1 requestChannel request with 512 payloads")

0 commit comments

Comments
 (0)