EMQ - 百万级开源MQTT消息服务器

EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。

注解

2.0 版本开始 emqttd 消息服务器自正式简称为 EMQ

EMQ 2.0 完整支持 MQTT V3.1/V3.1.1 版本协议规范,并扩展支持 WebSocket、Stomp、CoAP、MQTT-SN 或私有 TCP 协议。EMQ 2.0 消息服务器支持单节点100万连接与多节点分布式集群:

TODO: 2.0-rc.1 图片更新.

https://assets.gitee.com/assets/gitee_stars/11_float_left_people-e931cf6cd746cc916f408fe4411cb900.png

EMQ 2.0 为大规模客户端连接 (C1000K+) 的移动推送、移动消息、物联网、车联网、智能硬件等应用,提供一个完全开放源码、安装部署简便、企业级稳定可靠、可弹性扩展、易于定制开发的 MQTT 消息服务器。

注解

MQTT-SN、CoAP 协议已在2.0-rc.1版本发布,LWM2M、LoRaWan 协议在2.3-beta.1版本发布。

EMQ 2.0 项目文档目录:

开始使用 (Get Started)

EMQ 2.0 消息服务器简介

EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。

EMQ 项目设计目标是承载移动终端或物联网终端海量 MQTT 连接,并实现在海量物联网设备间快速低延时消息路由:

  1. 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
  2. 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
  3. 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
  4. 完整物联网协议支持,MQTT、MQTT-SN、CoAP、WebSocket 或私有协议支持。

MQTT 发布订阅模式简述

MQTT 是发布订阅(Publish/Subscribe) 模式的消息协议,与 HTTP 协议请求响应(Request/Response) 模式不同。

MQTT 发布者与订阅者之间通过”主题”(Topic) 进行消息路由,主题(Topic) 格式类似 Unix 文件路径,例如:

sensor/1/temperature

chat/room/subject

presence/user/feng

sensor/1/#

sensor/+/temperature

uber/drivers/joe/inbox

MQTT 主题(Topic) 支持’+’, ‘#’的通配符,’+’通配一个层级,’#’通配多个层级(必须在末尾)。

MQTT 消息发布者(Publisher) 只能向特定’名称主题’(不支持通配符)发布消息,订阅者(Subscriber)通过订阅’过滤主题’(支持通配符)来匹配消息。

注解

初接触MQTT协议的用户,通常会向通配符的’过滤主题’发布广播消息,MQTT 协议不支持这种模式,需从订阅侧设计广播主题(Topic)。 例如 Android 推送,向所有广州用户,推送某类本地消息,客户端获得 GIS 位置后,可订阅 ‘news/city/guangzhou’ 主题。

五分钟下载启动 EMQ

EMQ 2.0 消息服务器每个版本,会发布 Ubuntu、CentOS、FreeBSD、Mac OS X、Windows 平台程序包与 Docker 镜像。

下载地址: http://emqtt.com/downloads

程序包下载后,可直接解压启动运行,例如 Mac 平台:

unzip emqttd-macosx-v2.0.zip && cd emqttd

# 启动emqttd
./bin/emqttd start

# 检查运行状态
./bin/emqttd_ctl status

# 停止emqttd
./bin/emqttd stop

EMQ 消息服务默认允许匿名认证,启动后 MQTT 客户端可连接1883端口,启动运行日志输出在 log/ 目录。

源码编译EMQ 2.0

git clone https://github.com/emqtt/emq-relx.git

cd emq-relx && make

cd _rel/emqttd && ./bin/emqttd console

Web 管理控制台(Dashboard)

EMQ 消息服务器启动后,会默认加载 Dashboard 插件,启动 Web 管理控制台。用户可通过 Web 控制台,查看服务器运行状态、统计数据、客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)、插件(Plugin)。

控制台地址: http://127.0.0.1:18083,默认用户: admin,密码:public

./_static/images/dashboard.png

