Helpex - Trao đổi & giúp đỡ Đăng nhập

Các ví dụ về nhà sản xuất và người tiêu dùng Kafka sử dụng Java

Trong bài viết cuối cùng của tôi , chúng tôi đã thảo luận về cách thiết lập Kafka bằng Zookeeper. Trong bài viết này, chúng ta sẽ xem cách sản xuất và tiêu thụ hồ sơ / tin nhắn với các nhà môi giới Kafka.

Trước khi bắt đầu với một ví dụ, trước tiên hãy làm quen với các thuật ngữ phổ biến và một số lệnh được sử dụng trong Kafka.

Bản ghi:  Nhà sản xuất gửi tin nhắn cho Kafka dưới dạng hồ sơ. Một bản ghi là một cặp khóa-giá trị. Nó chứa tên chủ đề và số phân vùng sẽ được gửi. Nhà môi giới Kafka giữ hồ sơ bên trong phân vùng chủ đề. Trình tự hồ sơ được duy trì ở cấp phân vùng. Bạn có thể xác định logic trên phân vùng cơ sở nào sẽ được xác định.

Chủ đề: Nhà sản xuất viết một bản ghi về một chủ đề và người tiêu dùng lắng nghe nó. Một chủ đề có thể có nhiều phân vùng nhưng phải có ít nhất một phân vùng.

Phân vùng: Phân vùng  chủ đề là một đơn vị song song trong Kafka, tức là hai người tiêu dùng không thể sử dụng tin nhắn từ cùng một phân vùng cùng một lúc. Một người tiêu dùng có thể tiêu thụ từ nhiều phân vùng cùng một lúc.

Offset:  Một bản ghi trong phân vùng có phần bù được liên kết với nó. Hãy nghĩ về nó như thế này: phân vùng giống như một mảng; bù đắp giống như chỉ số.

Nhà sản xuất:  Tạo một bản ghi và xuất bản nó cho nhà môi giới.

Người tiêu dùng:  Tiêu thụ hồ sơ từ các nhà môi giới.

Các lệnh:  Trong Kafka, một thư mục thiết lập bên trong thư mục bin là một tập lệnh (kafka-topics.sh), bằng cách sử dụng, chúng ta có thể tạo và xóa các chủ đề và kiểm tra danh sách các chủ đề. Chuyển đến thư mục nhà Kafka.

  • Thực hiện lệnh này để xem danh sách tất cả các chủ đề.

    • ./bin/kafka-topics.sh --list --zookeeper localhost: 2181  .

    • localhost: 2181 là địa chỉ Zookeeper mà chúng tôi đã xác định trong tệp server.properIES trong bài viết trước.

  • Thực hiện lệnh này để tạo một chủ đề.

    • ./bin/kafka-topics.sh --create --zookeeper localhost: 2181 - ứng dụng-yếu tố 1 - phần 100 - bản demo.

    • replication-factor: nếu Kafka đang chạy trong một cụm, điều này xác định số lượng môi giới mà một phân vùng sẽ được sao chép. Đối  partitions số xác định có bao nhiêu phân vùng trong một chủ đề.

    • Sau khi một chủ đề được tạo, bạn có thể tăng số lượng phân vùng nhưng không thể giảm được. demo, đây, là tên chủ đề.

  • Thực hiện lệnh này để xóa một chủ đề.

    • ./bin/kafka-topics.sh --zookeeper localhost: 2181 --delete - demo demo.

    • Lệnh này sẽ không có hiệu lực nếu trong tệp server.properations của Kafka, nếu  delete.topic.enable không được đặt thành đúng.

  • Thực hiện lệnh này để xem thông tin về một chủ đề.

    • ./bin/kafka-topics.sh --describe - demo demo --zookeeper localhost: 2181. 

Bây giờ chúng ta đã biết các thuật ngữ phổ biến được sử dụng trong Kafka và các lệnh cơ bản để xem thông tin về một chủ đề, hãy bắt đầu với một ví dụ hoạt động.

public interface IKafkaConstants {
    public static String KAFKA_BROKERS = "localhost:9092";

    public static Integer MESSAGE_COUNT=1000;

    public static String CLIENT_ID="client1";

    public static String TOPIC_NAME="demo";

    public static String GROUP_ID_CONFIG="consumerGroup1";

    public static Integer MAX_NO_MESSAGE_FOUND_COUNT=100;

    public static String OFFSET_RESET_LATEST="latest";

    public static String OFFSET_RESET_EARLIER="earliest";

    public static Integer MAX_POLL_RECORDS=1;
}

Đoạn mã trên có chứa một số hằng số mà chúng ta sẽ sử dụng thêm.

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import com.gaurav.kafka.constants.IKafkaConstants;

public class ProducerCreator {

    public static Producer<long, string=""> createProducer() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
        props.put(ProducerConfig.CLIENT_ID_CONFIG, IKafkaConstants.CLIENT_ID);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
        return new KafkaProducer<>(props);
    }
}</long,>

