Flume 获取 Twitter 数据



  • 配置

    使用Flume,我们可以从各种服务中获取数据并将其传输到集中存储(HDFS和HBase)。本章介绍如何使用Apache Flume从Twitter服务中获取数据并将其存储在HDFS中。
    如Flume Architecture中所述,Web服务器生成日志数据,并且该数据由Flume中的代理收集。通道将该数据缓冲到接收器,接收器最后将其推送到集中存储。
    在本章提供的示例中,我们将创建一个应用程序,并使用Apache Flume提供的实验性Twitter源从其获取推文。我们将使用内存通道来缓冲这些推文,并使用HDFS接收器将这些推文推入HDFS。
    flume
    要获取Twitter数据,我们将必须遵循以下步骤-
    • 创建一个Twitter应用程序
    • 安装/启动HDFS
    • 配置Flume
  • 创建一个Twitter应用程序

    为了从Twitter获得推文,需要创建一个Twitter应用程序。请按照下面给出的步骤创建一个Twitter应用程序。
    第1步 要创建Twitter应用程序,请单击以下链接https://apps.twitter.com/。登录到您的Twitter帐户。您将拥有一个Twitter应用程序管理窗口,您可以在其中创建,删除和管理Twitter应用程序。
    flume
    第2步 单击创建新应用程序按钮。您将被重定向到一个窗口,您将在其中获得一个申请表,您必须在其中填写详细信息才能创建该应用程序。填写网站地址时,请提供完整的URL模式,例如http://example.com。
    flume
    第3步 填写详细信息,完成后接受开发者协议,单击页面底部的“创建您的Twitter应用程序”按钮。如果一切顺利,将使用给定的详细信息创建一个应用,如下所示。
    flume
    第4步 在页面底部的“key”和“token”标签下,您会看到一个名为创建我的访问令牌的按钮。单击它以生成访问令牌。
    flume
    第5步 最后,点击页面右侧顶部的Test OAuth按钮。这将导致一个页面,显示您的使用者密钥,使用者密钥,访问令牌和访问令牌密钥。复制这些详细信息。这些对于在Flume中配置代理很有用。
    flume
  • 启动 HDFS

    由于我们将数据存储在HDFS中,因此我们需要安装/验证Hadoop。启动Hadoop并在其中创建一个文件夹来存储Flume数据。在配置Flume之前,请执行以下步骤。
    步骤1:安装/验证Hadoop
    安装Hadoop。如果您的系统中已经安装了Hadoop,请使用Hadoop version命令验证安装,如下所示。
    
    $ hadoop version 
    
    如果您的系统包含Hadoop,并且已经设置了PATH变量,那么将获得以下输出-
    
    Hadoop 3.0.0 
    Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r 
    e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1 
    Compiled by jenkins on 2014-11-13T21:10Z 
    Compiled with protoc 2.5.0 
    From source with checksum 18e43357c8f927c0695f1e9522859d6a 
    This command was run using /home/Hadoop/hadoop/share/hadoop/common/hadoop-common-3.0.0.jar
    
    步骤2:启动Hadoop
    浏览Hadoop的sbin目录,然后启动yarn和Hadoop dfs(分布式文件系统),如下所示。
    
    cd /$Hadoop_Home/sbin/ 
    $ start-dfs.sh 
    localhost: starting namenode, logging to
       /home/Hadoop/hadoop/logs/hadoop-Hadoop-namenode-localhost.localdomain.out 
    localhost: starting datanode, logging to 
       /home/Hadoop/hadoop/logs/hadoop-Hadoop-datanode-localhost.localdomain.out 
    Starting secondary namenodes [0.0.0.0] 
    starting secondarynamenode, logging to 
       /home/Hadoop/hadoop/logs/hadoop-Hadoop-secondarynamenode-localhost.localdomain.out
    
      
    $ start-yarn.sh 
    starting yarn daemons 
    starting resourcemanager, logging to 
       /home/Hadoop/hadoop/logs/yarn-Hadoop-resourcemanager-localhost.localdomain.out 
    localhost: starting nodemanager, logging to 
       /home/Hadoop/hadoop/logs/yarn-Hadoop-nodemanager-localhost.localdomain.out 
    
    /
    步骤3:在HDFS中创建目录
    在Hadoop DFS中,您可以使用命令mkdir创建目录。浏览它,并在所需路径中创建一个名为twitter_data的目录,如下所示。
    
    $cd /$Hadoop_Home/bin/ 
    $ hdfs dfs -mkdir hdfs://localhost:9000/user/Hadoop/twitter_data 
    
  • 配置Flume

    我们必须使用conf文件夹中的配置文件配置源,通道和接收器。本章中给出的示例使用由Apache Flume提供的名为Twitter 1%Firehose Memory通道和HDFS接收器的实验源。
    Twitter 1%Firehose来源
    该来源是高度实验性的。它使用流API连接到1%的示例Twitter Firehose,并连续下载tweet,将其转换为Avro格式,并将Avro事件发送到下游Flume Sink。
    默认情况下,我们将在安装Flume时获得此源。与此源相对应的jar文件可以位于lib文件夹中,如下所示。
    flume
    设置CLASSPATH
    将classpath变量设置为Flume-env.sh文件中Flume的lib文件夹,如下所示。
    
    export CLASSPATH=$CLASSPATH:/FLUME_HOME/lib/* 
    
    此源需要诸如Twitter应用程序的使用者密钥,使用者密钥,访问令牌和访问令牌密钥之类的详细信息。配置此源时,必须为以下属性提供值-
    • Channels
    • Source type:org.apache.flume.source.twitter.TwitterSource
    • ConsumerKey - OAuth使用者密钥
    • ConsumerSecret - OAuth消费者机密
    • accessToken - OAuth访问令牌
    • accessTokenSecret - OAuth令牌密钥
    • maxBatchSize - 应该在一个Twitter批处理中的最大Twitter消息数。默认值为1000(可选)。
    • maxBatchDurationMillis - 关闭批次之前要等待的最大毫秒数。默认值为1000(可选)。
    渠道(channel)
    我们正在使用内存(memory)通道。要配置内存通道,必须为通道类型提供值。
    • type- 它保存通道的类型。在我们的示例中,类型为MemChannel。
    • Capacity - 这是通道中存储的最大事件数。其默认值为100(可选)。
    • TransactionCapacity - 它是通道接受或发送的最大事件数。其默认值为100(可选)。
    HDFS接收器
    该接收器将数据写入HDFS。要配置此接收器,必须提供以下详细信息。
    • Channel
    • type - HDFS
    • hdfs.path - HDFS中要存储数据的目录的路径。
    我们可以根据情况提供一些可选值。下面给出的是我们在应用程序中配置的HDFS接收器的可选属性。
    • fileType-这是我们的HDFS文件的必需文件格式。SequenceFile,DataStream和CompressedStream是此流可用的三种类型。在我们的示例中,我们正在使用DataStream。
    • writeFormat-可以是文本或可写的。
    • batchSize-它是刷新到HDFS之前写入文件的事件数。默认值为100。
    • rollsize-这是触发滚动的文件大小。默认值为100。
    • rollCount-它是滚动之前写入文件的事件数。默认值为10。
  • 示例–配置文件

    下面给出的是配置文件的示例。复制此内容并将其另存为Flume的conf文件夹中的twitter.conf。
    
    # Naming the components on the current agent. 
    TwitterAgent.sources = Twitter 
    TwitterAgent.channels = MemChannel 
    TwitterAgent.sinks = HDFS
      
    # Describing/Configuring the source 
    TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
    TwitterAgent.sources.Twitter.consumerKey = Your OAuth consumer key
    TwitterAgent.sources.Twitter.consumerSecret = Your OAuth consumer secret 
    TwitterAgent.sources.Twitter.accessToken = Your OAuth consumer key access token 
    TwitterAgent.sources.Twitter.accessTokenSecret = Your OAuth consumer key access token secret 
    TwitterAgent.sources.Twitter.keywords = tutorials point,java, bigdata, mapreduce, mahout, hbase, nosql
      
    # Describing/Configuring the sink 
    
    TwitterAgent.sinks.HDFS.type = hdfs 
    TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/user/Hadoop/twitter_data/
    TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
    TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
    TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
    TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
    TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
     
    # Describing/Configuring the channel 
    TwitterAgent.channels.MemChannel.type = memory 
    TwitterAgent.channels.MemChannel.capacity = 10000 
    TwitterAgent.channels.MemChannel.transactionCapacity = 100
      
    # Binding the source and sink to the channel 
    TwitterAgent.sources.Twitter.channels = MemChannel
    TwitterAgent.sinks.HDFS.channel = MemChannel 
    
  • 执行

    浏览Flume主目录并执行应用程序,如下所示。
    
    $ cd $FLUME_HOME 
    $ bin/flume-ng agent --conf ./conf/ -f conf/twitter.conf  Dflume.root.logger=DEBUG,console -n TwitterAgent
    
    如果一切顺利,则将开始将推文流式传输到HDFS中。下面给出的是获取推文时命令提示符窗口的快照。
    flume
  • 验证HDFS

    您可以使用下面提供的URL访问Hadoop管理Web UI。
    
    http://localhost:9870/ 
    
    单击页面右侧名为Utilities的下拉菜单。您可以看到两个选项,如下面的快照所示。
    flume
    单击浏览文件系统,然后输入存储推文的HDFS目录的路径。在我们的示例中,路径为/user/Hadoop/twitter_data/。然后,您可以看到存储在HDFS中的Twitter日志文件列表,如下所示。
    flume