EMQ 2.0 消息服务器功能列表

  • 完整的 MQTT V3.1/V3.1.1 协议规范支持
  • QoS0, QoS1, QoS2 消息支持
  • 持久会话与离线消息支持
  • Retained 消息支持
  • Last Will 消息支持
  • TCP/SSL 连接支持
  • MQTT/WebSocket/SSL 支持
  • HTTP消息发布接口支持
  • $SYS/# 系统主题支持
  • 客户端在线状态查询与订阅支持
  • 客户端 ID 或 IP 地址认证支持
  • 用户名密码认证支持
  • LDAP 认证
  • Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
  • 浏览器 Cookie 认证
  • 基于客户端 ID、IP 地址、用户名的访问控制(ACL)
  • 多服务器节点集群(Cluster)
  • 多服务器节点桥接(Bridge)
  • mosquitto 桥接支持
  • Stomp 协议支持
  • MQTT-SN 协议支持
  • CoAP 协议支持
  • Stomp/SockJS 支持
  • 通过 Paho 兼容性测试
  • 2.0新功能: 本地订阅($local/topic)
  • 2.0新功能: 共享订阅($share/<group>/topic)
  • 2.0新功能: sysctl 类似 k = v 格式配置文件

EMQ 2.0 扩展插件列表

EMQ 2.0 支持丰富的扩展插件,包括控制台、扩展模块、多种认证方式、多种接入协议等:

emq_plugin_template 插件模版与演示代码
emq_retainer Retain 消息存储插件
emq_modules Presence, Subscription 扩展模块插件
emq_dashboard Web 管理控制台,默认加载
emq_auth_clientid ClientId、密码认证插件
emq_auth_username 用户名、密码认证插件
emq_auth_ldap LDAP 认证插件
emq_auth_http HTTP 认证插件
emq_auth_mysql MySQL 认证插件
emq_auth_pgsql PostgreSQL 认证插件
emq_auth_redis Redis 认证插件
emq_auth_mongo MongoDB 认证插件
emq_sn MQTT-SN 协议插件
emq_coap CoAP 协议插件
emq_stomp Stomp 协议插件
emq_recon Recon 优化调测插件
emq_reloader 热升级插件(开发调试)
emq_sockjs SockJS 插件(废弃)

扩展插件通过 ‘bin/emqttd_ctl’ 管理命令行,或 Dashboard 控制台加载启用。例如启用 PostgreSQL 认证插件:

./bin/emqttd_ctl plugins load emq_auth_pgsql

100万线连接测试说明

注解

EMQ 2.0 消息服务器默认设置,允许最大客户端连接是512,因为大部分操作系统 ‘ulimit -n’ 限制为1024。

EMQ 消息服务器1.1.3版本,连接压力测试到130万线,8核心/32G内存的 CentOS 云服务器。

操作系统内核参数、TCP 协议栈参数、Erlang 虚拟机参数、EMQ 最大允许连接数设置简述如下:

Linux 操作系统参数

# 2M - 系统所有进程可打开的文件数量:

sysctl -w fs.file-max=2097152
sysctl -w fs.nr_open=2097152

# 1M - 系统允许当前进程打开的文件数量:

ulimit -n 1048576

TCP 协议栈参数

# backlog - Socket 监听队列长度:

sysctl -w net.core.somaxconn=65536

Erlang 虚拟机参数

emqttd/etc/emq.conf:

## Erlang Process Limit
node.process_limit = 2097152

## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1048576

EMQ 最大允许连接数

emqttd/etc/emq.conf ‘listeners’段落:

## Size of acceptor pool
listener.tcp.external.acceptors = 64

## Maximum number of concurrent clients
listener.tcp.external.max_clients = 1000000

测试客户端设置

测试客户端在一个接口上,最多只能创建65000连接:

sysctl -w net.ipv4.ip_local_port_range="500 65535"

echo 1000000 > /proc/sys/fs/nr_open

按应用场景测试

MQTT 是一个设计得非常出色的传输层协议,在移动消息、物联网、车联网、智能硬件甚至能源勘探等领域有着广泛的应用。1个字节报头、2个字节心跳、消息 QoS 支持等设计,非常适合在低带宽、不可靠网络、嵌入式设备上应用。

