diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index 58f282110..d59b9fe97 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -99,13 +99,18 @@ public static Payload create(ByteBuf data) { } public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { - return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer()); + try { + return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer()); + } finally { + data.release(); + if (metadata != null) { + metadata.release(); + } + } } public static Payload create(Payload payload) { - return create( - Unpooled.copiedBuffer(payload.sliceData()), - payload.hasMetadata() ? Unpooled.copiedBuffer(payload.sliceMetadata()) : null); + return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java index 9607ad327..3317b4618 100644 --- a/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java +++ b/rsocket-core/src/test/java/io/rsocket/frame/SetupFrameCodecTest.java @@ -25,8 +25,8 @@ void testEncodingNoResume() { assertEquals(0, SetupFrameCodec.resumeToken(frame).readableBytes()); assertEquals("metadata_type", SetupFrameCodec.metadataMimeType(frame)); assertEquals("data_type", SetupFrameCodec.dataMimeType(frame)); - assertEquals(metadata, SetupFrameCodec.metadata(frame)); - assertEquals(data, SetupFrameCodec.data(frame)); + assertEquals(payload.metadata(), SetupFrameCodec.metadata(frame)); + assertEquals(payload.data(), SetupFrameCodec.data(frame)); assertEquals(SetupFrameCodec.CURRENT_VERSION, SetupFrameCodec.version(frame)); frame.release(); } @@ -49,8 +49,8 @@ void testEncodingResume() { assertEquals(token, SetupFrameCodec.resumeToken(frame)); assertEquals("metadata_type", SetupFrameCodec.metadataMimeType(frame)); assertEquals("data_type", SetupFrameCodec.dataMimeType(frame)); - assertEquals(metadata, SetupFrameCodec.metadata(frame)); - assertEquals(data, SetupFrameCodec.data(frame)); + assertEquals(payload.metadata(), SetupFrameCodec.metadata(frame)); + assertEquals(payload.data(), SetupFrameCodec.data(frame)); assertEquals(SetupFrameCodec.CURRENT_VERSION, SetupFrameCodec.version(frame)); frame.release(); } diff --git a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java index 6bae0886b..3f97ab9dc 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java @@ -19,9 +19,13 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.Unpooled; import io.rsocket.Payload; +import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import java.nio.ByteBuffer; +import java.util.concurrent.ThreadLocalRandom; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -74,4 +78,32 @@ public void shouldIndicateThatItHasMetadata2() { Assertions.assertThat(payload.hasMetadata()).isTrue(); } + + @Test + public void shouldReleaseGivenByteBufDataAndMetadataUpOnPayloadCreation() { + LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + for (byte i = 0; i < 126; i++) { + ByteBuf data = allocator.buffer(); + data.writeByte(i); + + boolean metadataPresent = ThreadLocalRandom.current().nextBoolean(); + ByteBuf metadata = null; + if (metadataPresent) { + metadata = allocator.buffer(); + metadata.writeByte(i + 1); + } + + Payload payload = DefaultPayload.create(data, metadata); + + Assertions.assertThat(payload.getData()).isEqualTo(ByteBuffer.wrap(new byte[] {i})); + + Assertions.assertThat(payload.getMetadata()) + .isEqualTo( + metadataPresent + ? ByteBuffer.wrap(new byte[] {(byte) (i + 1)}) + : DefaultPayload.EMPTY_BUFFER); + allocator.assertHasNoLeaks(); + } + } }