个性化阅读
专注于IT技术分析

在java中创建kafka消费者(consumer)

本文概述

在上一节中,我们学习了用Java创建生产者。在本节中,我们将学习在Java中实现Kafka使用者。

要创建使用者,需要执行以下步骤:

  1. 创建记录器
  2. 创建消费者属性。
  3. 创建消费者。
  4. 为消费者订阅特定主题。
  5. 轮询一些新数据

让我们讨论学习Java使用者实现的每个步骤。

创建记录器

记录器被实现为在程序执行期间写入日志消息。用户需要创建一个Logger对象,该对象将需要导入“ org.slf4j类”。下面的快照显示了Logger的实现:

创建消费者属性

与生产者属性类似,Apache Kafka还提供了用于创建消费者的各种不同属性。要了解每个消费者的财产,请访问Apache Kafa的官方网站>文档>配置>消费者配置。在这里,我们将列出使用者的必需属性,例如:

key.deserializer:这是密钥的反序列化器类,用于实现“ org.apache.kafka.common.serialization.Deserializer”接口。

value.deserializer:实现值的反序列化器类,它实现“ org.apache.kafka.common.serialization.Desrializer”接口。

bootstrap.servers:这是主机/端口对的列表,用于建立与Kafka集群的初始连接。它不包含客户端所需的全套服务器。仅需要引导所需的服务器。

group.id:这是一个唯一字符串,用于标识使用者组的使用者。当消费者通过订阅主题使用基于Kafka的偏移管理策略或组管理功能时,需要此属性。

auto.offset.reset:当不存在初始偏移量或服务器上不再存在当前偏移量时,此属性是必需的。有以下值可用于重置偏移值:

最早:此偏移量变量会自动将值重置为其最早的偏移量。

最新:此偏移量变量将偏移值重置为其最新偏移量。

none:如果未找到上一组的先前偏移量,则会向使用方抛出异常。

其他:向消费者抛出异常。

注意:在我们的代码中,我们使用了“最早的”变量将值重置为最早的值。

这些是实现使用者所必需的一些基本属性。让我们使用IntelliJ IDEA实施。

步骤1)定义一个新的java类为“ consumer1.java”。

步骤2)在类中描述使用者属性,如以下快照所示:

在快照中,描述了所有必要的属性。

创造消费者

创建一个KafkaConsumer对象以创建使用者,如下所示:

在创建使用者时会传递上述属性。

订阅消费者

要从主题中读取消息,我们需要将使用者连接到指定的主题。消费者可以通过各种订阅API进行订阅。在这里,我们使用Arrays.asList()是因为用户可能想订阅一个或多个主题。因此,Arrays.asList()允许订阅方订阅多个主题。

下面的代码显示了使用者订阅的实现:

用户需要直接或通过字符串变量指定主题名称以阅读消息。可以有多个主题,也用逗号分隔。

轮询新数据

消费者通过轮询方法从Kafka读取数据。

poll方法返回从当前分区的偏移量获取的数据。指定持续时间,直到它等待数据,否则将空的ConsumerRecord返回给使用者。同样,记录器将获取记录键,分区,记录偏移量及其值。

下面列出了影响Java使用者的完整代码:

package com.firstgroupapp.aktutorial;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;

public class consumer1 {
    public static void main(String[] args) {
        Logger logger= LoggerFactory.getLogger(consumer1.class.getName());
        String bootstrapServers="127.0.0.1:9092";
        String grp_id="third_app";
        String topic="my_first";
        //Creating consumer properties
        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, grp_id);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //creating consumer
        KafkaConsumer<String, String> consumer= new KafkaConsumer<String, String>(properties);
        //Subscribing
                consumer.subscribe(Arrays.asList(topic));
        //polling
        while(true){
            ConsumerRecords<String, String> records=consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String, String> record: records){
                logger.info("Key: "+ record.key() + ", Value:" +record.value());
                logger.info("Partition:" + record.partition()+", Offset:"+record.offset());
            }


        }
    }
}

这样,消费者可以通过依次执行每个步骤来阅读消息。

使用者实现的输出可以在下面的快照中看到:

键值为空。这是因为我们之前没有指定任何密钥。由于“最早”,从头开始显示所有消息。

读取消费者组中的数据

用户可以让一个以上的消费者总共读取数据。这可以通过消费者组来完成。在消费者组中,一个或多个消费者将能够从Kafka中读取数据。如果用户想从头开始阅读消息,请重设group_id或更改group_id。这将重置用户的应用程序,并从头开始显示消息。


赞(0)
未经允许不得转载:srcmini » 在java中创建kafka消费者(consumer)

评论 抢沙发

评论前必须登录!