305 lines
12 KiB
Java
305 lines
12 KiB
Java
package io.reactivex.internal.operators.observable;
|
|
|
|
import io.reactivex.Observable;
|
|
import io.reactivex.ObservableSource;
|
|
import io.reactivex.Observer;
|
|
import io.reactivex.disposables.Disposable;
|
|
import io.reactivex.exceptions.Exceptions;
|
|
import io.reactivex.functions.Function;
|
|
import io.reactivex.internal.disposables.DisposableHelper;
|
|
import io.reactivex.internal.disposables.SequentialDisposable;
|
|
import io.reactivex.internal.functions.ObjectHelper;
|
|
import io.reactivex.internal.operators.observable.ObservableTimeoutTimed;
|
|
import io.reactivex.plugins.RxJavaPlugins;
|
|
import java.util.concurrent.TimeoutException;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
/* loaded from: classes2.dex */
|
|
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
|
|
final ObservableSource<U> b;
|
|
final Function<? super T, ? extends ObservableSource<V>> c;
|
|
final ObservableSource<? extends T> d;
|
|
|
|
static final class TimeoutConsumer extends AtomicReference<Disposable> implements Observer<Object>, Disposable {
|
|
final TimeoutSelectorSupport a;
|
|
final long b;
|
|
|
|
TimeoutConsumer(long j, TimeoutSelectorSupport timeoutSelectorSupport) {
|
|
this.b = j;
|
|
this.a = timeoutSelectorSupport;
|
|
}
|
|
|
|
@Override // io.reactivex.disposables.Disposable
|
|
public void dispose() {
|
|
DisposableHelper.dispose(this);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onComplete() {
|
|
Object obj = get();
|
|
DisposableHelper disposableHelper = DisposableHelper.DISPOSED;
|
|
if (obj != disposableHelper) {
|
|
lazySet(disposableHelper);
|
|
this.a.a(this.b);
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onError(Throwable th) {
|
|
Object obj = get();
|
|
DisposableHelper disposableHelper = DisposableHelper.DISPOSED;
|
|
if (obj == disposableHelper) {
|
|
RxJavaPlugins.b(th);
|
|
} else {
|
|
lazySet(disposableHelper);
|
|
this.a.a(this.b, th);
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onNext(Object obj) {
|
|
Disposable disposable = (Disposable) get();
|
|
if (disposable != DisposableHelper.DISPOSED) {
|
|
disposable.dispose();
|
|
lazySet(DisposableHelper.DISPOSED);
|
|
this.a.a(this.b);
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onSubscribe(Disposable disposable) {
|
|
DisposableHelper.setOnce(this, disposable);
|
|
}
|
|
}
|
|
|
|
interface TimeoutSelectorSupport extends ObservableTimeoutTimed.TimeoutSupport {
|
|
void a(long j, Throwable th);
|
|
}
|
|
|
|
public ObservableTimeout(Observable<T> observable, ObservableSource<U> observableSource, Function<? super T, ? extends ObservableSource<V>> function, ObservableSource<? extends T> observableSource2) {
|
|
super(observable);
|
|
this.b = observableSource;
|
|
this.c = function;
|
|
this.d = observableSource2;
|
|
}
|
|
|
|
@Override // io.reactivex.Observable
|
|
protected void subscribeActual(Observer<? super T> observer) {
|
|
ObservableSource<? extends T> observableSource = this.d;
|
|
if (observableSource == null) {
|
|
TimeoutObserver timeoutObserver = new TimeoutObserver(observer, this.c);
|
|
observer.onSubscribe(timeoutObserver);
|
|
timeoutObserver.a((ObservableSource<?>) this.b);
|
|
this.a.subscribe(timeoutObserver);
|
|
return;
|
|
}
|
|
TimeoutFallbackObserver timeoutFallbackObserver = new TimeoutFallbackObserver(observer, this.c, observableSource);
|
|
observer.onSubscribe(timeoutFallbackObserver);
|
|
timeoutFallbackObserver.a((ObservableSource<?>) this.b);
|
|
this.a.subscribe(timeoutFallbackObserver);
|
|
}
|
|
|
|
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, TimeoutSelectorSupport {
|
|
final Observer<? super T> a;
|
|
final Function<? super T, ? extends ObservableSource<?>> b;
|
|
final SequentialDisposable c = new SequentialDisposable();
|
|
final AtomicLong d = new AtomicLong();
|
|
final AtomicReference<Disposable> e = new AtomicReference<>();
|
|
ObservableSource<? extends T> f;
|
|
|
|
TimeoutFallbackObserver(Observer<? super T> observer, Function<? super T, ? extends ObservableSource<?>> function, ObservableSource<? extends T> observableSource) {
|
|
this.a = observer;
|
|
this.b = function;
|
|
this.f = observableSource;
|
|
}
|
|
|
|
void a(ObservableSource<?> observableSource) {
|
|
if (observableSource != null) {
|
|
TimeoutConsumer timeoutConsumer = new TimeoutConsumer(0L, this);
|
|
if (this.c.replace(timeoutConsumer)) {
|
|
observableSource.subscribe(timeoutConsumer);
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.disposables.Disposable
|
|
public void dispose() {
|
|
DisposableHelper.dispose(this.e);
|
|
DisposableHelper.dispose(this);
|
|
this.c.dispose();
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onComplete() {
|
|
if (this.d.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
|
|
this.c.dispose();
|
|
this.a.onComplete();
|
|
this.c.dispose();
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onError(Throwable th) {
|
|
if (this.d.getAndSet(Long.MAX_VALUE) == Long.MAX_VALUE) {
|
|
RxJavaPlugins.b(th);
|
|
return;
|
|
}
|
|
this.c.dispose();
|
|
this.a.onError(th);
|
|
this.c.dispose();
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onNext(T t) {
|
|
long j = this.d.get();
|
|
if (j != Long.MAX_VALUE) {
|
|
long j2 = 1 + j;
|
|
if (this.d.compareAndSet(j, j2)) {
|
|
Disposable disposable = this.c.get();
|
|
if (disposable != null) {
|
|
disposable.dispose();
|
|
}
|
|
this.a.onNext(t);
|
|
try {
|
|
ObservableSource<?> apply = this.b.apply(t);
|
|
ObjectHelper.a(apply, "The itemTimeoutIndicator returned a null ObservableSource.");
|
|
ObservableSource<?> observableSource = apply;
|
|
TimeoutConsumer timeoutConsumer = new TimeoutConsumer(j2, this);
|
|
if (this.c.replace(timeoutConsumer)) {
|
|
observableSource.subscribe(timeoutConsumer);
|
|
}
|
|
} catch (Throwable th) {
|
|
Exceptions.b(th);
|
|
this.e.get().dispose();
|
|
this.d.getAndSet(Long.MAX_VALUE);
|
|
this.a.onError(th);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onSubscribe(Disposable disposable) {
|
|
DisposableHelper.setOnce(this.e, disposable);
|
|
}
|
|
|
|
@Override // io.reactivex.internal.operators.observable.ObservableTimeoutTimed.TimeoutSupport
|
|
public void a(long j) {
|
|
if (this.d.compareAndSet(j, Long.MAX_VALUE)) {
|
|
DisposableHelper.dispose(this.e);
|
|
ObservableSource<? extends T> observableSource = this.f;
|
|
this.f = null;
|
|
observableSource.subscribe(new ObservableTimeoutTimed.FallbackObserver(this.a, this));
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.internal.operators.observable.ObservableTimeout.TimeoutSelectorSupport
|
|
public void a(long j, Throwable th) {
|
|
if (this.d.compareAndSet(j, Long.MAX_VALUE)) {
|
|
DisposableHelper.dispose(this);
|
|
this.a.onError(th);
|
|
} else {
|
|
RxJavaPlugins.b(th);
|
|
}
|
|
}
|
|
}
|
|
|
|
static final class TimeoutObserver<T> extends AtomicLong implements Observer<T>, Disposable, TimeoutSelectorSupport {
|
|
final Observer<? super T> a;
|
|
final Function<? super T, ? extends ObservableSource<?>> b;
|
|
final SequentialDisposable c = new SequentialDisposable();
|
|
final AtomicReference<Disposable> d = new AtomicReference<>();
|
|
|
|
TimeoutObserver(Observer<? super T> observer, Function<? super T, ? extends ObservableSource<?>> function) {
|
|
this.a = observer;
|
|
this.b = function;
|
|
}
|
|
|
|
void a(ObservableSource<?> observableSource) {
|
|
if (observableSource != null) {
|
|
TimeoutConsumer timeoutConsumer = new TimeoutConsumer(0L, this);
|
|
if (this.c.replace(timeoutConsumer)) {
|
|
observableSource.subscribe(timeoutConsumer);
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.disposables.Disposable
|
|
public void dispose() {
|
|
DisposableHelper.dispose(this.d);
|
|
this.c.dispose();
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onComplete() {
|
|
if (getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
|
|
this.c.dispose();
|
|
this.a.onComplete();
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onError(Throwable th) {
|
|
if (getAndSet(Long.MAX_VALUE) == Long.MAX_VALUE) {
|
|
RxJavaPlugins.b(th);
|
|
} else {
|
|
this.c.dispose();
|
|
this.a.onError(th);
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onNext(T t) {
|
|
long j = get();
|
|
if (j != Long.MAX_VALUE) {
|
|
long j2 = 1 + j;
|
|
if (compareAndSet(j, j2)) {
|
|
Disposable disposable = this.c.get();
|
|
if (disposable != null) {
|
|
disposable.dispose();
|
|
}
|
|
this.a.onNext(t);
|
|
try {
|
|
ObservableSource<?> apply = this.b.apply(t);
|
|
ObjectHelper.a(apply, "The itemTimeoutIndicator returned a null ObservableSource.");
|
|
ObservableSource<?> observableSource = apply;
|
|
TimeoutConsumer timeoutConsumer = new TimeoutConsumer(j2, this);
|
|
if (this.c.replace(timeoutConsumer)) {
|
|
observableSource.subscribe(timeoutConsumer);
|
|
}
|
|
} catch (Throwable th) {
|
|
Exceptions.b(th);
|
|
this.d.get().dispose();
|
|
getAndSet(Long.MAX_VALUE);
|
|
this.a.onError(th);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onSubscribe(Disposable disposable) {
|
|
DisposableHelper.setOnce(this.d, disposable);
|
|
}
|
|
|
|
@Override // io.reactivex.internal.operators.observable.ObservableTimeoutTimed.TimeoutSupport
|
|
public void a(long j) {
|
|
if (compareAndSet(j, Long.MAX_VALUE)) {
|
|
DisposableHelper.dispose(this.d);
|
|
this.a.onError(new TimeoutException());
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.internal.operators.observable.ObservableTimeout.TimeoutSelectorSupport
|
|
public void a(long j, Throwable th) {
|
|
if (compareAndSet(j, Long.MAX_VALUE)) {
|
|
DisposableHelper.dispose(this.d);
|
|
this.a.onError(th);
|
|
} else {
|
|
RxJavaPlugins.b(th);
|
|
}
|
|
}
|
|
}
|
|
}
|