Mqtt最开始是为石油管道的监控而设计的一个协议,占用带宽小,低功耗。在iot领域有广泛应用。 下载预编译安装包: 解压,查看目录结构: 启动服务器: 查看状态: 停止服务器: mqtt服务器的ip查看: 服务器启动后,可以通过dash board来监控mqtt broker的状态 按F5,执行效果:
服务器端搭建:
root@ubuntu:/home/jack# wget https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip --2020-04-18 18:54:51-- https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip Resolving github.com (github.com)... 52.74.223.119 Connecting to github.com (github.com)|52.74.223.119|:443... connected. HTTP request sent, awaiting response... 302 Found Location: https://github-production-release-asset-2e65be.s3.amazonaws.com/7202769/4b404580-5fa4-11ea-9fe4-73ad70b12bdc?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20200419%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20200419T015452Z&X-Amz-Expires=300&X-Amz-Signature=6f5336e167fccc50b81ab189f5e3f8edfd1ccf4daa862af3a5d564a4cc17893b&X-Amz-SignedHeaders=host&actor_id=0&repo_id=7202769&response-content-disposition=attachment%3B%20filename%3Demqx-ubuntu18.04-v4.0.4.zip&response-content-type=application%2Foctet-stream [following] --2020-04-18 18:54:52-- https://github-production-release-asset-2e65be.s3.amazonaws.com/7202769/4b404580-5fa4-11ea-9fe4-73ad70b12bdc?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAIWNJYAX4CSVEH53A%2F20200419%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20200419T015452Z&X-Amz-Expires=300&X-Amz-Signature=6f5336e167fccc50b81ab189f5e3f8edfd1ccf4daa862af3a5d564a4cc17893b&X-Amz-SignedHeaders=host&actor_id=0&repo_id=7202769&response-content-disposition=attachment%3B%20filename%3Demqx-ubuntu18.04-v4.0.4.zip&response-content-type=application%2Foctet-stream Resolving github-production-release-asset-2e65be.s3.amazonaws.com (github-production-release-asset-2e65be.s3.amazonaws.com)... 52.217.46.76 Connecting to github-production-release-asset-2e65be.s3.amazonaws.com (github-production-release-asset-2e65be.s3.amazonaws.com)|52.217.46.76|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 23267163 (22M) [application/octet-stream] Saving to: ‘emqx-ubuntu18.04-v4.0.4.zip’ emqx-ubuntu18.04-v4.0.4.zip 100%[=================================================================================>] 22.19M 1.45MB/s in 16s 2020-04-18 18:55:10 (1.36 MB/s) - ‘emqx-ubuntu18.04-v4.0.4.zip’ saved [23267163/23267163] root@ubuntu:/home/jack
root@ubuntu:/home/jack# unzip emqx-ubuntu18.04-v4.0.4.zip root@ubuntu:/home/jack/mqttd/emqx# tree -L 1 . ├── bin ├── data ├── erts-10.5.2 ├── etc ├── lib ├── log └── releases 7 directories, 0 files root@ubuntu:/home/jack/mqttd/emqx#
root@ubuntu:/home/jack/mqttd/emqx# ./bin/emqx start EMQ X Broker v4.0.4 is started successfully! root@ubuntu:/home/jack/mqttd/emqx#
root@ubuntu:/home/jack/mqttd/emqx# ./bin/emqx_ctl status Node 'emqx@127.0.0.1' is started emqx 4.0.4 is running root@ubuntu:/home/jack/mqttd/emqx#
root@ubuntu:/home/jack/mqttd/emqx# ./bin/emqx stop ok root@ubuntu:/home/jack/mqttd/emqx#
root@ubuntu:/home/jack/mqttd/emqx# ifconfig ens33: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 192.168.1.243 netmask 255.255.255.0 broadcast 192.168.1.255 inet6 fe80::20c:29ff:fef4:a0f6 prefixlen 64 scopeid 0x20<link> ether 00:0c:29:f4:a0:f6 txqueuelen 1000 (Ethernet) RX packets 193799 bytes 60667948 (60.6 MB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 140410 bytes 12827731 (12.8 MB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536 inet 127.0.0.1 netmask 255.0.0.0 inet6 ::1 prefixlen 128 scopeid 0x10<host> loop txqueuelen 1000 (Local Loopback) RX packets 1328 bytes 127242 (127.2 KB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 1328 bytes 127242 (127.2 KB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0 root@ubuntu:/home/jack/mqttd/emqx#
在浏览器输入:https://192.168.1.243:18083/#/clients,用户名为:admin,密码为:public。如下图:
vscode IDE,git。
注意:依赖的net包容易下载失败:go get golang.org/x/net/proxy,用git从github下载后,将包拷贝到D:GOPATHsrcgolang.orgxnet目录即可mqtt客户端实现:
package main import ( "fmt" "log" "os" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) //创建全局mqtt publish消息处理 handler var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Println("发布消息:") fmt.Printf("TOPIC: %sn", msg.Topic()) fmt.Printf("MSG: %sn", msg.Payload()) } //创建全局mqtt sub消息处理 handler var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { fmt.Println("收到订阅消息:") fmt.Printf("Sub Client Topic : %s n", msg.Topic()) fmt.Printf("Sub Client msg : %s n", msg.Payload()) } func main() { mqtt.DEBUG = log.New(os.Stdout, "", 0) mqtt.ERROR = log.New(os.Stdout, "", 0) //opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client") // 创建消息发布go程 go func() { opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.1.243:1883").SetClientID("emqx_test_client66666") opts.SetKeepAlive(60 * time.Second) // Message callback handler,在没有任何订阅时,发布端调用此函数 opts.SetDefaultPublishHandler(messagePubHandler) opts.SetPingTimeout(1 * time.Second) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } for i := 0; i < 6; i++ { time.Sleep(1 * time.Second) fmt.Println("发布一条天气预报消息") token := client.Publish("testtopic/1", 0, false, "Hello World,深圳天气晴朗") token.Wait() } fmt.Println("发布客户端断开与broker的连接") client.Disconnect(250) }() // 创建消息订阅go程 go func() { opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.1.243:1883").SetClientID("emqx_test_client99999") opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(1 * time.Second) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } // Subscription /*if token := client.Subscribe("testtopic/#", 0, messageSubHandler); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) }*/ for i := 0; i < 6; i++ { time.Sleep(time.Duration(3) * time.Second) //发布消息 if token := client.Subscribe("testtopic/#", 1, messageSubHandler); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } } // Cancel subscription if token := client.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } fmt.Println("订阅客户端断开与broker的连接") //c.Disconnect(250) }() // Disconnect time.Sleep(10 * time.Second) }
API server listening at: 127.0.0.1:34378 [client] Connect() [client] Connect() [store] memorystore initialized [store] memorystore initialized [client] about to write new connect msg [client] about to write new connect msg [client] socket connected to broker [client] Using MQTT 3.1.1 protocol [client] socket connected to broker [client] Using MQTT 3.1.1 protocol [net] connect started [net] connect started [net] received connack [client] client is connected [client] exit startClient [pinger] keepalive starting [net] logic started [net] logic waiting for msg on ibound [net] outgoing started [net] outgoing waiting for an outbound message [net] incoming started [net] received connack [pinger] keepalive starting [client] client is connected [client] exit startClient [net] logic started [net] logic waiting for msg on ibound [net] outgoing started [net] outgoing waiting for an outbound message [net] incoming started 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message [client] enter Subscribe [client] SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [testtopic/#] [client] sending subscribe message, topic: testtopic/# [client] exit Subscribe [net] obound priority msg to write, type *packets.SubscribePacket [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [store] memorystore del: message 1 was deleted [net] received suback, id: 1 [net] granted qoss [1] [net] logic waiting for msg on ibound 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [net] received publish, msgId: 0 [net] putting msg on onPubChan [net] done putting msg on incomingPubChan [net] logic waiting for msg on ibound 收到订阅消息: Sub Client Topic : testtopic/1 Sub Client msg : Hello World,深圳天气晴朗2 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [net] received publish, msgId: 0 [net] putting msg on onPubChan [net] done putting msg on incomingPubChan [net] logic waiting for msg on ibound 收到订阅消息: Sub Client Topic : testtopic/1 Sub Client msg : Hello World,深圳天气晴朗3 [pinger] ping check 1.9993801 [pinger] ping check 0.9978095 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [net] received publish, msgId: 0 [net] putting msg on onPubChan [net] done putting msg on incomingPubChan [net] logic waiting for msg on ibound 收到订阅消息: Sub Client Topic : testtopic/1 Sub Client msg : Hello World,深圳天气晴朗4 [client] enter Subscribe [client] SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [testtopic/#] [client] sending subscribe message, topic: testtopic/# [client] exit Subscribe [net] obound priority msg to write, type *packets.SubscribePacket [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [store] memorystore del: message 1 was deleted [net] received suback, id: 1 [net] granted qoss [1] [net] logic waiting for msg on ibound 发布一条天气预报消息 [client] enter Publish [client] sending publish message, topic: testtopic/1 [net] obound wrote msg, id: 0 [net] outgoing waiting for an outbound message 发布客户端断开与broker的连接 [client] disconnecting [net] obound priority msg to write, type *packets.DisconnectPacket [net] outbound wrote disconnect, stopping [pinger] keepalive stopped [net] incoming stopped [msgids] cleaned up [client] disconnected [store] memorystore closed [net] Received Message [net] logic got msg on ibound [net] received publish, msgId: 0 [net] putting msg on onPubChan [net] done putting msg on incomingPubChan [net] logic waiting for msg on ibound 收到订阅消息: Sub Client Topic : testtopic/1 Sub Client msg : Hello World,深圳天气晴朗5 [client] enter Subscribe [client] SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [testtopic/#] [client] sending subscribe message, topic: testtopic/# [client] exit Subscribe [net] obound priority msg to write, type *packets.SubscribePacket [net] outgoing waiting for an outbound message [net] Received Message [net] logic got msg on ibound [store] memorystore del: message 1 was deleted [net] received suback, id: 1 [net] granted qoss [1] [net] logic waiting for msg on ibound Process exiting with code: 0
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算