EMQ - 百万级开源MQTT消息服务器¶
EMQ 2.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。
注解
2.0 版本开始 emqttd 消息服务器自正式简称为 EMQ
EMQ 2.0 完整支持 MQTT V3.1/V3.1.1 版本协议规范,并扩展支持 WebSocket、Stomp、CoAP、MQTT-SN 或私有 TCP 协议。EMQ 2.0 消息服务器支持单节点100万连接与多节点分布式集群:
TODO: 2.0-rc.1 图片更新.

EMQ 2.0 为大规模客户端连接 (C1000K+) 的移动推送、移动消息、物联网、车联网、智能硬件等应用,提供一个完全开放源码、安装部署简便、企业级稳定可靠、可弹性扩展、易于定制开发的 MQTT 消息服务器。
注解
MQTT-SN、CoAP 协议已在2.0-rc.1版本发布,LWM2M、LoRaWan 协议在2.3-beta.1版本发布。
EMQ 2.0 项目文档目录:
开始使用 (Get Started)¶
EMQ 2.0 消息服务器简介¶
EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed) 的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。
EMQ 项目设计目标是承载移动终端或物联网终端海量 MQTT 连接,并实现在海量物联网设备间快速低延时消息路由:
- 稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
- 分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
- 消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
- 完整物联网协议支持,MQTT、MQTT-SN、CoAP、WebSocket 或私有协议支持。
MQTT 发布订阅模式简述¶
MQTT 是发布订阅(Publish/Subscribe) 模式的消息协议,与 HTTP 协议请求响应(Request/Response) 模式不同。
MQTT 发布者与订阅者之间通过”主题”(Topic) 进行消息路由,主题(Topic) 格式类似 Unix 文件路径,例如:
sensor/1/temperature
chat/room/subject
presence/user/feng
sensor/1/#
sensor/+/temperature
uber/drivers/joe/inbox
MQTT 主题(Topic) 支持’+’, ‘#’的通配符,’+’通配一个层级,’#’通配多个层级(必须在末尾)。
MQTT 消息发布者(Publisher) 只能向特定’名称主题’(不支持通配符)发布消息,订阅者(Subscriber)通过订阅’过滤主题’(支持通配符)来匹配消息。
注解
初接触MQTT协议的用户,通常会向通配符的’过滤主题’发布广播消息,MQTT 协议不支持这种模式,需从订阅侧设计广播主题(Topic)。 例如 Android 推送,向所有广州用户,推送某类本地消息,客户端获得 GIS 位置后,可订阅 ‘news/city/guangzhou’ 主题。
五分钟下载启动 EMQ¶
EMQ 2.0 消息服务器每个版本,会发布 Ubuntu、CentOS、FreeBSD、Mac OS X、Windows 平台程序包与 Docker 镜像。
下载地址: http://emqtt.com/downloads
程序包下载后,可直接解压启动运行,例如 Mac 平台:
unzip emqttd-macosx-v2.0.zip && cd emqttd
# 启动emqttd
./bin/emqttd start
# 检查运行状态
./bin/emqttd_ctl status
# 停止emqttd
./bin/emqttd stop
EMQ 消息服务默认允许匿名认证,启动后 MQTT 客户端可连接1883端口,启动运行日志输出在 log/ 目录。
源码编译EMQ 2.0¶
git clone https://github.com/emqtt/emq-relx.git
cd emq-relx && make
cd _rel/emqttd && ./bin/emqttd console
Web 管理控制台(Dashboard)¶
EMQ 消息服务器启动后,会默认加载 Dashboard 插件,启动 Web 管理控制台。用户可通过 Web 控制台,查看服务器运行状态、统计数据、客户端(Client)、会话(Session)、主题(Topic)、订阅(Subscription)、插件(Plugin)。
控制台地址: http://127.0.0.1:18083,默认用户: admin,密码:public

