147 lines
5.1 KiB
Java
147 lines
5.1 KiB
Java
package io.reactivex.internal.operators.observable;
|
|
|
|
import io.reactivex.Observable;
|
|
import io.reactivex.ObservableSource;
|
|
import io.reactivex.Observer;
|
|
import io.reactivex.disposables.Disposable;
|
|
import io.reactivex.exceptions.Exceptions;
|
|
import io.reactivex.functions.Function;
|
|
import io.reactivex.internal.disposables.DisposableHelper;
|
|
import io.reactivex.internal.disposables.EmptyDisposable;
|
|
import io.reactivex.internal.functions.ObjectHelper;
|
|
import io.reactivex.internal.util.AtomicThrowable;
|
|
import io.reactivex.internal.util.HalfSerializer;
|
|
import io.reactivex.subjects.PublishSubject;
|
|
import io.reactivex.subjects.Subject;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
/* loaded from: classes2.dex */
|
|
public final class ObservableRepeatWhen<T> extends AbstractObservableWithUpstream<T, T> {
|
|
final Function<? super Observable<Object>, ? extends ObservableSource<?>> b;
|
|
|
|
public ObservableRepeatWhen(ObservableSource<T> observableSource, Function<? super Observable<Object>, ? extends ObservableSource<?>> function) {
|
|
super(observableSource);
|
|
this.b = function;
|
|
}
|
|
|
|
@Override // io.reactivex.Observable
|
|
protected void subscribeActual(Observer<? super T> observer) {
|
|
Subject<T> a = PublishSubject.b().a();
|
|
try {
|
|
ObservableSource<?> apply = this.b.apply(a);
|
|
ObjectHelper.a(apply, "The handler returned a null ObservableSource");
|
|
ObservableSource<?> observableSource = apply;
|
|
RepeatWhenObserver repeatWhenObserver = new RepeatWhenObserver(observer, a, this.a);
|
|
observer.onSubscribe(repeatWhenObserver);
|
|
observableSource.subscribe(repeatWhenObserver.e);
|
|
repeatWhenObserver.d();
|
|
} catch (Throwable th) {
|
|
Exceptions.b(th);
|
|
EmptyDisposable.error(th, observer);
|
|
}
|
|
}
|
|
|
|
static final class RepeatWhenObserver<T> extends AtomicInteger implements Observer<T>, Disposable {
|
|
final Observer<? super T> a;
|
|
final Subject<Object> d;
|
|
final ObservableSource<T> g;
|
|
volatile boolean h;
|
|
final AtomicInteger b = new AtomicInteger();
|
|
final AtomicThrowable c = new AtomicThrowable();
|
|
final RepeatWhenObserver<T>.InnerRepeatObserver e = new InnerRepeatObserver();
|
|
final AtomicReference<Disposable> f = new AtomicReference<>();
|
|
|
|
final class InnerRepeatObserver extends AtomicReference<Disposable> implements Observer<Object> {
|
|
InnerRepeatObserver() {
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onComplete() {
|
|
RepeatWhenObserver.this.a();
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onError(Throwable th) {
|
|
RepeatWhenObserver.this.a(th);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onNext(Object obj) {
|
|
RepeatWhenObserver.this.b();
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onSubscribe(Disposable disposable) {
|
|
DisposableHelper.setOnce(this, disposable);
|
|
}
|
|
}
|
|
|
|
RepeatWhenObserver(Observer<? super T> observer, Subject<Object> subject, ObservableSource<T> observableSource) {
|
|
this.a = observer;
|
|
this.d = subject;
|
|
this.g = observableSource;
|
|
}
|
|
|
|
void a(Throwable th) {
|
|
DisposableHelper.dispose(this.f);
|
|
HalfSerializer.a((Observer<?>) this.a, th, (AtomicInteger) this, this.c);
|
|
}
|
|
|
|
void b() {
|
|
d();
|
|
}
|
|
|
|
public boolean c() {
|
|
return DisposableHelper.isDisposed(this.f.get());
|
|
}
|
|
|
|
void d() {
|
|
if (this.b.getAndIncrement() == 0) {
|
|
while (!c()) {
|
|
if (!this.h) {
|
|
this.h = true;
|
|
this.g.subscribe(this);
|
|
}
|
|
if (this.b.decrementAndGet() == 0) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.disposables.Disposable
|
|
public void dispose() {
|
|
DisposableHelper.dispose(this.f);
|
|
DisposableHelper.dispose(this.e);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onComplete() {
|
|
this.h = false;
|
|
this.d.onNext(0);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onError(Throwable th) {
|
|
DisposableHelper.dispose(this.e);
|
|
HalfSerializer.a((Observer<?>) this.a, th, (AtomicInteger) this, this.c);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onNext(T t) {
|
|
HalfSerializer.a(this.a, t, this, this.c);
|
|
}
|
|
|
|
@Override // io.reactivex.Observer
|
|
public void onSubscribe(Disposable disposable) {
|
|
DisposableHelper.replace(this.f, disposable);
|
|
}
|
|
|
|
void a() {
|
|
DisposableHelper.dispose(this.f);
|
|
HalfSerializer.a(this.a, this, this.c);
|
|
}
|
|
}
|
|
}
|