Skip to content

Commit 8ed0e28

Browse files
committed
more fixes
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 09ea0f1 commit 8ed0e28

File tree

1 file changed

+73
-1
lines changed

1 file changed

+73
-1
lines changed

rsocket-core/src/test/java/io/rsocket/RequestStreamFluxTest.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public static void setUp() {
4040
}
4141

4242
@Test
43-
public void requestNframeShouldBeSentOnSubscription() {
43+
public void requestNFrameShouldBeSentOnSubscriptionAndThenSeparately() {
4444
final UnboundedProcessor<ByteBuf> sender = new UnboundedProcessor<>();
4545
final Payload payload = ByteBufPayload.create("testData", "testMetadata");
4646
final IntObjectMap<Reassemble<?>> activeStreams = new SynchronizedIntObjectHashMap<>();
@@ -110,6 +110,78 @@ public void requestNframeShouldBeSentOnSubscription() {
110110
Assertions.assertThat(sender.isEmpty()).isTrue();
111111
}
112112

113+
114+
@Test
115+
public void requestNFrameShouldBeSentExectlyOnceIfItIsMaxAllowed() {
116+
final UnboundedProcessor<ByteBuf> sender = new UnboundedProcessor<>();
117+
final Payload payload = ByteBufPayload.create("testData", "testMetadata");
118+
final IntObjectMap<Reassemble<?>> activeStreams = new SynchronizedIntObjectHashMap<>();
119+
120+
final RequestStreamFlux requestStreamFlux =
121+
new RequestStreamFlux(
122+
ByteBufAllocator.DEFAULT,
123+
payload,
124+
0,
125+
TestStateAware.empty(),
126+
StreamIdSupplier.clientSupplier(),
127+
activeStreams,
128+
sender,
129+
PayloadDecoder.ZERO_COPY);
130+
131+
Assertions.assertThat(activeStreams).isEmpty();
132+
133+
final AssertSubscriber<Payload> assertSubscriber =
134+
requestStreamFlux.subscribeWith(AssertSubscriber.create(0));
135+
Assertions.assertThat(payload.refCnt()).isOne();
136+
Assertions.assertThat(activeStreams).isEmpty();
137+
138+
assertSubscriber.request(1);
139+
140+
Assertions.assertThat(payload.refCnt()).isZero();
141+
Assertions.assertThat(activeStreams).containsEntry(1, requestStreamFlux);
142+
143+
final ByteBuf frame = sender.poll();
144+
FrameAssert.assertThat(frame)
145+
.isNotNull()
146+
.hasPayloadSize(
147+
"testData".getBytes(CharsetUtil.UTF_8).length
148+
+ "testMetadata".getBytes(CharsetUtil.UTF_8).length)
149+
.hasMetadata("testMetadata")
150+
.hasData("testData")
151+
.hasNoFragmentsFollow()
152+
.hasRequestN(1)
153+
.typeOf(FrameType.REQUEST_STREAM)
154+
.hasClientSideStreamId()
155+
.hasStreamId(1);
156+
157+
Assertions.assertThat(sender.isEmpty()).isTrue();
158+
159+
assertSubscriber.request(1);
160+
final ByteBuf requestNFrame = sender.poll();
161+
FrameAssert.assertThat(requestNFrame)
162+
.isNotNull()
163+
.hasRequestN(1)
164+
.typeOf(FrameType.REQUEST_N)
165+
.hasClientSideStreamId()
166+
.hasStreamId(1);
167+
168+
Assertions.assertThat(sender.isEmpty()).isTrue();
169+
170+
requestStreamFlux.onNext(EmptyPayload.INSTANCE);
171+
172+
requestStreamFlux.onComplete();
173+
174+
assertSubscriber.assertValues(EmptyPayload.INSTANCE).assertComplete();
175+
176+
Assertions.assertThat(frame.release()).isTrue();
177+
Assertions.assertThat(frame.refCnt()).isZero();
178+
179+
Assertions.assertThat(payload.refCnt()).isZero();
180+
Assertions.assertThat(activeStreams).isEmpty();
181+
182+
Assertions.assertThat(sender.isEmpty()).isTrue();
183+
}
184+
113185
@ParameterizedTest
114186
@MethodSource("frameShouldBeSentOnSubscriptionResponses")
115187
public void frameShouldBeSentOnSubscription(

0 commit comments

Comments
 (0)