不同的应用有不同的系统要求,用户使用emqttd消息服务器前,可以按自己的应用场景进行测试,而不是简单的连接压力测试:

  1. Android 消息推送: 推送消息广播测试。
  2. 移动即时消息应用: 消息收发确认测试。
  3. 智能硬件应用: 消息的往返时延测试。
  4. 物联网数据采集: 并发连接与吞吐测试。

开源 MQTT 客户端项目

GitHub: https://github.com/emqtt

emqttc Erlang MQTT客户端库
emqtt_benchmark MQTT连接测试工具
CocoaMQTT Swift语言MQTT客户端库
QMQTT QT框架MQTT客户端库

Eclipse Paho: https://www.eclipse.org/paho/

MQTT.org: https://github.com/mqtt/mqtt.github.io/wiki/libraries

微信小程序开发

关于微信小程序

概览

小程序是一种新的开放能力,开发者可以快速地开发一个小程序。小程序可以在微信内被便捷地获取和传播,同时具有出色的使用体验。

微信小程序官网: https://mp.weixin.qq.com/cgi-bin/wx

微信小程序登录: https://mp.weixin.qq.com/

简易教程:https://developers.weixin.qq.com/miniprogram/dev/index.html

设计指南:https://developers.weixin.qq.com/miniprogram/design/index.html?t=2018712

应用

  1. 小程序开发
  2. 小游戏开发

动手开发微信小程序

开发之前需要先注册相关的帐号信息以及准备相关工具,笔者本人的帐号信息如下:

开发帐号:hytgxljh@163.com/ljh19*5

管理员微信号:m18617218008

TGIT帐号:hytgxljh/ljh19*5

注解

TGIT:一款可绑定的git管理工具。

微信小程序参考样例

MQTT协议

MQTT轻量发布订阅消息协议

概览

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。

MQTT官网: http://mqtt.org

MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

特点

  1. 开放消息协议,简单易实现
  2. 发布订阅模式,一对多消息发布
  3. 基于TCP/IP网络连接
  4. 1字节固定报头,2字节心跳报文,报文结构紧凑
  5. 消息QoS支持,可靠传输保证

应用

MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。

  1. 物联网M2M通信,物联网大数据采集
  2. Android消息推送,WEB消息推送
  3. 移动即时消息,例如Facebook Messenger
  4. 智能硬件、智能家具、智能电器
  5. 车联网通信,电动车站桩采集
  6. 智慧城市、远程医疗、远程教育
  7. 电力、石油与能源等行业市场

MQTT基于主题(Topic)消息路由

MQTT协议基于主题(Topic)进行消息路由,主题(Topic)类似URL路径,例如:

chat/room/1

sensor/10/temperature

sensor/+/temperature

$SYS/broker/metrics/packets/received

$SYS/broker/metrics/#

主题(Topic)通过’/’分割层级,支持’+’, ‘#’通配符:

'+': 表示通配一个层级,例如a/+,匹配a/x, a/y

'#': 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d

订阅者与发布者之间通过主题路由消息进行通信,例如采用mosquitto命令行发布订阅消息:

mosquitto_sub -t a/b/+ -q 1

mosquitto_pub -t a/b/c -m hello -q 1

注解

订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。

MQTT V3.1.1协议报文

报文结构

固定报头(Fixed header)
可变报头(Variable header)
报文有效载荷(Payload)

固定报头

Bit 7 6 5 4 3 2 1 0
byte1 MQTT Packet type Flags
byte2… Remaining Length

报文类型

类型名称 类型值 报文说明
CONNECT 1 发起连接
CONNACK 2 连接回执
PUBLISH 3 发布消息
PUBACK 4 发布回执
PUBREC 5 QoS2消息回执
PUBREL 6 QoS2消息释放
PUBCOMP 7 QoS2消息完成
SUBSCRIBE 8 订阅主题
SUBACK 9 订阅回执
UNSUBSCRIBE 10 取消订阅
UNSUBACK 11 取消订阅回执
PINGREQ 12 PING请求
PINGRESP 13 PING响应
DISCONNECT 14 断开连接

