实现操作符时的一些陷阱(二)

Posted by Advanced RxJava on June 25, 2016
本文是 Advanced RxJava http://akarnokd.blogspot.com/ 系列博客的中文翻译,已征得作者授权。该系列博客的作者是 RxJava 的核心贡献者之一。翻译的内容使用 知识共享 署名-非商业性使用-相同方式共享 4.0 国际 协议进行许可,转载请注明出处。如果发现翻译问题,或者任何改进意见,请 在 GitHub 上提交 issue
本文是 Piasy 独立翻译,发表于 https://blog.piasy.com/AdvancedRxJava/,请阅读原文支持原创 https://blog.piasy.com/AdvancedRxJava/2016/06/25/pitfalls-of-operator-implementations-2/

原文 Pitfalls of operator implementations (part 2)

介绍

本文中我将暂停对 producer 的讲解,继续回到实现操作符的陷阱这个话题,而且还会提到使用特定(序列)的 RxJava 操作符时的一些陷阱。

6,不等请求就直接发射(Emitting without request)

假设你要实现一个操作符,它会忽略上游发出的任何数据,并在上游结束时发出一个固定的值:

Operator<Integer, Integer> ignoreAllAndJust = child -> {
    Subscriber<Integer> parent = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer value) {
            // ignored
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            child.onNext(1);
            child.onCompleted();
        }
    };
    child.add(parent);
    return parent;
};

上面的这个操作符依赖于两个前提:Subscriber 的默认行为就是发出请求;下游一定会在上游结束之前至少发出一次请求。然而,即便你的测例通过了,这个操作符依然违反了 backpressure 的要求:onCompleted() 函数无条件的发出了一个数据,没有检查下游是否发出过请求。这个问题会在这样的场景下暴露出来:如果你有一个 hot Observable 或者不考虑 backpressure 的 Observable,而你又需要和 reactive-streams 兼容的下游进行交互,那么下游的 Subscriber 就会收到 onError 了,因为你的行为违反了 reactive-streams 规则的 §1.1 节。

既然我们现在已经了解了很多 producer,修复这个问题非常简单:

// ... same as before
@Override
public void onCompleted() {
    child.setProducer(new SingleProducer(child, 1));
}
// ... same as before

我们在 produer(二) 中介绍过 SingleProducer,现在它是最合适的选择。

但是我想介绍另外一种解决方案,这种方案和 RxJava 2.0 以及 reactive-streams 兼容的操作符相关:

Operator<Integer, Integer> ignoreAllAndJust = child -> {
    SingleDelayedProducer<Integer> sdp = 
        new SingleDelayedProducer<>(child);
    Subscriber<Integer> parent = new Subscriber<Integer>() {
        @Override
        public void onNext(Integer value) {
            // ignored
        }
        @Override
        public void onError(Throwable e) {
            child.onError(e);
        }
        @Override
        public void onCompleted() {
            sdp.set(1);
        }
    };
    child.add(parent);
    child.setProducer(sdp);
    return parent;
};

这种方案功能上是一样的,尽管相较于 RxJava 1.x 稍显冗长。之所以需要这样,是因为操作符的 Subscriber 无法脱离 Producer 而单独存在。而这是因为 Producer 语义上来说也是一种 Subscription,而且它为 Subscriber 提供了从上游取消订阅的唯一途径。延迟设置 producer 会延迟可能的取消订阅。

7,操作符中的共享状态(Shared state in the operator)

你可能认为 ignoreAllAndJust 很傻也没什么用处,但如果我们把它改成一个在接收到上游数据时进行计数,并在上游结束时发出这个计数,那它就变得有点用处了。假设我们的编译环境是 Java 6,不能用 lambda 表达式:

public final class CounterOp<T 
implements Operator<Integer, T> {
    int count;                                              // (1)
    @Override
    public Subscriber call(Subscriber child) {
        Subscriber<T> parent = new Subscriber<T>() {
            @Override
            public void onNext(T t) {
                count++;
            }
            @Override
            public void onError(Throwable e) {
                child.onError(e);
            }
            @Override
            public void onCompleted() {
                child.setProducer(
                    new SingleProducer<Integer>(child, count));
            }
        };
        child.add(parent);
        return parent;
    }
}
 
Observable<Integer> count = Observable.just(1)
    .lift(new CounterOp<Integer>());                        // (2)
         
count.subscribe(System.out::println);
count.subscribe(System.out::println);
count.subscribe(System.out::println);

我们已经吸取了上一条的教训,正确实现了 onCompleted() 方法,然而如果运行上面的代码,我们会发现打印的结果是 123!显然 just(1) 的计数应该始终是 1,无论我们对它计数多少次。

问题就出在(1)处,我们在所有的订阅者中共享了 count 变量。第一个订阅者会把它增加到 1,第二个订阅者会把它增加到 2,以此类推,由于(2),我们始终只有一个 CounterOp 实例,因此也就只有一个 count 实例。

解决办法就是把 count 移到 parent 中:

public final class CounterOp<T 
implements Operator<Integer, T> {
    @Override
    public Subscriber call(Subscriber child) {
        Subscriber<T> parent = new Subscriber<T>() {
           int count;
    // ... the rest is the same

当然我们也有一些场景需要在订阅者之间共享变量,但这些场景少之又少,所以第一原则就是:Operator 的所有成员都声明为 final。一旦声明为 final,你很快就会发现你的代码在尝试修改它们(你也很快就会发现代码写得有 bug)。

8,Observable 链条中的共享状态(Shared state in an Observable chain)

假设你对 toList() 的性能不满意,或者它返回的 List 类型不满足需求,你打算实现一个自己的聚合器。你希望通过已有的操作符解决这个问题,你找到了 reduce()

Observable<Vector<Integer>> list = Observable
    .range(1, 3)
    .reduce(new Vector<Integer>(), (vector, value) -> {
        vector.add(value);
        return vector;
    });
 
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);

如果运行上面的代码,你会发现第一次打印符合预期,但第二次打印了两遍,第三次则打印了三遍!

问题不是出在 reduce() 本身,而是对它的使用方式。当链条建立起来之后,传入 reduce()Vector 实例就相当于一个“全局”的了,后续对这个链条的调用都会共用同一个实例。

修复我们遇到的这个具体问题很简单,无需重新实现一个操作符:

Observable<Vector<Integer>> list2 = Observable
    .range(1, 3)
    .reduce((Vector<Integer>)null, (vector, value) -> {
        if (vector == null) {
            vector = new Vector<>();
        }
        vector.add(value);
        return vector;
    });
 
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);

你需要传入 null,并在聚合函数内创建新的 Vector 实例,这样就不会在订阅者之间共享了。

首要原则是,对于任何需要传入一个初始值的聚合操作符,都需要小心,很可能这个初始值是被不同订阅者所共享的,而如果你想要把链式调用的结果用多个订阅者去消费,它们就会发生冲突了,可能会导致不可预测的行为,甚至崩溃。

总结

在本文中,我分析了三种关于操作符更常见的陷阱,并且展示了如何测试并修复背后的 bug。

在使用 RxJava 各种操作符的过程中,可能很容易遇到各种奇怪(甚至滑稽)的问题(至少对我是这样),所以我的追求远未停止。而新的问题也变得越来越微妙,为了解决这些问题,我们需要了解更多关于操作符的原理。