Hadoop MapReduce

  • MapReduce

    MapReduce 是一个框架,通过它我们可以编写应用程序以可靠的方式在大型商用硬件集群上并行处理大量数据。
  • 什么是MapReduce?

    MapReduce是基于Java的分布式计算的处理技术和程序模型。MapReduce算法包含两个重要任务,即Map和Reduce。Map获取一组数据,并将其转换为另一组数据,其中各个元素分解为元组(键/值对)。其次,reduce任务,它将地图的输出作为输入并将这些数据元组合并为较小的元组集。就像名称MapReduce的顺序所暗示的那样,reduce任务总是在映射作业之后执行。
    MapReduce的主要优点在于,它易于在多个计算节点上扩展数据处理规模。在MapReduce模型下,数据处理原语称为映射器和简化器。分解的数据处理应用到映射器和减速器有时平凡的。但是,一旦我们以MapReduce形式编写了应用程序,就可以将应用程序扩展为在集群中的数百台,数千台甚至数万台计算机上运行,​​这仅仅是配置更改。这种简单的可伸缩性吸引了许多程序员使用MapReduce模型。
  • 算法

    • 通常,MapReduce范例基于将计算机发送到数据所在的位置!
    • MapReduce程序分三个阶段执行,即地图阶段,随机播放阶段和缩小阶段。
      • Map阶段-Map或Mapper的工作是处理输入数据。通常,输入数据采用文件或目录的形式,并存储在Hadoop文件系统(HDFS)中。输入文件逐行传递到映射器功能。映射器处理数据并创建几个小数据块。
      • Reduce阶段-此阶段是Shuffle阶段和Reduce阶段的组合。Reducer的工作是处理来自映射器的数据。处理后,它将产生一组新的输出,这些输出将存储在HDFS中。
    • 在MapReduce作业期间,Hadoop将Map和Reduce任务发送到集群中的相应服务器。
    • 该框架管理数据传递的所有细节,例如发布任务,验证任务完成以及在节点之间的集群周围复制数据。
    • 大多数计算都在本地磁盘上有数据的节点上进行,从而减少了网络流量。
    • 完成给定任务后,集群将收集并减少数据以形成适当的结果,然后将其发送回Hadoop服务器。
    mapreduce
  • 输入和输出(Java透视图)

    MapReduce框架对<key,value>对操作,即,该框架将作业的输入视为一组<key,value>对,并生成一组<key,value>对作为作业的输出,可能是不同类型的。键和值类应由框架以串行方式进行,因此需要实现Writable接口。此外,关键类必须实现Writable-Comparable接口,以利于框架排序。MapReduce作业的输入和输出类型-(输入)<k1,v1>→Map→<k2,v2>→reduce→<k3,v3>(输出)。
      输入 输出
    Map <k1, v1> list (<k2, v2>)
    Reduce <k2, list(v2)> list (<k3, v3>)
    术语
      PayLoad−应用程序实现Map和Reduce函数,并构成工作的核心。 Mapper− 映射器将输入键/值对映射到一组中间键/值对。 NamedNode− 管理Hadoop分布式文件系统(HDFS)的节点。 DataNode− 在进行任何处理之前预先呈现数据的节点。 MasterNode− JobTracker运行所在的节点,它接受来自客户端的作业请求。 SlaveNode− 运行Map and Reduce程序的节点。 JobTracker− 计划作业,并将分配的作业跟踪到任务跟踪器。 Task Tracker−跟踪任务并将状态报告给JobTracker。 Job− 程序是跨数据集的Mapper和Reducer的执行。 Task− 在数据切片上执行Mapper或Reducer。 TaskAttempt − 尝试在SlaveNode上执行任务的特定实例。
  • 示例场景

    以下是有关组织的电力消耗的数据。它包含每月的用电量和各年的年平均值。
      Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec Avg
    1979 23 23 2 43 24 25 26 26 26 26 25 26 25
    1980 26 27 28 28 28 30 31 31 31 30 30 30 29
    1981 31 32 32 32 33 34 35 36 36 34 34 34 34
    1984 39 38 39 39 39 41 42 43 40 39 38 38 40
    1985 38 39 39 39 39 41 41 41 00 40 39 39 45
    如果将上述数据作为输入,则我们必须编写应用程序对其进行处理并产生结果,例如查找最大使用年份,最小使用年份等。对于记录数量有限的程序员来说,这是一个过渡。他们将简单地编写逻辑以产生所需的输出,并将数据传递给编写的应用程序。
    但是,请考虑代表该州成立以来所有大型行业的用电量的数据。
    当我们编写应用程序来处理此类批量数据时,
    • 他们将花费很多时间来执行。
    • 当我们将数据从源移动到网络服务器等时,网络流量将会很大。
    为了解决这些问题,我们有MapReduce框架。
    输入数据
    以上数据另存为sample.txt,并作为输入提供。输入文件如下所示。
    
    1979   23   23   2   43   24   25   26   26   26   26   25   26  25 
    1980   26   27   28  28   28   30   31   31   31   30   30   30  29 
    1981   31   32   32  32   33   34   35   36   36   34   34   34  34 
    1984   39   38   39  39   39   41   42   43   40   39   38   38  40 
    1985   38   39   39  39   39   41   41   41   00   40   39   39  45 
    
    范例程序
    下面给出的是使用MapReduce框架的示例数据程序。
    
    package hadoop; 
    
    import java.util.*; 
    
    import java.io.IOException; 
    import java.io.IOException; 
    
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.conf.*; 
    import org.apache.hadoop.io.*; 
    import org.apache.hadoop.mapred.*; 
    import org.apache.hadoop.util.*; 
    
    public class ProcessUnits {
       //Mapper class 
       public static class E_EMapper extends MapReduceBase implements 
       Mapper<LongWritable ,/*Input key Type */ 
       Text,                /*Input value Type*/ 
       Text,                /*Output key Type*/ 
       IntWritable>        /*Output value Type*/ 
       {
          //Map function 
          public void map(LongWritable key, Text value, 
          OutputCollector<Text, IntWritable> output,   
          
          Reporter reporter) throws IOException { 
             String line = value.toString(); 
             String lasttoken = null; 
             StringTokenizer s = new StringTokenizer(line,"\t"); 
             String year = s.nextToken(); 
             
             while(s.hasMoreTokens()) {
                lasttoken = s.nextToken();
             }
             int avgprice = Integer.parseInt(lasttoken); 
             output.collect(new Text(year), new IntWritable(avgprice)); 
          } 
       }
       
       //Reducer class 
       public static class E_EReduce extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable > {
       
          //Reduce function 
          public void reduce( Text key, Iterator <IntWritable> values, 
          OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { 
             int maxavg = 30; 
             int val = Integer.MIN_VALUE; 
                
             while (values.hasNext()) { 
                if((val = values.next().get())>maxavg) { 
                   output.collect(key, new IntWritable(val)); 
                } 
             }
          } 
       }
    
       //Main function 
       public static void main(String args[])throws Exception { 
          JobConf conf = new JobConf(ProcessUnits.class); 
          
          conf.setJobName("max_eletricityunits"); 
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class); 
          conf.setMapperClass(E_EMapper.class); 
          conf.setCombinerClass(E_EReduce.class); 
          conf.setReducerClass(E_EReduce.class); 
          conf.setInputFormat(TextInputFormat.class); 
          conf.setOutputFormat(TextOutputFormat.class); 
          
          FileInputFormat.setInputPaths(conf, new Path(args[0])); 
          FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
          
          JobClient.runJob(conf); 
       } 
    } 
    
    将上述程序另存为ProcessUnits.java。程序的编译和执行如下所述。
  • 加工单元程序的编译和执行

    让我们假设我们位于Hadoop用户的主目录中(例如/home/hadoop)。
    请按照下面给出的步骤来编译和执行上述程序。
    步骤1
    以下命令将创建一个目录来存储已编译的Java类。
    
    $ mkdir units 
    
    步骤2
    下载Hadoop-core-1.2.1.jar,它用于编译和执行MapReduce程序。访问以下链接mvnrepository.com以下载jar。让我们假设下载的文件夹是/home/hadoop/。
    步骤3
    以下命令用于编译ProcessUnits.java程序并为该程序创建jar。
    
    $ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
    $ jar -cvf units.jar -C units/ .
    
    步骤4
    以下命令用于在HDFS中创建输入目录。
    
    $HADOOP_HOME/bin/hadoop fs -mkdir input_dir 
    
    步骤5
    以下命令用于在HDFS的输入目录中复制名为sample.txt的输入文件。
    
    $HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 
    
    步骤6
    以下命令用于验证输入目录中的文件。
    
    $HADOOP_HOME/bin/hadoop fs -ls input_dir/ 
    
    步骤7
    以下命令用于通过从输入目录获取输入文件来运行Eleunit_max应用程序。
    
    $HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 
    
    等待一段时间,直到文件执行完毕。执行后,如下所示,输出将包含输入分割数,Map任务数,reduce任务数等。
    
    INFO mapreduce.Job: Job job_1414748220717_0002 
    completed successfully 
    14/10/31 06:02:52 
    INFO mapreduce.Job: Counters: 49 
       File System Counters 
     
    FILE: Number of bytes read = 61 
    FILE: Number of bytes written = 279400 
    FILE: Number of read operations = 0 
    FILE: Number of large read operations = 0   
    FILE: Number of write operations = 0 
    HDFS: Number of bytes read = 546 
    HDFS: Number of bytes written = 40 
    HDFS: Number of read operations = 9 
    HDFS: Number of large read operations = 0 
    HDFS: Number of write operations = 2 Job Counters 
    
    
       Launched map tasks = 2  
       Launched reduce tasks = 1 
       Data-local map tasks = 2  
       Total time spent by all maps in occupied slots (ms) = 146137 
       Total time spent by all reduces in occupied slots (ms) = 441   
       Total time spent by all map tasks (ms) = 14613 
       Total time spent by all reduce tasks (ms) = 44120 
       Total vcore-seconds taken by all map tasks = 146137 
       Total vcore-seconds taken by all reduce tasks = 44120 
       Total megabyte-seconds taken by all map tasks = 149644288 
       Total megabyte-seconds taken by all reduce tasks = 45178880 
       
    Map-Reduce Framework 
     
       Map input records = 5  
       Map output records = 5   
       Map output bytes = 45  
       Map output materialized bytes = 67  
       Input split bytes = 208 
       Combine input records = 5  
       Combine output records = 5 
       Reduce input groups = 5  
       Reduce shuffle bytes = 6  
       Reduce input records = 5  
       Reduce output records = 5  
       Spilled Records = 10  
       Shuffled Maps  = 2  
       Failed Shuffles = 0  
       Merged Map outputs = 2  
       GC time elapsed (ms) = 948  
       CPU time spent (ms) = 5160  
       Physical memory (bytes) snapshot = 47749120  
       Virtual memory (bytes) snapshot = 2899349504  
       Total committed heap usage (bytes) = 277684224
         
    File Output Format Counters 
     
       Bytes Written = 40 
    
    步骤8
    以下命令用于验证输出文件夹中的结果文件。
    
    $HADOOP_HOME/bin/hadoop fs -ls output_dir/ 
    
    步骤9
    以下命令用于查看Part-00000文件中的输出。该文件由HDFS生成。
    
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 
    
    以下是MapReduce程序生成的输出。
    
    1981    34 
    1984    40 
    1985    45 
    
    步骤10
    以下命令用于将输出文件夹从HDFS复制到本地文件系统以进行分析。
    
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 
    
  • 重要命令

    所有Hadoop命令均由$HADOOP_HOME/bin/hadoop命令调用。在不带任何参数的情况下运行Hadoop脚本会打印所有命令的描述。
    用法-hadoop [--config confdir]命令
    下表列出了可用的选项及其说明。
    选项 描述
    namenode -format 格式化DFS文件系统。
    secondarynamenode 运行DFS辅助名称节点。
    namenode 运行DFS名称节点。
    datanode 运行一个DFS数据节点。
    dfsadmin 运行DFS管理客户端。
    mradmin 运行Map-Reduce管理客户端。
    fsck 运行DFS文件系统检查实用程序。
    fs 运行通用文件系统用户客户端。
    balancer 运行群集平衡实用程序。
    oiv 将脱机fsimage查看器应用于fsimage。
    fetchdt 从NameNode获取委托令牌。
    jobtracker 运行MapReduce作业跟踪程序节点。
    pipes 运行管道作业。
    tasktracker 运行一个MapReduce任务跟踪器节点。
    historyserver 将作业历史记录服务器作为独立的守护程序运行。
    job 处理MapReduce作业。
    queue 获取有关JobQueues的信息。
    version 打印版本。
    jar <jar> 运行一个jar文件。
    distcp <srcurl> <desturl> 递归复制文件或目录。
    distcp2 <srcurl> <desturl> DistCp版本2。
    archive -archiveName NAME -p <parent path> <src>* <dest> 创建一个hadoop存档。
    classpath 打印获取Hadoop jar和所需库所需的类路径。
    daemonlog 获取/设置每个守护程序的日志级别
  • 如何与MapReduce作业进行交互

    用法 -Hadoop job [GENERIC_OPTIONS]
    以下是Hadoop作业中可用的通用选项。
    选项 描述
    -submit <job-file> 提交工作。
    -status <job-id> 打印地图并减少完成百分比和所有作业计数器。
    -counter <job-id> <group-name> <countername> 打印计数器值。
    -kill <job-id> 杀死工作。
    -events <job-id> <fromevent-#> <#-of-events> 打印给定范围的作业跟踪程序收到的事件的详细信息。
    -history [all] <jobOutputDir> - history < jobOutputDir> 打印作业详细信息,失败和终止的提示详细信息。 通过指定[all]选项,可以查看有关作业的更多详细信息,例如成功的任务和为每个任务进行的任务尝试。
    -list[all] 显示所有作业。 -list仅显示尚未完成的作业
    -kill-task <任务ID> 终止任务。 杀死的任务不计入失败的尝试。
    -fail-task <task-id> 任务失败。 失败的任务计入失败的尝试。
    -set-priority <job-id> <priority> 更改作业的优先级。 允许的优先级值为VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW
    查看工作状态
    
    $ $HADOOP_HOME/bin/hadoop job -status <JOB-ID> 
    e.g. 
    $ $HADOOP_HOME/bin/hadoop job -status job_201310191043_0004 
    
    查看作业输出目录的历史记录
    
    $ $HADOOP_HOME/bin/hadoop job -history <DIR-NAME> 
    e.g. 
    $ $HADOOP_HOME/bin/hadoop job -history /user/expert/output 
    
    杀死工作
    
    $ $HADOOP_HOME/bin/hadoop job -kill <JOB-ID> 
    e.g. 
    $ $HADOOP_HOME/bin/hadoop job -kill job_201310191043_0004