如何编写自定义响应式基础类型

Posted by Advanced RxJava on May 5, 2017
本文是 Advanced RxJava http://akarnokd.blogspot.com/ 系列博客的中文翻译,已征得作者授权。该系列博客的作者是 RxJava 的核心贡献者之一。翻译的内容使用 知识共享 署名-非商业性使用-相同方式共享 4.0 国际 协议进行许可,转载请注明出处。如果发现翻译问题,或者任何改进意见,请 在 GitHub 上提交 issue
本文是 Piasy 独立翻译,发表于 https://blog.piasy.com/AdvancedRxJava/,请阅读原文支持原创 https://blog.piasy.com/AdvancedRxJava/2017/05/05/writing-custom-reactive-base-type/

原文 Writing a custom reactive base type

介绍

一直以来,大家都在问如何实现自己的响应式类型。尽管 RxJava 的 Observable 有大量方法,也允许通过 lift()extend() 以及 compose() 进行扩展,大家仍会希望 Observable 拥有某个 xyz() 操作符,或者在某个调用链中不允许调用 uvw()

第一个情况,其实是希望在吃透整个 RxJava 项目之前就能增加自定义的操作符函数,而这个需求其实和 JVM 环境中的响应式编程同样古老。当我最初把 Rx.NET 移植到 Java 中时,我也面临了同样的问题,因为 .NET 早在 2010 年就支持了方法扩展。而 Java 并不支持方法扩展,而且这一特性的提议也在 Java 8 开发时期被否决,他们选择了默认方法这一特性,理由是扩展方法无法被重载。的确,扩展方法无法被重载,但它可以被另一个类的另一个方法替代。

第二个情况,我们希望隐藏或者移除某些操作符,因为在我们自定义的 Observable 类型中,有些操作符没有意义。例如我们有一个 ParallelObservable,它把输入序列在内部分为并行处理的流水线,那么 map()filter() 就是有意义的,而 take()skip() 则没有意义。

包装(Wrapping)

上面的两种情况都可以通过包装 Observable 类来实现:

public final class MyObservable<T> {
    private Observable<T> actual;
    public MyObservable<T>(Observable<T> actual) {
        this.actual = actual;
    }
}

然后我们就可以通过转发来增加我们的操作符了:

    // ...
    public static <T> MyObservable<T> create(Observable<T> o) {
        return new MyObservable<T>(o);
    }

    public static <T> MyObservable<T> just(T value) {
        return create(Observable.just(value));
    }

    public final MyObservable<T> goAsync() {
        return create(actual.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()));
    }

    public final <R> MyObservable<R> map(Func1<T, R> mapper) {
        return create(actual.map(mapper));
    }

    public final void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...

这样我们就实现了上述两个目标:去掉了不必要的操作符,也引入了新的操作符。

如果我们看 RxJava 的源码,就会发现同样的模式,实际的对象都是 OnSubscribe 或者 Publisher 类型,而 Observable 则通过各种操作符极大地丰富了这些类型的功能。

互操作(Interoperation)

MyObservable 看起来已经足够了,但最终我们会需要和普通的 Observable 或者是其他人实现的 YourObservable 互相操作。由于它们是不同的类型,我们需要一个公共的父类来使它们可以交互。最自然的想法当然是都实现一个 toObservable() 函数,返回内部的 Observable,但这导致我们必须调用这个额外的方法。相反,我们可以让每个自定义类型都继承自同一个基类,或者实现同一个接口,它们包含互相操作所需的最小方法集合。

在 RxJava 1.x 中,显而易见的选项是 Observable,但它并不够好,因为它的方法都是 final 的,无法被被重写,而且这些方法都返回 Observable 而不是 MyObservable。不幸的是,RxJava 1.x 在这一点上无能为力,因为它要保证二进制兼容性。

幸运的是,在 RxJava 2.x 中,最基本的类型并不是 ObservableFlowable),而是 Publisher。所有的 Observable 都是 Publisher,而很多操作符都是接受 Publisher 参数。这样的好处就是我们可以兼容 RxJava 2.x 之外的基于 Publisher 的类型。之所以可以做到这样,是因为操作符只需要它的上游有一个 subscribe(Subscriber<? super T> s) 函数即可。

因此,如果我们的目标是 2.x,那么 MyObservable 就应该实现 Publisher 接口,这样它就立即可以和其他正经的响应式编程库兼容了:

public class MyObservable<T> implements Publisher<T> {
    // ...
    @Override
    public void subscribe(Subscriber<? super T> subscriber) {
        actual.subscribe(subscriber);
    }
    // ...
}

扩展(Extension)

有了这个 MyObservable 之后,我们可能需要不同的响应式类型以应对不同的使用场景,但复制所有的操作符就太繁琐了。第一想法当然是把 MyObservable 作为其他响应式类型的基类,但这同样会遇到 Observable -> MyObservable 的类型:操作符返回的类型不一样。

