refactored MQTT

This commit is contained in:
CaCO3
2022-10-27 01:09:09 +02:00
parent 06ab14a6c9
commit 30549ac5af
6 changed files with 130 additions and 133 deletions

View File

@@ -131,7 +131,6 @@ string ClassFlowControll::GetMQTTMainTopic()
if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0) if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0)
return ((ClassFlowMQTT*) (FlowControll[i]))->GetMQTTMainTopic(); return ((ClassFlowMQTT*) (FlowControll[i]))->GetMQTTMainTopic();
return ""; return "";
} }
@@ -148,7 +147,6 @@ void ClassFlowControll::SetInitialParameter(void)
disabled = false; disabled = false;
aktRunNr = 0; aktRunNr = 0;
aktstatus = "Booting ..."; aktstatus = "Booting ...";
} }
bool ClassFlowControll::isAutoStart(long &_intervall) bool ClassFlowControll::isAutoStart(long &_intervall)
@@ -157,6 +155,11 @@ bool ClassFlowControll::isAutoStart(long &_intervall)
return AutoStart; return AutoStart;
} }
int ClassFlowControll::getAutoInterval()
{
return AutoIntervall * 60; // AutoIntervall: Minuten -> seconds
}
ClassFlow* ClassFlowControll::CreateClassFlow(std::string _type) ClassFlow* ClassFlowControll::CreateClassFlow(std::string _type)
{ {
ClassFlow* cfc = NULL; ClassFlow* cfc = NULL;

View File

@@ -63,6 +63,7 @@ public:
std::string doSingleStep(std::string _stepname, std::string _host); std::string doSingleStep(std::string _stepname, std::string _host);
bool isAutoStart(long &_intervall); bool isAutoStart(long &_intervall);
int getAutoInterval();
std::string* getActStatus(); std::string* getActStatus();

View File

@@ -6,7 +6,8 @@
#include "time_sntp.h" #include "time_sntp.h"
#include "interface_mqtt.h" #include "interface_mqtt.h"
#include "ClassFlowPostProcessing.h" #include "ClassFlowPostProcessing.h"
#include "ClassLogFile.h" #include "ClassFlowPostProcessing.h"
#include "ClassFlowControll.h"
#include <time.h> #include <time.h>
@@ -22,7 +23,6 @@ void ClassFlowMQTT::SetInitialParameter(void)
topicRate = ""; topicRate = "";
topicTimeStamp = ""; topicTimeStamp = "";
maintopic = ""; maintopic = "";
lwt = "";
topicUptime = ""; topicUptime = "";
topicFreeMem = ""; topicFreeMem = "";
@@ -37,8 +37,6 @@ void ClassFlowMQTT::SetInitialParameter(void)
previousElement = NULL; previousElement = NULL;
ListFlowControll = NULL; ListFlowControll = NULL;
disabled = false; disabled = false;
MQTTenable = false;
keepAlive = 600; // TODO This must be greater than the Flow Interval!
} }
ClassFlowMQTT::ClassFlowMQTT() ClassFlowMQTT::ClassFlowMQTT()
@@ -57,6 +55,12 @@ ClassFlowMQTT::ClassFlowMQTT(std::vector<ClassFlow*>* lfc)
{ {
flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i]; flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i];
} }
if (((*ListFlowControll)[i])->name().compare("ClassFlowControll") == 0)
{
ClassFlowControll *cfc = (ClassFlowControll*) (*ListFlowControll)[i];
keepAlive = cfc->getAutoInterval()* 2.5; // Allow at least than 2 failed rounds before we are threated as disconnected
}
} }
} }
@@ -126,47 +130,22 @@ bool ClassFlowMQTT::ReadParameter(FILE* pfile, string& aktparamgraph)
} }
} }
#ifdef __HIDE_PASSWORD MQTT_Configure(uri, clientname, user, password, maintopic + "/connection", keepAlive);
ESP_LOGD(TAG, "Init Read with uri: %s, clientname: %s, user: %s, password: XXXXXX, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), maintopic.c_str());
#else
ESP_LOGD(TAG, "Init Read with uri: %s, clientname: %s, user: %s, password: %s, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), password.c_str(), maintopic.c_str());
#endif
if (!MQTTisConnected() && (uri.length() > 0) && (maintopic.length() > 0)) ESP_LOGD(TAG, "MQTT maintopic: %s", maintopic.c_str());
{
ESP_LOGD(TAG, "InitMQTTInit");
lwt = maintopic + "/connection";
#ifdef __HIDE_PASSWORD
ESP_LOGD(TAG, "Init MQTT with uri: %s, clientname: %s, user: %s, password: XXXXXXXX, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), maintopic.c_str());
#else
ESP_LOGD(TAG, "Init MQTT with uri: %s, clientname: %s, user: %s, password: %s, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), password.c_str(), maintopic.c_str());
#endif
if (!MQTTInit(uri, clientname, user, password, lwt, keepAlive))
{ // Failed
MQTTenable = false;
return true; // We need to return true despite we failed, else it will retry 5x and then reboot!
}
}
// Try sending LWT. If it fails, re-run init if (!MQTT_Init()) {
if (!MQTTPublish(lwt, "connected", SetRetainFlag)) if (!MQTT_Init()) { // Retry
{ // Failed
LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Re-running init...!");
if (!MQTTInit(this->uri, this->clientname, this->user, this->password, this->lwt, keepAlive))
{ // Failed
MQTTenable = false;
return false;
}
// Try again sending LWT and quit if it fails
if (!MQTTPublish(lwt, "connected", SetRetainFlag))
{ // Failed
MQTTenable = false;
return false; return false;
} }
} }
MQTTenable = true; MQTTPublish(maintopic + "/" + "mac", getMac(), SetRetainFlag);
MQTTPublish(maintopic + "/" + "ip", *getIPAddress(), SetRetainFlag);
MQTTPublish(maintopic + "/" + "hostname", hostname, SetRetainFlag);
publishRuntimeData();
return true; return true;
} }
@@ -176,6 +155,22 @@ string ClassFlowMQTT::GetMQTTMainTopic()
return maintopic; return maintopic;
} }
void ClassFlowMQTT::publishRuntimeData() {
char tmp_char[50];
sprintf(tmp_char, "%ld", (long)getUpTime());
MQTTPublish(maintopic + "/" + "uptime", std::string(tmp_char), SetRetainFlag);
sprintf(tmp_char, "%zu", esp_get_free_heap_size());
MQTTPublish(maintopic + "/" + "freeMem", std::string(tmp_char), SetRetainFlag);
sprintf(tmp_char, "%d", get_WIFI_RSSI());
MQTTPublish(maintopic + "/" + "wifiRSSI", std::string(tmp_char), SetRetainFlag);
sprintf(tmp_char, "%d", (int)temperatureRead());
MQTTPublish(maintopic + "/" + "CPUtemp", std::string(tmp_char), SetRetainFlag);
}
bool ClassFlowMQTT::doFlow(string zwtime) bool ClassFlowMQTT::doFlow(string zwtime)
{ {
@@ -188,44 +183,7 @@ bool ClassFlowMQTT::doFlow(string zwtime)
string zw = ""; string zw = "";
string namenumber = ""; string namenumber = "";
zw = maintopic + "/" + "uptime"; publishRuntimeData();
char uptimeStr[11];
sprintf(uptimeStr, "%ld", (long)getUpTime());
// Try sending uptime. If it fails, re-run init
if (!MQTTPublish(zw, uptimeStr, SetRetainFlag))
{ // Failed
LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Re-running init...!");
if (!MQTTInit(this->uri, this->clientname, this->user, this->password, this->lwt, keepAlive))
{ // Failed
MQTTenable = false;
return true; // We need to return true despite we failed, else it will retry 5x and then reboot!
}
// Try again and quit if it fails
if (!MQTTPublish(zw, uptimeStr, SetRetainFlag))
{ // Failed
MQTTenable = false;
return true; // We need to return true despite we failed, else it will retry 5x and then reboot!
}
}
zw = maintopic + "/" + "freeMem";
char freeheapmem[11];
sprintf(freeheapmem, "%zu", esp_get_free_heap_size());
if (!MQTTPublish(zw, freeheapmem, SetRetainFlag))
{ // Failed, skip other topics
return true; // We need to return true despite we failed, else it will retry 5x and then reboot!
}
zw = maintopic + "/" + "wifiRSSI";
char rssi[11];
sprintf(rssi, "%d", get_WIFI_RSSI());
MQTTPublish(zw, rssi, SetRetainFlag);
zw = maintopic + "/" + "CPUtemp";
std::string cputemp = std::to_string(temperatureRead());
MQTTPublish(zw, cputemp, SetRetainFlag);
if (flowpostprocessing) if (flowpostprocessing)
{ {
@@ -246,29 +204,23 @@ bool ClassFlowMQTT::doFlow(string zwtime)
else else
namenumber = maintopic + "/" + namenumber + "/"; namenumber = maintopic + "/" + namenumber + "/";
zw = namenumber + "value";
if (result.length() > 0) if (result.length() > 0)
MQTTPublish(zw, result, SetRetainFlag); MQTTPublish(namenumber + "value", result, SetRetainFlag);
zw = namenumber + "error";
if (resulterror.length() > 0) if (resulterror.length() > 0)
MQTTPublish(zw, resulterror, SetRetainFlag); MQTTPublish(namenumber + "error", resulterror, SetRetainFlag);
zw = namenumber + "rate";
if (resultrate.length() > 0) if (resultrate.length() > 0)
MQTTPublish(zw, resultrate, SetRetainFlag); MQTTPublish(namenumber + "rate", resultrate, SetRetainFlag);
zw = namenumber + "changeabsolut";
if (resultchangabs.length() > 0) if (resultchangabs.length() > 0)
MQTTPublish(zw, resultchangabs, SetRetainFlag); MQTTPublish(namenumber + "changeabsolut", resultchangabs, SetRetainFlag);
zw = namenumber + "raw";
if (resultraw.length() > 0) if (resultraw.length() > 0)
MQTTPublish(zw, resultraw, SetRetainFlag); MQTTPublish(namenumber + "raw", resultraw, SetRetainFlag);
zw = namenumber + "timestamp";
if (resulttimestamp.length() > 0) if (resulttimestamp.length() > 0)
MQTTPublish(zw, resulttimestamp, SetRetainFlag); MQTTPublish(namenumber + "timestamp", resulttimestamp, SetRetainFlag);
std::string json = ""; std::string json = "";
@@ -286,8 +238,7 @@ bool ClassFlowMQTT::doFlow(string zwtime)
json += "\",\"rate\":\"\""; json += "\",\"rate\":\"\"";
json += ",\"timestamp\":\""+resulttimestamp+"\"}"; json += ",\"timestamp\":\""+resulttimestamp+"\"}";
zw = namenumber + "json"; MQTTPublish(namenumber + "json", json, SetRetainFlag);
MQTTPublish(zw, json, SetRetainFlag);
} }
} }
else else

