First draft of InfluxDB integration, for #534

This commit is contained in:
Antonin Delpeuch
2022-04-15 10:19:14 +02:00
parent 4b825efffb
commit eb53db00d0
11 changed files with 436 additions and 3 deletions

View File

@@ -2,6 +2,6 @@ FILE(GLOB_RECURSE app_sources ${CMAKE_CURRENT_SOURCE_DIR}/*.*)
idf_component_register(SRCS ${app_sources}
INCLUDE_DIRS "."
REQUIRES jomjol_tfliteclass jomjol_helper jomjol_controlcamera jomjol_mqtt jomjol_fileserver_ota jomjol_image_proc jomjol_wlan)
REQUIRES jomjol_tfliteclass jomjol_helper jomjol_controlcamera jomjol_mqtt jomjol_influxdb jomjol_fileserver_ota jomjol_image_proc jomjol_wlan)

View File

@@ -49,6 +49,9 @@ std::string ClassFlowControll::doSingleStep(std::string _stepname, std::string _
if ((_stepname.compare("[MQTT]") == 0) || (_stepname.compare(";[MQTT]") == 0)){
_classname = "ClassFlowMQTT";
}
if ((_stepname.compare("[InfluxDB]") == 0) || (_stepname.compare(";[InfluxDB]") == 0)){
_classname = "ClassFlowInfluxDB";
}
for (int i = 0; i < FlowControll.size(); ++i)
if (FlowControll[i]->name().compare(_classname) == 0){
@@ -73,6 +76,8 @@ std::string ClassFlowControll::TranslateAktstatus(std::string _input)
return ("Digitalization of ROIs");
if (_input.compare("ClassFlowMQTT") == 0)
return ("Sending MQTT");
if (_input.compare("ClassFlowInfluxDB") == 0)
return ("Sending InfluxDB");
if (_input.compare("ClassFlowPostProcessing") == 0)
return ("Processing");
@@ -180,6 +185,8 @@ ClassFlow* ClassFlowControll::CreateClassFlow(std::string _type)
}
if (toUpper(_type).compare("[MQTT]") == 0)
cfc = new ClassFlowMQTT(&FlowControll);
if (toUpper(_type).compare("[INFLUXDB]") == 0)
cfc = new ClassFlowInfluxDB(&FlowControll);
if (toUpper(_type).compare("[POSTPROCESSING]") == 0)
{

View File

@@ -9,6 +9,7 @@
#include "ClassFlowCNNGeneral.h"
#include "ClassFlowPostProcessing.h"
#include "ClassFlowMQTT.h"
#include "ClassFlowInfluxDB.h"
#include "ClassFlowCNNGeneral.h"

View File

@@ -0,0 +1,161 @@
#include <sstream>
#include "ClassFlowInfluxDB.h"
#include "Helper.h"
#include "connect_wlan.h"
#include "time_sntp.h"
#include "interface_influxdb.h"
#include "ClassFlowPostProcessing.h"
#include <time.h>
void ClassFlowInfluxDB::SetInitialParameter(void)
{
uri = "";
database = "";
measurement = "";
OldValue = "";
flowpostprocessing = NULL;
user = "";
password = "";
previousElement = NULL;
ListFlowControll = NULL;
disabled = false;
InfluxDBenable = false;
}
ClassFlowInfluxDB::ClassFlowInfluxDB()
{
SetInitialParameter();
}
ClassFlowInfluxDB::ClassFlowInfluxDB(std::vector<ClassFlow*>* lfc)
{
SetInitialParameter();
ListFlowControll = lfc;
for (int i = 0; i < ListFlowControll->size(); ++i)
{
if (((*ListFlowControll)[i])->name().compare("ClassFlowPostProcessing") == 0)
{
flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i];
}
}
}
ClassFlowInfluxDB::ClassFlowInfluxDB(std::vector<ClassFlow*>* lfc, ClassFlow *_prev)
{
SetInitialParameter();
previousElement = _prev;
ListFlowControll = lfc;
for (int i = 0; i < ListFlowControll->size(); ++i)
{
if (((*ListFlowControll)[i])->name().compare("ClassFlowPostProcessing") == 0)
{
flowpostprocessing = (ClassFlowPostProcessing*) (*ListFlowControll)[i];
}
}
}
bool ClassFlowInfluxDB::ReadParameter(FILE* pfile, string& aktparamgraph)
{
std::vector<string> zerlegt;
aktparamgraph = trim(aktparamgraph);
if (aktparamgraph.size() == 0)
if (!this->GetNextParagraph(pfile, aktparamgraph))
return false;
if (toUpper(aktparamgraph).compare("[INFLUXDB]") != 0)
return false;
while (this->getNextLine(pfile, &aktparamgraph) && !this->isNewParagraph(aktparamgraph))
{
printf("while loop reading line: %s\n", aktparamgraph.c_str());
zerlegt = this->ZerlegeZeile(aktparamgraph);
if ((toUpper(zerlegt[0]) == "USER") && (zerlegt.size() > 1))
{
this->user = zerlegt[1];
}
if ((toUpper(zerlegt[0]) == "PASSWORD") && (zerlegt.size() > 1))
{
this->password = zerlegt[1];
}
if ((toUpper(zerlegt[0]) == "URI") && (zerlegt.size() > 1))
{
this->uri = zerlegt[1];
}
if (((toUpper(zerlegt[0]) == "MEASUREMENT")) && (zerlegt.size() > 1))
{
this->measurement = zerlegt[1];
}
if (((toUpper(zerlegt[0]) == "DATABASE")) && (zerlegt.size() > 1))
{
this->database = zerlegt[1];
}
}
if ((uri.length() > 0) && (database.length() > 0) && (measurement.length() > 0))
{
printf("Init InfluxDB with uri: %s, measurement: %s, user: %s, password: %s\n", uri.c_str(), measurement.c_str(), user.c_str(), password.c_str());
InfluxDBInit(uri, database, measurement, user, password);
InfluxDBenable = true;
} else {
printf("InfluxDB init skipped as we are missing some parameters");
}
return true;
}
string ClassFlowInfluxDB::GetInfluxDBMeasurement()
{
return measurement;
}
bool ClassFlowInfluxDB::doFlow(string zwtime)
{
if (!InfluxDBenable)
return true;
std::string result;
std::string resulterror = "";
std::string resultraw = "";
std::string resultrate = "";
std::string resulttimestamp = "";
string zw = "";
string namenumber = "";
if (flowpostprocessing)
{
std::vector<NumberPost*>* NUMBERS = flowpostprocessing->GetNumbers();
for (int i = 0; i < (*NUMBERS).size(); ++i)
{
result = (*NUMBERS)[i]->ReturnValue;
resultraw = (*NUMBERS)[i]->ReturnRawValue;
resulterror = (*NUMBERS)[i]->ErrorMessageText;
resultrate = (*NUMBERS)[i]->ReturnRateValue;
resulttimestamp = (*NUMBERS)[i]->timeStamp;
namenumber = (*NUMBERS)[i]->name;
if (namenumber == "default")
namenumber = "value";
else
namenumber = namenumber + "/value";
if (result.length() > 0 && resulttimestamp.length() > 0)
InfluxDBPublish(namenumber, result, resulttimestamp);
}
}
OldValue = result;
return true;
}

View File

@@ -0,0 +1,31 @@
#pragma once
#include "ClassFlow.h"
#include "ClassFlowPostProcessing.h"
#include <string>
class ClassFlowInfluxDB :
public ClassFlow
{
protected:
std::string uri, database, measurement;
std::string OldValue;
ClassFlowPostProcessing* flowpostprocessing;
std::string user, password;
bool InfluxDBenable;
void SetInitialParameter(void);
public:
ClassFlowInfluxDB();
ClassFlowInfluxDB(std::vector<ClassFlow*>* lfc);
ClassFlowInfluxDB(std::vector<ClassFlow*>* lfc, ClassFlow *_prev);
string GetInfluxDBMeasurement();
bool ReadParameter(FILE* pfile, string& aktparamgraph);
bool doFlow(string time);
string name(){return "ClassFlowInfluxDB";};
};

View File

@@ -0,0 +1,7 @@
FILE(GLOB_RECURSE app_sources ${CMAKE_CURRENT_SOURCE_DIR}/*.*)
idf_component_register(SRCS ${app_sources}
INCLUDE_DIRS "."
REQUIRES tflite-lib esp_http_client jomjol_logfile)

View File

@@ -0,0 +1,114 @@
#include "interface_influxdb.h"
//#define LOG_LOCAL_LEVEL ESP_LOG_DEBUG
#include "esp_log.h"
#include <time.h>
#include "ClassLogFile.h"
#include "esp_http_client.h"
#define MAX_HTTP_OUTPUT_BUFFER 2048
static const char *TAG_INTERFACEINFLUXDB = "interface_influxdb";
std::string _influxDBURI;
std::string _influxDBDatabase;
std::string _influxDBMeasurement;
std::string _influxDBUser;
std::string _influxDBPassword;
static esp_err_t http_event_handler(esp_http_client_event_t *evt)
{
switch(evt->event_id)
{
case HTTP_EVENT_ERROR:
ESP_LOGE(TAG_INTERFACEINFLUXDB, "HTTP Client Error encountered");
break;
case HTTP_EVENT_ON_CONNECTED:
ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client Connected");
break;
case HTTP_EVENT_HEADERS_SENT:
ESP_LOGV(TAG_INTERFACEINFLUXDB, "HTTP Client sent all request headers");
break;
case HTTP_EVENT_ON_HEADER:
ESP_LOGV(TAG_INTERFACEINFLUXDB, "Header: key=%s, value=%s", evt->header_key, evt->header_value);
break;
case HTTP_EVENT_ON_DATA:
ESP_LOGV(TAG_INTERFACEINFLUXDB, "HTTP Client data recevied: len=%d", evt->data_len);
break;
case HTTP_EVENT_ON_FINISH:
ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client finished");
break;
case HTTP_EVENT_DISCONNECTED:
ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP Client Disconnected");
break;
}
return ESP_OK;
}
void InfluxDBPublish(std::string _key, std::string _content, std::string _timestamp) {
char response_buffer[MAX_HTTP_OUTPUT_BUFFER] = {0};
esp_http_client_config_t http_config = {
.user_agent = "ESP32 Meter reader",
.method = HTTP_METHOD_POST,
.event_handler = http_event_handler,
.buffer_size = MAX_HTTP_OUTPUT_BUFFER,
.user_data = response_buffer
};
if (_influxDBUser.length() && _influxDBPassword.length()){
http_config.username = _influxDBUser.c_str();
http_config.password = _influxDBPassword.c_str();
http_config.auth_type = HTTP_AUTH_TYPE_BASIC;
}
// generate timestamp (TODO: parse result timestamp passed as string and convert it to POSIX timestamp?)
time_t now = time(NULL);
char nowTimestamp[21];
// pad with zeroes to get nanoseconds
sprintf(nowTimestamp,"%jd000000000", (intmax_t)now);
std::string payload = _influxDBMeasurement + " " + _key + "=" + _content + " " + nowTimestamp;
payload.shrink_to_fit();
ESP_LOGI(TAG_INTERFACEINFLUXDB, "sending line to influxdb: %s\n", payload.c_str());
// use the default retention policy of the database
std::string apiURI = _influxDBURI + "/api/v2/write?bucket=" + _influxDBDatabase + "/";
apiURI.shrink_to_fit();
http_config.url = apiURI.c_str();
ESP_LOGI(TAG_INTERFACEINFLUXDB, "API URI: %s", apiURI.c_str());
esp_http_client_handle_t http_client = esp_http_client_init(&http_config);
ESP_LOGI(TAG_INTERFACEINFLUXDB, "client is initialized%s\n", "");
esp_http_client_set_header(http_client, "Content-Type", "text/plain");
ESP_LOGI(TAG_INTERFACEINFLUXDB, "header is set%s\n", "");
ESP_ERROR_CHECK(esp_http_client_set_post_field(http_client, payload.c_str(), payload.length()));
ESP_LOGI(TAG_INTERFACEINFLUXDB, "post payload is set%s\n", "");
esp_err_t err = ESP_ERROR_CHECK_WITHOUT_ABORT(esp_http_client_perform(http_client));
if( err == ESP_OK ) {
ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP request was performed%s\n", "");
int status_code = esp_http_client_get_status_code(http_client);
ESP_LOGI(TAG_INTERFACEINFLUXDB, "HTTP status code %d\n", status_code);
} else {
ESP_LOGW(TAG_INTERFACEINFLUXDB, "HTTP request failed%s\n", "");
}
esp_http_client_cleanup(http_client);
}
void InfluxDBInit(std::string _uri, std::string _database, std::string _measurement, std::string _user, std::string _password){
_influxDBURI = _uri;
_influxDBDatabase = _database;
_influxDBMeasurement = _measurement;
_influxDBUser = _user;
_influxDBPassword = _password;
}
void InfluxDBdestroy() {
}

View File

@@ -0,0 +1,13 @@
#ifndef INTERFACE_INFLUXDB_H
#define INTERFACE_INFLUXDB_H
#include <string>
#include <map>
#include <functional>
void InfluxDBInit(std::string _influxDBURI, std::string _database, std::string _measurement, std::string _user, std::string _password);
void InfluxDBdestroy();
void InfluxDBPublish(std::string _key, std::string _content, std::string _timestamp);
#endif //INTERFACE_INFLUXDB_H

View File

@@ -35,6 +35,7 @@ lib_deps =
jomjol_time_sntp
jomjol_logfile
jomjol_mqtt
jomjol_influxdb
jomjol_controlGPIO