PUBLISH发布消息

PUBLISH报文承载客户端与服务器间双向的发布消息。 PUBACK报文用于接收端确认QoS1报文,PUBREC/PUBREL/PUBCOMP报文用于QoS2消息流程。

PINGREQ/PINGRESP心跳

客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送PINGREQ心跳报文,服务端响应PINGRESP报文。PINGREQ/PINGRESP报文均2个字节。

MQTT消息QoS

MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。

发布消息的QoS 主题订阅的QoS 接收消息的QoS
0 0 0
0 1 0
0 2 0
1 0 0
1 1 1
1 2 1
2 0 0
2 1 1
2 2 2

Qos0消息发布订阅

./_static/images/qos0_seq.png

Qos1消息发布订阅

./_static/images/qos1_seq.png

Qos2消息发布订阅

./_static/images/qos2_seq.png

MQTT会话(Clean Session)

MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。

‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。

‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。

MQTT连接保活心跳

MQTT客户端向服务器发起CONNECT请求时,通过KeepAlive参数设置保活周期。

客户端在无报文发送时,按KeepAlive周期定时发送2字节的PINGREQ心跳报文,服务端收到PINGREQ报文后,回复2字节的PINGRESP报文。

服务端在1.5个心跳周期内,既没有收到客户端发布订阅报文,也没有收到PINGREQ心跳报文时,主动心跳超时断开客户端TCP连接。

注解

emqttd消息服务器默认按最长2.5心跳周期超时设计。

MQTT遗愿消息(Last Will)

MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。

MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。

MQTT保留消息(Retained Message)

MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。

例如mosquitto命令行发布一条保留消息到主题’a/b/c’:

mosquitto_pub -r -q 1 -t a/b/c -m 'hello'

之后连接上来的MQTT客户端订阅主题’a/b/c’时候,仍可收到该消息:

$ mosquitto_sub -t a/b/c -q 1
hello

保留消息(Retained Message)有两种清除方式:

  1. 客户端向有保留消息的主题发布一个空消息:

    mosquitto_pub -r -q 1 -t a/b/c -m ''
    
  2. 消息服务器设置保留消息的超期时间。

MQTT WebSocket连接

MQTT协议除支持TCP传输层外,还支持WebSocket作为传输层。通过WebSocket浏览器可以直连MQTT消息服务器,发布订阅模式与其他MQTT客户端通信。

MQTT协议的WebSocket连接,必须采用binary模式,并携带子协议Header:

Sec-WebSocket-Protocol: mqttv3.1  mqttv3.1.1

MQTT协议客户端库

emqtt客户端库

emqtt项目组: https://github.com/emqtt

emqttc Erlang MQTT客户端库
CocoaMQTT Swift语言MQTT客户端库
QMQTT QT框架MQTT客户端库

Eclipse Paho客户端库

Paho官网: http://www.eclipse.org/paho/

mqtt.org官网客户端库

mqtt.org: https://github.com/mqtt/mqtt.github.io/wiki/libraries

MQTT与XMPP协议对比

MQTT协议设计简单轻量、路由灵活,将在移动互联网物联网消息领域,全面取代PC时代的XMPP协议:

  1. MQTT协议一个字节固定报头,两个字节心跳报文,报文体积小编解码容易。XMPP协议基于繁重的XML,报文体积大且交互繁琐。
  2. MQTT协议基于主题(Topic)发布订阅模式消息路由,相比XMPP基于JID的点对点消息路由更为灵活。
  3. MQTT协议未定义报文内容格式,可以承载JSON、二进制等不同类型报文。XMPP协议采用XML承载报文,二进制必须Base64编码等处理。
  4. MQTT协议支持消息收发确认和QoS保证,XMPP主协议并未定义类似机制。MQTT协议有更好的消息可靠性保证。

反向代理与内网穿透

natApp

官方网站

安装

A、ubuntu:

sudo wget http://download.natapp.cn/assets/downloads/clients/2_3_8/natapp_linux_amd64_2_3_8.zip