EMQ 2.0 消息服务器功能列表¶
- 完整的 MQTT V3.1/V3.1.1 协议规范支持
- QoS0, QoS1, QoS2 消息支持
- 持久会话与离线消息支持
- Retained 消息支持
- Last Will 消息支持
- TCP/SSL 连接支持
- MQTT/WebSocket/SSL 支持
- HTTP消息发布接口支持
- $SYS/# 系统主题支持
- 客户端在线状态查询与订阅支持
- 客户端 ID 或 IP 地址认证支持
- 用户名密码认证支持
- LDAP 认证
- Redis、MySQL、PostgreSQL、MongoDB、HTTP 认证集成
- 浏览器 Cookie 认证
- 基于客户端 ID、IP 地址、用户名的访问控制(ACL)
- 多服务器节点集群(Cluster)
- 多服务器节点桥接(Bridge)
- mosquitto 桥接支持
- Stomp 协议支持
- MQTT-SN 协议支持
- CoAP 协议支持
- Stomp/SockJS 支持
- 通过 Paho 兼容性测试
- 2.0新功能: 本地订阅($local/topic)
- 2.0新功能: 共享订阅($share/<group>/topic)
- 2.0新功能: sysctl 类似 k = v 格式配置文件
EMQ 2.0 扩展插件列表¶
EMQ 2.0 支持丰富的扩展插件,包括控制台、扩展模块、多种认证方式、多种接入协议等:
emq_plugin_template | 插件模版与演示代码 |
emq_retainer | Retain 消息存储插件 |
emq_modules | Presence, Subscription 扩展模块插件 |
emq_dashboard | Web 管理控制台,默认加载 |
emq_auth_clientid | ClientId、密码认证插件 |
emq_auth_username | 用户名、密码认证插件 |
emq_auth_ldap | LDAP 认证插件 |
emq_auth_http | HTTP 认证插件 |
emq_auth_mysql | MySQL 认证插件 |
emq_auth_pgsql | PostgreSQL 认证插件 |
emq_auth_redis | Redis 认证插件 |
emq_auth_mongo | MongoDB 认证插件 |
emq_sn | MQTT-SN 协议插件 |
emq_coap | CoAP 协议插件 |
emq_stomp | Stomp 协议插件 |
emq_recon | Recon 优化调测插件 |
emq_reloader | 热升级插件(开发调试) |
emq_sockjs | SockJS 插件(废弃) |
扩展插件通过 ‘bin/emqttd_ctl’ 管理命令行,或 Dashboard 控制台加载启用。例如启用 PostgreSQL 认证插件:
./bin/emqttd_ctl plugins load emq_auth_pgsql
100万线连接测试说明¶
注解
EMQ 2.0 消息服务器默认设置,允许最大客户端连接是512,因为大部分操作系统 ‘ulimit -n’ 限制为1024。
EMQ 消息服务器1.1.3版本,连接压力测试到130万线,8核心/32G内存的 CentOS 云服务器。
操作系统内核参数、TCP 协议栈参数、Erlang 虚拟机参数、EMQ 最大允许连接数设置简述如下:
Linux 操作系统参数¶
# 2M - 系统所有进程可打开的文件数量:
sysctl -w fs.file-max=2097152
sysctl -w fs.nr_open=2097152
# 1M - 系统允许当前进程打开的文件数量:
ulimit -n 1048576
Erlang 虚拟机参数¶
emqttd/etc/emq.conf:
## Erlang Process Limit
node.process_limit = 2097152
## Sets the maximum number of simultaneously existing ports for this system
node.max_ports = 1048576
EMQ 最大允许连接数¶
emqttd/etc/emq.conf ‘listeners’段落:
## Size of acceptor pool
listener.tcp.external.acceptors = 64
## Maximum number of concurrent clients
listener.tcp.external.max_clients = 1000000
测试客户端设置¶
测试客户端在一个接口上,最多只能创建65000连接:
sysctl -w net.ipv4.ip_local_port_range="500 65535"
echo 1000000 > /proc/sys/fs/nr_open
按应用场景测试¶
MQTT 是一个设计得非常出色的传输层协议,在移动消息、物联网、车联网、智能硬件甚至能源勘探等领域有着广泛的应用。1个字节报头、2个字节心跳、消息 QoS 支持等设计,非常适合在低带宽、不可靠网络、嵌入式设备上应用。
不同的应用有不同的系统要求,用户使用emqttd消息服务器前,可以按自己的应用场景进行测试,而不是简单的连接压力测试:
- Android 消息推送: 推送消息广播测试。
- 移动即时消息应用: 消息收发确认测试。
- 智能硬件应用: 消息的往返时延测试。
- 物联网数据采集: 并发连接与吞吐测试。
开源 MQTT 客户端项目¶
GitHub: https://github.com/emqtt
emqttc | Erlang MQTT客户端库 |
emqtt_benchmark | MQTT连接测试工具 |
CocoaMQTT | Swift语言MQTT客户端库 |
QMQTT | QT框架MQTT客户端库 |
Eclipse Paho: https://www.eclipse.org/paho/
MQTT.org: https://github.com/mqtt/mqtt.github.io/wiki/libraries
微信小程序开发¶
关于微信小程序¶
概览¶
小程序是一种新的开放能力,开发者可以快速地开发一个小程序。小程序可以在微信内被便捷地获取和传播,同时具有出色的使用体验。
微信小程序官网: https://mp.weixin.qq.com/cgi-bin/wx
微信小程序登录: https://mp.weixin.qq.com/
简易教程:https://developers.weixin.qq.com/miniprogram/dev/index.html
设计指南:https://developers.weixin.qq.com/miniprogram/design/index.html?t=2018712
应用¶
- 小程序开发
- 小游戏开发
动手开发微信小程序¶
开发之前需要先注册相关的帐号信息以及准备相关工具,笔者本人的帐号信息如下:
开发帐号:hytgxljh@163.com/ljh19*5
管理员微信号:m18617218008
TGIT帐号:hytgxljh/ljh19*5
注解
TGIT:一款可绑定的git管理工具。
微信小程序参考样例¶
MQTT协议¶
MQTT轻量发布订阅消息协议¶
概览¶
MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。
MQTT官网: http://mqtt.org
MQTT V3.1.1协议规范: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
特点¶
- 开放消息协议,简单易实现
- 发布订阅模式,一对多消息发布
- 基于TCP/IP网络连接
- 1字节固定报头,2字节心跳报文,报文结构紧凑
- 消息QoS支持,可靠传输保证
应用¶
MQTT协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等领域。
- 物联网M2M通信,物联网大数据采集
- Android消息推送,WEB消息推送
- 移动即时消息,例如Facebook Messenger
- 智能硬件、智能家具、智能电器
- 车联网通信,电动车站桩采集
- 智慧城市、远程医疗、远程教育
- 电力、石油与能源等行业市场
MQTT基于主题(Topic)消息路由¶
MQTT协议基于主题(Topic)进行消息路由,主题(Topic)类似URL路径,例如:
chat/room/1
sensor/10/temperature
sensor/+/temperature
$SYS/broker/metrics/packets/received
$SYS/broker/metrics/#
主题(Topic)通过’/’分割层级,支持’+’, ‘#’通配符:
'+': 表示通配一个层级,例如a/+,匹配a/x, a/y
'#': 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d
订阅者与发布者之间通过主题路由消息进行通信,例如采用mosquitto命令行发布订阅消息:
mosquitto_sub -t a/b/+ -q 1
mosquitto_pub -t a/b/c -m hello -q 1
注解
订阅者可以订阅含通配符主题,但发布者不允许向含通配符主题发布消息。
MQTT V3.1.1协议报文¶
报文结构¶
固定报头(Fixed header) |
可变报头(Variable header) |
报文有效载荷(Payload) |
固定报头¶
Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
byte1 | MQTT Packet type | Flags | ||||||
byte2… | Remaining Length |
报文类型¶
类型名称 | 类型值 | 报文说明 |
CONNECT | 1 | 发起连接 |
CONNACK | 2 | 连接回执 |
PUBLISH | 3 | 发布消息 |
PUBACK | 4 | 发布回执 |
PUBREC | 5 | QoS2消息回执 |
PUBREL | 6 | QoS2消息释放 |
PUBCOMP | 7 | QoS2消息完成 |
SUBSCRIBE | 8 | 订阅主题 |
SUBACK | 9 | 订阅回执 |
UNSUBSCRIBE | 10 | 取消订阅 |
UNSUBACK | 11 | 取消订阅回执 |
PINGREQ | 12 | PING请求 |
PINGRESP | 13 | PING响应 |
DISCONNECT | 14 | 断开连接 |
PUBLISH发布消息¶
PUBLISH报文承载客户端与服务器间双向的发布消息。 PUBACK报文用于接收端确认QoS1报文,PUBREC/PUBREL/PUBCOMP报文用于QoS2消息流程。
PINGREQ/PINGRESP心跳¶
客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送PINGREQ心跳报文,服务端响应PINGRESP报文。PINGREQ/PINGRESP报文均2个字节。
MQTT消息QoS¶
MQTT发布消息QoS保证不是端到端的,是客户端与服务器之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS。
发布消息的QoS | 主题订阅的QoS | 接收消息的QoS |
0 | 0 | 0 |
0 | 1 | 0 |
0 | 2 | 0 |
1 | 0 | 0 |
1 | 1 | 1 |
1 | 2 | 1 |
2 | 0 | 0 |
2 | 1 | 1 |
2 | 2 | 2 |
Qos0消息发布订阅¶