我相信 Java 8 的 Streams API 也遇到了同样的问题,如果我们看看签名:Stream extends BaseStream<T, Stream<T>>BaseStream<T, S extends BaseStream<T, S>>,就会发现非常别扭,父类的类型参数居然需要子类型。这样做是为了能在方法签名中捕获子类型,这样如果我们创建了一个 MyStream 类型,那所有的方法的返回类型都将是 MyStream

我们也可以用类似的方式定义 MyObservable 类型:

public class MyObservable<T, S extends MyObservable<T, S>> implements Publisher<T> {
    
    final Publisher<? extends T> actual;
    
    public MyObservable(Publisher<? extends T> actual) {
        this.actual = actual;
    }
    
    @SuppressWarnings("unchecked")
    public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
        return (U)new MyObservable<R, S>(my);
    }
    
    public final <R, U extends MyObservable<R, U>> U map(Function<? super T, ? extends R> mapper) {
        return wrap(Flowable.fromPublisher(actual).map(mapper));
    }
    
    @Override
    public void subscribe(Subscriber<? super T> s) {
        actual.subscribe(s);
    }
}

一大堆泛型的东西,我们编写了一个 wrap() 函数,用于把某种 Publisher 类型包装为 MyObservable 类型,并且在 map() 函数中调用了 wrap() 以确保正确的结果类型。MyObservable 的子类则要重写 wrap 函数以提供它们自己的类型:

public class TheirObservable<T> extends MyObservable<T, TheirObservable<T>> {
    public TheirObservable(Publisher<? extends T> actual) {
        super(actual);
    }

    @SuppressWarnings("unchecked")
    @Override
    public <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
        return (U) new TheirObservable<R>(my);
    }
}

让我们试一下:

public static void main(String[] args) {
    TheirObservable<Integer> their = new TheirObservable<>(Flowable.just(1));
    
    TheirObservable<String> out = their.map(v -> v.toString());

    Flowable.fromPublisher(out).subscribe(System.out::println);
}

结果符合预期,没有编译错误,也能打印出 1。

现在让我们为 TheirObservable 增加一个 take() 操作符:

    @SuppressWarnings({ "rawtypes", "unchecked" })
    public <U extends TheirObservable<T>> U take(int n) {
        Flowable<T> p = Flowable.fromPublisher(actual);
        Flowable<T> u = p.take(n);
        return (U)(TheirObservable)wrap(u);
    }

译者注:原作者这里贴了 filter 的代码,我按照自己的理解写了 take 的代码。

函数签名变得越来越复杂了,类型系统正在反击!我们需要使用裸类型和强转来使得结果看起来像是目标类型。此外,如果我们编写 their.map(v -> v.toString()).take(1); 这样的代码,编译器会提示找不到 take() 函数。因为 map() 只有在我们把返回值赋值给 MyObservable 类型时,它才会返回 MyObservable 类型。为了让类型推导正确工作,我们不得不把链式调用拆分为单独的调用:

    TheirObservable<Integer> their2 = new TheirObservable<>(Flowable.just(1));
    TheirObservable<String> step1 = their2.map(v -> v.toString());
    TheirObservable<String> step2 = step1.take(1);
    Flowable.fromPublisher(step2).subscribe(System.out::println);

接下来让我们把 TheirObservable 扩展为 AllObservable,然后增加 filter() 方法:

public static class AllObservable<T> extends TheirObservable<T> {
    public AllObservable(Publisher<? extends T> actual) {
        super(actual);
    }

    @Override
    <R, U extends MyObservable<R, U>> U wrap(Publisher<? extends R> my) {
        return (U)new AllObservable<R>(my);
    }
    
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public <U extends AllObservable<T>> U filter(Predicate<? super T> predicate) {
        Flowable<T> p = Flowable.fromPublisher(actual);
        Flowable<T> u = p.filter(predicate);
        return (U)(AllObservable)wrap(u);
    }
}

然后使用它:

    AllObservable<Integer> all = new AllObservable<>(Flowable.just(1));
    
    AllObservable<String> step1 = all.map(v -> v.toString());

    AllObservable<String> step2 = step1.take(1);
    
    AllObservable<String> step3 = step2.filter(v -> true);

    Flowable.fromPublisher(step3).subscribe(System.out::println);

不幸的是,上面的代码无法编译,因为 map() 不是返回的 AllObservable,也就是说 AllObservable 不是 MyObservable<String, U extends MyObservable<String, U>>。把 step1 改成 TheirObservable<String> 可以解决编译的问题。然而,如果我们想要交换 filter()take() 的顺序,step1 就不再是 AllObservable 了,而 filter() 也就无法调用了。

总结

我们能解决 AllObservable 的问题吗?我不知道,就我对 Java 类型系统和类型接口的理解,现在只能做到这个程度了。

RxJava 2.x 会有这样的结构吗?如果由我来定,那肯定就是不会了。为了支持这样的结构,我们每次都需要包装,而我一直想摆脱所有的 lift()create(),而且这样做也会让类型和函数的签名更加复杂。

因此,如果有人希望这样做,上面的例子就展示了如何在不修改 RxJava 的情况下进行包装,并且可以自己控制暴露哪些 API。这也算是“组合优于继承”的一个很好的例子。