Đoạn mã trên tạo ra một nhà sản xuất Kafka với một số thuộc tính.

  • BOOTSTRAP_SERVERS_CONFIG: Địa chỉ của nhà môi giới Kafka. Nếu Kafka đang chạy trong một cụm thì bạn có thể cung cấp các địa chỉ được phân tách bằng dấu phẩy (,). Ví dụ:localhost:9091,localhost:9092 

  • CLIENT_ID_CONFIG: Id của nhà sản xuất để nhà môi giới có thể xác định nguồn của yêu cầu.

  • KEY_SERIALIZER_CLASS_CONFIG: Lớp sẽ được sử dụng để tuần tự hóa đối tượng chính. Trong ví dụ của chúng tôi, khóa của chúng tôi là  Long, vì vậy chúng tôi có thể sử dụng  LongSerializer lớp để tuần tự hóa khóa. Nếu trong trường hợp sử dụng của bạn, bạn đang sử dụng một số đối tượng khác làm khóa thì bạn có thể tạo lớp serializer tùy chỉnh của mình bằng cách triển khai   giao diện serializer của Kafka và ghi đè  serialize phương thức.

  • VALUE_SERIALIZER_CLASS_CONFIG: Lớp sẽ được sử dụng để tuần tự hóa đối tượng giá trị. Trong ví dụ của chúng tôi, giá trị của chúng tôi là  String, vì vậy chúng tôi có thể sử dụng  StringSerializer lớp để tuần tự hóa khóa. Nếu giá trị của bạn là một số đối tượng khác thì bạn tạo lớp serializer tùy chỉnh của mình. Ví dụ:

import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.gaurav.kafka.pojo.CustomObject;

public class CustomSerializer implements Serializer<customobject> {

    @Override
    public void configure(Map<string,></string,> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, CustomObject data) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
        retVal = objectMapper.writeValueAsString(data).getBytes();
        } catch (Exception exception) {
        System.out.println("Error in serializing object"+ data);
        }
        return retVal;
    }

    @Override
    public void close() {

    }

}</customobject>
  • PARTATIONER_CLASS_CONFIG: Lớp sẽ được sử dụng để xác định phân vùng mà bản ghi sẽ đi. Trong chủ đề demo, chỉ có một phân vùng, vì vậy tôi đã nhận xét thuộc tính này. Bạn có thể tạo phân vùng tùy chỉnh của mình bằng cách triển khai  giao diện CustomPartitioner . Ví dụ:

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

public class CustomPartitioner implements Partitioner{

  private static final int PARTITION_COUNT=50;

  @Override
  public void configure(Map<string,></string,> configs) {

  }

  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    Integer keyInt=Integer.parseInt(key.toString());
    return keyInt % PARTITION_COUNT;
  }

  @Override
  public void close() {

  }

}

Ở trên  CustomPartitioner lớp, tôi đã ghi đè phân vùng phương thức trả về số phân vùng mà bản ghi sẽ đi.

import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import com.gaurav.kafka.constants.IKafkaConstants;

public class ConsumerCreator {

    public static Consumer<long, string=""> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IKafkaConstants.KAFKA_BROKERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, IKafkaConstants.GROUP_ID_CONFIG);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, IKafkaConstants.MAX_POLL_RECORDS);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, IKafkaConstants.OFFSET_RESET_EARLIER);

        Consumer<long, string=""> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(IKafkaConstants.TOPIC_NAME));
        return consumer;
    }

}</long,></long,>

Đoạn mã trên tạo ra một người tiêu dùng Kafka với một số thuộc tính.

  • BOOTSTRAP_SERVERS_CONFIG: Địa chỉ của nhà môi giới Kafka. Nếu Kafka đang chạy trong một cụm thì bạn có thể cung cấp các địa chỉ được phân tách bằng dấu phẩy (,). Ví dụ :  localhost:9091,localhost:9092.

  • GROUP_ID_CONFIG: Id nhóm người tiêu dùng được sử dụng để xác định nhóm người tiêu dùng này thuộc nhóm nào.

  • KEY_DESERIALIZER_CLASS_CONFIG: Tên lớp để giải tuần tự hóa đối tượng chính. Chúng tôi đã sử dụng  Long làm khóa, vì vậy chúng tôi sẽ sử dụng  LongDeserializer  làm lớp giải nén . Bạn có thể tạo trình giải nén tùy chỉnh bằng cách triển khai   giao diện Deserializer do Kafka cung cấp.

  • VALUE_DESERIALIZER_CLASS_CONFIG: Tên lớp để giải tuần tự hóa đối tượng giá trị. Chúng tôi đã sử dụng  String làm giá trị vì vậy chúng tôi sẽ sử dụng StringDeserializer  làm lớp giải nén . Bạn có thể tạo trình giải nén tùy chỉnh của bạn. Ví dụ:

import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.gaurav.kafka.pojo.CustomObject;

public class CustomObjectDeserializer implements Deserializer<customobject> {

    @Override
    public void configure(Map<string,></string,> configs, boolean isKey) {
    }

