-
Notifications
You must be signed in to change notification settings - Fork 356
Closed
Labels
Description
Actual Behavior
Provided code throws IllegalReferenceCountException when I use ByteBufPayload and publish result on different thread.
There is no exception when I use DefaultPayload.create("World".getBytes()) insted of ByteBufPayload.
Steps to Reproduce
@Test
public void refErrorTest() {
CloseableChannel server = RSocketServer.create((setup, sendingRSocket) -> Mono.just(new RSocket() {
@Override
public Mono<Payload> requestResponse(Payload payload) {
return Mono.just(DefaultPayload.create("Hello, " + payload.getDataUtf8()));
}
}))
.bind(TcpServerTransport.create(TcpServer.create().host("localhost").port(7890)))
.block();
RSocket client = RSocketConnector.create()
.payloadDecoder(PayloadDecoder.ZERO_COPY)
.connect(() -> TcpClientTransport.create(TcpClient.create().host("localhost").port(7890)))
.block();
var mrrMono = client.requestResponse(ByteBufPayload.create(ByteBuffer.wrap("World".getBytes())))
.publishOn(Schedulers.parallel())//TODO if remove this line code works as expected
.map(payload -> {
var dataBuf = payload.sliceData();
payload.release();
return dataBuf.toString(StandardCharsets.UTF_8);
})
.flatMap(d -> Mono.error(new IllegalArgumentException("oops")));
StepVerifier.create(mrrMono)
.verifyError();
client.dispose();
server.dispose();
}Your Environment
- RSocket version(s) used: 1.0.1
- Other relevant libraries versions (eg.
netty, ...): 4.1.51.Final - Platform (eg. JVM version (
javar -version): java 11.0.3 2019-04-16 LTS - OS and version (eg
uname -a): windows 10