Skip to content

Huge performance degradation when fragmentation is enabled #994

@koldat

Description

@koldat

When one define custom mtu to be used for fragment size it significantly degrades performance. Attached example code sends 1M records with size 5 bytes. Before fix it takes 39 seconds. After fix it takes 5 seconds (same time as with no custom fragmentation). We need to enable it, because Websocket max data size is 64kB and we support both transports.

Expected Behavior

Fragmentation setup should not affect performance. Especially when data buffers are smaller than mtu.

Actual Behavior

There is huge performance degradation.

Steps to Reproduce

public class RsocketTest {

    private static final Logger logger = LoggerFactory.getLogger(RsocketTest.class);

    public static void main(String[] args) {
        RSocket rsocket = new RSocket() {
            @Override
            public Flux<Payload> requestStream(Payload payload) {
                ByteBuffer data = ByteBuffer.wrap(new byte[5]);
                return Flux.generate(() -> new AtomicInteger(1000000), (state, s) -> {
                    if (state.decrementAndGet() == 0) {
                        s.complete();
                    } else {
                        s.next(DefaultPayload.create(data));
                    }
                    return state;
                });
            }
        };

        RSocketServer.create(SocketAcceptor.with(rsocket))
            .fragment(60000)
            .bind(TcpServerTransport.create(
                TcpServer.create().host("localhost").port(7000)))
            .subscribe();

        RSocket socket = RSocketConnector.create()
            .fragment(60000)
            .connect(TcpClientTransport.create(
                TcpClient.create().host("localhost").port(7000)))
            .block();

        Flux<Payload> res = socket.requestStream(DefaultPayload.create(ByteBuffer.wrap("Hello".getBytes())));
        long start = System.currentTimeMillis();
        res
            .limitRate(1000)
            .blockLast();
        
        logger.info("Call took: {}", System.currentTimeMillis() - start);
        
        socket.dispose();
    }
}

Possible Solution

FragmentationDuplexConnection change (I am creating PR now):

  @Override
  public Mono<Void> send(Publisher<ByteBuf> frames) {
    return delegate.send(Flux.from(frames)
        .concatMap(frame -> {
            FrameType frameType = FrameHeaderCodec.frameType(frame);
            int readableBytes = frame.readableBytes();
            if (!shouldFragment(frameType, readableBytes)) {
              return Flux.just(frame);
            }
            
            return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)));
        }));
  }

  @Override
  public Mono<Void> sendOne(ByteBuf frame) {
    FrameType frameType = FrameHeaderCodec.frameType(frame);
    int readableBytes = frame.readableBytes();
    if (!shouldFragment(frameType, readableBytes)) {
      return delegate.sendOne(frame);
    }
    Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
    fragments = logFragments(fragments);
    return delegate.send(fragments);
  }

  protected Flux<ByteBuf> logFragments(Flux<ByteBuf> fragments) {
    if (logger.isDebugEnabled()) {
      fragments =
          fragments.doOnNext(
              byteBuf -> {
                logger.debug(
                    "{} - stream id {} - frame type {} - \n {}",
                    type,
                    FrameHeaderCodec.streamId(byteBuf),
                    FrameHeaderCodec.frameType(byteBuf),
                    ByteBufUtil.prettyHexDump(byteBuf));
              });
    }
    return fragments;
  }

Your Environment

  • RSocket version(s) used: 1.0.2

@OlegDokuka can you take a look? Can we please include this into 1.0.4?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions