You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Cigarette/Cigarette/Thread/threadSendMqtt.cpp

98 lines
3.1 KiB
C++

#include "threadSendMqtt.h"
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <stdio.h>
#include <qtcpsocket.h>
// 相关头文件
#include <QFile> // 文件操作
#include <QJsonObject> // JSON对象
#include <QJsonArray> // JSON数组
#include <QJsonDocument> // JSON文档
#include <QJsonParseError> // JSON异常捕捉
void threadSendMqtt::init(SyncQueue<_MqttSendInfo>* p_MQTT_Info_queue, std::string ip_, int port_) {
ip = QString::fromStdString(ip_);
port = port_;
qDebug() << "Mqtt ip:" << ip << "| Mqtt port:" << port;
Local_MQTT_Info_queue = p_MQTT_Info_queue;
}
void threadSendMqtt::start_work()
{
//start(HighestPriority);
start();
}
void threadSendMqtt::stop()
{
isLoop = false;
wait();
delete m_client;
}
bool threadSendMqtt::connectTCP() {
m_client = new QMqttClient(this);
m_client->setHostname(ip);
m_client->setPort(port);
//m_client->setUsername(userName);
//m_client->setPassword(userPassword);
m_client->connectToHost();
//connect(m_client, &QMqttClient::stateChanged, this, &MainWindow::updateLogStateChange);
connect(m_client, &QMqttClient::connected, this, [this](void) {qDebug() << "Mqtt connected";});
connect(m_client, &QMqttClient::disconnected, this, [this](void) {qDebug() << "Mqtt disconnected"; });
connect(m_client, SIGNAL(messageSent(qint32)), this, SLOT(MQTT_DATASEND_SUCCESS(qint32)));//消息发送成功提示的槽函数绑定
connect(m_client, &QMqttClient::messageReceived, this, [this](const QByteArray &message, const QMqttTopicName &topic) {
const QString content = QDateTime::currentDateTime().toString()
+ QLatin1String(" Received Topic: ")
+ topic.name()
+ QLatin1String(" Message: ")
+ message
+ QLatin1Char('\n');
//ui->editLog->insertPlainText(content);
});
connect(m_client, &QMqttClient::pingResponseReceived, this, [this]() {
const QString content = QDateTime::currentDateTime().toString()
+ QLatin1String(" PingResponse")
+ QLatin1Char('\n');
//ui->editLog->insertPlainText(content);
});
//connect(ui->lineEditHost, &QLineEdit::textChanged, m_client, &QMqttClient::setHostname);
//connect(ui->spinBoxPort, QOverload<int>::of(&QSpinBox::valueChanged), this, &MainWindow::setClientPort);
return true;
}
void threadSendMqtt::run()
{
if (!connectTCP())
qDebug() << "Mqtt connect error!";
while (isLoop) {
_MqttSendInfo TCPSendInfo;
Local_MQTT_Info_queue->take(TCPSendInfo);
num++;
sendData(&TCPSendInfo, num);
}
}
void threadSendMqtt::sendData(_MqttSendInfo* TCPSendInfo, int Num) {
QJsonObject addressObject;
// 方式二:赋值
addressObject["street"] = "123 Main St.";
addressObject["city"] = "Anytown";
addressObject["country"] = "USA";
if (m_client->state() == QMqttClient::Connected) {
QJsonDocument jsonDoc(addressObject);
QByteArray jsonBytes = jsonDoc.toJson();
auto result = m_client->publish(QMqttTopicName("topic"), jsonBytes, 0, true);
}
}