jimu-decompiled/sources/io/reactivex/internal/operators/observable/ObservableBlockingSubscribe.java
2025-05-13 19:24:51 +02:00

59 lines
2.7 KiB
Java

package io.reactivex.internal.operators.observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.observers.BlockingObserver;
import io.reactivex.internal.observers.LambdaObserver;
import io.reactivex.internal.util.BlockingHelper;
import io.reactivex.internal.util.BlockingIgnoringReceiver;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.internal.util.NotificationLite;
import java.util.concurrent.LinkedBlockingQueue;
/* loaded from: classes2.dex */
public final class ObservableBlockingSubscribe {
public static <T> void a(ObservableSource<? extends T> observableSource, Observer<? super T> observer) {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
BlockingObserver blockingObserver = new BlockingObserver(linkedBlockingQueue);
observer.onSubscribe(blockingObserver);
observableSource.subscribe(blockingObserver);
while (!blockingObserver.isDisposed()) {
Object poll = linkedBlockingQueue.poll();
if (poll == null) {
try {
poll = linkedBlockingQueue.take();
} catch (InterruptedException e) {
blockingObserver.dispose();
observer.onError(e);
return;
}
}
if (blockingObserver.isDisposed() || observableSource == BlockingObserver.TERMINATED || NotificationLite.acceptFull(poll, observer)) {
return;
}
}
}
public static <T> void a(ObservableSource<? extends T> observableSource) {
BlockingIgnoringReceiver blockingIgnoringReceiver = new BlockingIgnoringReceiver();
LambdaObserver lambdaObserver = new LambdaObserver(Functions.d(), blockingIgnoringReceiver, blockingIgnoringReceiver, Functions.d());
observableSource.subscribe(lambdaObserver);
BlockingHelper.a(blockingIgnoringReceiver, lambdaObserver);
Throwable th = blockingIgnoringReceiver.a;
if (th != null) {
throw ExceptionHelper.a(th);
}
}
public static <T> void a(ObservableSource<? extends T> observableSource, Consumer<? super T> consumer, Consumer<? super Throwable> consumer2, Action action) {
ObjectHelper.a(consumer, "onNext is null");
ObjectHelper.a(consumer2, "onError is null");
ObjectHelper.a(action, "onComplete is null");
a(observableSource, new LambdaObserver(consumer, consumer2, action, Functions.d()));
}
}