Kafka 简单生产者/消费者示例

  • 简单生产者示例

    让我们创建一个使用Java客户端发布和使用消息的应用程序。Kafka生产者客户端包含以下API。
  • KafkaProducer API

    让我们了解本节中最重要的KafkaProducer API集。KafkaProducer API的核心部分是KafkaProducer类。KafkaProducer类提供了使用以下方法连接其构造函数中的Kafka代理的选项。
    • KafkaProducer类提供了send方法,以将消息异步发送到主题(Topic)。send()的用法如下
    • 
      KafkaProducer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
      
    • ProducerRecord - 生产者管理等待发送记录的缓冲区。
    • callback - 用户提供的回调,在服务器确认记录后执行(空表示没有回调)。
    • KafkaProducer类提供了flush方法,以确保所有先前发送的消息均已实际完成。flush方法的语法如下-
    • 
      public void flush()
      
    • KafkaProducer类提供了partitionFor方法,该方法有助于获取给定主题的分区元数据。这可以用于自定义分区。该方法的如下-
    • 
      public Map metrics()  // 它返回生产者维护的内部指标图。
      
    • public void close() - KafkaProducer类提供close方法块,直到所有先前发送的请求完成为止。
  • Producer API

    Producer API的核心部分是Producer类。Producer类提供了一种通过以下方法在其构造函数中连接Kafka 代理(Broker)的选项。
    Producer 类
    Producer类提供了使用以下方法签名将消息发送到单个或多个主题的send方法。
    
    public void send(KeyedMessaget<k,v> message) // 将数据发送到单个主题,并使用同步或异步生成器按key进行分配。
    public void send(List<KeyedMessage<k,v>>messages)  //将数据发送到多个主题。
    Properties prop = new Properties();
    prop.put(producer.type,”async”)
    ProducerConfig config = new ProducerConfig(prop);
    
    Producer有两种类型:Sync(同步)和Async(异步)。
    相同的API配置也适用于Sync生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,首选异步生成器。在像0.8这样的早期版本中,异步生产者没有用于send()的回调来注册错误处理程序。
    public void close()
    Producer类提供了close方法,以关闭与所有Kafka 代理的生产者池连接。
  • 配置设定

    下表列出了Producer API的主要配置设置,以便于更好地理解-
    配置 说明
    client.id 识别生产者申请
    producer.type 同步sync或异步async
    acks Acks配置控制生产者请求下的条件被认为是完整的。
    retries 如果生产者请求失败,则自动使用特定值重试。
    bootstrap.servers 代理自举列表。
    linger.ms 如果要减少请求数量,可以将linger.ms设置为大于某个值的值。
    key.serializer key序列化器
    value.serializer value序列化器
    batch.size 缓存大小
    buffer.memory 控制可用于生产者的内存总量。
  • ProducerRecord API

    ProducerRecord是一个键/值对,它被发送到Kafka集群。ProducerRecord类的构造函数用于使用以下签名创建具有分区,键值对的记录。
    
    public ProducerRecord (string topic, int partition, k key, v value)
    
    • topic -用户定义的主题名称,将附加到记录中。
    • partition -分区数
    • key - 将包含在记录中的key。
    • value -记录内容(值)
    
    public ProducerRecord (string topic, k key, v value)
    
    ProducerRecord类构造函数用于创建具有键,值对且无分区的记录。
    • topic -用户定义的主题名称,将附加到记录中。
    • key - 将包含在记录中的key。
    • value -记录内容(值)
    
    public ProducerRecord (string topic, v value)
    
    ProducerRecord类创建一个没有分区和键的记录
    • topic -用户定义的主题名称,将附加到记录中。
    • value -记录内容(值)
    下表中列出了ProducerRecord类方法-
    方法 说明
    public string topic() 主题将追加到记录中。
    public K key() 将包含在记录中的key。如果没有这样的键,则将在此处返回null。
    public V value() 记录内容。
    partition() 记录的分区数
  • SimpleProducer应用程序

    在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建自己的主题。之后,创建一个名为SimpleProducer.java的Java类,并输入以下代码。
    
    //import util.properties packages
    import java.util.Properties;
    
    //import simple producer packages
    import org.apache.kafka.clients.producer.Producer;
    
    //import KafkaProducer packages
    import org.apache.kafka.clients.producer.KafkaProducer;
    
    //import ProducerRecord packages
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    //Create java class named “SimpleProducer”
    public class SimpleProducer {
       
       public static void main(String[] args) throws Exception{
          
          // Check arguments length value
          if(args.length == 0){
             System.out.println("Enter topic name”);
             return;
          }
          
          //Assign topicName to string variable
          String topicName = args[0].toString();
          
          // create instance for properties to access producer configs   
          Properties props = new Properties();
          
          //Assign localhost id
          props.put("bootstrap.servers", “localhost:9092");
          
          //Set acknowledgements for producer requests.      
          props.put("acks", "all");
          
          //If the request fails, the producer can automatically retry,
          props.put("retries", 0);
          
          //Specify buffer size in config
          props.put("batch.size", 16384);
          
          //Reduce the no of requests less than 0   
          props.put("linger.ms", 1);
          
          //The buffer.memory controls the total amount of memory available to the producer for buffering.   
          props.put("buffer.memory", 33554432);
          
          props.put("key.serializer", 
             "org.apache.kafka.common.serializa-tion.StringSerializer");
             
          props.put("value.serializer", 
             "org.apache.kafka.common.serializa-tion.StringSerializer");
          
          Producer<String, String> producer = new KafkaProducer <String, String>(props);
                
          for(int i = 0; i < 10; i++)
             producer.send(new ProducerRecord<String, String>(topicName, 
                Integer.toString(i), Integer.toString(i)));
                   System.out.println(“Message sent successfully”);
                   producer.close();
       }
    }
    
    编译 -可以使用以下命令来编译应用程序。
    
    javac -cp "/path/to/kafka/kafka_2.12-1.0.0/lib/*"  *.java
    
    编译 -可以使用以下命令执行应用程序。
    
    java -cp "/path/to/kafka/kafka_2.12-1.0.0/lib/*":.  SimpleProducer <topic-name>
    
    输出
    
    Message sent successfully
    
    要检查上面的输出,请打开新终端并输入Consumer CLI命令来接收消息。
    
    >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    
  • 简单的消费者示例

    截至目前,我们已经创建了一个生产者,用于将消息发送到Kafka集群。现在让我们创建一个消费者以使用来自Kafka集群的消息。KafkaConsumer API用于消费来自Kafka集群的消息。KafkaConsumer类的构造函数在下面定义。
    语法
    
    public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
    
    configs - 返回使用者配置图。
    KafkaConsumer类具有以下重要方法,下表中列出了这些方法。
    方法 说明
    public java.util.Set<TopicPar-tition> assignment() 获取消费者当前分配的一组分区。
    public string subscription() 订阅给定的主题列表,以获取动态分配的分区。
    public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 订阅给定的主题列表,以获取动态分配的分区。
    public void unsubscribe() 从给定的分区列表中退订主题。
    public void sub-scribe(java.util.List<java.lang.String> topics) 订阅给定的主题列表,以获取动态分配的分区。 如果给定的主题列表为空,则将其与unsubscribe()相同。
    public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式是指正则表达式格式的订阅模式,并且listener参数从订阅模式获取通知。
    public void as-sign(java.util.List<TopicParti-tion> partitions) 手动为客户分配分区列表。
    poll() 使用订阅/分配API之一获取指定主题或分区的数据。 如果在轮询数据之前未预订主题,则将返回错误。
    public void commitSync() 提交最后一次poll()返回的所有主题和分区订阅列表的偏移量。 相同的操作应用于commitAsyn()。
    public void seek(TopicPartition partition, long offset) 获取使用者将在下一个poll()方法上使用的当前偏移值。
    public void resume() 恢复暂停的分区。
    public void wakeup() 唤醒消费者。
  • ConsumerRecord API

    ConsumerRecord API用于接收来自Kafka集群的记录。该API由主题名称,分区号(从中接收记录)以及指向Kafka分区中的记录的偏移量组成。ConsumerRecord类用于创建具有特定主题名称,分区计数和<键,值>对的消费者记录。它具有以下签名。
    
    public ConsumerRecord(string topic,int partition, long offset,K key, V value)
    
    • topic - 从Kafka集群收到的消费者记录的主题名称。
    • partition - 主题分区。
    • offset - 记录的键,如果不存在键,则返回null。
    • value - 记录内容。
  • ConsumerRecords API

    ConsumerRecords API充当ConsumerRecord的容器。该API用于保留特定主题的每个分区的ConsumerRecord列表。其构造函数定义如下。
    
    public ConsumerRecords(java.util.Map<TopicPartition,java.util.List<Consumer-Record>K,V>>> records)
    
    • TopicPartition - 返回特定主题的分区图。
    • records - ConsumerRecord的返回列表。
    ConsumerRecords类具有以下定义的方法。
    方法 说明
    public int count() 所有主题的记录数。
    public Set partitions() 此记录集中具有数据的分区集(如果未返回任何数据,则该集为空)。
    public Iterator iterator() 迭代器使您可以循环浏览集合,获取或删除元素。
    public List records() 获取给定分区的记录列表。
  • 配置设定

    消费者客户端API主要配置设置的配置设置在下面列出-
    方法 说明
    bootstrap.servers 自举代理列表。
    group.id 将单个消费者分配给一个组。
    enable.auto.commit 如果值为true,则启用自动提交偏移量,否则不提交。
    auto.commit.interval.ms 返回将更新的消耗偏移量写入ZooKeeper的频率。
    session.timeout.ms 指示在放弃并继续使用消息之前,Kafka将等待ZooKeeper响应请求(读取或写入)的毫秒数。
  • SimpleConsumer应用程序

    生产者应用程序步骤在此保持不变。首先,启动您的ZooKeeper和Kafka经纪人。然后,使用名为SimpleConsumer.java的Java类创建SimpleConsumer应用程序,并键入以下代码。
    输出
    
    import java.util.Properties;
    import java.util.Arrays;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    public class SimpleConsumer {
       public static void main(String[] args) throws Exception {
          if(args.length == 0){
             System.out.println("Enter topic name");
             return;
          }
          //Kafka consumer configuration settings
          String topicName = args[0].toString();
          Properties props = new Properties();
          
          props.put("bootstrap.servers", "localhost:9092");
          props.put("group.id", "test");
          props.put("enable.auto.commit", "true");
          props.put("auto.commit.interval.ms", "1000");
          props.put("session.timeout.ms", "30000");
          props.put("key.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          props.put("value.deserializer", 
             "org.apache.kafka.common.serializa-tion.StringDeserializer");
          KafkaConsumer<String, String> consumer = new KafkaConsumer
             <String, String>(props);
          
          //Kafka Consumer subscribes list of topics here.
          consumer.subscribe(Arrays.asList(topicName))
          
          //print the topic name
          System.out.println("Subscribed to topic " + topicName);
          int i = 0;
          
          while (true) {
             ConsumerRecords<String, String> records = con-sumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
             
             // print the offset,key and value for the consumer records.
             System.out.printf("offset = %d, key = %s, value = %s\n", 
                record.offset(), record.key(), record.value());
          }
       }
    }
    
    编译 -可以使用以下命令来编译应用程序。
    
    javac -cp "/path/to/kafka/kafka_2.12-1.0.0/lib/*" *.java
    
    执行-可以使用以下命令执行应用程序
    
    java -cp "/path/to/kafka/kafka_2.11-0.9.0.0/lib/*":. SimpleConsumer <topic-name>
    
    输入 - 打开生产者CLI,并向该主题发送一些消息。例如输入“Hello Consumer”发送。
    
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
    
    输出 -以下将是输出。
    
    Subscribed to topic Hello-Kafka
    offset = 3, key = null, value = Hello Consumer