unzip natapp_linux_amd64_2_3_8.zip

sudo mv natapp /etc/init.d/

sudo chmod 755 /etc/init.d/natapp

sudo vim /etc/rc.local
   添加: /etc/init.d/natapp

sudo wget http://download.natapp.cn/assets/downloads/config.ini

vi config.ini
    添加:你的authtoken

sudo mv config.ini /etc/init.d/

B、raspberrypi:

sudo wget http://download.natapp.cn/assets/downloads/clients/2_3_8/natapp_linux_arm_2_3_8.zip

unzip natapp_linux_arm_2_3_8.zip

sudo mv natapp /etc/init.d/

sudo chmod 755 /etc/init.d/natapp

sudo vim /etc/rc.local
   添加: /etc/init.d/natapp

sudo wget http://download.natapp.cn/assets/downloads/config.ini

vi config.ini
    添加:你的authtoken

sudo mv config.ini /etc/init.d/

Natapp For 树莓派 开机启动脚本

我的natApp隧道

teamviewer

安装

1、下载安装包:

x64# wget https://download.teamviewer.com/download/linux/teamviewer_amd64.deb
pi#  wget https://download.teamviewer.com/download/linux/teamviewer-host_armhf.deb

2、解决包的依赖:

sudo apt-get install -f

3、包文件安装:

sudo dpkg -i teamviewer_amd64.deb
sudo dpkg -i teamviewer-host_armhf.deb

4、设置登录密码:

sudo teamviewer --passwd qsh12345

5、重新启动服务:

sudo teamviewer --daemon start

6、获取id并设置密码:

teamviewer --info print id

WOL 远程开机

被控主机BIOS设置

需要将WAKE ON LAN打开

安装

sudo apt-get install wakeonlan

运行

sudo wakeonlan 00:E0:4C:45:ED:6A      //此为onda的台式主机

android

Dueros

单元测试目录 testing 使用

编译支持 freertos 平台

编译支持 x86 平台

搭建环境

安装交叉编译器:

sudo apt-get install gcc-arm-none-eabi

查看编译器版本:

arm-none-eabi-gcc -v

编译支持 mips 平台

1,修改build/device/linux/mips/sdkconfig.mk 编译工具路径 MIPS_SYSROOT_PATH,MIPS_LIB_PATH为本机的目录

_images/duer_001.png

2,看README.md

3,lightduerconfig,选第7个

4,编译完后复制out/linux/mips/sdkconfig/modules/duer-device/libduer-device.a到openwrt工程目录package/lele/duer/src/lib/下面

5,编译工程

查看.a .so库文件所支持的芯片架构

windows环境::

1.通过360解压工具,将libxxx.a文件解压

2.通过linux下file命令查看.o文件支持的格式

linux环境::

增加新的设备支持

pwd
libduer-device

cp build/device/freertos/mw300 build/device/freertos/rtl8711 -r cp platform/source-freertos/include-mw300 platform/source-freertos/include-rtl8711 -r //make时需要用到此目录 vim /home/liujinghuan/work/libduer-device/platform/source-freertos/duer.mak //修改新定义device引用头文件的路径 vim build/device/freertos/rtl8711/configsetup.sh //修改新定义device名称 source build/envsetup.sh //将刚才新加device加入到环境中 lightduerconfig

选择新增加的设备

make //成功后会在out目录下生成.a文件

安装过程中遇到的坑

文件格式不对导致的错误:

-bash: build/envsetup.sh: line 1: syntax error near unexpected token `$'{\r''
'bash: build/envsetup.sh: line 1: `function hlightduer() {

注解

导致这样的错误出现的原因是,windos环境中的换行符是\r\n,而linux系统中的换行符是\n

解决办法

sudo apt-get install tofrodos
fromdos build/envsetup.sh

EMQ 项目支持与联系:

官网: http://emqtt.com
项目: https://github.com/emqtt
微信: emqttd
微博: http://weibo.com/emqtt
Twitter: @emqtt
作者: 李枫 <feng@emqtt.io>