局域网内使用搭建一个本地MQTT服务器。
环境:
Ubuntu18, python3.6
流程:
- 1、使用mosquitto服务器软件搭建MQTT服务器
-
- 2、测试局域网内设备使用MQTT服务器
-
- 3、使用python作为MQTT客户端
(一)使用mosquitto服务器软件搭建MQTT服务器
转自:https://www.cnblogs.com/lulipro/p/10914482.html
1、安装下载 mosquitto
# 安装mosquitto
sudo apt-get install mosquitto
# 安装客户端
sudo apt-get install mosquitto-clients
# 安装设备端
sudo apt-get install mosquitto-dev
2、添加和修改配置
#用户的局部配置文件放在: /etc/mosquitto/conf.d/目录下,并且这个目录下的所有以.conf后缀的文件都将被mosquitto作为配置文件,在启动时加载。
#在/etc/mosquitto/conf.d目录下,新建myconfig.conf配置文件
#在其中输入如下内容
#-------------------------------------------
# 关闭匿名访问,客户端必须使用用户名
allow_anonymous false
#指定 用户名-密码 文件
password_file /etc/mosquitto/pwfile.txt
#--------------------------------------------
3、创建一个MQTT服务器账户
假设用户名为:user1
在命令行运行:mosquitto_passwd -c /etc/mosquitto/pwfile.txt user1
回车后连续输入2次用户密码即可
4、启动
sudo service mosquitto status #查看运行状态
sudo service mosquitto start #启动服务
sudo service mosquitto stop #停止服务
(二)测试局域网内设备使用MQTT服务器
可以使用手机做个简单的测试,使用刚刚创建的用户和密码建立连接
在App Store下载MQTTool工具,配置服务器IP,端口号默认1883,填写账户密码,然后点击connct:

然后订阅,订阅主题 test

然后时发布消息,主题也是test,发布内容是 have a test

此时再回到订阅栏看到消息 have a test,说明MQTT服务端搭好了。
(三)使用python作为MQTT客户端
真正使用的时候肯定不会只是手机发个简短的消息。可以使用python实现订阅和发布
安装包:
`pip3 install paho-mqtt`
代码:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
def subscribe():
client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("user1", "password")
client.on_connect = on_connect
client.on_message = on_message
#client.on_disconnect = on_disconnect
#连接 IP port keepalive
client.connect('192.168.0.116', 1883, 600)
#订阅的 topic
client.subscribe('test', qos=0)
client.loop_forever()
def publish():
client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("user1", "password")
client.on_connect = on_connect
client.on_message = on_message
#连接 IP port keepalive
client.connect('192.168.0.116', 1883, 600)
#发布 topic 内容
client.publish('test', payload='have a seventh test', qos=0)
client.publish('test', payload='have a eighth test', qos=0)
publish()
# subscribe()