浏览 923
上一节我们介绍过了通过Nginx 实现MQTT 会话的负载均衡,那么如何解决每个MQTT Broker在消息分发的时候,只能收到一半信息的情况呢?
此时,我们就需要借助Nginx Plus的高级功能:Nginx JavaScript,来定制基于ClientID 的会话保持机制,将客户端与某个MQTT Broker进行会话保持,保障其信息的完整性,除非该MQTT Broker出现故障。
"Round Robin load balancing is an effective mechanism for distributing client connections across a group of servers. However, there are several reasons why it is not ideal for MQTT connections."
0x01 Nginx JavaScript 会话保持代码
mqtt.js 提取 MQTT ClientId
var client_messages = 1;
var client_id_str = "-";
function getClientId(s) {
if ( !s.fromUpstream ) {
if ( s.buffer.toString().length == 0 ) { // Initial calls may
s.log("No buffer yet"); // contain no data, so
return s.AGAIN; // ask that we get called again
} else if ( client_messages == 1 ) { // CONNECT is first packet from the client
// CONNECT packet is 1, using upper 4 bits (00010000 to 00011111)
var packet_type_flags_byte = s.buffer.charCodeAt(0);
s.log("MQTT packet type+flags = " + packet_type_flags_byte.toString());
if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) {
// Calculate remaining length with variable encoding scheme
var multiplier = 1;
var remaining_len_val = 0;
var remaining_len_byte;
for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) {
remaining_len_byte = s.buffer.charCodeAt(remaining_len_pos);
if ( remaining_len_byte == 0 ) break; // Stop decoding on 0
remaining_len_val += (remaining_len_byte & 127) * multiplier;
multiplier *= 128;
}
// Extract ClientId based on length defined by 2-byte encoding
var payload_offset = remaining_len_pos + 12; // Skip fixed head
var client_id_len_msb = s.buffer.charCodeAt(payload_offset).toString(16);
var client_id_len_lsb = s.buffer.charCodeAt(payload_offset + 1).toString(16);
if ( client_id_len_lsb.length < 2 ) client_id_len_lsb = "0" + client_id_len_lsb;
var client_id_len_int = parseInt(client_id_len_msb + client_id_len_lsb, 16);
client_id_str = s.buffer.substr(payload_offset + 2, client_id_len_int);
s.log("ClientId value = " + client_id_str);
} else {
s.log("Received unexpected MQTT packet type+flags: " + packet_type_flags_byte.toString());
}
}
client_messages++;
}
return s.OK;
}
function setClientId(s) {
return client_id_str;
}
在配置文件nginx.conf 中进行引用
upstream hive_mq {
server 172.17.0.3:1883; #mq1
server 172.17.0.2:1883; #mq2
zone tcp_mem 64k;
hash $mqtt_client_id consistent; # Session persistence keyed against ClientId
}
server {
listen 1883;
preread_buffer_size 1k; # Big enough to read CONNECT packet header
js_prereadgetClientId; # Parse CONNECT packet for ClientId
proxy_passhive_mq;
proxy_connect_timeout 1s;
0x02 测试会话保持功能
此时,使用发布端持续发布数据,订阅客户端收到的信息会保持连续。
当停止其中任何一个MQTT Broker容器时,订阅客户端会有一定几率重新连接MQTT Broker并重新建立会话保持。
此时读者或许会问道,如果MQTT Broker容器意外故障,是否可以自动将故障点进行移除,并主动进行客户端的连接的重连? 好问题,请期待我们的下一节,使用NGINX Plus 进行主动MQTT健康检查。
参考:
https://dzone.com/articles/mqtt-load-balancing-and-session-persistence-with-nginx-plus
按点赞数排序
按时间排序