From 30549ac5af28ba2a9882b403ee3793b36d4f8830 Mon Sep 17 00:00:00 2001 From: CaCO3 Date: Thu, 27 Oct 2022 01:09:09 +0200 Subject: [PATCH] refactored MQTT --- .../jomjol_flowcontroll/ClassFlowControll.cpp | 7 +- .../jomjol_flowcontroll/ClassFlowControll.h | 1 + .../jomjol_flowcontroll/ClassFlowMQTT.cpp | 135 ++++++------------ .../jomjol_flowcontroll/ClassFlowMQTT.h | 4 +- .../components/jomjol_mqtt/interface_mqtt.cpp | 111 +++++++++----- code/components/jomjol_mqtt/interface_mqtt.h | 5 +- 6 files changed, 130 insertions(+), 133 deletions(-) diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp index e3e2e62e..bc8bfa4a 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp @@ -131,7 +131,6 @@ string ClassFlowControll::GetMQTTMainTopic() if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0) return ((ClassFlowMQTT*) (FlowControll[i]))->GetMQTTMainTopic(); - return ""; } @@ -148,7 +147,6 @@ void ClassFlowControll::SetInitialParameter(void) disabled = false; aktRunNr = 0; aktstatus = "Booting ..."; - } bool ClassFlowControll::isAutoStart(long &_intervall) @@ -157,6 +155,11 @@ bool ClassFlowControll::isAutoStart(long &_intervall) return AutoStart; } +int ClassFlowControll::getAutoInterval() +{ + return AutoIntervall * 60; // AutoIntervall: Minuten -> seconds +} + ClassFlow* ClassFlowControll::CreateClassFlow(std::string _type) { ClassFlow* cfc = NULL; diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.h b/code/components/jomjol_flowcontroll/ClassFlowControll.h index 5fd3b08a..0a2753c7 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.h +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.h @@ -63,6 +63,7 @@ public: std::string doSingleStep(std::string _stepname, std::string _host); bool isAutoStart(long &_intervall); + int getAutoInterval(); std::string* getActStatus(); diff --git a/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp b/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp index e34d26f2..868096f0 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp +++ b/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp @@ -6,7 +6,8 @@ #include "time_sntp.h" #include "interface_mqtt.h" #include "ClassFlowPostProcessing.h" -#include "ClassLogFile.h" +#include "ClassFlowPostProcessing.h" +#include "ClassFlowControll.h" #include @@ -22,7 +23,6 @@ void ClassFlowMQTT::SetInitialParameter(void) topicRate = ""; topicTimeStamp = ""; maintopic = ""; - lwt = ""; topicUptime = ""; topicFreeMem = ""; @@ -37,8 +37,6 @@ void ClassFlowMQTT::SetInitialParameter(void) previousElement = NULL; ListFlowControll = NULL; disabled = false; - MQTTenable = false; - keepAlive = 600; // TODO This must be greater than the Flow Interval! } ClassFlowMQTT::ClassFlowMQTT() @@ -57,6 +55,12 @@ ClassFlowMQTT::ClassFlowMQTT(std::vector* lfc) { flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i]; } + + if (((*ListFlowControll)[i])->name().compare("ClassFlowControll") == 0) + { + ClassFlowControll *cfc = (ClassFlowControll*) (*ListFlowControll)[i]; + keepAlive = cfc->getAutoInterval()* 2.5; // Allow at least than 2 failed rounds before we are threated as disconnected + } } } @@ -125,48 +129,23 @@ bool ClassFlowMQTT::ReadParameter(FILE* pfile, string& aktparamgraph) maintopic = hostname; } } + + MQTT_Configure(uri, clientname, user, password, maintopic + "/connection", keepAlive); -#ifdef __HIDE_PASSWORD - ESP_LOGD(TAG, "Init Read with uri: %s, clientname: %s, user: %s, password: XXXXXX, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), maintopic.c_str()); -#else - ESP_LOGD(TAG, "Init Read with uri: %s, clientname: %s, user: %s, password: %s, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), password.c_str(), maintopic.c_str()); -#endif + ESP_LOGD(TAG, "MQTT maintopic: %s", maintopic.c_str()); - if (!MQTTisConnected() && (uri.length() > 0) && (maintopic.length() > 0)) - { - ESP_LOGD(TAG, "InitMQTTInit"); - lwt = maintopic + "/connection"; -#ifdef __HIDE_PASSWORD - ESP_LOGD(TAG, "Init MQTT with uri: %s, clientname: %s, user: %s, password: XXXXXXXX, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), maintopic.c_str()); -#else - ESP_LOGD(TAG, "Init MQTT with uri: %s, clientname: %s, user: %s, password: %s, maintopic: %s", uri.c_str(), clientname.c_str(), user.c_str(), password.c_str(), maintopic.c_str()); -#endif - if (!MQTTInit(uri, clientname, user, password, lwt, keepAlive)) - { // Failed - MQTTenable = false; - return true; // We need to return true despite we failed, else it will retry 5x and then reboot! - } - } - - // Try sending LWT. If it fails, re-run init - if (!MQTTPublish(lwt, "connected", SetRetainFlag)) - { // Failed - LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Re-running init...!"); - if (!MQTTInit(this->uri, this->clientname, this->user, this->password, this->lwt, keepAlive)) - { // Failed - MQTTenable = false; - return false; - } - - // Try again sending LWT and quit if it fails - if (!MQTTPublish(lwt, "connected", SetRetainFlag)) - { // Failed - MQTTenable = false; + if (!MQTT_Init()) { + if (!MQTT_Init()) { // Retry return false; } } - MQTTenable = true; + MQTTPublish(maintopic + "/" + "mac", getMac(), SetRetainFlag); + MQTTPublish(maintopic + "/" + "ip", *getIPAddress(), SetRetainFlag); + MQTTPublish(maintopic + "/" + "hostname", hostname, SetRetainFlag); + + publishRuntimeData(); + return true; } @@ -176,6 +155,22 @@ string ClassFlowMQTT::GetMQTTMainTopic() return maintopic; } +void ClassFlowMQTT::publishRuntimeData() { + char tmp_char[50]; + + sprintf(tmp_char, "%ld", (long)getUpTime()); + MQTTPublish(maintopic + "/" + "uptime", std::string(tmp_char), SetRetainFlag); + + sprintf(tmp_char, "%zu", esp_get_free_heap_size()); + MQTTPublish(maintopic + "/" + "freeMem", std::string(tmp_char), SetRetainFlag); + + sprintf(tmp_char, "%d", get_WIFI_RSSI()); + MQTTPublish(maintopic + "/" + "wifiRSSI", std::string(tmp_char), SetRetainFlag); + + sprintf(tmp_char, "%d", (int)temperatureRead()); + MQTTPublish(maintopic + "/" + "CPUtemp", std::string(tmp_char), SetRetainFlag); +} + bool ClassFlowMQTT::doFlow(string zwtime) { @@ -188,44 +183,7 @@ bool ClassFlowMQTT::doFlow(string zwtime) string zw = ""; string namenumber = ""; - zw = maintopic + "/" + "uptime"; - char uptimeStr[11]; - sprintf(uptimeStr, "%ld", (long)getUpTime()); - - // Try sending uptime. If it fails, re-run init - if (!MQTTPublish(zw, uptimeStr, SetRetainFlag)) - { // Failed - LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Re-running init...!"); - if (!MQTTInit(this->uri, this->clientname, this->user, this->password, this->lwt, keepAlive)) - { // Failed - MQTTenable = false; - return true; // We need to return true despite we failed, else it will retry 5x and then reboot! - } - - // Try again and quit if it fails - if (!MQTTPublish(zw, uptimeStr, SetRetainFlag)) - { // Failed - MQTTenable = false; - return true; // We need to return true despite we failed, else it will retry 5x and then reboot! - } - } - - zw = maintopic + "/" + "freeMem"; - char freeheapmem[11]; - sprintf(freeheapmem, "%zu", esp_get_free_heap_size()); - if (!MQTTPublish(zw, freeheapmem, SetRetainFlag)) - { // Failed, skip other topics - return true; // We need to return true despite we failed, else it will retry 5x and then reboot! - } - - zw = maintopic + "/" + "wifiRSSI"; - char rssi[11]; - sprintf(rssi, "%d", get_WIFI_RSSI()); - MQTTPublish(zw, rssi, SetRetainFlag); - - zw = maintopic + "/" + "CPUtemp"; - std::string cputemp = std::to_string(temperatureRead()); - MQTTPublish(zw, cputemp, SetRetainFlag); + publishRuntimeData(); if (flowpostprocessing) { @@ -246,29 +204,23 @@ bool ClassFlowMQTT::doFlow(string zwtime) else namenumber = maintopic + "/" + namenumber + "/"; - zw = namenumber + "value"; if (result.length() > 0) - MQTTPublish(zw, result, SetRetainFlag); + MQTTPublish(namenumber + "value", result, SetRetainFlag); - zw = namenumber + "error"; if (resulterror.length() > 0) - MQTTPublish(zw, resulterror, SetRetainFlag); + MQTTPublish(namenumber + "error", resulterror, SetRetainFlag); - zw = namenumber + "rate"; if (resultrate.length() > 0) - MQTTPublish(zw, resultrate, SetRetainFlag); + MQTTPublish(namenumber + "rate", resultrate, SetRetainFlag); - zw = namenumber + "changeabsolut"; if (resultchangabs.length() > 0) - MQTTPublish(zw, resultchangabs, SetRetainFlag); + MQTTPublish(namenumber + "changeabsolut", resultchangabs, SetRetainFlag); - zw = namenumber + "raw"; if (resultraw.length() > 0) - MQTTPublish(zw, resultraw, SetRetainFlag); + MQTTPublish(namenumber + "raw", resultraw, SetRetainFlag); - zw = namenumber + "timestamp"; if (resulttimestamp.length() > 0) - MQTTPublish(zw, resulttimestamp, SetRetainFlag); + MQTTPublish(namenumber + "timestamp", resulttimestamp, SetRetainFlag); std::string json = ""; @@ -286,8 +238,7 @@ bool ClassFlowMQTT::doFlow(string zwtime) json += "\",\"rate\":\"\""; json += ",\"timestamp\":\""+resulttimestamp+"\"}"; - zw = namenumber + "json"; - MQTTPublish(zw, json, SetRetainFlag); + MQTTPublish(namenumber + "json", json, SetRetainFlag); } } else diff --git a/code/components/jomjol_flowcontroll/ClassFlowMQTT.h b/code/components/jomjol_flowcontroll/ClassFlowMQTT.h index a2805a4b..78213a50 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowMQTT.h +++ b/code/components/jomjol_flowcontroll/ClassFlowMQTT.h @@ -14,10 +14,9 @@ protected: ClassFlowPostProcessing* flowpostprocessing; std::string user, password; int SetRetainFlag; - bool MQTTenable; int keepAlive; - std::string maintopic, lwt; + std::string maintopic; void SetInitialParameter(void); public: @@ -28,6 +27,7 @@ public: string GetMQTTMainTopic(); bool ReadParameter(FILE* pfile, string& aktparamgraph); + void publishRuntimeData(); bool doFlow(string time); string name(){return "ClassFlowMQTT";}; }; diff --git a/code/components/jomjol_mqtt/interface_mqtt.cpp b/code/components/jomjol_mqtt/interface_mqtt.cpp index dc92269e..d6505cdc 100644 --- a/code/components/jomjol_mqtt/interface_mqtt.cpp +++ b/code/components/jomjol_mqtt/interface_mqtt.cpp @@ -19,16 +19,36 @@ esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY; bool mqtt_connected = false; esp_mqtt_client_handle_t client = NULL; +std::string uri, client_id, lwt_topic, user, password; +int keepalive; -bool MQTTPublish(std::string _key, std::string _content, int retained_flag){ - + +bool MQTTPublish(std::string _key, std::string _content, int retained_flag) { int msg_id; std::string zw; + + if (!mqtt_connected) { + LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Not connected, trying to re-connect..."); + if (!MQTT_Init()) { + if (!MQTT_Init()) { // Retry + LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to init!"); + return false; + } + } + } + msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag); if (msg_id < 0) { - LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to publish topic '" + _key + "'!"); - return false; + LogFile.WriteToFile(ESP_LOG_WARN, "MQTT - Failed to publish topic '" + _key + "', re-trying..."); + + msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag); + if (msg_id < 0) { + LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Failed to publish topic '" + _key + "'!"); + mqtt_connected = false; // Force re-init on next call + return false; + } } + zw = "MQTT - Published topic: " + _key + ", content: " + _content + " (msg_id=" + std::to_string(msg_id) + ")"; LogFile.WriteToFile(ESP_LOG_DEBUG, zw); return true; @@ -50,6 +70,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_DISCONNECTED"); + mqtt_connected = false; // Force re-init on next call esp_mqtt_client_reconnect(client); break; case MQTT_EVENT_SUBSCRIBED: @@ -79,6 +100,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG_INTERFACEMQTT, "MQTT_EVENT_ERROR"); + mqtt_connected = false; // Force re-init on next call break; default: ESP_LOGI(TAG_INTERFACEMQTT, "Other event id:%d", event->event_id); @@ -93,35 +115,57 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_ } -bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _LWTContext, int _keepalive){ - std::string _zwmessage = "connection lost"; +void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _lwt, int _keepalive){ +#ifdef __HIDE_PASSWORD + LogFile.WriteToFile(ESP_LOG_INFO, "MQTT Configuration: uri: " + _mqttURI + ", clientname: " + _clientid + + ", user: " + _user + ", password: XXXXXXXX, last-will-topic: " + _lwt + ", timeout: " + std::to_string(_keepalive)); +#else + LogFile.WriteToFile(ESP_LOG_INFO, "MQTT Configuration: uri: " + _mqttURI + ", clientname: " + _clientid + + ", user: " + _user + ", password: " + _password + ", last-will-topic: " + _lwt + ", timeout: " + std::to_string(_keepalive)); +#endif - int _lzw = _zwmessage.length(); - - esp_mqtt_client_config_t mqtt_cfg = { - .uri = _mqttURI.c_str(), - .client_id = _clientid.c_str(), - .lwt_topic = _LWTContext.c_str(), - .lwt_msg = _zwmessage.c_str(), - .lwt_retain = 1, - .lwt_msg_len = _lzw, - .keepalive = _keepalive - }; - - LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Init (client ID: " + _clientid + ")"); + uri = _mqttURI; + client_id = _clientid; + lwt_topic = _lwt; + keepalive = _keepalive; if (_user.length() && _password.length()){ - mqtt_cfg.username = _user.c_str(); - mqtt_cfg.password = _password.c_str(); + user = _user; + password = _password; + } +} -#ifdef __HIDE_PASSWORD - ESP_LOGI(TAG_INTERFACEMQTT, "Connect to MQTT: %s, XXXXXXXX", mqtt_cfg.username); -#else - ESP_LOGI(TAG_INTERFACEMQTT, "Connect to MQTT: %s, %s", mqtt_cfg.username, mqtt_cfg.password); -#endif +bool MQTT_Init() { + LogFile.WriteToFile(ESP_LOG_INFO, "MQTT - Init"); + + MQTTdestroy_client(); +/* + mqtt_cfg.uri = uri.c_str(); + mqtt_cfg.client_id = client_id.c_str(); + mqtt_cfg.lwt_topic = lwt_topic.c_str(); + mqtt_cfg.lwt_msg = lwt_msg.c_str(); + mqtt_cfg.lwt_retain = 1; + mqtt_cfg.lwt_msg_len = lwt_msg.length(); + mqtt_cfg.keepalive = keepalive; +*/ + + std::string lw = "connection lost"; + + esp_mqtt_client_config_t mqtt_cfg = { + .uri = uri.c_str(), + .client_id = client_id.c_str(), + .lwt_topic = lwt_topic.c_str(), + .lwt_msg = lw.c_str(), + .lwt_retain = 1, + .lwt_msg_len = (int)(lw.length()), + .keepalive = keepalive + }; + + if (user.length() && password.length()){ + mqtt_cfg.username = user.c_str(); + mqtt_cfg.password = password.c_str(); }; - MQTTdestroy(); client = esp_mqtt_client_init(&mqtt_cfg); if (client) { @@ -135,16 +179,10 @@ bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, st LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not start client!"); return false; } - - /* if(!MQTTPublish(_LWTContext, "", 1)) - { - LogFile.WriteToFile("MQTT - Could not publish LWT!"); - return false; - }*/ } else { - LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not Init client!"); + LogFile.WriteToFile(ESP_LOG_ERROR, "MQTT - Could not init client!"); return false; } @@ -153,7 +191,7 @@ bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, st } -void MQTTdestroy() { +void MQTTdestroy_client() { if (client != NULL) { esp_mqtt_client_stop(client); esp_mqtt_client_destroy(client); @@ -211,6 +249,9 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function>::iterator it = connectFunktionMap->begin(); it != connectFunktionMap->end(); ++it) { it->second(); diff --git a/code/components/jomjol_mqtt/interface_mqtt.h b/code/components/jomjol_mqtt/interface_mqtt.h index 6e9ae85e..db0e6971 100644 --- a/code/components/jomjol_mqtt/interface_mqtt.h +++ b/code/components/jomjol_mqtt/interface_mqtt.h @@ -5,8 +5,9 @@ #include #include -bool MQTTInit(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _LWTContext, int _keepalive); -void MQTTdestroy(); +void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password, std::string _lwt, int _keepalive); +bool MQTT_Init(); +void MQTTdestroy_client(); bool MQTTPublish(std::string _key, std::string _content, int retained_flag = 1); // retained Flag as Standart