RxPY - 转换运算符

  • 简述

  • 缓冲

    该操作符将从可观察源收集所有值,并在满足给定边界条件后定期发出它们。

    句法

    
    
    buffer(boundaries)
    
    

    参数

    boundaries:输入是可观察的,它将决定何时停止,以便发出收集的值。

    返回值

    返回值是可观察的,它将具有从源可观察对象收集的所有值,即持续时间由所采用的输入可观察对象决定。

    例子

    
    
    from rx import of, interval, operators as op
    
    from datetime import date
    
    test = of(1, 2,3,4,5,6,7,8,9,10)
    
    sub1 = test.pipe(
    
       op.buffer(interval(1.0))
    
    )
    
    sub1.subscribe(lambda x: print("The element is {0}".format(x)))
    
    

    输出

    
    
    E:\pyrx>python test1.py
    
    The elements are [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    
  • ground_by

    该运算符将根据给定的 key_mapper 函数对来自源 observable 的值进行分组。

    句法

    
    
    group_by(key_mapper)
    
    

    参数

    key_mapper:这个函数将负责从源 observable 中提取键。

    返回值

    它返回一个 observable,其值基于 key_mapper 函数分组。

    例子

    
    
    from rx import from_, interval, operators as op
    
    test = from_(["A", "B", "C", "D"])
    
    sub1 = test.pipe(
    
       op.group_by(lambda v: v[0])
    
    )
    
    sub1.subscribe(lambda x: print("The element is {0}".format(x)))
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The element is <rx.core.observable.groupedobservable.GroupedObservable object
    
    at
    
     0x000000C99A2E6550>
    
    The element is <rx.core.observable.groupedobservable.GroupedObservable object at
    
     0x000000C99A2E65C0>
    
    The element is <rx.core.observable.groupedobservable.GroupedObservable object at
    
     0x000000C99A2E6588>
    
    The element is <rx.core.observable.groupedobservable.GroupedObservable object at
    
     0x000000C99A2E6550>
    
    
  • map

    该运算符将根据给定的 mapper_func 的输出将源 observable 中的每个值更改为新值。

    句法

    
    
    map(mapper_func:None)
    
    

    参数

    mapper_func:(可选)它将根据来自此函数的输出更改源 observable 的值。

    例子

    
    
    from rx import of, interval, operators as op
    
    test = of(1, 2,3,4,5,6,7,8,9,10)
    
    sub1 = test.pipe(
    
       op.map(lambda x :x*x)
    
    )
    
    sub1.subscribe(lambda x: print("The element is {0}".format(x)))
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The element is 1
    
    The element is 4
    
    The element is 9
    
    The element is 16
    
    The element is 25
    
    The element is 36
    
    The element is 49
    
    The element is 64
    
    The element is 81
    
    The element is 100
    
    
  • scan

    该运算符将对来自源 observable 的值应用一个累加器函数,并返回一个带有新值的 observable。

    句法

    
    
    scan(accumulator_func, seed=NotSet)
    
    

    参数

    accumulator_func:此函数应用于源 observable 中的所有值。
    seed:(可选)在 accumular_func 中使用的初始值。

    返回值

    该运算符将返回一个可观察对象,该可观察对象将基于应用于源可观察对象的每个值的累加器函数具有新值。

    例子

    
    
    from rx import of, interval, operators as op
    
    test = of(1, 2,3,4,5,6,7,8,9,10)
    
    sub1 = test.pipe(
    
       op.scan(lambda acc, a: acc + a, 0))
    
    sub1.subscribe(lambda x: print("The element is {0}".format(x)))
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    The element is 1
    
    The element is 3
    
    The element is 6
    
    The element is 10
    
    The element is 15
    
    The element is 21
    
    The element is 28
    
    The element is 36
    
    The element is 45
    
    The element is 55