RxPY - 使用调度程序的并发

  • 简述

    RxPy 的一个重要特性是并发,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和 observe_on() 将与调度程序一起工作,这将决定订阅任务的执行。
    这是一个工作示例,显示了对 subscibe_on()、observe_on() 和调度程序的需求。

    例子

    
    
    import random
    
    import time
    
    import rx
    
    from rx import operators as ops
    
    def adding_delay(value):
    
       time.sleep(random.randint(5, 20) * 0.1)
    
       return value
    
    # Task 1
    
    rx.of(1,2,3,4,5).pipe(
    
       ops.map(lambda a: adding_delay(a))
    
    ).subscribe(
    
       lambda s: print("From Task 1: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 1 complete")
    
    )
    
    # Task 2
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a))
    
    ).subscribe(
    
       lambda s: print("From Task 2: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 2 complete")
    
    ) 
    
    input("Press any key to exit\n")
    
    
    在上面的例子中,我有 2 个任务:任务 1 和任务 2。任务的执行是按顺序执行的。第二个任务只有在第一个任务完成后才开始。

    输出

    
    
    E:\pyrx>python testrx.py
    
    From Task 1: 1
    
    From Task 1: 2
    
    From Task 1: 3
    
    From Task 1: 4
    
    From Task 1: 5
    
    Task 1 complete
    
    From Task 2: 1
    
    From Task 2: 2
    
    From Task 2: 3
    
    From Task 2: 4
    
    Task 2 complete
    
    
    RxPy 支持很多 Scheduler,这里我们将使用 ThreadPoolScheduler。ThreadPoolScheduler 主要会尝试管理可用的 CPU 线程。
    在我们之前看到的示例中,我们将使用一个多处理模块,该模块将为我们提供 cpu_count。计数将提供给 ThreadPoolScheduler,它将根据可用线程设法使任务并行工作。
    这是一个工作示例 -
    
    
    import multiprocessing
    
    import random
    
    import time
    
    from threading import current_thread
    
    import rx
    
    from rx.scheduler import ThreadPoolScheduler
    
    from rx import operators as ops
    
    # calculate cpu count, using which will create a ThreadPoolScheduler
    
    thread_count = multiprocessing.cpu_count()
    
    thread_pool_scheduler = ThreadPoolScheduler(thread_count)
    
    print("Cpu count is : {0}".format(thread_count))
    
    def adding_delay(value):
    
       time.sleep(random.randint(5, 20) * 0.1)
    
       return value
    
    # Task 1
    
    rx.of(1,2,3,4,5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 1: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 1 complete")
    
    )
    
    # Task 2
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 2: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 2 complete")
    
    )
    
    input("Press any key to exit\n")
    
    
    在上面的例子中,我有 2 个任务,cpu_count 是 4。因为任务是 2,我们可用的线程是 4,所以这两个任务可以并行启动。

    输出

    
    
    E:\pyrx>python testrx.py
    
    Cpu count is : 4
    
    Press any key to exit
    
    From Task 1: 1
    
    From Task 2: 1
    
    From Task 1: 2
    
    From Task 2: 2
    
    From Task 2: 3
    
    From Task 1: 3
    
    From Task 2: 4
    
    Task 2 complete
    
    From Task 1: 4
    
    From Task 1: 5
    
    Task 1 complete
    
    
    如果您看到输出,则这两个任务已并行启动。
    现在,考虑一个场景,其中任务超过 CPU 计数,即 CPU 计数为 4,任务为 5。在这种情况下,我们需要检查任务完成后是否有任何线程空闲,因此,它可以是分配给队列中可用的新任务。
    为此,我们可以使用 observe_on() 操作符,它会观察调度器是否有空闲线程。这是一个使用 observe_on() 的工作示例

    例子

    
    
    import multiprocessing
    
    import random
    
    import time
    
    from threading import current_thread
    
    import rx
    
    from rx.scheduler import ThreadPoolScheduler
    
    from rx import operators as ops
    
    # calculate cpu count, using which will create a ThreadPoolScheduler
    
    thread_count = multiprocessing.cpu_count()
    
    thread_pool_scheduler = ThreadPoolScheduler(thread_count)
    
    print("Cpu count is : {0}".format(thread_count))
    
    def adding_delay(value):
    
       time.sleep(random.randint(5, 20) * 0.1)
    
       return value
    
    # Task 1
    
    rx.of(1,2,3,4,5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 1: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 1 complete")
    
    )
    
    # Task 2
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 2: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 2 complete")
    
    )
    
    #Task 3
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 3: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 3 complete")
    
    )
    
    #Task 4
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.subscribe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 4: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 4 complete")
    
    )
    
    #Task 5
    
    rx.range(1, 5).pipe(
    
       ops.map(lambda a: adding_delay(a)),
    
       ops.observe_on(thread_pool_scheduler)
    
    ).subscribe(
    
       lambda s: print("From Task 5: {0}".format(s)),
    
       lambda e: print(e),
    
       lambda: print("Task 5 complete")
    
    )
    
    input("Press any key to exit\n")
    
    

    输出

    
    
    E:\pyrx>python testrx.py
    
    Cpu count is : 4
    
    From Task 4: 1
    
    From Task 4: 2
    
    From Task 1: 1
    
    From Task 2: 1
    
    From Task 3: 1
    
    From Task 1: 2
    
    From Task 3: 2
    
    From Task 4: 3
    
    From Task 3: 3
    
    From Task 2: 2
    
    From Task 1: 3
    
    From Task 4: 4
    
    Task 4 complete
    
    From Task 5: 1
    
    From Task 5: 2
    
    From Task 5: 3
    
    From Task 3: 4
    
    Task 3 complete
    
    From Task 2: 3
    
    Press any key to exit
    
    From Task 5: 4
    
    Task 5 complete
    
    From Task 1: 4
    
    From Task 2: 4
    
    Task 2 complete
    
    From Task 1: 5
    
    Task 1 complete
    
    
    如果您看到输出,则在任务 4 完成的那一刻,线程被分配给下一个任务,即任务 5,并且同样开始执行。