Streamlined MQTT interlock when not activated & interlock MQTT when WIFI is not connected (#1556)

* streamlined mqtt interlock when disabled

* Disconnect mqtt client before reboot

* Interlock MQTT with WIFI

* Update

* loglevel to DEBUG

* Update

* mqtt msg id incremental

* new ENABLE_MQTT includes

* Loglevel to DEBUG

* Loglevel

* Update interface_mqtt.cpp
This commit is contained in:
Slider0007
2022-12-12 08:09:04 +01:00
committed by GitHub
parent 93f0f6b07d
commit b85e3b11a9
12 changed files with 188 additions and 131 deletions

View File

@@ -29,6 +29,9 @@
#include "server_tflite.h"
#include "server_file.h"
#include "server_GPIO.h"
#ifdef ENABLE_MQTT
#include "interface_mqtt.h"
#endif //ENABLE_MQTT
#include "ClassLogFile.h"
@@ -601,6 +604,9 @@ void doReboot(){
xTaskCreate(&task_reboot, "reboot", configMINIMAL_STACK_SIZE * 64, NULL, 10, NULL);
// KillTFliteTasks(); // kills itself
gpio_handler_destroy();
#ifdef ENABLE_MQTT
MQTTdestroy_client();
#endif //ENABLE_MQTT
vTaskDelay(5000 / portTICK_PERIOD_MS);
esp_restart();
hard_restart();

View File

@@ -93,7 +93,7 @@ void ClassFlowImage::RemoveOldLogs()
if (!isLogImage)
return;
ESP_LOGI(TAG, "remove old images");
ESP_LOGD(TAG, "remove old images");
if (logfileRetentionInDays == 0) {
return;
}
@@ -132,7 +132,7 @@ void ClassFlowImage::RemoveOldLogs()
}
}
}
ESP_LOGI(TAG, "Image folder deleted: %d | Image folder not deleted: %d", deleted, notDeleted);
ESP_LOGD(TAG, "Image folder deleted: %d | Image folder not deleted: %d", deleted, notDeleted);
closedir(dir);
}

View File

@@ -189,36 +189,26 @@ string ClassFlowMQTT::GetMQTTMainTopic()
}
bool ClassFlowMQTT::Start(float AutoIntervall) {
// printf("URI: %s, MAINTOPIC: %s", uri.c_str(), maintopic.c_str());
if ((uri.length() == 0) || (maintopic.length() == 0))
bool ClassFlowMQTT::Start(float AutoIntervall)
{
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "MQTT not started because URI or Maintopic is not set. MQTT will be disabled.");
MQTTdisable();
return false;
}
roundInterval = AutoIntervall; // Minutes
keepAlive = roundInterval * 60 * 2.5; // Seconds, make sure it is greater thatn 2 rounds!
std::stringstream stream;
stream << std::fixed << std::setprecision(1) << "Digitizer interval is " << roundInterval <<
" minutes => setting MQTT LWT timeout to " << ((float)keepAlive/60) << " minutes.";
LogFile.WriteToFile(ESP_LOG_INFO, TAG, stream.str());
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, stream.str());
mqttServer_setParameter(flowpostprocessing->GetNumbers(), keepAlive, roundInterval);
MQTT_Configure(uri, clientname, user, password, maintopic, LWT_TOPIC, LWT_CONNECTED, LWT_DISCONNECTED,
keepAlive, SetRetainFlag, (void *)&GotConnected);
bool MQTTConfigCheck = MQTT_Configure(uri, clientname, user, password, maintopic, LWT_TOPIC, LWT_CONNECTED,
LWT_DISCONNECTED, keepAlive, SetRetainFlag, (void *)&GotConnected);
if (!MQTT_Init()) {
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init at startup failed! Retry with next publish call");
if (!MQTTConfigCheck) {
return false;
}
return true;
return (MQTT_Init() == 1);
}
@@ -237,7 +227,7 @@ bool ClassFlowMQTT::doFlow(string zwtime)
publishSystemData();
if (flowpostprocessing)
if (flowpostprocessing && getMQTTisConnected())
{
std::vector<NumberPost*>* NUMBERS = flowpostprocessing->GetNumbers();

View File

@@ -298,7 +298,7 @@ void ClassLogFile::RemoveOldLogFile()
return;
}
ESP_LOGI(TAG, "Remove old log files");
ESP_LOGD(TAG, "Remove old log files");
time_t rawtime;
struct tm* timeinfo;
@@ -350,7 +350,7 @@ void ClassLogFile::RemoveOldDataLog()
return;
}
ESP_LOGI(TAG, "Remove old data files");
ESP_LOGD(TAG, "Remove old data files");
time_t rawtime;
struct tm* timeinfo;

