RxPY - Connectable 操作符

  • publish

    此方法会将 observable 转换为可连接的 observable。

    句法

    
    
    publish(mapper=None)
    
    

    参数

    mapper:可选。用于多次多播源值的函数,无需进行多次订阅。

    例子

    
    
    from rx import create, range, operators as op
    
    import random
    
    def test_observable(observer, scheduler):
    
       observer.on_next(random.random())
    
       observer.on_completed()
    
    source = create(test_observable).pipe(op.publish())
    
    test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
    
    test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 –
    
    {0}".format(i)))
    
    source.connect()
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    From subscriber 1 - 0.14751607273318490
    
    From subscriber 2 - 0.1475160727331849
    
    
  • ref_count

    这个操作符将使 observable 成为一个普通的 observable。

    句法

    
    
    ref_count()
    
    

    例子

    
    
    from rx import create, operators as op
    
    import random
    
    def test_observable(observer, scheduler):
    
       observer.on_next(random.random())
    
    source = create(test_observable).pipe(op.publish(),op.ref_count())
    
    test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
    
    test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    From subscriber 1 - 0.8230640432381131
    
    
  • replay

    此方法的工作方式类似于 replaySubject。这个方法将返回相同的值,即使 observable 已经发出,并且一些订阅者订阅晚了。

    句法

    
    
    replay()
    
    

    例子

    
    
    from rx import create, range, operators as op
    
    import random
    
    from threading import Timer
    
    def test_observable(observer, scheduler):
    
       observer.on_next(random.random())
    
       observer.on_completed()
    
    source = create(test_observable).pipe(op.replay())
    
    test1 = source.subscribe(on_next = lambda i: print("From subscriber 1 - {0}".format(i)))
    
    test2 = source.subscribe(on_next = lambda i: print("From subscriber 2 - {0}".format(i)))
    
    source.connect()
    
    print("subscriber called after delay ")
    
    def last_subscriber():
    
       test3 = source.subscribe(on_next = lambda i: print("From subscriber 3 - {0}".format(i)))
    
    t = Timer(5.0, last_subscriber)
    
    t.start()
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    From subscriber 1 - 0.8340998157725388
    
    From subscriber 2 - 0.8340998157725388
    
    subscriber called after delay
    
    From subscriber 3 - 0.8340998157725388