Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recommended way of Observables creation to support rxjava2 plugin system #3867

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package retrofit2.adapter.rxjava2;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.ObservableEmitter;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
Expand All @@ -25,53 +25,49 @@
import retrofit2.Callback;
import retrofit2.Response;

final class CallEnqueueObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
final class CallEnqueueObservable {

CallEnqueueObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, observer);
observer.onSubscribe(callback);
if (!callback.isDisposed()) {
call.enqueue(callback);
}
public static <T> Observable<Response<T>> create(Call<T> originalCall) {
return Observable.create(emitter -> {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallCallback<T> callback = new CallCallback<>(call, emitter);
emitter.setDisposable(callback);
if (!callback.isDisposed()) {
call.enqueue(callback);
}
});
}

private static final class CallCallback<T> implements Disposable, Callback<T> {
private final Call<?> call;
private final Observer<? super Response<T>> observer;
private final ObservableEmitter<Response<T>> emitter;
private volatile boolean disposed;
boolean terminated = false;

CallCallback(Call<?> call, Observer<? super Response<T>> observer) {
CallCallback(Call<?> call, ObservableEmitter<Response<T>> emitter) {
this.call = call;
this.observer = observer;
this.emitter = emitter;
}

@Override
public void onResponse(Call<T> call, Response<T> response) {
if (disposed) return;

try {
observer.onNext(response);
emitter.onNext(response);

if (!disposed) {
terminated = true;
observer.onComplete();
emitter.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposed) {
try {
observer.onError(t);
emitter.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
Expand All @@ -85,7 +81,7 @@ public void onFailure(Call<T> call, Throwable t) {
if (call.isCanceled()) return;

try {
observer.onError(t);
emitter.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,54 +16,49 @@
package retrofit2.adapter.rxjava2;

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.CompositeException;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.plugins.RxJavaPlugins;
import retrofit2.Call;
import retrofit2.Response;

final class CallExecuteObservable<T> extends Observable<Response<T>> {
private final Call<T> originalCall;
final class CallExecuteObservable {

CallExecuteObservable(Call<T> originalCall) {
this.originalCall = originalCall;
}

@Override
protected void subscribeActual(Observer<? super Response<T>> observer) {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
observer.onSubscribe(disposable);
if (disposable.isDisposed()) {
return;
}

boolean terminated = false;
try {
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
observer.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
observer.onComplete();
static <T> Observable<Response<T>> create(Call<T> originalCall) {
return Observable.create(emitter -> {
// Since Call is a one-shot type, clone it for each new observer.
Call<T> call = originalCall.clone();
CallDisposable disposable = new CallDisposable(call);
emitter.setDisposable(disposable);
if (disposable.isDisposed()) {
return;
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposable.isDisposed()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));

boolean terminated = false;
try {
Response<T> response = call.execute();
if (!disposable.isDisposed()) {
emitter.onNext(response);
}
if (!disposable.isDisposed()) {
terminated = true;
emitter.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!disposable.isDisposed()) {
try {
emitter.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
});
}

private static final class CallDisposable implements Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.plugins.RxJavaPlugins;

import java.lang.reflect.Type;

import javax.annotation.Nullable;

import retrofit2.Call;
import retrofit2.CallAdapter;
import retrofit2.Response;
Expand All @@ -37,15 +40,15 @@ final class RxJava2CallAdapter<R> implements CallAdapter<R, Object> {
private final boolean isCompletable;

RxJava2CallAdapter(
Type responseType,
@Nullable Scheduler scheduler,
boolean isAsync,
boolean isResult,
boolean isBody,
boolean isFlowable,
boolean isSingle,
boolean isMaybe,
boolean isCompletable) {
Type responseType,
@Nullable Scheduler scheduler,
boolean isAsync,
boolean isResult,
boolean isBody,
boolean isFlowable,
boolean isSingle,
boolean isMaybe,
boolean isCompletable) {
this.responseType = responseType;
this.scheduler = scheduler;
this.isAsync = isAsync;
Expand All @@ -65,7 +68,7 @@ public Type responseType() {
@Override
public Object adapt(Call<R> call) {
Observable<Response<R>> responseObservable =
isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call);
isAsync ? CallEnqueueObservable.create(call):CallExecuteObservable.create(call);

Observable<?> observable;
if (isResult) {
Expand Down