|
@@ -0,0 +1,290 @@
|
|
|
|
+/*
|
|
|
|
+ * Copyright (C) 2018, Umbrella CompanyLimited All rights reserved.
|
|
|
|
+ * Project:Net
|
|
|
|
+ * Author:Drake
|
|
|
|
+ * Date:12/7/19 1:30 PM
|
|
|
|
+ */
|
|
|
|
+package com.drake.net.observable;
|
|
|
|
+
|
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
+import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
+
|
|
|
|
+import io.reactivex.Observable;
|
|
|
|
+import io.reactivex.ObservableOnSubscribe;
|
|
|
|
+import io.reactivex.Observer;
|
|
|
|
+import io.reactivex.disposables.Disposable;
|
|
|
|
+import io.reactivex.exceptions.Exceptions;
|
|
|
|
+import io.reactivex.functions.Cancellable;
|
|
|
|
+import io.reactivex.internal.disposables.CancellableDisposable;
|
|
|
|
+import io.reactivex.internal.disposables.DisposableHelper;
|
|
|
|
+import io.reactivex.internal.fuseable.SimpleQueue;
|
|
|
|
+import io.reactivex.internal.queue.SpscLinkedArrayQueue;
|
|
|
|
+import io.reactivex.internal.util.AtomicThrowable;
|
|
|
|
+import io.reactivex.plugins.RxJavaPlugins;
|
|
|
|
+
|
|
|
|
+public final class ShootObservable<T> extends Observable<T> {
|
|
|
|
+ final ObservableOnSubscribe<T> source;
|
|
|
|
+
|
|
|
|
+ public ShootObservable(ObservableOnSubscribe<T> source) {
|
|
|
|
+ this.source = source;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void subscribeActual(Observer<? super T> observer) {
|
|
|
|
+ CreateEmitter<T> parent = new CreateEmitter<T>(observer);
|
|
|
|
+ observer.onSubscribe(parent);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ source.subscribe(parent);
|
|
|
|
+ } catch (Throwable ex) {
|
|
|
|
+ Exceptions.throwIfFatal(ex);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ static final class CreateEmitter<T>
|
|
|
|
+ extends AtomicReference<Disposable>
|
|
|
|
+ implements io.reactivex.ObservableEmitter<T>, Disposable {
|
|
|
|
+
|
|
|
|
+ private static final long serialVersionUID = -3434801548987643227L;
|
|
|
|
+
|
|
|
|
+ final Observer<? super T> observer;
|
|
|
|
+
|
|
|
|
+ CreateEmitter(Observer<? super T> observer) {
|
|
|
|
+ this.observer = observer;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onNext(T t) {
|
|
|
|
+ if (t == null) {
|
|
|
|
+ onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (!isDisposed()) {
|
|
|
|
+ observer.onNext(t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onError(Throwable t) {
|
|
|
|
+ if (!tryOnError(t)) {
|
|
|
|
+ RxJavaPlugins.onError(t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean tryOnError(Throwable t) {
|
|
|
|
+ if (t == null) {
|
|
|
|
+ t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
|
|
|
|
+ }
|
|
|
|
+ if (!isDisposed()) {
|
|
|
|
+ try {
|
|
|
|
+ observer.onError(t);
|
|
|
|
+ } finally {
|
|
|
|
+ dispose();
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onComplete() {
|
|
|
|
+ if (!isDisposed()) {
|
|
|
|
+ try {
|
|
|
|
+ observer.onComplete();
|
|
|
|
+ } finally {
|
|
|
|
+ dispose();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setDisposable(Disposable d) {
|
|
|
|
+ DisposableHelper.set(this, d);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setCancellable(Cancellable c) {
|
|
|
|
+ setDisposable(new CancellableDisposable(c));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public io.reactivex.ObservableEmitter<T> serialize() {
|
|
|
|
+ return new SerializedEmitter<T>(this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void dispose() {
|
|
|
|
+ DisposableHelper.dispose(this);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isDisposed() {
|
|
|
|
+ return DisposableHelper.isDisposed(get());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Serializes calls to onNext, onError and onComplete.
|
|
|
|
+ *
|
|
|
|
+ * @param <T> the value type
|
|
|
|
+ */
|
|
|
|
+ static final class SerializedEmitter<T>
|
|
|
|
+ extends AtomicInteger
|
|
|
|
+ implements io.reactivex.ObservableEmitter<T> {
|
|
|
|
+
|
|
|
|
+ private static final long serialVersionUID = 4883307006032401862L;
|
|
|
|
+
|
|
|
|
+ final io.reactivex.ObservableEmitter<T> emitter;
|
|
|
|
+
|
|
|
|
+ final AtomicThrowable error;
|
|
|
|
+
|
|
|
|
+ final SpscLinkedArrayQueue<T> queue;
|
|
|
|
+
|
|
|
|
+ volatile boolean done;
|
|
|
|
+
|
|
|
|
+ SerializedEmitter(io.reactivex.ObservableEmitter<T> emitter) {
|
|
|
|
+ this.emitter = emitter;
|
|
|
|
+ this.error = new AtomicThrowable();
|
|
|
|
+ this.queue = new SpscLinkedArrayQueue<T>(16);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onNext(T t) {
|
|
|
|
+ if (emitter.isDisposed() || done) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (t == null) {
|
|
|
|
+ onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (get() == 0 && compareAndSet(0, 1)) {
|
|
|
|
+ emitter.onNext(t);
|
|
|
|
+ if (decrementAndGet() == 0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ SimpleQueue<T> q = queue;
|
|
|
|
+ synchronized (q) {
|
|
|
|
+ q.offer(t);
|
|
|
|
+ }
|
|
|
|
+ if (getAndIncrement() != 0) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ drainLoop();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onError(Throwable t) {
|
|
|
|
+ if (!tryOnError(t)) {
|
|
|
|
+ RxJavaPlugins.onError(t);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean tryOnError(Throwable t) {
|
|
|
|
+ if (emitter.isDisposed() || done) {
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ if (t == null) {
|
|
|
|
+ t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
|
|
|
|
+ }
|
|
|
|
+ if (error.addThrowable(t)) {
|
|
|
|
+ done = true;
|
|
|
|
+ drain();
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void onComplete() {
|
|
|
|
+ if (emitter.isDisposed() || done) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ done = true;
|
|
|
|
+ drain();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void drain() {
|
|
|
|
+ if (getAndIncrement() == 0) {
|
|
|
|
+ drainLoop();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ void drainLoop() {
|
|
|
|
+ io.reactivex.ObservableEmitter<T> e = emitter;
|
|
|
|
+ SpscLinkedArrayQueue<T> q = queue;
|
|
|
|
+ AtomicThrowable error = this.error;
|
|
|
|
+ int missed = 1;
|
|
|
|
+ for (; ; ) {
|
|
|
|
+
|
|
|
|
+ for (; ; ) {
|
|
|
|
+ if (e.isDisposed()) {
|
|
|
|
+ q.clear();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (error.get() != null) {
|
|
|
|
+ q.clear();
|
|
|
|
+ e.onError(error.terminate());
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ boolean d = done;
|
|
|
|
+ T v = q.poll();
|
|
|
|
+
|
|
|
|
+ boolean empty = v == null;
|
|
|
|
+
|
|
|
|
+ if (d && empty) {
|
|
|
|
+ e.onComplete();
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (empty) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ e.onNext(v);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ missed = addAndGet(-missed);
|
|
|
|
+ if (missed == 0) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setDisposable(Disposable d) {
|
|
|
|
+ emitter.setDisposable(d);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setCancellable(Cancellable c) {
|
|
|
|
+ emitter.setCancellable(c);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public boolean isDisposed() {
|
|
|
|
+ return emitter.isDisposed();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public io.reactivex.ObservableEmitter<T> serialize() {
|
|
|
|
+ return this;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public String toString() {
|
|
|
|
+ return emitter.toString();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+}
|