3737import io .rsocket .frame .RequestResponseFrameFlyweight ;
3838import io .rsocket .frame .RequestStreamFrameFlyweight ;
3939import io .rsocket .frame .decoder .PayloadDecoder ;
40- import io .rsocket .internal .FluxSwitchOnFirst ;
41- import io .rsocket .internal .RateLimitableRequestSubscriber ;
4240import io .rsocket .internal .SynchronizedIntObjectHashMap ;
4341import io .rsocket .internal .UnboundedProcessor ;
4442import io .rsocket .internal .UnicastMonoEmpty ;
6058import org .reactivestreams .Publisher ;
6159import org .reactivestreams .Subscriber ;
6260import org .reactivestreams .Subscription ;
61+ import reactor .core .publisher .BaseSubscriber ;
6362import reactor .core .publisher .Flux ;
6463import reactor .core .publisher .Mono ;
6564import reactor .core .publisher .SignalType ;
@@ -83,7 +82,7 @@ class RSocketRequester implements RSocket {
8382 private final PayloadDecoder payloadDecoder ;
8483 private final Consumer <Throwable > errorConsumer ;
8584 private final StreamIdSupplier streamIdSupplier ;
86- private final IntObjectMap <RateLimitableRequestSubscriber > senders ;
85+ private final IntObjectMap <Subscription > senders ;
8786 private final IntObjectMap <Processor <Payload , Payload >> receivers ;
8887 private final UnboundedProcessor <ByteBuf > sendProcessor ;
8988 private final RequesterLeaseHandler leaseHandler ;
@@ -262,7 +261,6 @@ private Flux<Payload> handleRequestStream(final Payload payload) {
262261 receivers .put (streamId , receiver );
263262
264263 return receiver
265- .log ()
266264 .doOnRequest (
267265 new LongConsumer () {
268266
@@ -312,29 +310,31 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
312310 return Flux .error (err );
313311 }
314312
315- return request .transform (
316- f ->
317- new FluxSwitchOnFirst <>(
318- f ,
319- (s , flux ) -> {
320- Payload payload = s .get ();
321- if (payload != null ) {
322- return handleChannel (flux , payload );
323- } else {
324- return flux ;
325- }
326- },
327- false ));
313+ return request .switchOnFirst (
314+ (s , flux ) -> {
315+ Payload payload = s .get ();
316+ if (payload != null ) {
317+ return handleChannel (payload , flux .skip (1 ));
318+ } else {
319+ return flux ;
320+ }
321+ },
322+ false );
328323 }
329324
330- private Flux <? extends Payload > handleChannel (Flux < Payload > inboundFlux , Payload initialPayload ) {
325+ private Flux <? extends Payload > handleChannel (Payload initialPayload , Flux < Payload > inboundFlux ) {
331326 final UnboundedProcessor <ByteBuf > sendProcessor = this .sendProcessor ;
332327 final AtomicBoolean payloadReleasedFlag = new AtomicBoolean (false );
333328 final int streamId = streamIdSupplier .nextStreamId (receivers );
334329
335330 final UnicastProcessor <Payload > receiver = UnicastProcessor .create ();
336- final RateLimitableRequestSubscriber <Payload > upstreamSubscriber =
337- new RateLimitableRequestSubscriber <Payload >(Queues .SMALL_BUFFER_SIZE ) {
331+ final BaseSubscriber <Payload > upstreamSubscriber =
332+ new BaseSubscriber <Payload >() {
333+
334+ @ Override
335+ protected void hookOnSubscribe (Subscription subscription ) {
336+ // noops
337+ }
338338
339339 @ Override
340340 protected void hookOnNext (Payload payload ) {
@@ -355,7 +355,7 @@ protected void hookOnComplete() {
355355 protected void hookOnError (Throwable t ) {
356356 ByteBuf frame = ErrorFrameFlyweight .encode (allocator , streamId , t );
357357 sendProcessor .onNext (frame );
358- receiver .dispose ( );
358+ receiver .onError ( t );
359359 }
360360
361361 @ Override
@@ -377,7 +377,7 @@ public void accept(long n) {
377377 senders .put (streamId , upstreamSubscriber );
378378 receivers .put (streamId , receiver );
379379
380- inboundFlux .subscribe (upstreamSubscriber );
380+ inboundFlux .limitRate ( Queues . SMALL_BUFFER_SIZE ). subscribe (upstreamSubscriber );
381381
382382 ByteBuf frame =
383383 RequestChannelFrameFlyweight .encode (
@@ -405,16 +405,15 @@ public void accept(long n) {
405405 upstreamSubscriber .cancel ();
406406 }
407407 })
408+ .doOnComplete (() -> receivers .remove (streamId , receiver ))
408409 .doOnCancel (
409410 () -> {
410411 if (!payloadReleasedFlag .getAndSet (true )) {
411412 initialPayload .release ();
412413 }
413- if (contains (streamId )) {
414+ if (receivers . remove (streamId , receiver )) {
414415 sendProcessor .onNext (CancelFrameFlyweight .encode (allocator , streamId ));
415- if (receivers .remove (streamId , receiver )) {
416- upstreamSubscriber .cancel ();
417- }
416+ upstreamSubscriber .cancel ();
418417 }
419418 });
420419 }
0 commit comments