From eb53db00d07df0eff36dc4f9fa8791eb286ac42a Mon Sep 17 00:00:00 2001 From: Antonin Delpeuch Date: Fri, 15 Apr 2022 10:19:14 +0200 Subject: [PATCH] First draft of InfluxDB integration, for #534 --- .../jomjol_flowcontroll/CMakeLists.txt | 2 +- .../jomjol_flowcontroll/ClassFlowControll.cpp | 7 + .../jomjol_flowcontroll/ClassFlowControll.h | 1 + .../jomjol_flowcontroll/ClassFlowInfluxDB.cpp | 161 ++++++++++++++++++ .../jomjol_flowcontroll/ClassFlowInfluxDB.h | 31 ++++ .../components/jomjol_influxdb/CMakeLists.txt | 7 + .../jomjol_influxdb/interface_influxdb.cpp | 114 +++++++++++++ .../jomjol_influxdb/interface_influxdb.h | 13 ++ code/platformio.ini | 1 + sd-card/html/edit_config_param.html | 89 +++++++++- sd-card/html/readconfigparam.js | 13 +- 11 files changed, 436 insertions(+), 3 deletions(-) create mode 100644 code/components/jomjol_flowcontroll/ClassFlowInfluxDB.cpp create mode 100644 code/components/jomjol_flowcontroll/ClassFlowInfluxDB.h create mode 100644 code/components/jomjol_influxdb/CMakeLists.txt create mode 100644 code/components/jomjol_influxdb/interface_influxdb.cpp create mode 100644 code/components/jomjol_influxdb/interface_influxdb.h diff --git a/code/components/jomjol_flowcontroll/CMakeLists.txt b/code/components/jomjol_flowcontroll/CMakeLists.txt index 8a066910..6ee66829 100644 --- a/code/components/jomjol_flowcontroll/CMakeLists.txt +++ b/code/components/jomjol_flowcontroll/CMakeLists.txt @@ -2,6 +2,6 @@ FILE(GLOB_RECURSE app_sources ${CMAKE_CURRENT_SOURCE_DIR}/*.*) idf_component_register(SRCS ${app_sources} INCLUDE_DIRS "." - REQUIRES jomjol_tfliteclass jomjol_helper jomjol_controlcamera jomjol_mqtt jomjol_fileserver_ota jomjol_image_proc jomjol_wlan) + REQUIRES jomjol_tfliteclass jomjol_helper jomjol_controlcamera jomjol_mqtt jomjol_influxdb jomjol_fileserver_ota jomjol_image_proc jomjol_wlan) diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp index 9a0758ca..e6316e15 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.cpp +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.cpp @@ -49,6 +49,9 @@ std::string ClassFlowControll::doSingleStep(std::string _stepname, std::string _ if ((_stepname.compare("[MQTT]") == 0) || (_stepname.compare(";[MQTT]") == 0)){ _classname = "ClassFlowMQTT"; } + if ((_stepname.compare("[InfluxDB]") == 0) || (_stepname.compare(";[InfluxDB]") == 0)){ + _classname = "ClassFlowInfluxDB"; + } for (int i = 0; i < FlowControll.size(); ++i) if (FlowControll[i]->name().compare(_classname) == 0){ @@ -73,6 +76,8 @@ std::string ClassFlowControll::TranslateAktstatus(std::string _input) return ("Digitalization of ROIs"); if (_input.compare("ClassFlowMQTT") == 0) return ("Sending MQTT"); + if (_input.compare("ClassFlowInfluxDB") == 0) + return ("Sending InfluxDB"); if (_input.compare("ClassFlowPostProcessing") == 0) return ("Processing"); @@ -180,6 +185,8 @@ ClassFlow* ClassFlowControll::CreateClassFlow(std::string _type) } if (toUpper(_type).compare("[MQTT]") == 0) cfc = new ClassFlowMQTT(&FlowControll); + if (toUpper(_type).compare("[INFLUXDB]") == 0) + cfc = new ClassFlowInfluxDB(&FlowControll); if (toUpper(_type).compare("[POSTPROCESSING]") == 0) { diff --git a/code/components/jomjol_flowcontroll/ClassFlowControll.h b/code/components/jomjol_flowcontroll/ClassFlowControll.h index 3f568b26..cc4e30dc 100644 --- a/code/components/jomjol_flowcontroll/ClassFlowControll.h +++ b/code/components/jomjol_flowcontroll/ClassFlowControll.h @@ -9,6 +9,7 @@ #include "ClassFlowCNNGeneral.h" #include "ClassFlowPostProcessing.h" #include "ClassFlowMQTT.h" +#include "ClassFlowInfluxDB.h" #include "ClassFlowCNNGeneral.h" diff --git a/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.cpp b/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.cpp new file mode 100644 index 00000000..55b1f9ff --- /dev/null +++ b/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.cpp @@ -0,0 +1,161 @@ +#include +#include "ClassFlowInfluxDB.h" +#include "Helper.h" +#include "connect_wlan.h" + +#include "time_sntp.h" +#include "interface_influxdb.h" +#include "ClassFlowPostProcessing.h" + +#include + +void ClassFlowInfluxDB::SetInitialParameter(void) +{ + uri = ""; + database = ""; + measurement = ""; + + OldValue = ""; + flowpostprocessing = NULL; + user = ""; + password = ""; + previousElement = NULL; + ListFlowControll = NULL; + disabled = false; + InfluxDBenable = false; +} + +ClassFlowInfluxDB::ClassFlowInfluxDB() +{ + SetInitialParameter(); +} + +ClassFlowInfluxDB::ClassFlowInfluxDB(std::vector* lfc) +{ + SetInitialParameter(); + + ListFlowControll = lfc; + for (int i = 0; i < ListFlowControll->size(); ++i) + { + if (((*ListFlowControll)[i])->name().compare("ClassFlowPostProcessing") == 0) + { + flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i]; + } + } +} + +ClassFlowInfluxDB::ClassFlowInfluxDB(std::vector* lfc, ClassFlow *_prev) +{ + SetInitialParameter(); + + previousElement = _prev; + ListFlowControll = lfc; + + for (int i = 0; i < ListFlowControll->size(); ++i) + { + if (((*ListFlowControll)[i])->name().compare("ClassFlowPostProcessing") == 0) + { + flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i]; + } + } +} + + +bool ClassFlowInfluxDB::ReadParameter(FILE* pfile, string& aktparamgraph) +{ + std::vector zerlegt; + + aktparamgraph = trim(aktparamgraph); + + if (aktparamgraph.size() == 0) + if (!this->GetNextParagraph(pfile, aktparamgraph)) + return false; + + if (toUpper(aktparamgraph).compare("[INFLUXDB]") != 0) + return false; + + while (this->getNextLine(pfile, &aktparamgraph) && !this->isNewParagraph(aktparamgraph)) + { + printf("while loop reading line: %s\n", aktparamgraph.c_str()); + zerlegt = this->ZerlegeZeile(aktparamgraph); + if ((toUpper(zerlegt[0]) == "USER") && (zerlegt.size() > 1)) + { + this->user = zerlegt[1]; + } + if ((toUpper(zerlegt[0]) == "PASSWORD") && (zerlegt.size() > 1)) + { + this->password = zerlegt[1]; + } + if ((toUpper(zerlegt[0]) == "URI") && (zerlegt.size() > 1)) + { + this->uri = zerlegt[1]; + } + if (((toUpper(zerlegt[0]) == "MEASUREMENT")) && (zerlegt.size() > 1)) + { + this->measurement = zerlegt[1]; + } + if (((toUpper(zerlegt[0]) == "DATABASE")) && (zerlegt.size() > 1)) + { + this->database = zerlegt[1]; + } + } + + if ((uri.length() > 0) && (database.length() > 0) && (measurement.length() > 0)) + { + printf("Init InfluxDB with uri: %s, measurement: %s, user: %s, password: %s\n", uri.c_str(), measurement.c_str(), user.c_str(), password.c_str()); + InfluxDBInit(uri, database, measurement, user, password); + InfluxDBenable = true; + } else { + printf("InfluxDB init skipped as we are missing some parameters"); + } + + return true; +} + + +string ClassFlowInfluxDB::GetInfluxDBMeasurement() +{ + return measurement; +} + + +bool ClassFlowInfluxDB::doFlow(string zwtime) +{ + if (!InfluxDBenable) + return true; + + std::string result; + std::string resulterror = ""; + std::string resultraw = ""; + std::string resultrate = ""; + std::string resulttimestamp = ""; + string zw = ""; + string namenumber = ""; + + if (flowpostprocessing) + { + std::vector* NUMBERS = flowpostprocessing->GetNumbers(); + + for (int i = 0; i < (*NUMBERS).size(); ++i) + { + result = (*NUMBERS)[i]->ReturnValue; + resultraw = (*NUMBERS)[i]->ReturnRawValue; + resulterror = (*NUMBERS)[i]->ErrorMessageText; + resultrate = (*NUMBERS)[i]->ReturnRateValue; + resulttimestamp = (*NUMBERS)[i]->timeStamp; + + namenumber = (*NUMBERS)[i]->name; + if (namenumber == "default") + namenumber = "value"; + else + namenumber = namenumber + "/value"; + + if (result.length() > 0 && resulttimestamp.length() > 0) + InfluxDBPublish(namenumber, result, resulttimestamp); + } + } + + OldValue = result; + + return true; +} diff --git a/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.h b/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.h new file mode 100644 index 00000000..b7e25362 --- /dev/null +++ b/code/components/jomjol_flowcontroll/ClassFlowInfluxDB.h @@ -0,0 +1,31 @@ +#pragma once +#include "ClassFlow.h" + +#include "ClassFlowPostProcessing.h" + +#include + +class ClassFlowInfluxDB : + public ClassFlow +{ +protected: + std::string uri, database, measurement; + std::string OldValue; + ClassFlowPostProcessing* flowpostprocessing; + std::string user, password; + bool InfluxDBenable; + + void SetInitialParameter(void); + +public: + ClassFlowInfluxDB(); + ClassFlowInfluxDB(std::vector* lfc); + ClassFlowInfluxDB(std::vector* lfc, ClassFlow *_prev); + + string GetInfluxDBMeasurement(); + + bool ReadParameter(FILE* pfile, string& aktparamgraph); + bool doFlow(string time); + string name(){return "ClassFlowInfluxDB";}; +}; + diff --git a/code/components/jomjol_influxdb/CMakeLists.txt b/code/components/jomjol_influxdb/CMakeLists.txt new file mode 100644 index 00000000..47330bd5 --- /dev/null +++ b/code/components/jomjol_influxdb/CMakeLists.txt @@ -0,0 +1,7 @@ +FILE(GLOB_RECURSE app_sources ${CMAKE_CURRENT_SOURCE_DIR}/*.*) + +idf_component_register(SRCS ${app_sources} + INCLUDE_DIRS "." + REQUIRES tflite-lib esp_http_client jomjol_logfile) + + diff --git a/code/components/jomjol_influxdb/interface_influxdb.cpp b/code/components/jomjol_influxdb/interface_influxdb.cpp new file mode 100644 index 00000000..2089d98e --- /dev/null +++ b/code/components/jomjol_influxdb/interface_influxdb.cpp @@ -0,0 +1,114 @@ +#include "interface_influxdb.h" + +//#define LOG_LOCAL_LEVEL ESP_LOG_DEBUG +#include "esp_log.h" +#include +#include "ClassLogFile.h" +#include "esp_http_client.h" + +#define MAX_HTTP_OUTPUT_BUFFER 2048 + +static const char *TAG_INTERFACEINFLUXDB = "interface_influxdb"; + +std::string _influxDBURI; +std::string _influxDBDatabase; +std::string _influxDBMeasurement; +std::string _influxDBUser; +std::string _influxDBPassword; + +static esp_err_t http_event_handler(esp_http_client_event_t *evt) +{ + switch(evt->event_id) + { + case HTTP_EVENT_ERROR: + ESP_LOGE(TAG_INTERFACEINFLUXDB, "HTTP Client Error encountered"); + break; + case HTTP_EVENT_ON_CONNECTED: + ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client Connected"); + break; + case HTTP_EVENT_HEADERS_SENT: + ESP_LOGV(TAG_INTERFACEINFLUXDB, "HTTP Client sent all request headers"); + break; + case HTTP_EVENT_ON_HEADER: + ESP_LOGV(TAG_INTERFACEINFLUXDB, "Header: key=%s, value=%s", evt->header_key, evt->header_value); + break; + case HTTP_EVENT_ON_DATA: + ESP_LOGV(TAG_INTERFACEINFLUXDB, "HTTP Client data recevied: len=%d", evt->data_len); + break; + case HTTP_EVENT_ON_FINISH: + ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client finished"); + break; + case HTTP_EVENT_DISCONNECTED: + ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client Disconnected"); + break; + } + return ESP_OK; +} + +void InfluxDBPublish(std::string _key, std::string _content, std::string _timestamp) { + char response_buffer[MAX_HTTP_OUTPUT_BUFFER] = {0}; + esp_http_client_config_t http_config = { + .user_agent = "ESP32 Meter reader", + .method = HTTP_METHOD_POST, + .event_handler = http_event_handler, + .buffer_size = MAX_HTTP_OUTPUT_BUFFER, + .user_data = response_buffer + }; + + if (_influxDBUser.length() && _influxDBPassword.length()){ + http_config.username = _influxDBUser.c_str(); + http_config.password = _influxDBPassword.c_str(); + http_config.auth_type = HTTP_AUTH_TYPE_BASIC; + } + + // generate timestamp (TODO: parse result timestamp passed as string and convert it to POSIX timestamp?) + time_t now = time(NULL); + char nowTimestamp[21]; + // pad with zeroes to get nanoseconds + sprintf(nowTimestamp,"%jd000000000", (intmax_t)now); + + std::string payload = _influxDBMeasurement + " " + _key + "=" + _content + " " + nowTimestamp; + payload.shrink_to_fit(); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "sending line to influxdb: %s\n", payload.c_str()); + + // use the default retention policy of the database + std::string apiURI = _influxDBURI + "/api/v2/write?bucket=" + _influxDBDatabase + "/"; + apiURI.shrink_to_fit(); + http_config.url = apiURI.c_str(); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "API URI: %s", apiURI.c_str()); + + esp_http_client_handle_t http_client = esp_http_client_init(&http_config); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "client is initialized%s\n", ""); + + esp_http_client_set_header(http_client, "Content-Type", "text/plain"); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "header is set%s\n", ""); + + ESP_ERROR_CHECK(esp_http_client_set_post_field(http_client, payload.c_str(), payload.length())); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "post payload is set%s\n", ""); + + esp_err_t err = ESP_ERROR_CHECK_WITHOUT_ABORT(esp_http_client_perform(http_client)); + + if( err == ESP_OK ) { + ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP request was performed%s\n", ""); + int status_code = esp_http_client_get_status_code(http_client); + ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP status code %d\n", status_code); + } else { + ESP_LOGW(TAG_INTERFACEINFLUXDB, "HTTP request failed%s\n", ""); + } + esp_http_client_cleanup(http_client); +} + + +void InfluxDBInit(std::string _uri, std::string _database, std::string _measurement, std::string _user, std::string _password){ + _influxDBURI = _uri; + _influxDBDatabase = _database; + _influxDBMeasurement = _measurement; + _influxDBUser = _user; + _influxDBPassword = _password; + +} + +void InfluxDBdestroy() { +} + + diff --git a/code/components/jomjol_influxdb/interface_influxdb.h b/code/components/jomjol_influxdb/interface_influxdb.h new file mode 100644 index 00000000..33ae0564 --- /dev/null +++ b/code/components/jomjol_influxdb/interface_influxdb.h @@ -0,0 +1,13 @@ +#ifndef INTERFACE_INFLUXDB_H +#define INTERFACE_INFLUXDB_H + +#include +#include +#include + +void InfluxDBInit(std::string _influxDBURI, std::string _database, std::string _measurement, std::string _user, std::string _password); +void InfluxDBdestroy(); + +void InfluxDBPublish(std::string _key, std::string _content, std::string _timestamp); + +#endif //INTERFACE_INFLUXDB_H diff --git a/code/platformio.ini b/code/platformio.ini index b082bd42..84ee163b 100644 --- a/code/platformio.ini +++ b/code/platformio.ini @@ -35,6 +35,7 @@ lib_deps = jomjol_time_sntp jomjol_logfile jomjol_mqtt + jomjol_influxdb jomjol_controlGPIO diff --git a/sd-card/html/edit_config_param.html b/sd-card/html/edit_config_param.html index 1d4c485d..587bbb3c 100644 --- a/sd-card/html/edit_config_param.html +++ b/sd-card/html/edit_config_param.html @@ -636,6 +636,81 @@ textarea { Password for MQTT authentication + + +