View File

@@ -1,8 +1,8 @@
#ifdef ENABLE_MQTT
#include "interface_mqtt.h"
//#define LOG_LOCAL_LEVEL ESP_LOG_DEBUG
#include "esp_log.h"
#include "connect_wlan.h"
#include "mqtt_client.h"
#include "ClassLogFile.h"
#include "server_tflite.h"
@@ -19,52 +19,54 @@ std::map<std::string, std::function<bool(std::string, char*, int)>>* subscribeFu
int failedOnRound = -1;
bool MQTT_Enabled = true;
esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY;
esp_mqtt_event_id_t esp_mqtt_ID = MQTT_EVENT_ANY;
// ESP_EVENT_ANY_ID
bool mqtt_enabled = false;
bool mqtt_configOK = false;
bool mqtt_initialized = false;
bool mqtt_connected = false;
esp_mqtt_client_handle_t client = NULL;
std::string uri, client_id, lwt_topic, lwt_connected, lwt_disconnected, user, password, maintopic;
int keepalive, SetRetainFlag;
void (*callbackOnConnected)(std::string, int) = NULL;
void MQTTdisable()
bool MQTTPublish(std::string _key, std::string _content, int retained_flag)
{
MQTT_Enabled = false;
if (!mqtt_enabled) { // MQTT sevice not started / configured (MQTT_Init not called before)
return false;
}
bool MQTTPublish(std::string _key, std::string _content, int retained_flag) {
int msg_id;
std::string zw;
if (failedOnRound == getCountFlowRounds()) { // we already failed in this round, do not retry until the next round
return true; // Fail quietly
}
#ifdef DEBUG_DETAIL_ON
LogFile.WriteHeapInfo("MQTT Publish");
#endif
if (!mqtt_initialized) {
if (!MQTT_Init()) {
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, skipping all MQTT publishings in this round!");
failedOnRound = getCountFlowRounds();
return false;
}
}
MQTT_Init(); // Re-Init client if not initialized yet/anymore
msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
if (msg_id < 0) {
if (mqtt_initialized && mqtt_connected) {
#ifdef DEBUG_DETAIL_ON
long long int starttime = esp_timer_get_time();
#endif
int msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
#ifdef DEBUG_DETAIL_ON
ESP_LOGD(TAG, "Publish msg_id %d in %lld ms", msg_id, (esp_timer_get_time() - starttime)/1000);
#endif
if (msg_id == -1) {
LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Failed to publish topic '" + _key + "', re-trying...");
esp_mqtt_client_reconnect(client);
#ifdef DEBUG_DETAIL_ON
starttime = esp_timer_get_time();
#endif
msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
if (msg_id < 0) {
#ifdef DEBUG_DETAIL_ON
ESP_LOGD(TAG, "Publish msg_id %d in %lld ms", msg_id, (esp_timer_get_time() - starttime)/1000);
#endif
if (msg_id == -1) {
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to publish topic '" + _key + "', skipping all MQTT publishings in this round!");
failedOnRound = getCountFlowRounds();
return false;
@@ -76,28 +78,33 @@ bool MQTTPublish(std::string _key, std::string _content, int retained_flag) {
_content.append("..");
}
zw = "Published topic: " + _key + ", content: " + _content + " (msg_id=" + std::to_string(msg_id) + ")";
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, zw);
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Published topic: " + _key + ", content: " + _content + " (msg_id=" + std::to_string(msg_id) + ")");
return true;
}
else {
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publish skipped. Client not initalized or not connected. (topic: " + _key + ")");
return false;
}
}
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
{
static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event) {
int msg_id;
std::string topic = "";
switch (event->event_id) {
case MQTT_EVENT_BEFORE_CONNECT:
ESP_LOGD(TAG, "MQTT_EVENT_BEFORE_CONNECT");
mqtt_initialized = true;
break;
case MQTT_EVENT_CONNECTED:
ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED");
mqtt_initialized = true;
mqtt_connected = true;
MQTTconnected();
break;
case MQTT_EVENT_DISCONNECTED:
ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED");
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Disconnected from broker");
mqtt_connected = false;
break;
case MQTT_EVENT_SUBSCRIBED:
@@ -126,8 +133,15 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
}
break;
case MQTT_EVENT_ERROR:
ESP_LOGD(TAG, "MQTT_EVENT_ERROR");
mqtt_initialized = false; // Force re-init on next publish call
#ifdef DEBUG_DETAIL_ON
ESP_LOGD(TAG, "MQTT_EVENT_ERROR - esp_mqtt_error_codes:");
ESP_LOGD(TAG, "error_type:%d", event->error_handle->error_type);
ESP_LOGD(TAG, "connect_return_code:%d", event->error_handle->connect_return_code);
ESP_LOGD(TAG, "esp_transport_sock_errno:%d", event->error_handle->esp_transport_sock_errno);
ESP_LOGD(TAG, "esp_tls_last_esp_err:%d", event->error_handle->esp_tls_last_esp_err);
ESP_LOGD(TAG, "esp_tls_stack_err:%d", event->error_handle->esp_tls_stack_err);
ESP_LOGD(TAG, "esp_tls_cert_verify_flags:%d", event->error_handle->esp_tls_cert_verify_flags);
#endif
mqtt_connected = false;
break;
default:
@@ -143,16 +157,14 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
}
void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password,
bool MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password,
std::string _maintopic, std::string _lwt, std::string _lwt_connected, std::string _lwt_disconnected,
int _keepalive, int _SetRetainFlag, void *_callbackOnConnected) {
#ifdef __HIDE_PASSWORD
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "URI: " + _mqttURI + ", clientname: " + _clientid +
", user: " + _user + ", password: XXXXXXXX, maintopic: " + _maintopic + ", last-will-topic: " + _maintopic + "/" + _lwt + ", keepAlive: " + std::to_string(_keepalive));
#else
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "URI: " + _mqttURI + ", clientname: " + _clientid +
", user: " + _user + ", password: " + _password + ", maintopic: " + _maintopic + ", last-will-topic: " + _maintopic + "/" + _lwt + ", keepAlive: " + std::to_string(_keepalive));
#endif
if ((_mqttURI.length() == 0) || (_maintopic.length() == 0) || (_clientid.length() == 0))
{
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init aborted! Config error (URI, MainTopic or ClientID missing)");
return false;
}
uri = _mqttURI;
client_id = _clientid;
@@ -168,42 +180,60 @@ void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _us
user = _user;
password = _password;
}
#ifdef __HIDE_PASSWORD
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "URI: " + uri + ", clientname: " + client_id + ", user: " + user + ", password: XXXXXXXX, maintopic: "
+ maintopic + ", last-will-topic: " + lwt_topic + ", keepAlive: " + std::to_string(keepalive) + ", RetainFlag: " + std::to_string(SetRetainFlag));
#else
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "URI: " + uri + ", clientname: " + client_id + ", user: " + user + ", password: " + password + ", maintopic: "
+ maintopic + ", last-will-topic: " + lwt_topic + ", keepAlive: " + std::to_string(keepalive) + ", RetainFlag: " + std::to_string(SetRetainFlag));
#endif
mqtt_configOK = true;
return true;
}
bool MQTT_Init() {
if (MQTT_Enabled == false)
return false;
if ((client_id.length() == 0) || (lwt_topic.length() == 0))
{
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, std::string("Init with no Client_ID (" + client_id + ") or Last Will Topic (" + lwt_topic + "). Abort Init!"));
return false;
int MQTT_Init() {
if (mqtt_initialized) {
return 0;
}
esp_err_t ret;
LogFile.WriteToFile(ESP_LOG_INFO, TAG, std::string("Init"));
if (mqtt_configOK) {
mqtt_enabled = true;
} else {
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Init called, but client is not yet configured.");
return 0;
}
if (!getWIFIisConnected()) {
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Init called, but WIFI is not yet connected.");
return 0;
}
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Init");
MQTTdestroy_client();
std::string lw = lwt_disconnected;
esp_mqtt_client_config_t mqtt_cfg = {
.uri = uri.c_str(),
.client_id = client_id.c_str(),
.lwt_topic = lwt_topic.c_str(),
.lwt_msg = lw.c_str(),
.lwt_msg = lwt_disconnected.c_str(),
.lwt_retain = 1,
.lwt_msg_len = (int)(lw.length()),
.lwt_msg_len = (int)(lwt_disconnected.length()),
.keepalive = keepalive,
.disable_auto_reconnect = false, // Reconnection routine active
.reconnect_timeout_ms = 10000 // Try to reconnect to broker every 10s
.disable_auto_reconnect = false, // Reconnection routine active (Default: false)
.buffer_size = 1536, // size of MQTT send/receive buffer (Default: 1024)
.reconnect_timeout_ms = 15000, // Try to reconnect to broker (Default: 10000ms)
.network_timeout_ms = 20000, // Network Timeout (Default: 10000ms)
.message_retransmit_timeout = 3000 // Tiem after message resent when broker not acknowledged (QoS1, QoS2)
};
if (user.length() && password.length()){
mqtt_cfg.username = user.c_str();
mqtt_cfg.password = password.c_str();
};
}
#ifdef DEBUG_DETAIL_ON
LogFile.WriteHeapInfo("MQTT Client Init");
@@ -212,12 +242,12 @@ bool MQTT_Init() {
client = esp_mqtt_client_init(&mqtt_cfg);
if (client)
{
ret = esp_mqtt_client_register_event(client, esp_mmqtt_ID, mqtt_event_handler, client);
esp_err_t ret = esp_mqtt_client_register_event(client, esp_mqtt_ID, mqtt_event_handler, client);
if (ret != ESP_OK)
{
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not register event (ret=" + std::to_string(ret) + ")!");
mqtt_initialized = false;
return false;
return -1;
}
#ifdef DEBUG_DETAIL_ON
@@ -226,40 +256,44 @@ bool MQTT_Init() {
ret = esp_mqtt_client_start(client);
if (ret != ESP_OK)
{
LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Could not start client (ret=" + std::to_string(ret) + "), retrying...");
ret = esp_mqtt_client_start(client);
if (ret != ESP_OK)
{
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not start client (ret=" + std::to_string(ret) + ")!");
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Client start failed (retval=" + std::to_string(ret) + ")!");
mqtt_initialized = false;
return false;
return -1;
}
else {
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Client started, waiting for established connection...");
mqtt_initialized = true;
return 1;
}
}
else
{
LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, no handle created!");
mqtt_initialized = false;
return false;
return -1;
}
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Client started, waiting for established connection...");
mqtt_initialized = true;
return true;
}
void MQTTdestroy_client() {
if (client) {
if (mqtt_connected) {
esp_mqtt_client_disconnect(client);
mqtt_connected = false;
}
esp_mqtt_client_stop(client);
esp_mqtt_client_destroy(client);
client = NULL;
mqtt_initialized = false;
mqtt_connected = false;
}
}
bool MQTTisConnected() {
bool getMQTTisEnabled() {
return mqtt_enabled;
}
bool getMQTTisConnected() {
return mqtt_connected;
}

View File

@@ -6,15 +6,16 @@
#include <map>
#include <functional>
void MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password,
bool MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _user, std::string _password,
std::string _maintopic, std::string _lwt, std::string _lwt_connected, std::string _lwt_disconnected,
int _keepalive, int SetRetainFlag, void *callbackOnConnected);
bool MQTT_Init();
int MQTT_Init();
void MQTTdestroy_client();
bool MQTTPublish(std::string _key, std::string _content, int retained_flag = 1); // retained Flag as Standart
bool MQTTisConnected();
bool getMQTTisEnabled();
bool getMQTTisConnected();
void MQTTregisterConnectFunction(std::string name, std::function<void()> func);
void MQTTunregisterConnectFunction(std::string name);
@@ -22,6 +23,5 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::st
void MQTTdestroySubscribeFunction();
void MQTTconnected();
void MQTTdisable();
#endif //INTERFACE_MQTT_H
#endif //#ENABLE_MQTT

View File

@@ -129,6 +129,9 @@ void sendHomeAssistantDiscoveryTopic(std::string group, std::string field,
}
void MQTThomeassistantDiscovery() {
if (!getMQTTisConnected())
return;
LogFile.WriteToFile(ESP_LOG_INFO, TAG, "MQTT - Sending Homeassistant Discovery Topics (Meter Type: " + meterType + ", Value Unit: " + valueUnit + " , Rate Unit: " + rateUnit + ")...");
// Group | Field | User Friendly Name | Icon | Unit | Device Class | State Class | Entity Category
@@ -161,6 +164,9 @@ void MQTThomeassistantDiscovery() {
}
void publishSystemData() {
if (!getMQTTisConnected())
return;
char tmp_char[50];
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing system MQTT topics...");
@@ -180,6 +186,9 @@ void publishSystemData() {
void publishStaticData() {
if (!getMQTTisConnected())
return;
LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing static MQTT topics...");
MQTTPublish(maintopic + "/" + "MAC", getMac(), retainFlag);
MQTTPublish(maintopic + "/" + "IP", *getIPAddress(), retainFlag);
@@ -204,6 +213,7 @@ esp_err_t sendDiscovery_and_static_Topics(httpd_req_t *req) {
}
void GotConnected(std::string maintopic, int retainFlag) {
vTaskDelay(10000 / portTICK_PERIOD_MS); // Delay execution by 10s after connection got established
if (HomeassistantDiscovery) {
MQTThomeassistantDiscovery();
}

View File

@@ -2,6 +2,6 @@ FILE(GLOB_RECURSE app_sources ${CMAKE_CURRENT_SOURCE_DIR}/*.*)
idf_component_register(SRCS ${app_sources}
INCLUDE_DIRS "."
REQUIRES nvs_flash jomjol_helper)
REQUIRES nvs_flash jomjol_helper jomjol_mqtt)

View File

@@ -13,6 +13,9 @@
#include "lwip/err.h"
#include "lwip/sys.h"
#ifdef ENABLE_MQTT
#include "interface_mqtt.h"
#endif //ENABLE_MQTT
#include <fstream>
#include <string>
@@ -38,6 +41,7 @@ static EventGroupHandle_t s_wifi_event_group;
static const char *TAG = "WIFI";
static int s_retry_num = 0;
bool WIFIConnected = false;
///////////////////////////////////////////////////////////
#define BLINK_GPIO GPIO_NUM_33
@@ -113,9 +117,11 @@ static void event_handler(void* arg, esp_event_base_t event_base,
int32_t event_id, void* event_data)
{
if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_START) {
WIFIConnected = false;
LEDBlinkTask(200, 1, true);
esp_wifi_connect();
} else if (event_base == WIFI_EVENT && event_id == WIFI_EVENT_STA_DISCONNECTED) {
WIFIConnected = false;
// if (s_retry_num < EXAMPLE_ESP_MAXIMUM_RETRY) {
esp_wifi_connect();
s_retry_num++;
@@ -131,6 +137,14 @@ static void event_handler(void* arg, esp_event_base_t event_base,
s_retry_num = 0;
xEventGroupSetBits(s_wifi_event_group, WIFI_CONNECTED_BIT);
LEDBlinkTask(1000, 5, true);
WIFIConnected = true;
#ifdef ENABLE_MQTT
if (getMQTTisEnabled()) {
vTaskDelay(5000 / portTICK_PERIOD_MS);
MQTT_Init(); // Init when WIFI is getting connected
}
#endif //ENABLE_MQTT
}
}
@@ -289,3 +303,7 @@ void wifi_init_sta(const char *_ssid, const char *_password)
wifi_init_sta(_ssid, _password, NULL, NULL, NULL, NULL, NULL);
}
bool getWIFIisConnected() {
return WIFIConnected;
}

View File

@@ -10,6 +10,7 @@ void wifi_init_sta(const char *_ssid, const char *_password);
std::string* getIPAddress();
std::string* getSSID();
int get_WIFI_RSSI();
bool getWIFIisConnected();
extern std::string hostname;
extern std::string std_hostname;

View File

@@ -40,20 +40,18 @@ CONFIG_SPIRAM_CACHE_WORKAROUND=y
CONFIG_ESP_INT_WDT_TIMEOUT_MS=300
CONFIG_HTTPD_MAX_REQ_HDR_LEN=1024
CONFIG_HTTPD_PURGE_BUF_LEN=16
CONFIG_ESP32_WIFI_DYNAMIC_RX_BUFFER_NUM=16
CONFIG_ESP32_WIFI_CACHE_TX_BUFFER_NUM=16
CONFIG_FATFS_LFN_HEAP=y
CONFIG_FATFS_MAX_LFN=255
CONFIG_FATFS_API_ENCODING_ANSI_OEM=y
CONFIG_FMB_TIMER_PORT_ENABLED=y
CONFIG_MQTT_MSG_ID_INCREMENTAL=y
CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED=y
CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED=y
CONFIG_MQTT_USE_CORE_0=y