Cân bằng tải khách hàng MQTT với RabbitMQ và Spring Cloud


Hoàng Lệ Thủy
8 tháng trước
Hữu ích 9 Chia sẻ Viết bình luận 0
Đã xem 6865

Giới thiệu

MQTT là giao thức kết nối giữa máy với máy (M2M), IoT. Nó được thiết kế như một ấn phẩm cực kỳ nhẹ và đăng ký vận chuyển tin nhắn. Nó rất hữu ích cho các kết nối với các vị trí từ xa nơi yêu cầu một dấu chân mã nhỏ và / hoặc băng thông mạng ở mức cao.

Mỗi ứng dụng khách MQTT đăng ký một số chủ đề nhất định và nhận tin nhắn khi nhà xuất bản bắt đầu đẩy tin nhắn về các chủ đề đó.

Làm thế nào để nhân rộng?

Mục đích của tỷ lệ ngang là phân phối tải giữa nhiều phiên bản của cùng một ứng dụng. Nếu các máy khách MQTT trong các trường hợp đó được đăng ký vào cùng một chủ đề, thì cùng một thông báo MQTT sẽ được gửi đến từng phiên bản, đây không phải là hành vi mong muốn.


Người tiêu dùng cạnh tranh

Spring Cloud Stream định nghĩa khái niệm "Nhóm người tiêu dùng" như sau:

"Mặc dù mô hình đăng ký xuất bản giúp dễ dàng kết nối các ứng dụng thông qua các chủ đề được chia sẻ, khả năng mở rộng bằng cách tạo nhiều phiên bản của một ứng dụng nhất định cũng quan trọng không kém. Khi làm như vậy, các phiên bản khác nhau của ứng dụng được đặt trong mối quan hệ người tiêu dùng cạnh tranh , trong đó chỉ có một trong các trường hợp được mong đợi sẽ xử lý một thông điệp nhất định. "

Dựa trên định nghĩa này, Spring Cloud Stream cho phép phân phối tải cho một chủ đề trên nhiều máy khách, như thể hiện trong hình tiếp theo.

Thí dụ

Trong ví dụ này, một khách hàng MQTT sẽ xuất bản tin nhắn đến một chủ đề trong RabbitMQ và nhiều người tiêu dùng sẽ chia sẻ các tin nhắn của chủ đề đó.

Cài đặt Plugin RabbitMQ và MQTT

Đầu tiên, chúng tôi sẽ chạy một phiên bản của RabbitMQ bằng hình ảnh Docker . Sau đó, chúng tôi sẽ cài đặt plugin MQTT .

> docker run  -d --hostname vs29 --name vs29 -p 8081:15672 -p 5672:5672 -p 1883:1883 rabbitmq:3-management


Bây giờ, hãy kiểm tra nhật ký khởi động của container đó:

>docker ps
CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS              PORTS                                                                                                               NAMES
fbd443154bf6        rabbitmq:3-management     "docker-entrypoint.s…"   6 seconds ago       Up 2 seconds        4369/tcp, 0.0.0.0:1883->1883/tcp, 5671/tcp, 15671/tcp, 0.0.0.0:5672->5672/tcp, 25672/tcp, 0.0.0.0:8081->15672/tcp   vs29

>docker logs fbd443154bf6 -f
....
....
...

2019-02-03 07:34:16.709 [info] <0.198.0>
node : rabbit@vs29
home dir : /var/lib/rabbitmq
config file(s) : /etc/rabbitmq/rabbitmq.conf
cookie hash : O+z+vUDvSh3J1vK/lV08Xw==
log(s) : <stdout>
database dir : /var/lib/rabbitmq/mnesia/rabbit@vs29


Một vài dòng cuối cùng của nhật ký chỉ ra rằng máy chủ được đọc ngay bây giờ. Bây giờ, hãy cài đặt plugin MQTT :

Navigate to the container first:
> docker exec -u 0 -it fbd443154bf6 /bin/bash

Enable the plugin now:
root@vs29:/#rabbitmq-plugins enable rabbitmq_mqtt


Thêm người dùng mới trong RabbitMQ

Hãy thêm người dùng mới vào RabbitMQ bằng UI quản trị. Mở URL http: // RabbitMQhost: 8081 / sau đó điều hướng đến tab 'Quản trị viên' (thông tin đăng nhập mặc định trong RabbitMQ là khách / khách).

Để tạo người dùng mới:

  • Thêm tên người dùng trong trường 'Tên người dùng.' Sau đó, hãy thêm người dùng 'client1.' Đặt mật khẩu trong trường 'Mật khẩu', hãy đặt mật khẩu thành 'client1'
  • Nhấp vào 'Thêm người dùng'

  • Người dùng mặc định không có quyền truy cập vào bất kỳ máy chủ ảo nào. Nhấp vào 'client1' để chỉnh sửa quyền của người dùng này. Trong trang mới, nhấp vào 'Đặt quyền' để cấp cho người dùng quyền truy cập vào tất cả các máy chủ ảo.
  • Để xác minh rằng mọi thứ đều hoạt động tốt, hãy sử dụng máy khách MQTT để đẩy dữ liệu đến máy chủ mới của chúng tôi. Trong hướng dẫn này, chúng tôi sẽ sử dụng các lệnh 'mosquitto_pub' và 'mosquitto_sub' được cung cấp bởi máy chủ Mosquitto. Đầu tiên, hãy đăng ký tất cả các chủ đề trong máy chủ. Thứ hai, chúng tôi sẽ đẩy một số dữ liệu đến máy chủ.
..>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "Hello World" -u client1 -P client1 -p 1883


