package com.jujin8.rxnewslibary;

import android.accounts.NetworkErrorException;
import com.jujin8.rxlibrary.retrofit.CallBack;
import com.jujin8.rxlibrary.vo.BaseInfo;
import com.qq.e.comm.constants.ErrorCode;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
import java.net.ConnectException;
import java.net.UnknownHostException;
import java.util.concurrent.TimeoutException;
import retrofit2.adapter.rxjava2.HttpException;

/* loaded from: classes.dex */
public abstract class DataResource<T> {
    private Observable<BaseInfo<T>> local = getLocalObservable();
    private final CallBack<T> mCallBack;

    public DataResource(CallBack<T> callBack) {
        this.mCallBack = callBack;
        if (this.local == null) {
            loadNet().observeOn(AndroidSchedulers.mainThread()).subscribe(getObserver());
        } else {
            loadNetAndCache().observeOn(AndroidSchedulers.mainThread()).subscribe(getObserver());
        }
    }

    public static <V> Observable<V> changeIO(Observable<V> observable) {
        if (observable == null) {
            return null;
        }
        return observable.subscribeOn(Schedulers.io());
    }

    public static <V> Observable<V> changeMain(Observable<V> observable) {
        if (observable == null) {
            return null;
        }
        return observable.observeOn(AndroidSchedulers.mainThread());
    }

    public static <V> Observable<V> changeNetwork(Observable<V> observable) {
        if (observable == null) {
            return null;
        }
        return observable.subscribeOn(Schedulers.io());
    }

    private DisposableObserver<BaseInfo<T>> getObserver() {
        return new DisposableObserver<BaseInfo<T>>() { // from class: com.jujin8.rxnewslibary.DataResource.1
            @Override // io.reactivex.Observer
            public void onComplete() {
                System.out.println("数据、获取数据onComplete");
            }

            @Override // io.reactivex.Observer
            public void onError(Throwable th) {
                th.printStackTrace();
                try {
                    if ((th instanceof ConnectException) || (th instanceof TimeoutException) || (th instanceof NetworkErrorException) || (th instanceof UnknownHostException)) {
                        DataResource.this.mCallBack.netError();
                    } else if (th instanceof HttpException) {
                        DataResource.this.mCallBack.fail(ErrorCode.AdError.PLACEMENT_ERROR, "服务器错误");
                    } else {
                        DataResource.this.mCallBack.onFailure(th, false);
                    }
                } catch (Exception e) {
                    DataResource.this.mCallBack.onFailure(e, false);
                }
            }

            @Override // io.reactivex.Observer
            public void onNext(BaseInfo<T> baseInfo) {
                if (baseInfo.isSuccessful()) {
                    DataResource.this.mCallBack.success(baseInfo.data);
                } else {
                    DataResource.this.mCallBack.fail(baseInfo.code, baseInfo.message);
                }
            }
        };
    }

    private Observable<BaseInfo<T>> loadNet() {
        return changeIO(getRemoteObservable()).doOnNext(new Consumer<BaseInfo<T>>() { // from class: com.jujin8.rxnewslibary.DataResource.3
            @Override // io.reactivex.functions.Consumer
            public void accept(BaseInfo<T> baseInfo) throws Exception {
                DataResource.this.loadDataing();
                if (baseInfo.isCache() || !baseInfo.isSuccessful()) {
                    return;
                }
                System.out.println("数据、保存数据开始");
                DataResource.this.saveData(baseInfo.data);
                System.out.println("数据、保存数据结束");
            }
        }).observeOn(Schedulers.io());
    }

    private Observable<BaseInfo<T>> loadNetAndCache() {
        if (this.local != null) {
            this.local = changeIO(this.local);
        }
        return (Observable<BaseInfo<T>>) loadNet().observeOn(Schedulers.io()).publish(new Function<Observable<BaseInfo<T>>, ObservableSource<BaseInfo<T>>>() { // from class: com.jujin8.rxnewslibary.DataResource.2
            @Override // io.reactivex.functions.Function
            public ObservableSource<BaseInfo<T>> apply(Observable<BaseInfo<T>> observable) throws Exception {
                return Observable.merge(observable, DataResource.this.local.takeUntil(observable));
            }
        });
    }

    protected abstract Observable<BaseInfo<T>> getLocalObservable();

    protected abstract Observable<BaseInfo<T>> getRemoteObservable();

    protected abstract void loadDataing();

    protected abstract void saveData(T t);
}
