Spring Cloud - Apache Kafka 流

  • 介绍

    在分布式环境中,服务之间需要相互通信。通信可以同步或异步发生。在本节中,我们将看看服务如何通过异步使用message brokers.
    执行异步通信的两个主要好处 -
    • 生产者和消费者的速度可以不同− 无论数据的消费者是慢还是快,都不影响生产者处理,反之亦然。两者都可以以各自的速度工作而不会相互影响。
    • 生产者不需要处理来自不同消费者的请求− 可能有多个消费者想要从生产者那里读取同一组数据。通过中间的消息代理,生产者不需要处理这些消费者产生的负载。此外,生产者级别的任何中断都不会阻止消费者读取较旧的生产者数据,因为这些数据将在消息代理中可用。
    Apache KafkaRabbitMQ是两个众所周知的用于进行异步通信的消息代理。在本教程中,我们将使用 Apache Kafka。
  • Kafka – 依赖设置

    让我们使用我们之前使用过的 Restaurant 案例。因此,假设我们的客户服务和餐厅服务通过异步通信进行通信。为此,我们将使用 Apache Kafka。我们将需要在两种服务中使用它,即客户服务和餐厅服务。
    要使用 Apache Kafka,我们将更新两个服务的 POM 并添加以下依赖项。
    
    
    <dependency>
    
          <groupId>org.springframework.cloud</groupId>
    
          <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    
    </dependency>
    
    
    我们还需要运行 Kafka 实例。有多种方法可以完成,但我们更喜欢使用 Docker 容器启动 Kafka。以下是我们可以考虑使用的一些图像 -
    无论我们使用哪个镜像,这里需要注意的重要一点是,一旦镜像启动并运行,请确保 Kafka 集群可访问 localhost:9092
    现在我们已经在我们的镜像上运行了 Kafka 集群,让我们转到核心示例。
  • Binding & Binders

    关于 Spring Cloud 流,有三个重要的概念 -
    • 外部消息传递系统- 这是外部管理的组件,负责存储应用程序生成的事件/消息,这些事件/消息可由其订阅者/消费者读取。请注意,这不在 app/Spring 中管理。几个例子是 Apache Kafka、RabbitMQ
    • Binders − 这是提供与消息系统集成的组件,例如,由消息系统的 IP 地址、身份验证等组成。
    • Bindings - 该组件使用绑定器向消息传递系统生成消息或使用来自特定主题/队列的消息。
    以上所有属性都定义在 application properties file.
    例如
    让我们使用我们之前使用过的 Restaurant 案例。因此,让我们假设每当向客户服务添加新服务时,我们都希望将客户信息通知给附近的餐馆关于他/她的信息。
    为此,让我们首先更新我们的客户服务以包含和使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,每当我们通过 API 添加 Customer 时,它也会被添加到 Kafka。
    
    
    spring:
    
       application:
    
          name: customer-service
    
       cloud:
    
          stream:
    
             source: customerBinding-out-0
    
             kafka:
    
                binder:
    
                brokers: localhost:9092
    
                replicationFactor: 1
    
          bindings:
    
             customerBinding-out-0:
    
                destination: customer
    
                producer:
    
                   partitionCount: 3
    
    server:
    
       port: ${app_port}
    
    eureka:
    
       client:
    
          serviceURL:
    
             defaultZone: http://localhost:8900/eureka
    
    
    注意点
    • 我们已经定义了一个带有本地 Kafka 实例地址的绑定器。
    • 我们还定义了绑定“customerBinding-out-0”,它使用“customer”主题来输出消息。
    • 我们还在 stream.source 这样我们就可以在我们的代码中强制使用它。
    完成后,让我们现在通过添加一个新方法“addCustomer”来更新我们的控制器,该方法负责为 POST 请求提供服务。然后,从post 请求,我们将数据发送到 Kafka Broker。
    
    
    package com.jc2182;
    
    import java.util.HashMap;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.cloud.stream.function.StreamBridge;
    
    import org.springframework.web.bind.annotation.PathVariable;
    
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import org.springframework.web.bind.annotation.RequestMethod;
    
    import org.springframework.web.bind.annotation.RestController;
    
    
    
    @RestController
    
    class RestaurantCustomerInstancesController {
    
       @Autowired
    
       private StreamBridge streamBridge;
    
       static HashMap<Long, Customer> mockCustomerData = new HashMap();
    
       static{
    
          mockCustomerData.put(1L, new Customer(1, "Jane", "DC"));
    
          mockCustomerData.put(2L, new Customer(2, "John", "SFO"));
    
          mockCustomerData.put(3L, new Customer(3, "Kate", "NY"));
    
       }
    
       @RequestMapping("/customer/{id}")
    
       public Customer getCustomerInfo(@PathVariable("id") Long id) {
    
          System.out.println("Querying customer for id with: " + id);
    
          return mockCustomerData.get(id);
    
       }
    
       @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST)
    
       public Customer addCustomer(@PathVariable("id") Long id) {
    
          // add default name
    
          Customer defaultCustomer = new Customer(id, "Dwayne", "NY");
    
          streamBridge.send("customerBinding-out-0", defaultCustomer);
    
          return defaultCustomer;
    
       }
    
    }
    
    
    注意点
    • 我们是 Autowiring StreamBridge,我们将使用它来发送消息。
    • 我们在 'send' 方法中使用的参数也指定了我们想要用来发送数据的绑定。
    现在让我们更新我们的餐厅服务以包含和订阅“客户”主题。请注意,我们将使用 Restaurant Service 作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务就会通过 Kafka 知道它。
    首先,让我们更新 application properties 文件。
    
    
    spring:
    
       application:
    
          name: restaurant-service
    
       cloud:
    
          function:
    
             definition: customerBinding
    
          stream:
    
             kafka:
    
                binder:
    
                   brokers: localhost:9092
    
                   replicationFactor: 1
    
                bindings:
    
                   customerBinding-in-0:
    
                   destination: customer
    
    server:
    
       port: ${app_port}
    
    eureka:
    
       client:
    
          serviceURL:
    
             defaultZone: http://localhost:8900/eureka
    
    
    完成后,让我们现在通过添加一个新方法“customerBinding”来更新我们的控制器,该方法负责获取请求并提供一个函数,该函数将打印请求及其元数据详细信息。
    
    
    package com.jc2182;
    
    import java.util.HashMap;
    
    import java.util.List;
    
    import java.util.function.Consumer;
    
    import java.util.function.Supplier;
    
    import java.util.stream.Collectors;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import org.springframework.cloud.stream.annotation.StreamListener;
    
    import org.springframework.cloud.stream.function.StreamBridge;
    
    import org.springframework.context.annotation.Bean;
    
    import org.springframework.kafka.support.Acknowledgment;
    
    import org.springframework.kafka.support.KafkaHeaders;
    
    import org.springframework.messaging.Message;
    
    import org.springframework.messaging.support.MessageBuilder;
    
    import org.springframework.web.bind.annotation.PathVariable;
    
    import org.springframework.web.bind.annotation.RequestMapping;
    
    import org.springframework.web.bind.annotation.RestController;
    
    
    
    @RestController
    
    class RestaurantController {
    
       @Autowired
    
       CustomerService customerService;
    
       @Autowired
    
       private StreamBridge streamBridge;
    
       static HashMap<Long, Restaurant> mockRestaurantData = new HashMap();
    
       static{
    
          mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC"));
    
          mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO"));
    
          mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC"));
    
          mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY"));
    
       }
    
       @RequestMapping("/restaurant/customer/{id}")
    
       public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) {
    
          System.out.println("Got request for customer with id: " + id);
    
          String customerCity = customerService.getCustomerById(id).getCity();
    
          return mockRestaurantData.entrySet().stream().filter(
    
    entry -> entry.getValue().getCity().equals(customerCity))
    
    .map(entry -> entry.getValue())
    
    .collect(Collectors.toList());
    
       }
    
       @RequestMapping("/restaurant/cust/{id}")
    
       public void getRestaurantForCust(@PathVariable("id") Long id) {
    
          streamBridge.send("ordersBinding-out-0", id);
    
       }
    
       @Bean
    
       public Consumer<Message<Customer>> customerBinding() {
    
          return msg -> {
    
             System.out.println(msg);
    
          };
    
       }
    
    }
    
    
    Points to note
    • 我们正在使用'customerBinding',它应该传递当消息到达此绑定时将调用的函数。
    • 在创建捆绑和指定主题时,我们用于此函数/bean 的名称也需要在 YAML 文件中使用。
    现在,让我们像往常一样执行上面的代码,启动 Eureka Server。请注意,这不是硬性要求,为了完整起见,在此列出。
    然后,让我们使用以下命令编译并开始更新客户服务 -
    
    
    mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
    
    1.0.jar --spring.config.location=classpath:application-kafka.yml
    
    
    然后,让我们使用以下命令编译并开始更新餐厅服务 -
    
    
    mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
    
    1.0.jar --spring.config.location=classpath:application-kafka.yml
    
    
    我们准备好了,现在让我们通过点击 API 来测试我们的代码片段 -
    
    
    curl -X POST http://localhost:8083/customer/1
    
    
    这是我们将为此 API 获得的输出 -
    
    
    {
    
       "id": 1,
    
       "name": "Dwayne",
    
       "city": "NY"
    
    }
    
    
    现在,让我们检查餐厅服务的日志 -
    
    
    GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY],
    
    headers={kafka_offset=1,...
    
    
    因此,实际上,您会看到使用 Kafka Broker,Restaurant Service 收到了有关新添加的 Customer 的通知。
  • 分区和消费者组

    分区和消费者组是使用 Spring Cloud 流时应该注意的两个重要概念。
    Partitions − 它们用于对数据进行分区,以便我们可以在多个消费者之间分配工作。
    让我们看看如何在 Spring Cloud 中对数据进行分区。比如说,我们想根据客户 ID 对数据进行分区。因此,让我们更新我们的客户服务。为此,我们需要告诉
    让我们更新我们的 Customer Service 应用程序属性以指定数据的键。
    
    
    spring:
    
       application:
    
          name: customer-service
    
       cloud:
    
          function:
    
             definition: ordersBinding
    
          stream:
    
             source: customerBinding-out-0
    
             kafka:
    
                binder:
    
                   brokers: localhost:9092
    
                   replicationFactor: 1
    
             bindings:
    
                customerBinding-out-0:
    
                   destination: customer
    
                   producer:
    
                      partitionKeyExpression: 'getPayload().getId()'
    
                      partitionCount: 3
    
    server:
    
       port: ${app_port}
    
    eureka:
    
       client:
    
          serviceURL:
    
             defaultZone: http://localhost:8900/eureka
    
    
    为了指定键,即“partitionKeyExpression”,我们提供了 Spring 表达式语言。该表达式假定类型为 GenericMessage<Customer>,因为我们在消息中发送 Customer 数据。请注意,GenericMessage 是 Spring Framework 类,用于将有效负载和标头包装在单个对象中。因此,我们从该消息中获得了类型为 Customer 的有效负载,然后我们调用了getId() 客户的方法。
    现在,让我们也更新我们的消费者,即餐厅服务,以在消费请求时记录更多信息。
    现在,让我们像往常一样执行上面的代码,启动 Eureka Server。请注意,这不是硬性要求,为了完整起见,在此列出。
    然后,让我们使用以下命令编译并开始更新客户服务 -
    
    
    mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient-
    
    1.0.jar --spring.config.location=classpath:application-kafka.yml
    
    
    然后,让我们使用以下命令编译并开始更新餐厅服务 -
    
    
    mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client-
    
    1.0.jar --spring.config.location=classpath:application-kafka.yml
    
    
    我们准备好了,现在让我们测试我们的代码片段。作为测试的一部分,这是我们要做的 -
    • 插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/1
    • 插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/1
    • 插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/5
    • 插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/3
    • 插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/1
    我们不太关心 API 的输出。相反,我们更关心数​​据发送到的分区。由于我们使用客户 ID 作为键,我们希望具有相同 ID 的客户最终会出现在同一个分区中。
    现在,让我们检查餐厅服务的日志 -
    
    
    Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
    
    Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
    
    Partition Id: 1
    
    Customer: Customer [id=1, name=Dwayne, city=NY]
    
    Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
    
    Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
    
    Partition Id: 1
    
    Customer: Customer [id=1, name=Dwayne, city=NY]
    
    Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
    
    Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
    
    Partition Id: 2
    
    Customer: Customer [id=5, name=Dwayne, city=NY]
    
    Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
    
    Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
    
    Partition Id: 0
    
    Customer: Customer [id=3, name=Dwayne, city=NY]
    
    Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323
    
    Partition Id: 1
    
    Customer: Customer [id=1, name=Dwayne, city=NY]
    
    
    因此,如我们所见,Id 为 1 的 Customer 每次都在同一个分区中,即分区 1。
    Consumer Group− 消费者组是出于相同目的阅读相同主题的消费者的逻辑分组。主题中的数据在消费者组中的消费者之间进行分区,以便给定消费者组中的只有一个消费者可以读取主题的分区。
    要定义一个消费者组,我们需要做的就是在我们使用 Kafka 主题名称的绑定中定义一个组。例如,让我们在应用程序文件中为控制器定义消费者组名称。
    
    
    spring:
    
       application:
    
          name: restaurant-service
    
       cloud:
    
          function:
    
             definition: customerBinding
    
          stream:
    
             kafka:
    
                binder:
    
                   brokers: localhost:9092
    
                   replicationFactor: 1
    
                bindings:
    
                   customerBinding-in-0:
    
                   destination: customer
    
                   group: restController
    
    server:
    
       port: ${app_port}
    
    eureka:
    
       client:
    
          serviceURL:
    
             defaultZone: http://localhost:8900/eureka
    
    
    让我们重新编译并启动餐厅服务。现在,让我们通过点击客户服务上的 POST API 来生成事件 -
    插入一个 ID 为 1 的客户: curl -X POST http://localhost:8083/customer/1
    现在,如果我们检查餐厅服务的日志,我们将看到以下内容 -
    
    
    Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400
    
    Consumer Group: restContoller
    
    Partition Id: 1
    
    Customer: Customer [id=1, name=Dwayne, city=NY]
    
    
    因此,正如我们从输出中看到的,我们创建了一个名为“rest-contoller”的消费者组,其消费者负责阅读主题。在上面的例子中,我们只运行了一个服务实例,所以“客户”主题的所有分区都分配给了同一个实例。但是,如果我们有多个分区,我们将在工作人员之间分配分区。