InfluxDB

+ + + + + + + Uri + + + + + + URI of the HTTP interface to InfluxDB, without traililing slash, e.g. http://IP-Address:Port + + + + + + + + Database + + + + + + Database name in which to publish the read value. + + + + + + + + Measurement + + + + + + Measurement name to use to publish the read value. + + + + + + + + user + + + + + + User for InfluxDB authentication + + + + + + + + password + + + + + + Password for InfluxDB authentication + + +

AutoTimer

@@ -678,7 +753,7 @@ textarea { - + @@ -1793,6 +1868,12 @@ function UpdateInput() { WriteParameter(param, category, "MQTT", "user", true); WriteParameter(param, category, "MQTT", "password", true); + WriteParameter(param, category, "InfluxDB", "Uri", true); + WriteParameter(param, category, "InfluxDB", "Database", true); + WriteParameter(param, category, "InfluxDB", "Measurement", true); + WriteParameter(param, category, "InfluxDB", "user", true); + WriteParameter(param, category, "InfluxDB", "password", true); + WriteParameter(param, category, "GPIO", "IO0", true); WriteParameter(param, category, "GPIO", "IO1", true); WriteParameter(param, category, "GPIO", "IO3", true); @@ -1847,6 +1928,7 @@ function ReadParameterAll() category["Analog"]["enabled"] = document.getElementById("Category_Analog_enabled").checked; category["Digits"]["enabled"] = document.getElementById("Category_Digits_enabled").checked; category["MQTT"]["enabled"] = document.getElementById("Category_MQTT_enabled").checked; + category["InfluxDB"]["enabled"] = document.getElementById("Category_InfluxDB_enabled").checked; category["GPIO"]["enabled"] = document.getElementById("Category_GPIO_enabled").checked; ReadParameter(param, "MakeImage", "LogImageLocation", true); @@ -1886,6 +1968,11 @@ function ReadParameterAll() ReadParameter(param, "MQTT", "user", true); ReadParameter(param, "MQTT", "password", true); + ReadParameter(param, "InfluxDB", "Uri", true); + ReadParameter(param, "InfluxDB", "Measurement", true); + ReadParameter(param, "InfluxDB", "user", true); + ReadParameter(param, "InfluxDB", "password", true); + ReadParameter(param, "GPIO", "IO0", true); ReadParameter(param, "GPIO", "IO1", true); ReadParameter(param, "GPIO", "IO3", true); diff --git a/sd-card/html/readconfigparam.js b/sd-card/html/readconfigparam.js index adaf490c..09d9bff4 100644 --- a/sd-card/html/readconfigparam.js +++ b/sd-card/html/readconfigparam.js @@ -124,7 +124,18 @@ function ParseConfig() { ParamAddValue(param, catname, "ClientID"); ParamAddValue(param, catname, "user"); ParamAddValue(param, catname, "password"); - + + var catname = "InfluxDB"; + category[catname] = new Object(); + category[catname]["enabled"] = false; + category[catname]["found"] = false; + param[catname] = new Object(); + ParamAddValue(param, catname, "Uri"); + ParamAddValue(param, catname, "Database"); + ParamAddValue(param, catname, "Measurement"); + ParamAddValue(param, catname, "user"); + ParamAddValue(param, catname, "password"); + var catname = "GPIO"; category[catname] = new Object(); category[catname]["enabled"] = false;