Qos1消息发布订阅¶

Qos2消息发布订阅¶

MQTT会话(Clean Session)¶
MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁。
MQTT连接保活心跳¶
MQTT客户端向服务器发起CONNECT请求时,通过KeepAlive参数设置保活周期。
客户端在无报文发送时,按KeepAlive周期定时发送2字节的PINGREQ心跳报文,服务端收到PINGREQ报文后,回复2字节的PINGRESP报文。
服务端在1.5个心跳周期内,既没有收到客户端发布订阅报文,也没有收到PINGREQ心跳报文时,主动心跳超时断开客户端TCP连接。
注解
emqttd消息服务器默认按最长2.5心跳周期超时设计。
MQTT遗愿消息(Last Will)¶
MQTT客户端向服务器端CONNECT请求时,可以设置是否发送遗愿消息(Will Message)标志,和遗愿消息主题(Topic)与内容(Payload)。
MQTT客户端异常下线时(客户端断开前未向服务器发送DISCONNECT消息),MQTT消息服务器会发布遗愿消息。
MQTT保留消息(Retained Message)¶
MQTT客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息(Retained Message)会驻留在消息服务器,后来的订阅者订阅主题时仍可以接收该消息。
例如mosquitto命令行发布一条保留消息到主题’a/b/c’:
mosquitto_pub -r -q 1 -t a/b/c -m 'hello'
之后连接上来的MQTT客户端订阅主题’a/b/c’时候,仍可收到该消息:
$ mosquitto_sub -t a/b/c -q 1
hello
保留消息(Retained Message)有两种清除方式:
客户端向有保留消息的主题发布一个空消息:
mosquitto_pub -r -q 1 -t a/b/c -m ''
消息服务器设置保留消息的超期时间。
MQTT WebSocket连接¶
MQTT协议除支持TCP传输层外,还支持WebSocket作为传输层。通过WebSocket浏览器可以直连MQTT消息服务器,发布订阅模式与其他MQTT客户端通信。
MQTT协议的WebSocket连接,必须采用binary模式,并携带子协议Header:
Sec-WebSocket-Protocol: mqttv3.1 或 mqttv3.1.1
MQTT协议客户端库¶
emqtt客户端库¶
emqtt项目组: https://github.com/emqtt
emqttc | Erlang MQTT客户端库 |
CocoaMQTT | Swift语言MQTT客户端库 |
QMQTT | QT框架MQTT客户端库 |
Eclipse Paho客户端库¶
Paho官网: http://www.eclipse.org/paho/
mqtt.org官网客户端库¶
mqtt.org: https://github.com/mqtt/mqtt.github.io/wiki/libraries
MQTT与XMPP协议对比¶
MQTT协议设计简单轻量、路由灵活,将在移动互联网物联网消息领域,全面取代PC时代的XMPP协议:
- MQTT协议一个字节固定报头,两个字节心跳报文,报文体积小编解码容易。XMPP协议基于繁重的XML,报文体积大且交互繁琐。
- MQTT协议基于主题(Topic)发布订阅模式消息路由,相比XMPP基于JID的点对点消息路由更为灵活。
- MQTT协议未定义报文内容格式,可以承载JSON、二进制等不同类型报文。XMPP协议采用XML承载报文,二进制必须Base64编码等处理。
- MQTT协议支持消息收发确认和QoS保证,XMPP主协议并未定义类似机制。MQTT协议有更好的消息可靠性保证。
反向代理与内网穿透¶
natApp¶
官方网站¶
website: https://natapp.cn/
安装¶
A、ubuntu:
sudo wget http://download.natapp.cn/assets/downloads/clients/2_3_8/natapp_linux_amd64_2_3_8.zip
unzip natapp_linux_amd64_2_3_8.zip
sudo mv natapp /etc/init.d/
sudo chmod 755 /etc/init.d/natapp
sudo vim /etc/rc.local
添加: /etc/init.d/natapp
sudo wget http://download.natapp.cn/assets/downloads/config.ini
vi config.ini
添加:你的authtoken
sudo mv config.ini /etc/init.d/
B、raspberrypi:
sudo wget http://download.natapp.cn/assets/downloads/clients/2_3_8/natapp_linux_arm_2_3_8.zip
unzip natapp_linux_arm_2_3_8.zip
sudo mv natapp /etc/init.d/
sudo chmod 755 /etc/init.d/natapp
sudo vim /etc/rc.local
添加: /etc/init.d/natapp
sudo wget http://download.natapp.cn/assets/downloads/config.ini
vi config.ini
添加:你的authtoken
sudo mv config.ini /etc/init.d/
我的natApp隧道¶
http://8bta7a.natappfree.cc root/a*l
TCP:s1.natapp.cc:2205 pi/raspberry1*5
TCP:s1.natapp.cc:2206 liujinghuan/1*56
teamviewer¶
安装¶
1、下载安装包:
x64# wget https://download.teamviewer.com/download/linux/teamviewer_amd64.deb
pi# wget https://download.teamviewer.com/download/linux/teamviewer-host_armhf.deb
2、解决包的依赖:
sudo apt-get install -f
3、包文件安装:
sudo dpkg -i teamviewer_amd64.deb
sudo dpkg -i teamviewer-host_armhf.deb
4、设置登录密码:
sudo teamviewer --passwd qsh12345
5、重新启动服务:
sudo teamviewer --daemon start
6、获取id并设置密码:
teamviewer --info print id
android
CMake¶
Dueros¶
单元测试目录 testing 使用¶
编译支持 freertos 平台¶
编译支持 mips 平台¶
1,修改build/device/linux/mips/sdkconfig.mk 编译工具路径 MIPS_SYSROOT_PATH,MIPS_LIB_PATH为本机的目录

