diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index ef3c5ed450..89468cec2b 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -8046,7 +8046,7 @@ public final Observable doOnError(Consumer onError) { /** * Calls the appropriate onXXX method (shared between all Observer) for the lifecycle events of - * the sequence (subscription, disposal, requesting). + * the sequence (subscription, disposal). *

* *

diff --git a/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java b/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java index 7e3941e260..59d7fac2bc 100644 --- a/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/DisposableLambdaObserver.java @@ -61,6 +61,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; downstream.onError(t); } else { RxJavaPlugins.onError(t); @@ -70,19 +71,24 @@ public void onError(Throwable t) { @Override public void onComplete() { if (upstream != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; downstream.onComplete(); } } @Override public void dispose() { - try { - onDispose.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); + Disposable d = upstream; + if (d != DisposableHelper.DISPOSED) { + upstream = DisposableHelper.DISPOSED; + try { + onDispose.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } + d.dispose(); } - upstream.dispose(); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java index f0d233881d..0c979d2918 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycle.java @@ -108,13 +108,17 @@ public void request(long n) { @Override public void cancel() { - try { - onCancel.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); + Subscription s = upstream; + if (s != SubscriptionHelper.CANCELLED) { + upstream = SubscriptionHelper.CANCELLED; + try { + onCancel.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } + s.cancel(); } - upstream.cancel(); } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java index 3d23930aaa..34578bbfee 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnLifecycleTest.java @@ -87,7 +87,7 @@ public void run() throws Exception { ); assertEquals(1, calls[0]); - assertEquals(2, calls[1]); + assertEquals(1, calls[1]); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnUnsubscribeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnUnsubscribeTest.java index 2987102666..de1bd0d57a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnUnsubscribeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDoOnUnsubscribeTest.java @@ -24,6 +24,7 @@ import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; import io.reactivex.functions.*; +import io.reactivex.processors.BehaviorProcessor; import io.reactivex.subscribers.TestSubscriber; public class FlowableDoOnUnsubscribeTest { @@ -148,4 +149,24 @@ public void run() { assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get()); assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get()); } + + @Test + public void noReentrantDispose() { + + final AtomicInteger cancelCalled = new AtomicInteger(); + + final BehaviorProcessor p = BehaviorProcessor.create(); + p.doOnCancel(new Action() { + @Override + public void run() throws Exception { + cancelCalled.incrementAndGet(); + p.onNext(2); + } + }) + .firstOrError() + .subscribe() + .dispose(); + + assertEquals(1, cancelCalled.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java index 9629946ecb..e2aeb6a3f5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java @@ -113,7 +113,7 @@ public void testUnsubscribeSource() throws Exception { o.subscribe(); o.subscribe(); o.subscribe(); - verify(unsubscribe, times(1)).run(); + verify(unsubscribe, never()).run(); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnUnsubscribeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnUnsubscribeTest.java index 9d2c84db3c..b7df811bac 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnUnsubscribeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDoOnUnsubscribeTest.java @@ -25,6 +25,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; +import io.reactivex.subjects.BehaviorSubject; public class ObservableDoOnUnsubscribeTest { @@ -152,4 +153,24 @@ public void run() { assertEquals("There should exactly 1 un-subscription events for upper stream", 1, upperCount.get()); assertEquals("There should exactly 1 un-subscription events for lower stream", 1, lowerCount.get()); } + + @Test + public void noReentrantDispose() { + + final AtomicInteger disposeCalled = new AtomicInteger(); + + final BehaviorSubject s = BehaviorSubject.create(); + s.doOnDispose(new Action() { + @Override + public void run() throws Exception { + disposeCalled.incrementAndGet(); + s.onNext(2); + } + }) + .firstOrError() + .subscribe() + .dispose(); + + assertEquals(1, disposeCalled.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 5b06f031a2..2592361cd6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -938,11 +938,11 @@ public void accept(String v) { @Test public void testUnsubscribeSource() throws Exception { Action unsubscribe = mock(Action.class); - Observable o = Observable.just(1).doOnDispose(unsubscribe).cache(); + Observable o = Observable.just(1).doOnDispose(unsubscribe).replay().autoConnect(); o.subscribe(); o.subscribe(); o.subscribe(); - verify(unsubscribe, times(1)).run(); + verify(unsubscribe, never()).run(); } @Test