diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java index 178cc4fa9..e23bcceb2 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketFactory.java @@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor { private Resume resume; public ClientRSocketFactory() { - this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace)); + this(RSocketConnector.create()); } public ClientRSocketFactory(RSocketConnector connector) { @@ -393,9 +393,13 @@ public ClientRSocketFactory fragment(int mtu) { return this; } - /** @deprecated this is deprecated with no replacement. */ + /** + * @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In + * order to observe errors, it is recommended to add error handler using {@code doOnError} + * on the specific logical stream. In order to observe connection, or RSocket terminal + * errors, it is recommended to hook on {@link Closeable#onClose()} handler. + */ public ClientRSocketFactory errorConsumer(Consumer errorConsumer) { - connector.errorConsumer(errorConsumer); return this; } @@ -417,7 +421,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor { private Resume resume; public ServerRSocketFactory() { - this(RSocketServer.create().errorConsumer(Throwable::printStackTrace)); + this(RSocketServer.create()); } public ServerRSocketFactory(RSocketServer server) { @@ -497,9 +501,13 @@ public ServerRSocketFactory fragment(int mtu) { return this; } - /** @deprecated this is deprecated with no replacement. */ + /** + * @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In + * order to observe errors, it is recommended to add error handler using {@code doOnError} + * on the specific logical stream. In order to observe connection, or RSocket terminal + * errors, it is recommended to hook on {@link Closeable#onClose()} handler. + */ public ServerRSocketFactory errorConsumer(Consumer errorConsumer) { - server.errorConsumer(errorConsumer); return this; } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 38393c27d..d9622e0f0 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -91,8 +91,6 @@ public class RSocketConnector { private int mtu = 0; private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; - private Consumer errorConsumer = ex -> {}; - private RSocketConnector() {} /** @@ -436,17 +434,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) { return this; } - /** - * @deprecated this is deprecated with no replacement and will be removed after {@link - * io.rsocket.RSocketFactory} is removed. - */ - @Deprecated - public RSocketConnector errorConsumer(Consumer errorConsumer) { - Objects.requireNonNull(errorConsumer); - this.errorConsumer = errorConsumer; - return this; - } - /** * The final step to connect with the transport to use as input and the resulting {@code * Mono} as output. Each subscriber to the returned {@code Mono} starts a new connection @@ -524,7 +511,6 @@ public Mono connect(Supplier transportSupplier) { new RSocketRequester( multiplexer.asClientConnection(), payloadDecoder, - errorConsumer, StreamIdSupplier.clientSupplier(), mtu, (int) keepAliveInterval.toMillis(), @@ -564,7 +550,6 @@ public Mono connect(Supplier transportSupplier) { CLIENT_TAG, wrappedConnection.alloc(), leases.sender(), - errorConsumer, leases.stats()) : ResponderLeaseHandler.None; @@ -573,7 +558,6 @@ public Mono connect(Supplier transportSupplier) { multiplexer.asServerConnection(), wrappedRSocketHandler, payloadDecoder, - errorConsumer, responderLeaseHandler, mtu); diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index 3de074953..cdfda0755 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -59,6 +59,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -74,9 +76,8 @@ * Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer */ class RSocketRequester implements RSocket { - private static final AtomicReferenceFieldUpdater TERMINATION_ERROR = - AtomicReferenceFieldUpdater.newUpdater( - RSocketRequester.class, Throwable.class, "terminationError"); + private static final Logger LOGGER = LoggerFactory.getLogger(RSocketRequester.class); + private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); private static final Consumer DROPPED_ELEMENTS_CONSUMER = referenceCounted -> { @@ -93,9 +94,14 @@ class RSocketRequester implements RSocket { CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]); } + private volatile Throwable terminationError; + + private static final AtomicReferenceFieldUpdater TERMINATION_ERROR = + AtomicReferenceFieldUpdater.newUpdater( + RSocketRequester.class, Throwable.class, "terminationError"); + private final DuplexConnection connection; private final PayloadDecoder payloadDecoder; - private final Consumer errorConsumer; private final StreamIdSupplier streamIdSupplier; private final IntObjectMap senders; private final IntObjectMap> receivers; @@ -104,14 +110,12 @@ class RSocketRequester implements RSocket { private final RequesterLeaseHandler leaseHandler; private final ByteBufAllocator allocator; private final KeepAliveFramesAcceptor keepAliveFramesAcceptor; - private volatile Throwable terminationError; private final MonoProcessor onClose; private final Scheduler serialScheduler; RSocketRequester( DuplexConnection connection, PayloadDecoder payloadDecoder, - Consumer errorConsumer, StreamIdSupplier streamIdSupplier, int mtu, int keepAliveTickPeriod, @@ -122,7 +126,6 @@ class RSocketRequester implements RSocket { this.connection = connection; this.allocator = connection.alloc(); this.payloadDecoder = payloadDecoder; - this.errorConsumer = errorConsumer; this.streamIdSupplier = streamIdSupplier; this.mtu = mtu; this.leaseHandler = leaseHandler; @@ -140,7 +143,7 @@ class RSocketRequester implements RSocket { .subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose); connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError); - connection.receive().subscribe(this::handleIncomingFrames, errorConsumer); + connection.receive().subscribe(this::handleIncomingFrames, e -> {}); if (keepAliveTickPeriod != 0 && keepAliveHandler != null) { KeepAliveSupport keepAliveSupport = @@ -396,7 +399,6 @@ private Flux handleChannel(Flux request) { payload.release(); final IllegalArgumentException t = new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - errorConsumer.accept(t); return Mono.error(t); } return handleChannel(payload, flux); @@ -446,7 +448,6 @@ protected void hookOnNext(Payload payload) { cancel(); final IllegalArgumentException t = new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE); - errorConsumer.accept(t); // no need to send any errors. sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId)); receiver.onError(t); @@ -609,9 +610,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) { break; default: // Ignore unknown frames. Throwing an error will close the socket. - errorConsumer.accept( - new IllegalStateException( - "Client received supported frame on stream 0: " + frame.toString())); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("Requester received unsupported frame on stream 0: " + frame.toString()); + } } } @@ -668,7 +669,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { } default: throw new IllegalStateException( - "Client received supported frame on stream " + streamId + ": " + frame.toString()); + "Requester received unsupported frame on stream " + streamId + ": " + frame.toString()); } } @@ -736,7 +737,9 @@ private void terminate(Throwable e) { try { receiver.onError(e); } catch (Throwable t) { - errorConsumer.accept(t); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); + } } }); } @@ -748,14 +751,15 @@ private void terminate(Throwable e) { try { sender.cancel(); } catch (Throwable t) { - errorConsumer.accept(t); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); + } } }); } senders.clear(); receivers.clear(); sendProcessor.dispose(); - errorConsumer.accept(e); onClose.onError(e); } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java index d5f9206d8..ca5e605c7 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java @@ -43,6 +43,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import reactor.core.Disposable; import reactor.core.Exceptions; import reactor.core.publisher.*; @@ -50,6 +52,8 @@ /** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */ class RSocketResponder implements RSocket { + private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class); + private static final Consumer DROPPED_ELEMENTS_CONSUMER = referenceCounted -> { if (referenceCounted.refCnt() > 0) { @@ -69,7 +73,6 @@ class RSocketResponder implements RSocket { private final io.rsocket.ResponderRSocket responderRSocket; private final PayloadDecoder payloadDecoder; - private final Consumer errorConsumer; private final ResponderLeaseHandler leaseHandler; private final Disposable leaseHandlerDisposable; private final MonoProcessor onClose; @@ -87,12 +90,10 @@ class RSocketResponder implements RSocket { private final UnboundedProcessor sendProcessor; private final ByteBufAllocator allocator; - @SuppressWarnings("deprecation") RSocketResponder( DuplexConnection connection, RSocket requestHandler, PayloadDecoder payloadDecoder, - Consumer errorConsumer, ResponderLeaseHandler leaseHandler, int mtu) { this.connection = connection; @@ -106,7 +107,6 @@ class RSocketResponder implements RSocket { : null; this.payloadDecoder = payloadDecoder; - this.errorConsumer = errorConsumer; this.leaseHandler = leaseHandler; this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>(); this.channelProcessors = new SynchronizedIntObjectHashMap<>(); @@ -118,7 +118,7 @@ class RSocketResponder implements RSocket { connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError); - connection.receive().subscribe(this::handleFrame, errorConsumer); + connection.receive().subscribe(this::handleFrame, e -> {}); leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized); this.connection @@ -135,7 +135,9 @@ private void handleSendProcessorError(Throwable t) { try { subscription.cancel(); } catch (Throwable e) { - errorConsumer.accept(e); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); + } } }); @@ -146,7 +148,9 @@ private void handleSendProcessorError(Throwable t) { try { subscription.onError(t); } catch (Throwable e) { - errorConsumer.accept(e); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); + } } }); } @@ -375,9 +379,7 @@ protected void hookOnSubscribe(Subscription subscription) { } @Override - protected void hookOnError(Throwable throwable) { - errorConsumer.accept(throwable); - } + protected void hookOnError(Throwable throwable) {} @Override protected void hookFinally(SignalType type) { @@ -583,9 +585,7 @@ protected void hookOnSubscribe(Subscription subscription) { } @Override - protected void hookOnError(Throwable throwable) { - errorConsumer.accept(throwable); - } + protected void hookOnError(Throwable throwable) {} }); } @@ -599,7 +599,6 @@ private void handleCancelFrame(int streamId) { } private void handleError(int streamId, Throwable t) { - errorConsumer.accept(t); sendProcessor.onNext(ErrorFrameCodec.encode(allocator, streamId, t)); } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java index a0f7c810b..6390d44dd 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java @@ -67,8 +67,6 @@ public final class RSocketServer { private int mtu = 0; private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT; - private Consumer errorConsumer = ex -> {}; - private RSocketServer() {} /** Static factory method to create an {@code RSocketServer}. */ @@ -247,16 +245,6 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) { return this; } - /** - * @deprecated this is deprecated with no replacement and will be removed after {@link - * io.rsocket.RSocketFactory} is removed. - */ - @Deprecated - public RSocketServer errorConsumer(Consumer errorConsumer) { - this.errorConsumer = errorConsumer; - return this; - } - /** * Start the server on the given transport. * @@ -395,7 +383,6 @@ private Mono acceptSetup( new RSocketRequester( wrappedMultiplexer.asServerConnection(), payloadDecoder, - errorConsumer, StreamIdSupplier.serverSupplier(), mtu, setupPayload.keepAliveInterval(), @@ -422,11 +409,7 @@ private Mono acceptSetup( ResponderLeaseHandler responderLeaseHandler = leaseEnabled ? new ResponderLeaseHandler.Impl<>( - SERVER_TAG, - connection.alloc(), - leases.sender(), - errorConsumer, - leases.stats()) + SERVER_TAG, connection.alloc(), leases.sender(), leases.stats()) : ResponderLeaseHandler.None; RSocket rSocketResponder = @@ -434,7 +417,6 @@ private Mono acceptSetup( connection, wrappedRSocketHandler, payloadDecoder, - errorConsumer, responderLeaseHandler, mtu); }) diff --git a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java index 0d24c51d8..038120efc 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java @@ -119,10 +119,7 @@ public ClientServerInputMultiplexer( break; } }, - t -> { - LOGGER.error("Error receiving frame:", t); - dispose(); - }); + t -> {}); } public DuplexConnection asClientServerConnection() { diff --git a/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java b/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java index 2035ade87..df8787cb7 100644 --- a/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java +++ b/rsocket-core/src/main/java/io/rsocket/lease/ResponderLeaseHandler.java @@ -41,7 +41,6 @@ final class Impl implements ResponderLeaseHandler { private final String tag; private final ByteBufAllocator allocator; private final Function, Flux> leaseSender; - private final Consumer errorConsumer; private final Optional leaseStatsOption; private final T leaseStats; @@ -49,12 +48,10 @@ public Impl( String tag, ByteBufAllocator allocator, Function, Flux> leaseSender, - Consumer errorConsumer, Optional leaseStatsOption) { this.tag = tag; this.allocator = allocator; this.leaseSender = leaseSender; - this.errorConsumer = errorConsumer; this.leaseStatsOption = leaseStatsOption; this.leaseStats = leaseStatsOption.orElse(null); } @@ -86,8 +83,7 @@ public Disposable send(Consumer leaseFrameSender) { lease -> { currentLease = create(lease); leaseFrameSender.accept(createLeaseFrame(lease)); - }, - errorConsumer); + }); } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java b/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java index 20972a0d3..ac5832aaa 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java +++ b/rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java @@ -21,8 +21,6 @@ import io.rsocket.buffer.LeaksTrackingByteBufAllocator; import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.test.util.TestSubscriber; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.junit.Assert; import org.junit.rules.ExternalResource; import org.junit.runner.Description; import org.junit.runners.model.Statement; @@ -33,7 +31,6 @@ public abstract class AbstractSocketRule extends ExternalReso protected TestDuplexConnection connection; protected Subscriber connectSub; protected T socket; - protected ConcurrentLinkedQueue errors; protected LeaksTrackingByteBufAllocator allocator; @Override @@ -44,7 +41,6 @@ public void evaluate() throws Throwable { allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); connection = new TestDuplexConnection(allocator); connectSub = TestSubscriber.create(); - errors = new ConcurrentLinkedQueue<>(); init(); base.evaluate(); } @@ -57,12 +53,6 @@ protected void init() { protected abstract T newRSocket(); - public void assertNoConnectionErrors() { - if (errors.size() > 1) { - Assert.fail("No connection errors expected: " + errors.peek().toString()); - } - } - public ByteBufAllocator alloc() { return allocator; } diff --git a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java index b3ded08ec..d98f86113 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/KeepAliveTest.java @@ -35,9 +35,6 @@ import io.rsocket.test.util.TestDuplexConnection; import io.rsocket.util.DefaultPayload; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,12 +54,10 @@ static RSocketState requester(int tickPeriod, int timeout) { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection connection = new TestDuplexConnection(allocator); - Errors errors = new Errors(); RSocketRequester rSocket = new RSocketRequester( connection, DefaultPayload::create, - errors, StreamIdSupplier.clientSupplier(), 0, tickPeriod, @@ -70,7 +65,7 @@ static RSocketState requester(int tickPeriod, int timeout) { new DefaultKeepAliveHandler(connection), RequesterLeaseHandler.None, TestScheduler.INSTANCE); - return new RSocketState(rSocket, errors, allocator, connection); + return new RSocketState(rSocket, allocator, connection); } static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { @@ -85,12 +80,10 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { Duration.ofSeconds(10), false); - Errors errors = new Errors(); RSocketRequester rSocket = new RSocketRequester( resumableConnection, DefaultPayload::create, - errors, StreamIdSupplier.clientSupplier(), 0, tickPeriod, @@ -98,7 +91,7 @@ static ResumableRSocketState resumableRequester(int tickPeriod, int timeout) { new ResumableKeepAliveHandler(resumableConnection), RequesterLeaseHandler.None, TestScheduler.INSTANCE); - return new ResumableRSocketState(rSocket, errors, connection, resumableConnection, allocator); + return new ResumableRSocketState(rSocket, connection, resumableConnection, allocator); } @BeforeEach @@ -121,10 +114,8 @@ void rSocketNotDisposedOnPresentKeepAlives() { Mono.delay(Duration.ofMillis(2000)).block(); RSocket rSocket = requesterState.rSocket(); - List errors = requesterState.errors().errors(); Assertions.assertThat(rSocket.isDisposed()).isFalse(); - Assertions.assertThat(errors).isEmpty(); } @Test @@ -143,11 +134,12 @@ void rSocketDisposedOnMissingKeepAlives() { Mono.delay(Duration.ofMillis(2000)).block(); - List errors = requesterState.errors().errors(); Assertions.assertThat(rSocket.isDisposed()).isTrue(); - Assertions.assertThat(errors).hasSize(1); - Throwable throwable = errors.get(0); - Assertions.assertThat(throwable).isInstanceOf(ConnectionErrorException.class); + rSocket + .onClose() + .as(StepVerifier::create) + .expectError(ConnectionErrorException.class) + .verify(Duration.ofMillis(100)); } @Test @@ -224,13 +216,11 @@ void resumableRequesterNoKeepAlivesAfterDispose() { @Test void resumableRSocketsNotDisposedOnMissingKeepAlives() { RSocket rSocket = resumableRequesterState.rSocket(); - List errors = resumableRequesterState.errors().errors(); TestDuplexConnection connection = resumableRequesterState.connection(); Mono.delay(Duration.ofMillis(500)).block(); Assertions.assertThat(rSocket.isDisposed()).isFalse(); - Assertions.assertThat(errors).hasSize(0); Assertions.assertThat(connection.isDisposed()).isTrue(); } @@ -248,17 +238,12 @@ private boolean keepAliveFrameWithoutRespondFlag(ByteBuf frame) { static class RSocketState { private final RSocket rSocket; - private final Errors errors; private final TestDuplexConnection connection; private final LeaksTrackingByteBufAllocator allocator; public RSocketState( - RSocket rSocket, - Errors errors, - LeaksTrackingByteBufAllocator allocator, - TestDuplexConnection connection) { + RSocket rSocket, LeaksTrackingByteBufAllocator allocator, TestDuplexConnection connection) { this.rSocket = rSocket; - this.errors = errors; this.connection = connection; this.allocator = allocator; } @@ -271,10 +256,6 @@ public RSocket rSocket() { return rSocket; } - public Errors errors() { - return errors; - } - public LeaksTrackingByteBufAllocator alloc() { return allocator; } @@ -282,19 +263,16 @@ public LeaksTrackingByteBufAllocator alloc() { static class ResumableRSocketState { private final RSocket rSocket; - private final Errors errors; private final TestDuplexConnection connection; private final ResumableDuplexConnection resumableDuplexConnection; private final LeaksTrackingByteBufAllocator allocator; public ResumableRSocketState( RSocket rSocket, - Errors errors, TestDuplexConnection connection, ResumableDuplexConnection resumableDuplexConnection, LeaksTrackingByteBufAllocator allocator) { this.rSocket = rSocket; - this.errors = errors; this.connection = connection; this.resumableDuplexConnection = resumableDuplexConnection; this.allocator = allocator; @@ -312,25 +290,8 @@ public RSocket rSocket() { return rSocket; } - public Errors errors() { - return errors; - } - public LeaksTrackingByteBufAllocator alloc() { return allocator; } } - - static class Errors implements Consumer { - private final List errors = new ArrayList<>(); - - @Override - public void accept(Throwable throwable) { - errors.add(throwable); - } - - public List errors() { - return new ArrayList<>(errors); - } - } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java index ddfbe4234..ab336b8cd 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketLeaseTest.java @@ -85,7 +85,7 @@ void setUp() { requesterLeaseHandler = new RequesterLeaseHandler.Impl(TAG, leases -> leaseReceiver = leases); responderLeaseHandler = new ResponderLeaseHandler.Impl<>( - TAG, byteBufAllocator, stats -> leaseSender, err -> {}, Optional.empty()); + TAG, byteBufAllocator, stats -> leaseSender, Optional.empty()); ClientServerInputMultiplexer multiplexer = new ClientServerInputMultiplexer(connection, new InitializingInterceptorRegistry(), true); @@ -93,7 +93,6 @@ void setUp() { new RSocketRequester( multiplexer.asClientConnection(), payloadDecoder, - err -> {}, StreamIdSupplier.clientSupplier(), 0, 0, @@ -114,7 +113,6 @@ void setUp() { multiplexer.asServerConnection(), mockRSocketHandler, payloadDecoder, - err -> {}, responderLeaseHandler, 0); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java index 00f74152a..4cd3a3a26 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterSubscribersTest.java @@ -66,7 +66,6 @@ void setUp() { new RSocketRequester( connection, PayloadDecoder.DEFAULT, - err -> {}, StreamIdSupplier.clientSupplier(), 0, 0, diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index e4943e9b0..1ba75f75a 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -24,10 +24,8 @@ import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; import static io.rsocket.frame.FrameType.REQUEST_STREAM; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; @@ -122,13 +120,9 @@ public void tearDown() { @Test @Timeout(2_000) - public void testInvalidFrameOnStream0() { + public void testInvalidFrameOnStream0ShouldNotTerminateRSocket() { rule.connection.addToReceivedBuffer(RequestNFrameCodec.encode(rule.alloc(), 0, 10)); - assertThat("Unexpected errors.", rule.errors, hasSize(1)); - assertThat( - "Unexpected error received.", - rule.errors, - contains(instanceOf(IllegalStateException.class))); + Assertions.assertThat(rule.socket.isDisposed()).isFalse(); rule.assertHasNoLeaks(); } @@ -167,11 +161,8 @@ protected void hookOnSubscribe(Subscription subscription) { public void testHandleSetupException() { rule.connection.addToReceivedBuffer( ErrorFrameCodec.encode(rule.alloc(), 0, new RejectedSetupException("boom"))); - assertThat("Unexpected errors.", rule.errors, hasSize(1)); - assertThat( - "Unexpected error received.", - rule.errors, - contains(instanceOf(RejectedSetupException.class))); + Assertions.assertThatThrownBy(() -> rule.socket.onClose().block()) + .isInstanceOf(RejectedSetupException.class); rule.assertHasNoLeaks(); } @@ -242,7 +233,12 @@ public void testRequestReplyErrorOnSend() { Subscriber responseSub = TestSubscriber.create(10); response.subscribe(responseSub); - this.rule.assertNoConnectionErrors(); + this.rule + .socket + .onClose() + .as(StepVerifier::create) + .expectComplete() + .verify(Duration.ofMillis(100)); verify(responseSub).onSubscribe(any(Subscription.class)); @@ -1006,7 +1002,6 @@ protected RSocketRequester newRSocket() { return new RSocketRequester( connection, PayloadDecoder.ZERO_COPY, - throwable -> errors.add(throwable), StreamIdSupplier.clientSupplier(), 0, 0, diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index e34973848..036dc2eef 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -59,7 +59,6 @@ import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.util.Collection; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -138,7 +137,6 @@ public void testHandleResponseFrameNoError() throws Exception { Collection> sendSubscribers = rule.connection.getSendSubscribers(); assertThat("Request not sent.", sendSubscribers, hasSize(1)); - assertThat("Unexpected error.", rule.errors, is(empty())); Subscriber sendSub = sendSubscribers.iterator().next(); assertThat( "Unexpected frame sent.", @@ -152,7 +150,6 @@ public void testHandleResponseFrameNoError() throws Exception { public void testHandlerEmitsError() throws Exception { final int streamId = 4; rule.sendRequest(streamId, FrameType.REQUEST_STREAM); - assertThat("Unexpected error.", rule.errors, is(empty())); assertThat( "Unexpected frame sent.", frameType(rule.connection.awaitSend()), is(FrameType.ERROR)); } @@ -173,7 +170,6 @@ public Mono requestResponse(Payload payload) { }); rule.sendRequest(streamId, FrameType.REQUEST_RESPONSE); - assertThat("Unexpected error.", rule.errors, is(empty())); assertThat("Unexpected frame sent.", rule.connection.getSent(), is(empty())); rule.connection.addToReceivedBuffer(CancelFrameCodec.encode(allocator, streamId)); @@ -232,10 +228,6 @@ protected void hookOnSubscribe(Subscription subscription) { for (Runnable runnable : runnables) { rule.connection.clearSendReceiveBuffers(); runnable.run(); - Assertions.assertThat(rule.errors) - .first() - .isInstanceOf(IllegalArgumentException.class) - .hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE); Assertions.assertThat(rule.connection.getSent()) .hasSize(1) .first() @@ -778,7 +770,6 @@ public void setAcceptingSocket(RSocket acceptingSocket) { this.acceptingSocket = acceptingSocket; connection = new TestDuplexConnection(alloc()); connectSub = TestSubscriber.create(); - errors = new ConcurrentLinkedQueue<>(); this.prefetch = Integer.MAX_VALUE; super.init(); } @@ -787,7 +778,6 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { this.acceptingSocket = acceptingSocket; connection = new TestDuplexConnection(alloc()); connectSub = TestSubscriber.create(); - errors = new ConcurrentLinkedQueue<>(); this.prefetch = prefetch; super.init(); } @@ -795,12 +785,7 @@ public void setAcceptingSocket(RSocket acceptingSocket, int prefetch) { @Override protected RSocketResponder newRSocket() { return new RSocketResponder( - connection, - acceptingSocket, - PayloadDecoder.ZERO_COPY, - throwable -> errors.add(throwable), - ResponderLeaseHandler.None, - 0); + connection, acceptingSocket, PayloadDecoder.ZERO_COPY, ResponderLeaseHandler.None, 0); } private void sendRequest(int streamId, FrameType frameType) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java index 48ce150d6..f032942db 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketTest.java @@ -16,11 +16,6 @@ package io.rsocket.core; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.rsocket.Payload; @@ -34,24 +29,18 @@ import io.rsocket.lease.RequesterLeaseHandler; import io.rsocket.lease.ResponderLeaseHandler; import io.rsocket.test.util.LocalDuplexConnection; -import io.rsocket.test.util.TestSubscriber; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; import java.time.Duration; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.Assertions; -import org.hamcrest.MatcherAssert; -import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExternalResource; import org.junit.runner.Description; import org.junit.runners.model.Statement; -import org.mockito.ArgumentCaptor; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -62,16 +51,6 @@ public class RSocketTest { @Rule public final SocketRule rule = new SocketRule(); - public static void assertError(String s, String mode, ArrayList errors) { - for (Throwable t : errors) { - if (t.toString().equals(s)) { - return; - } - } - - Assert.fail("Expected " + mode + " connection error: " + s + " other errors " + errors.size()); - } - @Test(timeout = 2_000) public void testRequestReplyNoError() { StepVerifier.create(rule.crs.requestResponse(DefaultPayload.create("hello"))) @@ -89,14 +68,15 @@ public Mono requestResponse(Payload payload) { return Mono.error(new NullPointerException("Deliberate exception.")); } }); - Subscriber subscriber = TestSubscriber.create(); - rule.crs.requestResponse(EmptyPayload.INSTANCE).subscribe(subscriber); - verify(subscriber).onError(any(ApplicationErrorException.class)); - - // Client sees error through normal API - rule.assertNoClientErrors(); - - rule.assertServerError("java.lang.NullPointerException: Deliberate exception."); + rule.crs + .requestResponse(EmptyPayload.INSTANCE) + .as(StepVerifier::create) + .expectErrorSatisfies( + t -> + Assertions.assertThat(t) + .isInstanceOf(ApplicationErrorException.class) + .hasMessage("Deliberate exception.")) + .verify(Duration.ofMillis(100)); } @Test(timeout = 2000) @@ -109,21 +89,16 @@ public Mono requestResponse(Payload payload) { new CustomRSocketException(0x00000501, "Deliberate Custom exception.")); } }); - Subscriber subscriber = TestSubscriber.create(); - rule.crs.requestResponse(EmptyPayload.INSTANCE).subscribe(subscriber); - ArgumentCaptor customRSocketExceptionArgumentCaptor = - ArgumentCaptor.forClass(CustomRSocketException.class); - verify(subscriber).onError(customRSocketExceptionArgumentCaptor.capture()); - - Assert.assertEquals( - "Deliberate Custom exception.", - customRSocketExceptionArgumentCaptor.getValue().getMessage()); - Assert.assertEquals(0x00000501, customRSocketExceptionArgumentCaptor.getValue().errorCode()); - - // Client sees error through normal API - rule.assertNoClientErrors(); - - rule.assertServerError("CustomRSocketException (0x501): Deliberate Custom exception."); + rule.crs + .requestResponse(EmptyPayload.INSTANCE) + .as(StepVerifier::create) + .expectErrorSatisfies( + t -> + Assertions.assertThat(t) + .isInstanceOf(CustomRSocketException.class) + .hasMessage("Deliberate Custom exception.") + .hasFieldOrPropertyWithValue("errorCode", 0x00000501)) + .verify(); } @Test(timeout = 2000) @@ -147,9 +122,6 @@ public Flux requestChannel(Publisher payloads) { .expectNextCount(3) .expectComplete() .verify(Duration.ofMillis(5000)); - - rule.assertNoClientErrors(); - rule.assertNoServerErrors(); } @Test(timeout = 2000) @@ -413,8 +385,6 @@ public static class SocketRule extends ExternalResource { private RSocketResponder srs; private RSocket requestAcceptor; - private ArrayList clientErrors = new ArrayList<>(); - private ArrayList serverErrors = new ArrayList<>(); private LeaksTrackingByteBufAllocator allocator; @@ -479,7 +449,6 @@ public Flux requestChannel(Publisher payloads) { serverConnection, requestAcceptor, PayloadDecoder.DEFAULT, - throwable -> serverErrors.add(throwable), ResponderLeaseHandler.None, 0); @@ -487,7 +456,6 @@ public Flux requestChannel(Publisher payloads) { new RSocketRequester( clientConnection, PayloadDecoder.DEFAULT, - throwable -> clientErrors.add(throwable), StreamIdSupplier.clientSupplier(), 0, 0, @@ -501,28 +469,5 @@ public void setRequestAcceptor(RSocket requestAcceptor) { this.requestAcceptor = requestAcceptor; init(); } - - public void assertNoErrors() { - assertNoClientErrors(); - assertNoServerErrors(); - } - - public void assertNoClientErrors() { - MatcherAssert.assertThat( - "Unexpected error on the client connection.", clientErrors, is(empty())); - } - - public void assertNoServerErrors() { - MatcherAssert.assertThat( - "Unexpected error on the server connection.", serverErrors, is(empty())); - } - - public void assertClientError(String s) { - assertError(s, "client", this.clientErrors); - } - - public void assertServerError(String s) { - assertError(s, "server", this.serverErrors); - } } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java index 75a5e070e..2957a051e 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/SetupRejectionTest.java @@ -18,8 +18,6 @@ import io.rsocket.transport.ServerTransport; import io.rsocket.util.DefaultPayload; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; @@ -53,12 +51,10 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); TestDuplexConnection conn = new TestDuplexConnection(allocator); - List errors = new ArrayList<>(); RSocketRequester rSocket = new RSocketRequester( conn, DefaultPayload::create, - errors::add, StreamIdSupplier.clientSupplier(), 0, 0, @@ -83,7 +79,6 @@ void requesterStreamsTerminatedOnZeroErrorFrame() { err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage())) .verify(Duration.ofSeconds(5)); - assertThat(errors).hasSize(1); assertThat(rSocket.isDisposed()).isTrue(); } @@ -96,7 +91,6 @@ void requesterNewStreamsTerminatedAfterZeroErrorFrame() { new RSocketRequester( conn, DefaultPayload::create, - err -> {}, StreamIdSupplier.clientSupplier(), 0, 0,