View File

@@ -14,10 +14,9 @@ protected:
ClassFlowPostProcessing* flowpostprocessing; ClassFlowPostProcessing* flowpostprocessing;
std::string user, password; std::string user, password;
int SetRetainFlag; int SetRetainFlag;
bool MQTTenable;
int keepAlive; int keepAlive;
std::string maintopic, lwt; std::string maintopic;
void SetInitialParameter(void); void SetInitialParameter(void);
public: public:
@@ -28,6 +27,7 @@ public:
string GetMQTTMainTopic(); string GetMQTTMainTopic();
bool ReadParameter(FILE* pfile, string& aktparamgraph); bool ReadParameter(FILE* pfile, string& aktparamgraph);
void publishRuntimeData();
bool doFlow(string time); bool doFlow(string time);
string name(){return "ClassFlowMQTT";}; string name(){return "ClassFlowMQTT";};
}; };

View File

@@ -19,16 +19,36 @@ esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY;
bool mqtt_connected = false; bool mqtt_connected = false;
esp_mqtt_client_handle_t client = NULL; esp_mqtt_client_handle_t client = NULL;
std::string uri, client_id, lwt_topic, user, password;
int keepalive;
bool MQTTPublish(std::string _key, std::string _content, int retained_flag){
bool MQTTPublish(std::string _key, std::string _content, int retained_flag) {
int msg_id; int msg_id;
std::string zw; std::string zw;
if (!mqtt_connected) {
LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Not connected, trying to re-connect...");
if (!MQTT_Init()) {
if (!MQTT_Init()) { // Retry
LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to init!");
return false;
}
}
}
msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
if (msg_id < 0) {
LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Failed to publish topic '" + _key + "', re-trying...");
msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag); msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
if (msg_id < 0) { if (msg_id < 0) {
LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to publish topic '" + _key + "'!"); LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to publish topic '" + _key + "'!");
mqtt_connected = false; // Force re-init on next call
return false; return false;
} }
}
zw = "MQTT - Published topic: " + _key + ", content: " + _content + " (msg_id=" + std::to_string(msg_id) + ")"; zw = "MQTT - Published topic: " + _key + ", content: " + _content + " (msg_id=" + std::to_string(msg_id) + ")";
LogFile.WriteToFile(ESP_LOG_DEBUG, zw); LogFile.WriteToFile(ESP_LOG_DEBUG, zw);
return true; return true;
@@ -50,6 +70,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
break; break;
case MQTT_EVENT_DISCONNECTED: case MQTT_EVENT_DISCONNECTED:
ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_DISCONNECTED"); ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_DISCONNECTED");
mqtt_connected = false; // Force re-init on next call
esp_mqtt_client_reconnect(client); esp_mqtt_client_reconnect(client);
break; break;
case MQTT_EVENT_SUBSCRIBED: case MQTT_EVENT_SUBSCRIBED:
@@ -79,6 +100,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
break; break;
case MQTT_EVENT_ERROR: case MQTT_EVENT_ERROR:
ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_ERROR"); ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_ERROR");
mqtt_connected = false; // Force re-init on next call
break; break;
default: default:
ESP_LOGI(TAG_INTERFACEMQTT, "Other event id:%d", event->event_id); ESP_LOGI(TAG_INTERFACEMQTT, "Other event id:%d", event->event_id);
@@ -93,35 +115,57 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
} }
bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _LWTContext, int _keepalive){ void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _lwt, int _keepalive){
std::string _zwmessage = "connection lost"; #ifdef __HIDE_PASSWORD
LogFile.WriteToFile(ESP_LOG_INFO, "MQTT Configuration: uri: " + _mqttURI + ", clientname: " + _clientid +
", user: " + _user + ", password: XXXXXXXX, last-will-topic: " + _lwt + ", timeout: " + std::to_string(_keepalive));
#else
LogFile.WriteToFile(ESP_LOG_INFO, "MQTT Configuration: uri: " + _mqttURI + ", clientname: " + _clientid +
", user: " + _user + ", password: " + _password + ", last-will-topic: " + _lwt + ", timeout: " + std::to_string(_keepalive));
#endif
int _lzw = _zwmessage.length(); uri = _mqttURI;
client_id = _clientid;
esp_mqtt_client_config_t mqtt_cfg = { lwt_topic = _lwt;
.uri = _mqttURI.c_str(), keepalive = _keepalive;
.client_id = _clientid.c_str(),
.lwt_topic = _LWTContext.c_str(),
.lwt_msg = _zwmessage.c_str(),
.lwt_retain = 1,
.lwt_msg_len = _lzw,
.keepalive = _keepalive
};
LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Init (client ID: " + _clientid + ")");
if (_user.length() && _password.length()){ if (_user.length() && _password.length()){
mqtt_cfg.username = _user.c_str(); user = _user;
mqtt_cfg.password = _password.c_str(); password = _password;
}
}
#ifdef __HIDE_PASSWORD bool MQTT_Init() {
ESP_LOGI(TAG_INTERFACEMQTT, "Connect to MQTT: %s, XXXXXXXX", mqtt_cfg.username); LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Init");
#else
ESP_LOGI(TAG_INTERFACEMQTT, "Connect to MQTT: %s, %s", mqtt_cfg.username, mqtt_cfg.password); MQTTdestroy_client();
#endif /*
mqtt_cfg.uri = uri.c_str();
mqtt_cfg.client_id = client_id.c_str();
mqtt_cfg.lwt_topic = lwt_topic.c_str();
mqtt_cfg.lwt_msg = lwt_msg.c_str();
mqtt_cfg.lwt_retain = 1;
mqtt_cfg.lwt_msg_len = lwt_msg.length();
mqtt_cfg.keepalive = keepalive;
*/
std::string lw = "connection lost";
esp_mqtt_client_config_t mqtt_cfg = {
.uri = uri.c_str(),
.client_id = client_id.c_str(),
.lwt_topic = lwt_topic.c_str(),
.lwt_msg = lw.c_str(),
.lwt_retain = 1,
.lwt_msg_len = (int)(lw.length()),
.keepalive = keepalive
};
if (user.length() && password.length()){
mqtt_cfg.username = user.c_str();
mqtt_cfg.password = password.c_str();
}; };
MQTTdestroy();
client = esp_mqtt_client_init(&mqtt_cfg); client = esp_mqtt_client_init(&mqtt_cfg);
if (client) if (client)
{ {
@@ -135,16 +179,10 @@ bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, st
LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not start client!"); LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not start client!");
return false; return false;
} }
/* if(!MQTTPublish(_LWTContext, "", 1))
{
LogFile.WriteToFile("MQTT - Could not publish LWT!");
return false;
}*/
} }
else else
{ {
LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not Init client!"); LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not init client!");
return false; return false;
} }
@@ -153,7 +191,7 @@ bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, st
} }
void MQTTdestroy() { void MQTTdestroy_client() {
if (client != NULL) { if (client != NULL) {
esp_mqtt_client_stop(client); esp_mqtt_client_stop(client);
esp_mqtt_client_destroy(client); esp_mqtt_client_destroy(client);
@@ -211,6 +249,9 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::st
void MQTTconnected(){ void MQTTconnected(){
if (mqtt_connected) { if (mqtt_connected) {
LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Connected"); LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Connected");
MQTTPublish(lwt_topic, "connected", true);
if (connectFunktionMap != NULL) { if (connectFunktionMap != NULL) {
for(std::map<std::string, std::function<void()>>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) { for(std::map<std::string, std::function<void()>>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) {
it->second(); it->second();

View File

@@ -5,8 +5,9 @@
#include <map> #include <map>
#include <functional> #include <functional>
bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _LWTContext, int _keepalive); void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _lwt, int _keepalive);
void MQTTdestroy(); bool MQTT_Init();
void MQTTdestroy_client();
bool MQTTPublish(std::string _key, std::string _content, int retained_flag = 1); // retained Flag as Standart bool MQTTPublish(std::string _key, std::string _content, int retained_flag = 1); // retained Flag as Standart