2,看README.md
3,lightduerconfig,选第7个
4,编译完后复制out/linux/mips/sdkconfig/modules/duer-device/libduer-device.a到openwrt工程目录package/lele/duer/src/lib/下面
5,编译工程
增加新的设备支持¶
- pwd
- libduer-device
cp build/device/freertos/mw300 build/device/freertos/rtl8711 -r cp platform/source-freertos/include-mw300 platform/source-freertos/include-rtl8711 -r //make时需要用到此目录 vim /home/liujinghuan/work/libduer-device/platform/source-freertos/duer.mak //修改新定义device引用头文件的路径 vim build/device/freertos/rtl8711/configsetup.sh //修改新定义device名称 source build/envsetup.sh //将刚才新加device加入到环境中 lightduerconfig
选择新增加的设备make //成功后会在out目录下生成.a文件
安装过程中遇到的坑¶
文件格式不对导致的错误:
-bash: build/envsetup.sh: line 1: syntax error near unexpected token `$'{\r''
'bash: build/envsetup.sh: line 1: `function hlightduer() {
注解
导致这样的错误出现的原因是,windos环境中的换行符是\r\n,而linux系统中的换行符是\n
解决办法
sudo apt-get install tofrodos
fromdos build/envsetup.sh
国内方案商参考¶
EMQ 项目支持与联系:
官网: | http://emqtt.com |
项目: | https://github.com/emqtt |
微信: | emqttd |
微博: | http://weibo.com/emqtt |
Twitter: | @emqtt |
作者: | 李枫 <feng@emqtt.io> |