Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.rsocket;

import java.util.function.BiFunction;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/**
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
* channel.
*
* @deprecated as of 1.0 RC7 in favor of using {@link RSocket#requestChannel(Publisher)} with {@link
* Flux#switchOnFirst(BiFunction)}
*/
@Deprecated
public interface ResponderRSocket extends RSocket {
/**
* Implement this method to peak at the first payload of the incoming request stream without
Expand Down
16 changes: 10 additions & 6 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.ResponderRSocket;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.frame.*;
import io.rsocket.frame.decoder.PayloadDecoder;
Expand All @@ -51,7 +50,7 @@
import reactor.util.concurrent.Queues;

/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
class RSocketResponder implements ResponderRSocket {
class RSocketResponder implements RSocket {
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
referenceCounted -> {
if (referenceCounted.refCnt() > 0) {
Expand All @@ -66,7 +65,10 @@ class RSocketResponder implements ResponderRSocket {

private final DuplexConnection connection;
private final RSocket requestHandler;
private final ResponderRSocket responderRSocket;

@SuppressWarnings("deprecation")
private final io.rsocket.ResponderRSocket responderRSocket;

private final PayloadDecoder payloadDecoder;
private final Consumer<Throwable> errorConsumer;
private final ResponderLeaseHandler leaseHandler;
Expand All @@ -86,6 +88,7 @@ class RSocketResponder implements ResponderRSocket {
private final UnboundedProcessor<ByteBuf> sendProcessor;
private final ByteBufAllocator allocator;

@SuppressWarnings("deprecation")
RSocketResponder(
DuplexConnection connection,
RSocket requestHandler,
Expand All @@ -99,7 +102,9 @@ class RSocketResponder implements ResponderRSocket {

this.requestHandler = requestHandler;
this.responderRSocket =
(requestHandler instanceof ResponderRSocket) ? (ResponderRSocket) requestHandler : null;
(requestHandler instanceof io.rsocket.ResponderRSocket)
? (io.rsocket.ResponderRSocket) requestHandler
: null;

this.payloadDecoder = payloadDecoder;
this.errorConsumer = errorConsumer;
Expand Down Expand Up @@ -219,8 +224,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
}
}

@Override
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
private Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
try {
if (leaseHandler.useLease()) {
return responderRSocket.requestChannel(payload, payloads);
Expand Down