238 lines
6.7 KiB
Java
238 lines
6.7 KiB
Java
package io.reactivex.internal.operators.flowable;
|
|
|
|
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
|
|
import io.reactivex.internal.subscriptions.BasicIntQueueSubscription;
|
|
import io.reactivex.internal.subscriptions.EmptySubscription;
|
|
import io.reactivex.internal.subscriptions.SubscriptionHelper;
|
|
import io.reactivex.internal.util.BackpressureHelper;
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import org.reactivestreams.Publisher;
|
|
import org.reactivestreams.Subscriber;
|
|
|
|
/* loaded from: classes2.dex */
|
|
final class FlowableGroupBy$State<T, K> extends BasicIntQueueSubscription<T> implements Publisher<T> {
|
|
final K a;
|
|
final SpscLinkedArrayQueue<T> b;
|
|
final FlowableGroupBy$GroupBySubscriber<?, K, T> c;
|
|
final boolean d;
|
|
volatile boolean f;
|
|
Throwable g;
|
|
boolean k;
|
|
int l;
|
|
final AtomicLong e = new AtomicLong();
|
|
final AtomicBoolean h = new AtomicBoolean();
|
|
final AtomicReference<Subscriber<? super T>> i = new AtomicReference<>();
|
|
final AtomicBoolean j = new AtomicBoolean();
|
|
|
|
FlowableGroupBy$State(int i, FlowableGroupBy$GroupBySubscriber<?, K, T> flowableGroupBy$GroupBySubscriber, K k, boolean z) {
|
|
this.b = new SpscLinkedArrayQueue<>(i);
|
|
this.c = flowableGroupBy$GroupBySubscriber;
|
|
this.a = k;
|
|
this.d = z;
|
|
}
|
|
|
|
@Override // org.reactivestreams.Publisher
|
|
public void a(Subscriber<? super T> subscriber) {
|
|
if (!this.j.compareAndSet(false, true)) {
|
|
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), subscriber);
|
|
return;
|
|
}
|
|
subscriber.onSubscribe(this);
|
|
this.i.lazySet(subscriber);
|
|
drain();
|
|
}
|
|
|
|
@Override // org.reactivestreams.Subscription
|
|
public void cancel() {
|
|
if (this.h.compareAndSet(false, true)) {
|
|
this.c.cancel(this.a);
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.internal.fuseable.SimpleQueue
|
|
public void clear() {
|
|
this.b.clear();
|
|
}
|
|
|
|
void drain() {
|
|
if (getAndIncrement() != 0) {
|
|
return;
|
|
}
|
|
if (this.k) {
|
|
drainFused();
|
|
} else {
|
|
drainNormal();
|
|
}
|
|
}
|
|
|
|
void drainFused() {
|
|
Throwable th;
|
|
SpscLinkedArrayQueue<T> spscLinkedArrayQueue = this.b;
|
|
Subscriber<? super T> subscriber = this.i.get();
|
|
int i = 1;
|
|
while (true) {
|
|
if (subscriber != null) {
|
|
if (this.h.get()) {
|
|
spscLinkedArrayQueue.clear();
|
|
return;
|
|
}
|
|
boolean z = this.f;
|
|
if (z && !this.d && (th = this.g) != null) {
|
|
spscLinkedArrayQueue.clear();
|
|
subscriber.onError(th);
|
|
return;
|
|
}
|
|
subscriber.onNext(null);
|
|
if (z) {
|
|
Throwable th2 = this.g;
|
|
if (th2 != null) {
|
|
subscriber.onError(th2);
|
|
return;
|
|
} else {
|
|
subscriber.onComplete();
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
i = addAndGet(-i);
|
|
if (i == 0) {
|
|
return;
|
|
}
|
|
if (subscriber == null) {
|
|
subscriber = this.i.get();
|
|
}
|
|
}
|
|
}
|
|
|
|
void drainNormal() {
|
|
SpscLinkedArrayQueue<T> spscLinkedArrayQueue = this.b;
|
|
boolean z = this.d;
|
|
Subscriber<? super T> subscriber = this.i.get();
|
|
int i = 1;
|
|
while (true) {
|
|
if (subscriber != null) {
|
|
long j = this.e.get();
|
|
long j2 = 0;
|
|
while (j2 != j) {
|
|
boolean z2 = this.f;
|
|
T poll = spscLinkedArrayQueue.poll();
|
|
boolean z3 = poll == null;
|
|
if (a(z2, z3, subscriber, z)) {
|
|
return;
|
|
}
|
|
if (z3) {
|
|
break;
|
|
}
|
|
subscriber.onNext(poll);
|
|
j2++;
|
|
}
|
|
if (j2 == j && a(this.f, spscLinkedArrayQueue.isEmpty(), subscriber, z)) {
|
|
return;
|
|
}
|
|
if (j2 != 0) {
|
|
if (j != Long.MAX_VALUE) {
|
|
this.e.addAndGet(-j2);
|
|
}
|
|
this.c.upstream.request(j2);
|
|
}
|
|
}
|
|
i = addAndGet(-i);
|
|
if (i == 0) {
|
|
return;
|
|
}
|
|
if (subscriber == null) {
|
|
subscriber = this.i.get();
|
|
}
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.internal.fuseable.SimpleQueue
|
|
public boolean isEmpty() {
|
|
return this.b.isEmpty();
|
|
}
|
|
|
|
public void onComplete() {
|
|
this.f = true;
|
|
drain();
|
|
}
|
|
|
|
public void onError(Throwable th) {
|
|
this.g = th;
|
|
this.f = true;
|
|
drain();
|
|
}
|
|
|
|
public void onNext(T t) {
|
|
this.b.offer(t);
|
|
drain();
|
|
}
|
|
|
|
@Override // io.reactivex.internal.fuseable.SimpleQueue
|
|
public T poll() {
|
|
T poll = this.b.poll();
|
|
if (poll != null) {
|
|
this.l++;
|
|
return poll;
|
|
}
|
|
int i = this.l;
|
|
if (i == 0) {
|
|
return null;
|
|
}
|
|
this.l = 0;
|
|
this.c.upstream.request(i);
|
|
return null;
|
|
}
|
|
|
|
@Override // org.reactivestreams.Subscription
|
|
public void request(long j) {
|
|
if (SubscriptionHelper.validate(j)) {
|
|
BackpressureHelper.a(this.e, j);
|
|
drain();
|
|
}
|
|
}
|
|
|
|
@Override // io.reactivex.internal.fuseable.QueueFuseable
|
|
public int requestFusion(int i) {
|
|
if ((i & 2) == 0) {
|
|
return 0;
|
|
}
|
|
this.k = true;
|
|
return 2;
|
|
}
|
|
|
|
boolean a(boolean z, boolean z2, Subscriber<? super T> subscriber, boolean z3) {
|
|
if (this.h.get()) {
|
|
this.b.clear();
|
|
return true;
|
|
}
|
|
if (!z) {
|
|
return false;
|
|
}
|
|
if (z3) {
|
|
if (!z2) {
|
|
return false;
|
|
}
|
|
Throwable th = this.g;
|
|
if (th != null) {
|
|
subscriber.onError(th);
|
|
} else {
|
|
subscriber.onComplete();
|
|
}
|
|
return true;
|
|
}
|
|
Throwable th2 = this.g;
|
|
if (th2 != null) {
|
|
this.b.clear();
|
|
subscriber.onError(th2);
|
|
return true;
|
|
}
|
|
if (!z2) {
|
|
return false;
|
|
}
|
|
subscriber.onComplete();
|
|
return true;
|
|
}
|
|
}
|