Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/RSocketFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
private Resume resume;

public ClientRSocketFactory() {
this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace));
this(RSocketConnector.create());
}

public ClientRSocketFactory(RSocketConnector connector) {
Expand Down Expand Up @@ -393,9 +393,13 @@ public ClientRSocketFactory fragment(int mtu) {
return this;
}

/** @deprecated this is deprecated with no replacement. */
/**
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
* order to observe errors, it is recommended to add error handler using {@code doOnError}
* on the specific logical stream. In order to observe connection, or RSocket terminal
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
*/
public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
connector.errorConsumer(errorConsumer);
return this;
}

Expand All @@ -417,7 +421,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor {
private Resume resume;

public ServerRSocketFactory() {
this(RSocketServer.create().errorConsumer(Throwable::printStackTrace));
this(RSocketServer.create());
}

public ServerRSocketFactory(RSocketServer server) {
Expand Down Expand Up @@ -497,9 +501,13 @@ public ServerRSocketFactory fragment(int mtu) {
return this;
}

/** @deprecated this is deprecated with no replacement. */
/**
* @deprecated this handler is deliberately no-ops and is deprecated with no replacement. In
* order to observe errors, it is recommended to add error handler using {@code doOnError}
* on the specific logical stream. In order to observe connection, or RSocket terminal
* errors, it is recommended to hook on {@link Closeable#onClose()} handler.
*/
public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
server.errorConsumer(errorConsumer);
return this;
}

Expand Down
16 changes: 0 additions & 16 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public class RSocketConnector {
private int mtu = 0;
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

private Consumer<Throwable> errorConsumer = ex -> {};

private RSocketConnector() {}

/**
Expand Down Expand Up @@ -436,17 +434,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
@Deprecated
public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
Objects.requireNonNull(errorConsumer);
this.errorConsumer = errorConsumer;
return this;
}

/**
* The final step to connect with the transport to use as input and the resulting {@code
* Mono<RSocket>} as output. Each subscriber to the returned {@code Mono} starts a new connection
Expand Down Expand Up @@ -524,7 +511,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
new RSocketRequester(
multiplexer.asClientConnection(),
payloadDecoder,
errorConsumer,
StreamIdSupplier.clientSupplier(),
mtu,
(int) keepAliveInterval.toMillis(),
Expand Down Expand Up @@ -564,7 +550,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
CLIENT_TAG,
wrappedConnection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
: ResponderLeaseHandler.None;

Expand All @@ -573,7 +558,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
multiplexer.asServerConnection(),
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
responderLeaseHandler,
mtu);

Expand Down
38 changes: 21 additions & 17 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -74,9 +76,8 @@
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
*/
class RSocketRequester implements RSocket {
private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketRequester.class);

private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
referenceCounted -> {
Expand All @@ -93,9 +94,14 @@ class RSocketRequester implements RSocket {
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
}

private volatile Throwable terminationError;

private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
AtomicReferenceFieldUpdater.newUpdater(
RSocketRequester.class, Throwable.class, "terminationError");

private final DuplexConnection connection;
private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final StreamIdSupplier streamIdSupplier;
private final IntObjectMap<Subscription> senders;
private final IntObjectMap<Processor<Payload, Payload>> receivers;
Expand All @@ -104,14 +110,12 @@ class RSocketRequester implements RSocket {
private final RequesterLeaseHandler leaseHandler;
private final ByteBufAllocator allocator;
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
private volatile Throwable terminationError;
private final MonoProcessor<Void> onClose;
private final Scheduler serialScheduler;

RSocketRequester(
DuplexConnection connection,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
StreamIdSupplier streamIdSupplier,
int mtu,
int keepAliveTickPeriod,
Expand All @@ -122,7 +126,6 @@ class RSocketRequester implements RSocket {
this.connection = connection;
this.allocator = connection.alloc();
this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.streamIdSupplier = streamIdSupplier;
this.mtu = mtu;
this.leaseHandler = leaseHandler;
Expand All @@ -140,7 +143,7 @@ class RSocketRequester implements RSocket {
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
connection.receive().subscribe(this::handleIncomingFrames, e -> {});

if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
KeepAliveSupport keepAliveSupport =
Expand Down Expand Up @@ -396,7 +399,6 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
payload.release();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
return Mono.error(t);
}
return handleChannel(payload, flux);
Expand Down Expand Up @@ -446,7 +448,6 @@ protected void hookOnNext(Payload payload) {
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
errorConsumer.accept(t);
// no need to send any errors.
sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId));
receiver.onError(t);
Expand Down Expand Up @@ -609,9 +610,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
break;
default:
// Ignore unknown frames. Throwing an error will close the socket.
errorConsumer.accept(
new IllegalStateException(
"Client received supported frame on stream 0: " + frame.toString()));
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Requester received unsupported frame on stream 0: " + frame.toString());
}
}
}

Expand Down Expand Up @@ -668,7 +669,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
}
default:
throw new IllegalStateException(
"Client received supported frame on stream " + streamId + ": " + frame.toString());
"Requester received unsupported frame on stream " + streamId + ": " + frame.toString());
}
}