..>mosquitto_sub -h xxx.xxx.xxx.xxx -t "#" -u client1 -P client1
Hello World


Nếu mọi thứ đều ổn, bạn sẽ nhận được 'Hello World'

Tạo dịch vụ nhận tin nhắn

Mục tiêu của hướng dẫn này là phân phối tải giữa nhiều phiên bản của cùng một ứng dụng. Vì vậy, hãy tạo một dịch vụ đơn giản để sử dụng tin nhắn bằng Spring Boot và Spring Cloud Stream.

  • Tạo một dự án Spring Boot mới. Bạn có thể sử dụng IDE hoặc Spring Trình khởi tạo
  • Điều chỉnh tệp mvn của bạn để bao gồm các nội dung sau:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>


Bây giờ, hãy thêm Stream Listener của chúng tôi:

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.SubscribableChannel;

@EnableBinding(MessageSink.InputChannel.class)
public class MessageSink {

    @StreamListener(InputChannel.SINK)
    public void handle(String message) {
System.out.println("new message:" + message + ", from worker :" + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public interface InputChannel {
        String SINK = "message-sink";

        @Input(SINK)
        SubscribableChannel sink();
    }
}


Bước tiếp theo là xác định và định cấu hình Kênh của chúng tôi (đây là phần quan trọng nhất của hướng dẫn này). Cấu hình sẽ được thêm vào trong tệp application.yml:

spring:
  cloud:
    stream:
      bindings:
        message-sink :
         destination: amq.topic
         binder: rabbit
         group: messages-consumer-group
         consumer :
           concurrency: 1
      rabbit:
        bindings:
          message-sink:
            consumer:
              durableSubscription: true
              declareExchange: true
              exchangeDurable: true
              exchangeType: topic
              queueNameGroupOnly: true 
  rabbitmq:
    host: xxx.xxx.xxx.xxx
    password: client1
    username: client1


Hãy thảo luận về cấu hình quan trọng trong application.yml:

  • Destination : amq.topic là Exchange mặc định được sử dụng bởi Plugin MQTT, vì vậy chúng tôi cần đăng ký nó.
  • nhóm : Theo Spring Cloud Documents , 'Tất cả các nhóm đăng ký một điểm đến đã nhận sẽ nhận được một bản sao dữ liệu được công bố, nhưng chỉ một thành viên của mỗi nhóm nhận được một tin nhắn nhất định từ đích đó'
  • Consumer.concurrency : Số lượng chủ đề tối đa có thể được sử dụng để xử lý các tin nhắn nhận được trong người tiêu dùng này. Chúng tôi sửa đổi con số này thành bất kỳ giá trị tích cực nào và khái niệm 'người tiêu dùng được nhóm' vẫn được áp dụng.
  • queueNamegroupOnly : Theo Spring Cloud Documents, 'Khi đúng, tiêu thụ từ một hàng đợi có tên bằng group. Nếu không, tên hàng đợi là destination.group. Điều này rất hữu ích, ví dụ như khi sử dụng Spring Cloud Stream để tiêu thụ từ hàng đợi RabbitMQ hiện có. ' Thật vậy, đây là một cấu hình rất quan trọng. Việc bỏ qua thuộc tính này sẽ dẫn đến nhiều lỗi khi bắt đầu dịch vụ vì Spring sẽ tạo một tên hàng đợi bắt đầu bằng 'amq,' không được RabbitMQ cho phép. Bạn có thêm chi tiết trong chủ đề này

Xác minh phân phối tải

Hãy bắt đầu hai trường hợp dịch vụ và đẩy một số dữ liệu bằng ứng dụng khách MQTT. Đầu tiên, mở cửa sổ lệnh Shell, điều hướng đến nguồn dự án của bạn và xây dựng dự án bằng lệnh

>mvn clean compile package


Thứ hai, mở hai lệnh cửa sổ Shell, điều hướng đến thư mục dự án của bạn và khởi động dịch vụ bằng lệnh

>java -jar target\balanced_mqtt_client-0.0.1-SNAPSHOT.jar


Bây giờ, chúng tôi sẽ đẩy một số dữ liệu từ máy khách MQTT:

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 1" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 2" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 3" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 4" -u client1 -P client1 -p 1883

>mosquitto_pub -h xxx.xxx.xxx.xxx -t /my/topic -m "message 5" -u client1 -P client1 -p 1883


Về phía người tiêu dùng, các thông báo sau sẽ được nhìn thấy:

người tiêu dùng 1:

2019-02-07 10:33:55.848  INFO 14284 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
2019-02-07 10:33:55.858  INFO 14284 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 8.824 seconds (JVM running for 9.318)
new message:message 1, from worker :messages-consumer-group-1
new message:message 3, from worker :messages-consumer-group-1
new message:message 5, from worker :messages-consumer-group-1


người tiêu dùng 2:

O 13832 --- [           main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.messages-consumer-group
O 13832 --- [           main] r.n.cloud.rabbitmq.mqtt.MqttApplication  : Started MqttApplication in 7.8 seconds (JVM running for 8.495)
worker :messages-consumer-group-1
worker :messages-consumer-group-1


Như chúng ta có thể thấy, các tin nhắn được phân phối giữa hai người tiêu dùng.

Phần kết luận

Hướng dẫn cho thấy cách triển khai người tiêu dùng MQTT cân bằng tải bằng máy chủ RabbitMQ và tính năng 'người tiêu dùng được nhóm'. Bạn có thể tải xuống bản demo từ GitHub .

Hữu ích 9 Chia sẻ Viết bình luận 0
Đã xem 6865