PySpark - 广播和累加器

  • 简述

    对于并行处理,Apache Spark 使用共享变量。当驱动程序向集群上的执行程序发送任务时,集群的每个节点上都会有一份共享变量的副本,以便它可以用于执行任务。
    Apache Spark 支持两种类型的共享变量 -
    • 广播
    • 累加器
    让我们详细了解它们。
  • 广播

    广播变量用于保存跨所有节点的数据副本。这个变量缓存在所有机器上,而不是在有任务的机器上发送。以下代码块包含 PySpark 的广播类的详细信息。
    
    class pyspark.Broadcast (
       sc = None, 
       value = None, 
       pickle_registry = None, 
       path = None
    )
    
    以下示例显示了如何使用广播变量。广播变量有一个名为 value 的属性,它存储数据并用于返回广播值。
    ----------------------------------------broadcast.py--------------------------------------
    from pyspark import SparkContext 
    sc = SparkContext("local", "Broadcast app") 
    words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"]) 
    data = words_new.value 
    print "Stored data -> %s" % (data) 
    elem = words_new.value[2] 
    print "Printing a particular element in RDD -> %s" % (elem)
    ----------------------------------------broadcast.py--------------------------------------
    
    命令- 广播变量的命令如下 -
    
    $SPARK_HOME/bin/spark-submit broadcast.py
    
    输出- 以下命令的输出如下所示。
    
    Stored data -> [
       'scala',  
       'java', 
       'hadoop', 
       'spark', 
       'akka'
    ]
    Printing a particular element in RDD -> hadoop
    
  • 累加器

    累加器变量用于通过关联和交换操作聚合信息。例如,您可以将累加器用于求和运算或计数器(在 MapReduce 中)。以下代码块包含 PySpark 的 Accumulator 类的详细信息。
    
    class pyspark.Accumulator(aid, value, accum_param)
    
    以下示例显示了如何使用累加器变量。累加器变量有一个名为 value 的属性,类似于广播变量的属性。它存储数据并用于返回累加器的值,但只能在驱动程序中使用。
    在这个例子中,一个累加器变量被多个工作人员使用并返回一个累加值。
    
    ----------------------------------------accumulator.py------------------------------------
    from pyspark import SparkContext 
    sc = SparkContext("local", "Accumulator app") 
    num = sc.accumulator(10) 
    def f(x): 
       global num 
       num+=x 
    rdd = sc.parallelize([20,30,40,50]) 
    rdd.foreach(f) 
    final = num.value 
    print "Accumulated value is -> %i" % (final)
    ----------------------------------------accumulator.py------------------------------------
    
    命令- 累加器变量的命令如下 -
    
    $SPARK_HOME/bin/spark-submit accumulator.py
    
    输出- 上述命令的输出如下所示。
    
    Accumulated value is -> 150