    @Override
    public CustomObject deserialize(String topic, byte[] data) {
        ObjectMapper mapper = new ObjectMapper();
        CustomObject object = null;
        try {
object = mapper.readValue(data, CustomObject.class);
        } catch (Exception exception) {
System.out.println("Error in deserializing bytes "+ exception);
        }
        return object;
    }

    @Override
    public void close() {
    }

}</customobject>
  • MAX_POLL_RECORDS_CONFIG: Số lượng bản ghi tối đa mà người tiêu dùng sẽ tìm nạp trong một lần lặp.

  • ENABLE_AUTO_COMMIT_CONFIG: Khi người tiêu dùng từ một nhóm nhận được tin nhắn, họ phải cam kết bù đắp cho hồ sơ đó. Nếu cấu hình này được đặt thành đúng thì theo định kỳ, các lần bù sẽ được cam kết, nhưng, đối với mức sản xuất, điều này phải là sai và phải bù một cách thủ công.

  • AUTO_OFFSET_RESET_CONFIG: Đối với mỗi nhóm người tiêu dùng, giá trị bù được cam kết cuối cùng được lưu trữ. Cấu hình này có ích nếu không có bù nào được cam kết cho nhóm đó, tức là đó là nhóm mới được tạo.

    • Đặt giá trị này sớm nhất  sẽ khiến người tiêu dùng tìm nạp các bản ghi từ đầu bù, tức là từ 0.

    • Đặt giá trị này thành  mới nhất  sẽ khiến người tiêu dùng tìm nạp các bản ghi từ các bản ghi mới. Bởi các bản ghi mới có nghĩa là những bản ghi được tạo sau khi nhóm người tiêu dùng bắt đầu hoạt động.

import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import com.gaurav.kafka.constants.IKafkaConstants;
import com.gaurav.kafka.consumer.ConsumerCreator;
import com.gaurav.kafka.producer.ProducerCreator;

public class App {
    public static void main(String[] args) {
      runProducer();
      //runConsumer();
    }

    static void runConsumer() {
        Consumer<long, string=""> consumer = ConsumerCreator.createConsumer();

        int noMessageFound = 0;

        while (true) {
          ConsumerRecords<long, string=""> consumerRecords = consumer.poll(1000);
          // 1000 is the time in milliseconds consumer will wait if no record is found at broker.
          if (consumerRecords.count() == 0) {
              noMessageFound++;
              if (noMessageFound > IKafkaConstants.MAX_NO_MESSAGE_FOUND_COUNT)
                // If no message found count is reached to threshold exit loop.  
                break;
              else
                  continue;
          }

          //print each record. 
          consumerRecords.forEach(record -> {
              System.out.println("Record Key " + record.key());
              System.out.println("Record value " + record.value());
              System.out.println("Record partition " + record.partition());
              System.out.println("Record offset " + record.offset());
           });

          // commits the offset of record to broker. 
           consumer.commitAsync();
        }
    consumer.close();
    }

    static void runProducer() {
Producer<long, string=""> producer = ProducerCreator.createProducer();

        for (int index = 0; index < IKafkaConstants.MESSAGE_COUNT; index++) {
            ProducerRecord<long, string=""> record = new ProducerRecord<long, string="">(IKafkaConstants.TOPIC_NAME,
            "This is record " + index);
            try {
            RecordMetadata metadata = producer.send(record).get();
                        System.out.println("Record sent with key " + index + " to partition " + metadata.partition()
                        + " with offset " + metadata.offset());
                 } 
            catch (ExecutionException e) {
                     System.out.println("Error in sending record");
                     System.out.println(e);
                  } 
             catch (InterruptedException e) {
                      System.out.println("Error in sending record");
                      System.out.println(e);
                  }
         }
    }
}
</long,></long,></long,></long,></long,>

Đoạn trích trên giải thích cách tạo và tiêu thụ tin nhắn từ nhà môi giới Kafka. Nếu bạn muốn chạy một nhà sản xuất thì hãy gọi  hàm run Producter từ chức năng chính. Nếu bạn muốn chạy một bộ tiêu thụ, hãy gọi  hàm runConsumer từ hàm chính.

  • Việc bù các bản ghi có thể được cam kết với người môi giới theo cả hai cách không đồng bộ và đồng bộ. Sử dụng cách đồng bộ, luồng sẽ bị chặn cho đến khi phần bù không được ghi cho người môi giới.

Phần kết luận

Chúng tôi đã thấy cách các nhà sản xuất và người tiêu dùng Kafka làm việc. Bạn có thể kiểm tra toàn bộ dự án trên trang GitHub của tôi . Nếu bạn đang phải đối mặt với bất kỳ vấn đề nào với Kafka, vui lòng hỏi trong các bình luận. Trong bài viết tiếp theo, tôi sẽ thảo luận về cách thiết lập các công cụ giám sát cho Kafka bằng Burrow.

4 hữu ích 0 bình luận 8.6k xem chia sẻ

Có thể bạn quan tâm

loading