From 0d5d726d6977d97cdbe6b16629b65df43102190e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Sun, 2 Aug 2020 00:42:57 +0300 Subject: [PATCH] improves ref-counting by eliminating redundant retain/release improves ref counting and performance subsequently by eliminating redundant retain on the transport level hence we do not need to release this frame on the RSocketRequester/RSocketResponder level anymore Fixes all duplex connections to follow that strategy Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 11 ++- .../io/rsocket/core/RSocketResponder.java | 13 ++-- .../java/io/rsocket/core/RSocketServer.java | 15 +--- .../core/RSocketRequesterSubscribersTest.java | 10 ++- .../ClientServerInputMultiplexerTest.java | 40 +++++----- .../test/util/LocalDuplexConnection.java | 32 +++++++- .../test/util/TestDuplexConnection.java | 29 ++++++- .../local/LocalDuplexConnection.java | 78 ++++++++++++++++++- .../transport/local/LocalTransportTest.java | 40 ++++------ .../LocalTransportWithFragmentationTest.java | 40 ++++++++++ .../transport/netty/TcpDuplexConnection.java | 31 +------- .../netty/WebsocketDuplexConnection.java | 2 +- 12 files changed, 242 insertions(+), 99 deletions(-) create mode 100644 rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 00a2cae37..dbe10a488 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -19,7 +19,6 @@ import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport; import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; @@ -205,10 +204,14 @@ private void handleIncomingFrames(ByteBuf frame) { } else { handleFrame(streamId, type, frame); } - frame.release(); } catch (Throwable t) { - ReferenceCountUtil.safeRelease(frame); - throw reactor.core.Exceptions.propagate(t); + super.getSendProcessor() + .onNext( + ErrorFrameCodec.encode( + super.getAllocator(), + 0, + new ConnectionErrorException("Unexpected error during frame handling", t))); + this.tryTerminateOnConnectionError(t); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index 8daea299a..c80ecc035 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -17,11 +17,11 @@ package io.rsocket.core; import io.netty.buffer.ByteBuf; -import io.netty.util.ReferenceCountUtil; import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.ResponderRSocket; +import io.rsocket.exceptions.ConnectionErrorException; import io.rsocket.frame.ErrorFrameCodec; import io.rsocket.frame.FrameHeaderCodec; import io.rsocket.frame.FrameType; @@ -39,7 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Disposable; -import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -314,10 +313,14 @@ private void handleFrame(ByteBuf frame) { "ServerRSocket: Unexpected frame type: " + frameType))); break; } - ReferenceCountUtil.safeRelease(frame); } catch (Throwable t) { - ReferenceCountUtil.safeRelease(frame); - throw Exceptions.propagate(t); + super.getSendProcessor() + .onNext( + ErrorFrameCodec.encode( + super.getAllocator(), + 0, + new ConnectionErrorException("Unexpected error during frame handling", t))); + this.tryTerminateOnConnectionError(t); } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index 3d72d9a0c..f7fbbe9bd 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -387,29 +387,22 @@ private Mono acceptSetup( multiplexer, new InvalidSetupException( "Unsupported version: " + SetupFrameCodec.humanReadableVersion(setupFrame))) - .doFinally( - signalType -> { - setupFrame.release(); - multiplexer.dispose(); - }); + .doFinally(signalType -> multiplexer.dispose()); } boolean leaseEnabled = leasesSupplier != null; if (SetupFrameCodec.honorLease(setupFrame) && !leaseEnabled) { return serverSetup .sendError(multiplexer, new InvalidSetupException("lease is not supported")) - .doFinally( - signalType -> { - setupFrame.release(); - multiplexer.dispose(); - }); + .doFinally(signalType -> multiplexer.dispose()); } return serverSetup.acceptRSocketSetup( setupFrame, multiplexer, (keepAliveHandler, wrappedMultiplexer) -> { - ConnectionSetupPayload setupPayload = new DefaultConnectionSetupPayload(setupFrame); + ConnectionSetupPayload setupPayload = + new DefaultConnectionSetupPayload(setupFrame.retain()); Leases leases = leaseEnabled ? leasesSupplier.get() : null; RequesterLeaseHandler requesterLeaseHandler = diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 64a8ec30c..fda6b61ee 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -82,7 +82,7 @@ void setUp() { @ParameterizedTest @MethodSource("allInteractions") - void singleSubscriber(Function> interaction) { + void singleSubscriber(Function> interaction, FrameType requestType) { Flux response = Flux.from(interaction.apply(rSocketRequester)); AssertSubscriber assertSubscriberA = AssertSubscriber.create(); @@ -91,7 +91,9 @@ void singleSubscriber(Function> interaction) { response.subscribe(assertSubscriberA); response.subscribe(assertSubscriberB); - connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(connection.alloc(), 1)); + if (requestType != FrameType.REQUEST_FNF && requestType != FrameType.METADATA_PUSH) { + connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(connection.alloc(), 1)); + } assertSubscriberA.assertTerminated(); assertSubscriberB.assertTerminated(); @@ -111,7 +113,9 @@ void singleSubscriberInCaseOfRacing( RaceTestUtils.race( () -> response.subscribe(assertSubscriberA), () -> response.subscribe(assertSubscriberB)); - connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(connection.alloc(), i)); + if (requestType != FrameType.REQUEST_FNF && requestType != FrameType.METADATA_PUSH) { + connection.addToReceivedBuffer(PayloadFrameCodec.encodeComplete(connection.alloc(), i)); + } assertSubscriberA.assertTerminated(); assertSubscriberB.assertTerminated(); diff --git a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java index 63acc40aa..a4a902f21 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java @@ -68,52 +68,52 @@ public void clientSplits() { .doOnNext(f -> setupFrames.incrementAndGet()) .subscribe(); - source.addToReceivedBuffer(errorFrame(1)); + source.addToReceivedBuffer(errorFrame(1).retain()); assertEquals(1, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(1)); + source.addToReceivedBuffer(errorFrame(1).retain()); assertEquals(2, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(leaseFrame()); + source.addToReceivedBuffer(leaseFrame().retain()); assertEquals(3, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(keepAliveFrame()); + source.addToReceivedBuffer(keepAliveFrame().retain()); assertEquals(4, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(2)); + source.addToReceivedBuffer(errorFrame(2).retain()); assertEquals(4, clientFrames.get()); assertEquals(1, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(0)); + source.addToReceivedBuffer(errorFrame(0).retain()); assertEquals(5, clientFrames.get()); assertEquals(1, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(metadataPushFrame()); + source.addToReceivedBuffer(metadataPushFrame().retain()); assertEquals(5, clientFrames.get()); assertEquals(2, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(setupFrame()); + source.addToReceivedBuffer(setupFrame().retain()); assertEquals(5, clientFrames.get()); assertEquals(2, serverFrames.get()); assertEquals(1, setupFrames.get()); - source.addToReceivedBuffer(resumeFrame()); + source.addToReceivedBuffer(resumeFrame().retain()); assertEquals(5, clientFrames.get()); assertEquals(2, serverFrames.get()); assertEquals(2, setupFrames.get()); - source.addToReceivedBuffer(resumeOkFrame()); + source.addToReceivedBuffer(resumeOkFrame().retain()); assertEquals(5, clientFrames.get()); assertEquals(2, serverFrames.get()); assertEquals(3, setupFrames.get()); @@ -141,52 +141,52 @@ public void serverSplits() { .doOnNext(f -> setupFrames.incrementAndGet()) .subscribe(); - source.addToReceivedBuffer(errorFrame(1)); + source.addToReceivedBuffer(errorFrame(1).retain()); assertEquals(1, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(1)); + source.addToReceivedBuffer(errorFrame(1).retain()); assertEquals(2, clientFrames.get()); assertEquals(0, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(leaseFrame()); + source.addToReceivedBuffer(leaseFrame().retain()); assertEquals(2, clientFrames.get()); assertEquals(1, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(keepAliveFrame()); + source.addToReceivedBuffer(keepAliveFrame().retain()); assertEquals(2, clientFrames.get()); assertEquals(2, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(2)); + source.addToReceivedBuffer(errorFrame(2).retain()); assertEquals(2, clientFrames.get()); assertEquals(3, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(errorFrame(0)); + source.addToReceivedBuffer(errorFrame(0).retain()); assertEquals(2, clientFrames.get()); assertEquals(4, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(metadataPushFrame()); + source.addToReceivedBuffer(metadataPushFrame().retain()); assertEquals(3, clientFrames.get()); assertEquals(4, serverFrames.get()); assertEquals(0, setupFrames.get()); - source.addToReceivedBuffer(setupFrame()); + source.addToReceivedBuffer(setupFrame().retain()); assertEquals(3, clientFrames.get()); assertEquals(4, serverFrames.get()); assertEquals(1, setupFrames.get()); - source.addToReceivedBuffer(resumeFrame()); + source.addToReceivedBuffer(resumeFrame().retain()); assertEquals(3, clientFrames.get()); assertEquals(4, serverFrames.get()); assertEquals(2, setupFrames.get()); - source.addToReceivedBuffer(resumeOkFrame()); + source.addToReceivedBuffer(resumeOkFrame().retain()); assertEquals(3, clientFrames.get()); assertEquals(4, serverFrames.get()); assertEquals(3, setupFrames.get()); diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java index 58323c066..a2957c5a1 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java @@ -20,10 +20,13 @@ import io.netty.buffer.ByteBufAllocator; import io.rsocket.DuplexConnection; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Operators; public class LocalDuplexConnection implements DuplexConnection { private final ByteBufAllocator allocator; @@ -55,7 +58,34 @@ public Mono send(Publisher frame) { @Override public Flux receive() { - return receive.doOnNext(f -> System.out.println(name + " - " + f.toString())); + return receive + .doOnNext(f -> System.out.println(name + " - " + f.toString())) + .transform( + Operators.lift( + (__, actual) -> + new CoreSubscriber() { + + @Override + public void onSubscribe(Subscription s) { + actual.onSubscribe(s); + } + + @Override + public void onNext(ByteBuf byteBuf) { + actual.onNext(byteBuf); + byteBuf.release(); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + })); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java index 17a19b8c9..bb2ce99ec 100644 --- a/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java +++ b/rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java @@ -24,13 +24,16 @@ import java.util.concurrent.LinkedBlockingQueue; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.CoreSubscriber; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Operators; /** * An implementation of {@link DuplexConnection} that provides functionality to modify the behavior @@ -83,7 +86,31 @@ public Mono send(Publisher frames) { @Override public Flux receive() { - return received; + return received.transform( + Operators.lift( + (__, actual) -> + new CoreSubscriber() { + @Override + public void onSubscribe(Subscription s) { + actual.onSubscribe(s); + } + + @Override + public void onNext(ByteBuf byteBuf) { + actual.onNext(byteBuf); + byteBuf.release(); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + })); } @Override diff --git a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java index afaa14f95..026f30ced 100644 --- a/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java +++ b/rsocket-transport-local/src/main/java/io/rsocket/transport/local/LocalDuplexConnection.java @@ -22,9 +22,13 @@ import java.util.Objects; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.Operators; /** An implementation of {@link DuplexConnection} that connects inside the same JVM. */ final class LocalDuplexConnection implements DuplexConnection { @@ -73,7 +77,8 @@ public Mono onClose() { @Override public Flux receive() { - return in; + return in.transform( + Operators.lift((__, actual) -> new ByteBufReleaserOperator(actual))); } @Override @@ -94,4 +99,75 @@ public Mono sendOne(ByteBuf frame) { public ByteBufAllocator alloc() { return allocator; } + + static class ByteBufReleaserOperator + implements CoreSubscriber, Subscription, Fuseable.QueueSubscription { + + final CoreSubscriber actual; + + Subscription s; + + public ByteBufReleaserOperator(CoreSubscriber actual) { + this.actual = actual; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + actual.onSubscribe(this); + } + } + + @Override + public void onNext(ByteBuf buf) { + actual.onNext(buf); + buf.release(); + } + + @Override + public void onError(Throwable t) { + actual.onError(t); + } + + @Override + public void onComplete() { + actual.onComplete(); + } + + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + s.cancel(); + } + + @Override + public int requestFusion(int requestedMode) { + return Fuseable.NONE; + } + + @Override + public ByteBuf poll() { + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); + } + + @Override + public int size() { + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); + } + + @Override + public boolean isEmpty() { + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(NOT_SUPPORTED_MESSAGE); + } + } } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java index 7184dd645..ffc9ccb3a 100644 --- a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportTest.java @@ -16,31 +16,25 @@ package io.rsocket.transport.local; -final class LocalTransportTest { // implements TransportTest { - /* - TODO // think this has a memory leak or something in the local connection now that needs to be checked into. the test - TODO // isn't very happy when run from commandline i the command line - private static final AtomicInteger UNIQUE_NAME_GENERATOR = new AtomicInteger(); +import io.rsocket.test.TransportTest; +import java.time.Duration; +import java.util.UUID; - private final TransportPair transportPair = - new TransportPair<>( - () -> "test" + UNIQUE_NAME_GENERATOR.incrementAndGet(), - (address, server) -> LocalClientTransport.create(address), - LocalServerTransport::create); +final class LocalTransportTest implements TransportTest { - @Override - @Test - public void requestChannel512() { + private final TransportPair transportPair = + new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server) -> LocalClientTransport.create(address), + LocalServerTransport::create); - } + @Override + public Duration getTimeout() { + return Duration.ofSeconds(10); + } - @Override - public Duration getTimeout() { - return Duration.ofSeconds(10); - } - - @Override - public TransportPair getTransportPair() { - return transportPair; - }*/ + @Override + public TransportPair getTransportPair() { + return transportPair; + } } diff --git a/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java new file mode 100644 index 000000000..1597dc6d3 --- /dev/null +++ b/rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalTransportWithFragmentationTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2015-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.transport.local; + +import io.rsocket.test.FragmentationTransportTest; +import java.time.Duration; +import java.util.UUID; + +final class LocalTransportWithFragmentationTest implements FragmentationTransportTest { + + private final TransportPair transportPair = + new TransportPair<>( + () -> "test-" + UUID.randomUUID(), + (address, server) -> LocalClientTransport.create(address), + LocalServerTransport::create); + + @Override + public Duration getTimeout() { + return Duration.ofSeconds(10); + } + + @Override + public TransportPair getTransportPair() { + return transportPair; + } +} diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java index 618708bf0..80c8b8256 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java @@ -31,7 +31,6 @@ public final class TcpDuplexConnection extends BaseDuplexConnection { private final Connection connection; - private final boolean encodeLength; /** * Creates a new instance @@ -39,20 +38,6 @@ public final class TcpDuplexConnection extends BaseDuplexConnection { * @param connection the {@link Connection} for managing the server */ public TcpDuplexConnection(Connection connection) { - this(connection, true); - } - - /** - * Creates a new instance - * - * @param encodeLength indicates if this connection should encode the length or not. - * @param connection the {@link Connection} to for managing the server - * @deprecated as of 1.0.1 in favor of using {@link #TcpDuplexConnection(Connection)} and hence - * {@code encodeLength} should always be true. - */ - @Deprecated - public TcpDuplexConnection(Connection connection, boolean encodeLength) { - this.encodeLength = encodeLength; this.connection = Objects.requireNonNull(connection, "connection must not be null"); connection @@ -78,7 +63,7 @@ protected void doOnClose() { @Override public Flux receive() { - return connection.inbound().receive().map(this::decode); + return connection.inbound().receive().map(FrameLengthCodec::frame); } @Override @@ -90,18 +75,6 @@ public Mono send(Publisher frames) { } private ByteBuf encode(ByteBuf frame) { - if (encodeLength) { - return FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame); - } else { - return frame; - } - } - - private ByteBuf decode(ByteBuf frame) { - if (encodeLength) { - return FrameLengthCodec.frame(frame).retain(); - } else { - return frame; - } + return FrameLengthCodec.encode(alloc(), frame.readableBytes(), frame); } } diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java index 0183ef19d..a3745bd1f 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java @@ -68,7 +68,7 @@ protected void doOnClose() { @Override public Flux receive() { - return connection.inbound().receive().map(ByteBuf::retain); + return connection.inbound().receive(); } @Override