-
Notifications
You must be signed in to change notification settings - Fork 357
Closed
Labels
Description
When one define custom mtu to be used for fragment size it significantly degrades performance. Attached example code sends 1M records with size 5 bytes. Before fix it takes 39 seconds. After fix it takes 5 seconds (same time as with no custom fragmentation). We need to enable it, because Websocket max data size is 64kB and we support both transports.
Expected Behavior
Fragmentation setup should not affect performance. Especially when data buffers are smaller than mtu.
Actual Behavior
There is huge performance degradation.
Steps to Reproduce
public class RsocketTest {
private static final Logger logger = LoggerFactory.getLogger(RsocketTest.class);
public static void main(String[] args) {
RSocket rsocket = new RSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
ByteBuffer data = ByteBuffer.wrap(new byte[5]);
return Flux.generate(() -> new AtomicInteger(1000000), (state, s) -> {
if (state.decrementAndGet() == 0) {
s.complete();
} else {
s.next(DefaultPayload.create(data));
}
return state;
});
}
};
RSocketServer.create(SocketAcceptor.with(rsocket))
.fragment(60000)
.bind(TcpServerTransport.create(
TcpServer.create().host("localhost").port(7000)))
.subscribe();
RSocket socket = RSocketConnector.create()
.fragment(60000)
.connect(TcpClientTransport.create(
TcpClient.create().host("localhost").port(7000)))
.block();
Flux<Payload> res = socket.requestStream(DefaultPayload.create(ByteBuffer.wrap("Hello".getBytes())));
long start = System.currentTimeMillis();
res
.limitRate(1000)
.blockLast();
logger.info("Call took: {}", System.currentTimeMillis() - start);
socket.dispose();
}
}Possible Solution
FragmentationDuplexConnection change (I am creating PR now):
@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
return delegate.send(Flux.from(frames)
.concatMap(frame -> {
FrameType frameType = FrameHeaderCodec.frameType(frame);
int readableBytes = frame.readableBytes();
if (!shouldFragment(frameType, readableBytes)) {
return Flux.just(frame);
}
return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)));
}));
}
@Override
public Mono<Void> sendOne(ByteBuf frame) {
FrameType frameType = FrameHeaderCodec.frameType(frame);
int readableBytes = frame.readableBytes();
if (!shouldFragment(frameType, readableBytes)) {
return delegate.sendOne(frame);
}
Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
fragments = logFragments(fragments);
return delegate.send(fragments);
}
protected Flux<ByteBuf> logFragments(Flux<ByteBuf> fragments) {
if (logger.isDebugEnabled()) {
fragments =
fragments.doOnNext(
byteBuf -> {
logger.debug(
"{} - stream id {} - frame type {} - \n {}",
type,
FrameHeaderCodec.streamId(byteBuf),
FrameHeaderCodec.frameType(byteBuf),
ByteBufUtil.prettyHexDump(byteBuf));
});
}
return fragments;
}Your Environment
- RSocket version(s) used: 1.0.2
@OlegDokuka can you take a look? Can we please include this into 1.0.4?