Expand Down Expand Up @@ -736,7 +737,9 @@ private void terminate(Throwable e) {
try {
receiver.onError(e);
} catch (Throwable t) {
errorConsumer.accept(t);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
Expand All @@ -748,14 +751,15 @@ private void terminate(Throwable e) {
try {
sender.cancel();
} catch (Throwable t) {
errorConsumer.accept(t);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
senders.clear();
receivers.clear();
sendProcessor.dispose();
errorConsumer.accept(e);
onClose.onError(e);
}

Expand Down
27 changes: 13 additions & 14 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.*;
import reactor.util.annotation.Nullable;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements RSocket {
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);

private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
referenceCounted -> {
if (referenceCounted.refCnt() > 0) {
Expand All @@ -69,7 +73,6 @@ class RSocketResponder implements RSocket {
private final io.rsocket.ResponderRSocket responderRSocket;

private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;
private final Disposable leaseHandlerDisposable;
private final MonoProcessor<Void> onClose;
Expand All @@ -87,12 +90,10 @@ class RSocketResponder implements RSocket {
private final UnboundedProcessor<ByteBuf> sendProcessor;
private final ByteBufAllocator allocator;

@SuppressWarnings("deprecation")
RSocketResponder(
DuplexConnection connection,
RSocket requestHandler,
PayloadDecoder payloadDecoder,
Consumer<Throwable> errorConsumer,
ResponderLeaseHandler leaseHandler,
int mtu) {
this.connection = connection;
Expand All @@ -106,7 +107,6 @@ class RSocketResponder implements RSocket {
: null;

this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
this.leaseHandler = leaseHandler;
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
Expand All @@ -118,7 +118,7 @@ class RSocketResponder implements RSocket {

connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);

connection.receive().subscribe(this::handleFrame, errorConsumer);
connection.receive().subscribe(this::handleFrame, e -> {});
leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);

this.connection
Expand All @@ -135,7 +135,9 @@ private void handleSendProcessorError(Throwable t) {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});

Expand All @@ -146,7 +148,9 @@ private void handleSendProcessorError(Throwable t) {
try {
subscription.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Dropped exception", t);
}
}
});
}
Expand Down Expand Up @@ -375,9 +379,7 @@ protected void hookOnSubscribe(Subscription subscription) {
}

@Override
protected void hookOnError(Throwable throwable) {
errorConsumer.accept(throwable);
}
protected void hookOnError(Throwable throwable) {}

@Override
protected void hookFinally(SignalType type) {
Expand Down Expand Up @@ -583,9 +585,7 @@ protected void hookOnSubscribe(Subscription subscription) {
}

@Override
protected void hookOnError(Throwable throwable) {
errorConsumer.accept(throwable);
}
protected void hookOnError(Throwable throwable) {}
});
}

Expand All @@ -599,7 +599,6 @@ private void handleCancelFrame(int streamId) {
}

private void handleError(int streamId, Throwable t) {
errorConsumer.accept(t);
sendProcessor.onNext(ErrorFrameCodec.encode(allocator, streamId, t));
}

Expand Down
20 changes: 1 addition & 19 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ public final class RSocketServer {
private int mtu = 0;
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;

private Consumer<Throwable> errorConsumer = ex -> {};

private RSocketServer() {}

/** Static factory method to create an {@code RSocketServer}. */
Expand Down Expand Up @@ -247,16 +245,6 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
return this;
}

/**
* @deprecated this is deprecated with no replacement and will be removed after {@link
* io.rsocket.RSocketFactory} is removed.
*/
@Deprecated
public RSocketServer errorConsumer(Consumer<Throwable> errorConsumer) {
this.errorConsumer = errorConsumer;
return this;
}

/**
* Start the server on the given transport.
*
Expand Down Expand Up @@ -395,7 +383,6 @@ private Mono<Void> acceptSetup(
new RSocketRequester(
wrappedMultiplexer.asServerConnection(),
payloadDecoder,
errorConsumer,
StreamIdSupplier.serverSupplier(),
mtu,
setupPayload.keepAliveInterval(),
Expand All @@ -422,19 +409,14 @@ private Mono<Void> acceptSetup(
ResponderLeaseHandler responderLeaseHandler =
leaseEnabled
? new ResponderLeaseHandler.Impl<>(
SERVER_TAG,
connection.alloc(),
leases.sender(),
errorConsumer,
leases.stats())
SERVER_TAG, connection.alloc(), leases.sender(), leases.stats())
: ResponderLeaseHandler.None;

RSocket rSocketResponder =
new RSocketResponder(
connection,
wrappedRSocketHandler,
payloadDecoder,
errorConsumer,
responderLeaseHandler,
mtu);
})
Expand Down
Loading