点赞
评论
收藏
分享
举报
IoT 场景-02:通过Nginx JavaScript 实现会话保持
发表于2020-12-19 15:51

浏览 923

文章标签

上一节我们介绍过了通过Nginx 实现MQTT 会话的负载均衡,那么如何解决每个MQTT Broker在消息分发的时候,只能收到一半信息的情况呢?

此时,我们就需要借助Nginx Plus的高级功能:Nginx JavaScript,来定制基于ClientID 的会话保持机制,将客户端与某个MQTT Broker进行会话保持,保障其信息的完整性,除非该MQTT Broker出现故障。


"Round Robin load balancing is an effective mechanism for distributing client connections across a group of servers. However, there are several reasons why it is not ideal for MQTT connections."


0x01 Nginx JavaScript 会话保持代码

mqtt.js 提取 MQTT ClientId 

var client_messages = 1;
2
var client_id_str = "-";
3
4
function getClientId(s) {
5
    if ( !s.fromUpstream ) {
6
        if ( s.buffer.toString().length == 0  ) { // Initial calls may
7
            s.log("No buffer yet");               // contain no data, so
8
            return s.AGAIN;                       // ask that we get called again
9
        } else if ( client_messages == 1 ) { // CONNECT is first packet from the client
            // CONNECT packet is 1, using upper 4 bits (00010000 to 00011111)
2
            var packet_type_flags_byte = s.buffer.charCodeAt(0);
3
            s.log("MQTT packet type+flags = " + packet_type_flags_byte.toString());
4
            if ( packet_type_flags_byte >= 16 && packet_type_flags_byte < 32 ) {
5
                // Calculate remaining length with variable encoding scheme
6
                var multiplier = 1;
7
                var remaining_len_val = 0;
8
                var remaining_len_byte;
9
                for (var remaining_len_pos = 1; remaining_len_pos < 5; remaining_len_pos++ ) {
10
                    remaining_len_byte = s.buffer.charCodeAt(remaining_len_pos);
11
                    if ( remaining_len_byte == 0 ) break; // Stop decoding on 0
12
                    remaining_len_val += (remaining_len_byte & 127) * multiplier;
13
                    multiplier *= 128;
14
                }
15
16
                // Extract ClientId based on length defined by 2-byte encoding
17
                var payload_offset = remaining_len_pos + 12; // Skip fixed head
1
                var client_id_len_msb = s.buffer.charCodeAt(payload_offset).toString(16);
2
                var client_id_len_lsb = s.buffer.charCodeAt(payload_offset + 1).toString(16);
3
                if ( client_id_len_lsb.length < 2 ) client_id_len_lsb = "0" + client_id_len_lsb;
4
                var client_id_len_int = parseInt(client_id_len_msb + client_id_len_lsb, 16);
5
                client_id_str = s.buffer.substr(payload_offset + 2, client_id_len_int);
6
                s.log("ClientId value  = " + client_id_str);
7
            } else {
8
                s.log("Received unexpected MQTT packet type+flags: " + packet_type_flags_byte.toString());
9
            }
10
        }
11
        client_messages++;
12
    }
13
    return s.OK;
14
}
15
16
function setClientId(s) {
17
    return client_id_str;
18
}


在配置文件nginx.conf 中进行引用 

upstream hive_mq {

server 172.17.0.3:1883; #mq1

server 172.17.0.2:1883; #mq2

zone tcp_mem 64k;

hash $mqtt_client_id consistent; # Session persistence keyed against ClientId

}

 

server {

listen 1883;

preread_buffer_size 1k; # Big enough to read CONNECT packet header

js_prereadgetClientId; # Parse CONNECT packet for ClientId

proxy_passhive_mq;

proxy_connect_timeout 1s; 


0x02 测试会话保持功能


此时,使用发布端持续发布数据,订阅客户端收到的信息会保持连续。

当停止其中任何一个MQTT Broker容器时,订阅客户端会有一定几率重新连接MQTT Broker并重新建立会话保持。



此时读者或许会问道,如果MQTT Broker容器意外故障,是否可以自动将故障点进行移除,并主动进行客户端的连接的重连? 好问题,请期待我们的下一节,使用NGINX Plus 进行主动MQTT健康检查。



参考:

https://dzone.com/articles/mqtt-load-balancing-and-session-persistence-with-nginx-plus













已修改于2023-03-09 02:07
本作品系原创
创作不易,留下一份鼓励
yuefeng

暂无个人介绍

关注



写下您的评论
发表评论
全部评论(0)

按点赞数排序

按时间排序

关于作者
yuefeng
这家伙很懒还未留下介绍~
6
文章
0
问答
1
粉丝
相关文章