package rx.operators;

import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.operators.ChunkedOperation;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

/* loaded from: classes2.dex */
public final class OperationWindow extends ChunkedOperation {

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: classes2.dex */
    public static class Window<T> extends ChunkedOperation.Chunk<T, Observable<T>> {
        protected Window() {
        }

        @Override // rx.operators.ChunkedOperation.Chunk
        public Observable<T> getContents() {
            return Observable.from((Iterable) this.contents);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: classes2.dex */
    public static final class WindowViaObservable<T, U> implements Observable.OnSubscribeFunc<Observable<T>> {
        final Observable<U> boundary;
        final Observable<? extends T> source;

        /* loaded from: classes2.dex */
        private static final class BoundaryObserver<T, U> extends Subscriber<U> {
            final SourceObserver<T> so;

            public BoundaryObserver(SourceObserver<T> sourceObserver) {
                this.so = sourceObserver;
            }

            @Override // rx.Observer
            public void onCompleted() {
                this.so.onCompleted();
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                this.so.onError(th);
            }

            @Override // rx.Observer
            public void onNext(U u) {
                this.so.replace();
            }
        }

        /* loaded from: classes2.dex */
        private static final class SourceObserver<T> extends Subscriber<T> {
            final Subscription cancel;
            final Observer<? super Observable<T>> observer;
            final Object guard = new Object();
            Subject<T, T> subject = create();

            public SourceObserver(Observer<? super Observable<T>> observer, Subscription subscription) {
                this.observer = observer;
                this.cancel = subscription;
            }

            Subject<T, T> create() {
                return PublishSubject.create();
            }

            @Override // rx.Observer
            public void onCompleted() {
                synchronized (this.guard) {
                    if (this.subject == null) {
                        return;
                    }
                    Subject<T, T> subject = this.subject;
                    this.subject = null;
                    subject.onCompleted();
                    this.observer.onCompleted();
                    this.cancel.unsubscribe();
                }
            }

            @Override // rx.Observer
            public void onError(Throwable th) {
                synchronized (this.guard) {
                    if (this.subject == null) {
                        return;
                    }
                    Subject<T, T> subject = this.subject;
                    this.subject = null;
                    subject.onError(th);
                    this.observer.onError(th);
                    this.cancel.unsubscribe();
                }
            }

            @Override // rx.Observer
            public void onNext(T t) {
                synchronized (this.guard) {
                    if (this.subject == null) {
                        return;
                    }
                    this.subject.onNext(t);
                }
            }

            public void replace() {
                try {
                    synchronized (this.guard) {
                        if (this.subject == null) {
                            return;
                        }
                        this.subject.onCompleted();
                        this.subject = create();
                        this.observer.onNext(this.subject);
                    }
                } catch (Throwable th) {
                    onError(th);
                }
            }
        }

        public WindowViaObservable(Observable<? extends T> observable, Observable<U> observable2) {
            this.source = observable;
            this.boundary = observable2;
        }

        @Override // rx.Observable.OnSubscribeFunc
        public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            SourceObserver sourceObserver = new SourceObserver(observer, compositeSubscription);
            try {
                observer.onNext(sourceObserver.subject);
                compositeSubscription.add(this.source.unsafeSubscribe(sourceObserver));
                if (!compositeSubscription.isUnsubscribed()) {
                    compositeSubscription.add(this.boundary.unsafeSubscribe(new BoundaryObserver(sourceObserver)));
                }
                return compositeSubscription;
            } catch (Throwable th) {
                observer.onError(th);
                return Subscriptions.empty();
            }
        }
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, int i) {
        return window(observable, i, i);
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final int i, final int i2) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.4
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.SizeBasedChunks sizeBasedChunks = new ChunkedOperation.SizeBasedChunks(observer, OperationWindow.windowMaker(), i);
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(sizeBasedChunks, observer, new ChunkedOperation.SkippingChunkCreator(sizeBasedChunks, i2)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, long j2, TimeUnit timeUnit) {
        return window(observable, j, j2, timeUnit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final long j2, final TimeUnit timeUnit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.7
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeBasedChunks timeBasedChunks = new ChunkedOperation.TimeBasedChunks(observer, OperationWindow.windowMaker(), j, timeUnit, scheduler);
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(timeBasedChunks, observer, new ChunkedOperation.TimeBasedChunkCreator(timeBasedChunks, j2, timeUnit, scheduler)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, TimeUnit timeUnit) {
        return window(observable, j, timeUnit, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, long j, TimeUnit timeUnit, int i) {
        return window(observable, j, timeUnit, i, Schedulers.threadPoolForComputation());
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final TimeUnit timeUnit, final int i, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.6
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.TimeAndSizeBasedChunks timeAndSizeBasedChunks = new ChunkedOperation.TimeAndSizeBasedChunks(observer, OperationWindow.windowMaker(), i, j, timeUnit, scheduler);
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(timeAndSizeBasedChunks, observer, new ChunkedOperation.SingleChunkCreator(timeAndSizeBasedChunks)));
            }
        };
    }

    public static <T> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final long j, final TimeUnit timeUnit, final Scheduler scheduler) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.5
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks nonOverlappingChunks = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(nonOverlappingChunks, observer, new ChunkedOperation.TimeBasedChunkCreator(nonOverlappingChunks, j, timeUnit, scheduler)));
            }
        };
    }

    public static <T, U> Observable.OnSubscribeFunc<Observable<T>> window(Observable<? extends T> observable, Observable<U> observable2) {
        return new WindowViaObservable(observable, observable2);
    }

    public static <T, TOpening, TClosing> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final Observable<? extends TOpening> observable2, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> func1) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.3
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.OverlappingChunks overlappingChunks = new ChunkedOperation.OverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(overlappingChunks, observer, new ChunkedOperation.ObservableBasedMultiChunkCreator(overlappingChunks, Observable.this, func1)));
            }
        };
    }

    public static <T, TClosing> Observable.OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> observable, final Func0<? extends Observable<? extends TClosing>> func0) {
        return new Observable.OnSubscribeFunc<Observable<T>>() { // from class: rx.operators.OperationWindow.2
            @Override // rx.Observable.OnSubscribeFunc
            public Subscription onSubscribe(Observer<? super Observable<T>> observer) {
                ChunkedOperation.NonOverlappingChunks nonOverlappingChunks = new ChunkedOperation.NonOverlappingChunks(observer, OperationWindow.windowMaker());
                return observable.unsafeSubscribe(new ChunkedOperation.ChunkObserver(nonOverlappingChunks, observer, new ChunkedOperation.ObservableBasedSingleChunkCreator(nonOverlappingChunks, Func0.this)));
            }
        };
    }

    public static <T> Func0<Window<T>> windowMaker() {
        return new Func0<Window<T>>() { // from class: rx.operators.OperationWindow.1
            @Override // rx.functions.Func0, java.util.concurrent.Callable
            public Window<T> call() {
                return new Window<>();
            }
        };
    }
}
