From 85905a7045fad094fe8e3414da33e2dc69f5bb3e Mon Sep 17 00:00:00 2001 From: Slider0007 <115730895+Slider0007@users.noreply.github.com> Date: Sat, 3 Dec 2022 19:10:44 +0100 Subject: [PATCH] Improve MQTT connection handling (#1462) * modify mqtt init at startup + after disconnection * mqtt_init only when not initialized * Minor udapte * Apply suggestions from code review Co-authored-by: CaCO3 * Correct typo Co-authored-by: CaCO3 --- .../jomjol_flowcontroll/ClassFlowControll.cpp | 19 +++++--- .../jomjol_flowcontroll/ClassFlowControll.h | 1 + .../jomjol_flowcontroll/ClassFlowMQTT.cpp | 5 +- .../components/jomjol_mqtt/interface_mqtt.cpp | 48 +++++++++++-------- .../jomjol_tfliteclass/server_tflite.cpp | 2 + 5 files changed, 45 insertions(+), 30 deletions(-) diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp index 7af4c119..b078cada 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp @@ -140,6 +140,15 @@ string ClassFlowControll::GetMQTTMainTopic() return ""; } +bool ClassFlowControll::StartMQTTService() { + /* Start the MQTT service */ + for (int i = 0; i < FlowControll.size(); ++i) { + if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0) { + return ((ClassFlowMQTT*) (FlowControll[i]))->Start(AutoIntervall); + } + } + return false; +} void ClassFlowControll::SetInitialParameter(void) @@ -311,7 +320,9 @@ bool ClassFlowControll::doFlow(string time) MQTTPublish(mqttServer_getMainTopic() + "/" + "status", flowStatus, false); string zw = "FlowControll.doFlow - " + FlowControll[i]->name(); - LogFile.WriteHeapInfo(zw); + #ifdef DEBUG_DETAIL_ON + LogFile.WriteHeapInfo(zw); + #endif if (!FlowControll[i]->doFlow(time)){ repeat++; @@ -551,12 +562,6 @@ bool ClassFlowControll::ReadParameter(FILE* pfile, string& aktparamgraph) } } } - - /* Start the MQTT service */ - for (int i = 0; i < FlowControll.size(); ++i) - if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0) - return ((ClassFlowMQTT*) (FlowControll[i]))->Start(AutoIntervall); - return true; } diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.h b/code/components/jomjol_flowcontroll/ClassFlowControll.h index 5fd3b08a..026e30b1 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.h +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.h @@ -71,6 +71,7 @@ public: t_CNNType GetTypeDigital(); t_CNNType GetTypeAnalog(); + bool StartMQTTService(); int CleanTempFolder(); diff --git a/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp b/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp index 39470c65..d350b18e 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp +++ b/code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp @@ -203,9 +203,8 @@ bool ClassFlowMQTT::Start(float AutoIntervall) { keepAlive, SetRetainFlag, (void *)&GotConnected); if (!MQTT_Init()) { - if (!MQTT_Init()) { // Retry - return false; - } + LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init at startup failed! Retry with next publish call"); + return false; } return true; diff --git a/code/components/jomjol_mqtt/interface_mqtt.cpp b/code/components/jomjol_mqtt/interface_mqtt.cpp index 8b411f74..d742b1d3 100644 --- a/code/components/jomjol_mqtt/interface_mqtt.cpp +++ b/code/components/jomjol_mqtt/interface_mqtt.cpp @@ -17,10 +17,11 @@ std::map>* subscribeFu int failedOnRound = -1; - + esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY; // ESP_EVENT_ANY_ID +bool mqtt_initialized = false; bool mqtt_connected = false; esp_mqtt_client_handle_t client = NULL; std::string uri, client_id, lwt_topic, lwt_connected, lwt_disconnected, user, password, maintopic; @@ -36,29 +37,27 @@ bool MQTTPublish(std::string _key, std::string _content, int retained_flag) { return true; // Fail quietly } + #ifdef DEBUG_DETAIL_ON LogFile.WriteHeapInfo("MQTT Publish"); #endif - if (!mqtt_connected) { - LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Not connected, trying to re-connect..."); + if (!mqtt_initialized) { if (!MQTT_Init()) { - if (!MQTT_Init()) { // Retry - LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to init, skipping all MQTT publishings in this round!"); - failedOnRound = getCountFlowRounds(); - return false; - } + LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, skipping all MQTT publishings in this round!"); + failedOnRound = getCountFlowRounds(); + return false; } - } + } msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag); if (msg_id < 0) { LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Failed to publish topic '" + _key + "', re-trying..."); - + esp_mqtt_client_reconnect(client); + msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag); if (msg_id < 0) { LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to publish topic '" + _key + "', skipping all MQTT publishings in this round!"); - mqtt_connected = false; // Force re-init on next call failedOnRound = getCountFlowRounds(); return false; } @@ -91,9 +90,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_DISCONNECTED: ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED"); - LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Disconnected! Going to re-connect..."); - mqtt_connected = false; // Force re-init on next call - esp_mqtt_client_reconnect(client); + mqtt_connected = false; break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); @@ -122,7 +119,8 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) break; case MQTT_EVENT_ERROR: ESP_LOGD(TAG, "MQTT_EVENT_ERROR"); - mqtt_connected = false; // Force re-init on next call + mqtt_initialized = false; // Force re-init on next publish call + mqtt_connected = false; break; default: ESP_LOGD(TAG, "Other event id:%d", event->event_id); @@ -179,7 +177,9 @@ bool MQTT_Init() { .lwt_msg = lw.c_str(), .lwt_retain = 1, .lwt_msg_len = (int)(lw.length()), - .keepalive = keepalive + .keepalive = keepalive, + .disable_auto_reconnect = false, // Reconnection routine active + .reconnect_timeout_ms = 10000 // Try to reconnect to broker every 10s }; if (user.length() && password.length()){ @@ -190,6 +190,7 @@ bool MQTT_Init() { #ifdef DEBUG_DETAIL_ON LogFile.WriteHeapInfo("MQTT Client Init"); #endif + client = esp_mqtt_client_init(&mqtt_cfg); if (client) { @@ -197,6 +198,7 @@ bool MQTT_Init() { if (ret != ESP_OK) { LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not register event (ret=" + std::to_string(ret) + ")!"); + mqtt_initialized = false; return false; } @@ -211,25 +213,31 @@ bool MQTT_Init() { if (ret != ESP_OK) { LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not start client (ret=" + std::to_string(ret) + ")!"); + mqtt_initialized = false; return false; } } } else { - LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not init client!"); + LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, no handle created!"); + mqtt_initialized = false; return false; } - LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Init successful"); + LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Client started, waiting for established connection..."); + mqtt_initialized = true; return true; } void MQTTdestroy_client() { - if (client != NULL) { + if (client) { esp_mqtt_client_stop(client); esp_mqtt_client_destroy(client); + client = NULL; + mqtt_initialized = false; + mqtt_connected = false; } } @@ -283,7 +291,7 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function