Working with Time series Data¶
Kapacitor¶
Point Data (Time Series data) Analytics Overview¶
Note
In this document, you will find labels of ‘Edge Insights for Industrial (EII)’ for file names, paths, code snippets, and so on. Consider the references of EII as Open Edge Insights for Industrial (Open EII). This is due to the product name change of EII as Open EII.
Any integral value that gets generated over time, we can say it is a point data. The examples can be :
Temperature at a different time in a day.
Number of oil barrels processed per minute.
By doing the analytics over point data, the factory can have an anomaly detection mechanism. That’s where the PointDataAnalytics come into the picture.
IEdgeInsights uses the TICK stack to do point data analytics.
IEdgeInsights has a temperature anomaly detection example for demonstrating the time-series data analytics flow.
The high-level flow of the data, in the example, can be seen as MQTT-temp-sensor–>Telegraf–>Influx–>Kapacitor–>Influx.
MQTT-temp-sensor simulator sends the data to the Telegraf. Telegraf just sends the same data to the Influx and Influx send it to Kapacitor. Kapacitor does anomaly detection and publishes the results back to Influx.
Here, Telegraf is the TICK stack component and supporting the number of input plug-ins for data ingestion. Influx is a time-series database. Kapacitor is an analytics engine where users can write custom analytics plug-ins (TICK scripts).
Starting the example¶
To start the mqtt-temp-sensor, please refer tools/mqtt-publisher/README.md.
In case, if SI wants to use the IEdgeInsights only for Point Data Analytics, then comment Video use case containers ia_video_ingestion and ia_video_analytics in
../build/docker-compose.yml
Starting the Open EII. To start the Open EII in production mode, provisioning is required. After provisioning, please follow the below commands
cd build docker-compose -f docker-compose-build.yml build eii_start.sh
To start the Open EII in developer mode, please refer to the README.
To verify the output please check the output of below command
docker logs -f ia_influxdbconnector
Below is the snapshot of sample output of the ia_influxdbconnector command.
I0822 09:03:01.705940 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:01.927094 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:02.704000 1 pubManager.go:111] Published message: map[data:point_data,host=ia_telegraf,topic=temperature/simulated/0 ts=1566464582.6218634,temperature=27.353740759929877 1566464582622771952]
The data can be visualized using the Grafana dashboard, to know more refer Grafana/README.md
Purpose of Telegraf¶
Telegraf is one of the data entry points for IEdgeInsights. It supports many input plugins, which can be used for point data ingestion. In the above example, the MQTT input plugin of Telegraf is used. And below is the configuration of the plugin.
# # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
# ## MQTT broker URLs to be used. The format should be scheme://host:port,
# ## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
#
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
topics = [
"temperature/simulated/0",
]
name_override = "point_data"
data_format = "json"
#
# # if true, messages that can't be delivered while the subscriber is offline
# # will be delivered when it comes back (such as on service restart).
# # NOTE: if true, client_id MUST be set
persistent_session = false
# # If empty, a random client ID will be generated.
client_id = ""
#
# ## username and password to connect MQTT server.
username = ""
password = ""
The production mode Telegraf configuration file is
Telegraf/config/telegraf.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
) and in developer mode,
the configuration file is
Telegraf/config/telegraf_devmode.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf_devmode.conf
).
For more information on the supported input and output plugins please refer https://docs.influxdata.com/telegraf/v1.10/plugins/
Purpose of Kapacitor¶
About Kapacitor and UDF
- User can write the custom anomaly detection algorithm in PYTHON/GOLANG. And these algorithms will be called as
UDF (user-defined function). These algorithms have to follow certain API standards so that the Kapacitor will be able to call these UDFs at run time.
- IEdgeInsights has come up with the sample UDF written in GOLANG. Kapacitor is subscribed to the InfluxDB, and
gets the temperature data. After getting this data, Kapacitor calls these UDF, which detects the anomaly in the temperature and sends back the results to Influx.
- The sample Go UDF is at go_classifier.go(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/go_classifier.go
) and the tick script is at go_point_classifier.tick(
[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/go_point_classifier.tick
)
- The sample Go UDF is at go_classifier.go(
- The sample Python UDF is at py_classifier.py(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/py_classifier.py
) and the tick script is at py_point_classifier.tick(
[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/py_point_classifier.tick
)For more details, on Kapacitor and UDF, please refer below links i) Writing a sample UDF at anomaly detection ii) UDF and kapacitor interaction here
- The sample Python UDF is at py_classifier.py(
- In production mode the Kapacitor config file is
Kapacitor/config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in developer mode the config file would be Kapacitor/config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
)
Custom UDFs available in the udfs([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory¶
UNIX Socket based UDFs
go_classifier.go: Filter the points based on temperature (data > 20 and < 25 filtered out).
py_classifier.py: Filter the points based on temperature (data > 20 and < 25 filtered out).
profiling_udf.go: Add the profiling (time taken to process the data) data in the points.
temperature_classifier.go: Filter the points based on temperature (data < 25 filtered out).
humidity_classifier.py: Filter the points based on humidity (data < 25 filtered out).
Process based UDFs
rfc_classifier.py: Random Forest Classification algo sample. This UDF can be used as profiling udf as well.
Steps to configure the UDFs in Kapacitor¶
Keep the custom UDFs in the udfs(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory and the TICK script in the tick_scripts([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts
) directory.Keep the training data set (if any) required for the custom UDFs in the training_data_sets(
[WORK_DIR]/IEdgeInsights/Kapacitor/training_data_sets
) directory.For python UDFs, if any external python package dependency needs to be installed. To install the python package using pip, it can be added in the requirements.txt(
[WORK_DIR]/IEdgeInsights/Kapacitor/requirements.txt
) file and to install the python package using conda, it can be added in the conda_requirements.txt([WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt
) file.Modify the udf section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). Mention the custom UDF in the conf for example[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
In case of go/python based UDF, update the values of keys named “type”, “name”, “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file. for example"task": [{ "tick_script": "py_point_classifier.tick", "task_name": "py_point_classifier", "udfs": [{ "type": "python", "name": "py_classifier" }] }]
In case of, tick only UDF, update the values of keys named “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file. for example"task": [{ "tick_script": "simple_logging.tick", "task_name": "simple_logging" }]
Note:
By default, go_classifier and rfc_classifier is configured.
Mention the TICK script udf function same as configured in the Kapacitor config file. For example, UDF Node in the TICK script
@py_point_classifier()
should be same as
[udf.functions.py_point_classifier] socket = "/tmp/socket_file" timeout = "20s"
go/python based UDF should listen on the same socket file as mentioned in the the udf section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). For example[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
In case of process based UDFs, provide the correct path of the code within the container in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). By default, the files and directories will be copied into the container under the “/EII” directory. It is recommended to keep the custom UDFs in the udfs([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory, the path of the custom UDF will be “/EII/udfs/customUDF_name” as shown in below example. If the UDF is kept in different path please modify the path in the args accordingly.The PYTHONPATH of the Kapacitor agent directory is “/EII/go/src/github.com/influxdata/kapacitor/udf/agent/py/”. How to pass it is shown in the below example.
For example
[udf.functions.customUDF] prog = "python3.7" args = ["-u", "/EII/udfs/customUDF"] timeout = "60s" [udf.functions.customUDF.env] PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/"
Do the provisioning and run the Open EII stack.
Steps to run the samples of multiple UDFs in a single task and multiple tasks using single UDF¶
Refer to the samples/README
Kapacitor input and output plugins¶
Purpose of plugins¶
The plugins allow Kapacitor to interact directly with Open EII Message bus. They use message bus publisher/subscriber interface. Using these plugins Kapacitor can now receive data from various Open EII publishers and send data to various Open EII subscribers. Hence, it’s possible to have a time-series use case without InfluxDB and Kapacitor can act as an independent analytical engine.
A simple use case flow can be as follows:
MQTT-temp-sensor–>Telegraf–>Kapacitor–>TimeseriesProfiler
Using input plugin¶
Configure the Open EII input plugin in config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
) For example:[eii] enabled = true
Edit config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) to add a subscriber under interfaces.For example, to receive data published by Telegraf:
TCP mode
"Subscribers": [ { "Name": "telegraf_sub", "Type": "zmq_tcp", "EndPoint": "ia_telegraf:65077", "PublisherAppName": "Telegraf", "Topics": [ "*" ] } ]
IPC mode
"Subscribers": [ { "Name": "telegraf_sub", "Type": "zmq_ipc", "EndPoint": { "SocketDir": "/EII/sockets", "SocketFile": "telegraf-out" }, "PublisherAppName": "Telegraf", "Topics": [ "*" ] } ]
Note: For IPC mode, we need to specify the ‘EndPoint’ as a dict of ‘SocketDir’ and ‘SocketFile’ in case where ‘Topics’ is [*] (as in the above example). In case of single topic the ‘EndPoint’ can be defined as below (as in the example of Kapacitor o/p plugin):**
"EndPoint": "/EII/sockets"
The received data will be available in the ‘eii’ storage for the tick scripts to use.
Create/modify a tick script to process the data and configure the same in config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
). For example, use the stock tick_scripts/eii_input_plugin_logging.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/eii_input_plugin_logging.tick
) which logs the data received from ‘eii’ storage onto the kapacitor log file (residing in the container at /tmp/log/kapacitor/kapacitor.log)."task": [ { "tick_script": "eii_input_plugin_logging.tick", "task_name": "eii_input_plugin_logging" } ]
Do the provisioning and run the Open EII stack.
The subscribed data will now be available in the above logs file which can be viewed with the command below:
docker exec ia_kapacitor tail -f /tmp/log/kapacitor/kapacitor.log
Using output plugin¶
Create/modify a tick script to use ‘eiiOut’ node to send the data using publisher interface For example, you may modify the profiling UDF as below:
dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('point_data') @profiling_udf() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Add a publisher interface added to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic i.e. ‘sample_publisher’ and ‘sample_topic’ respectively as in the above example. For example:TCP mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_tcp", "EndPoint": "0.0.0.0:65034", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler" ] } ]
IPC mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_ipc", "EndPoint": "/EII/sockets", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler" ] } ]
Do the provisioning and run the Open EII stack.
Using input/output plugin with RFC UDF¶
Add the RFC task to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
):"task": [ { "tick_script": "rfc_task.tick", "task_name": "random_forest_sample", "udfs": [{ "type": "python", "name": "rfc_classifier" }] } ]
Modify the rfc_task.tick(
[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/rfc_task.tick
) as below, for example:dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('ts_data') |window() .period(3s) .every(4s) data0 @rfc() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Add a publisher interface added to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic i.e. ‘sample_publisher’ and ‘sample_topic’ respectively as in the above example. For example:TCP mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_tcp", "EndPoint": "0.0.0.0:65034", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
IPC mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_ipc", "EndPoint": "/EII/sockets", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
Do the provisioning and run the Open EII stack.
Grafana Overview¶
Grafana is an open-source metric analytics and visualization suite. Its uses include:
Visualizing time series data for infrastructure and application analytics
Industrial sensors, home automation, weather, and process control
Grafana supports various storage backends for the time-series data (data source). Open Edge Insights for Industrial (Open EII) uses InfluxDB as the data source. Grafana connects to the InfluxDB data source which has been preconfigured as a part of the Grafana setup. The ia_influxdbconnector
service must be running for Grafana to be able to collect the time-series data. After the data source starts working, you can use the preconfigured dashboard to visualize the incoming data. You can also edit the dashboard as required.
Configuration¶
The following are the configuration details for Grafana:
dashboard.json(
[WORK_DIR]/IEdgeInsights/Grafana/dashboard.json
): This is the dashboard json file that is loaded when Grafana starts. It is preconfigured to display the time-series data.dashboard_sample.yml(
[WORK_DIR]/IEdgeInsights/Grafana/dashboard_sample.yml
): This is the config file for all the dashboards. It specifies the path to locate all the dashboard json files.datasource_sample.yml(
[WORK_DIR]/IEdgeInsights/Grafana/datasource_sample.yml
): This is the config file for setting up the data source. It has various fields for data source configuration.grafana_template.ini(
[WORK_DIR]/IEdgeInsights/Grafana/grafana_template.ini
): This is the config file for Grafana. It specifies how Grafana should start after it is configured.
Note
You can edit the contents of these files based on your requirement.
Run Grafana¶
Based on requirement, you can run Grafana in the Prod mode
or the DEV mode
.
Complete the following steps to run Grafana:
Open the docker-compose.yml(
[WORK_DIR]/IEdgeInsights/docker-compose.yml
) file.In the
docker-compose.yml
file, uncommentia_grafana
.Check if the
ia_influxdbconnector
,ia_kapacitor
, andia_telegraph
services are running for the time-series data.Check if the publisher(
[WORK_DIR]/IEdgeInsights/tools/mqtt-publisher/publisher_temp.sh
) service is running.Run the
docker-compose build
command to build image.Run the
docker-compose up
to start the service.
Complete the previous steps and based on the mode that you want to run Grafana refer to the following sections:
Run Grafana in the PROD mode¶
Note
Skip this section, if you are running Grafana in the DEV mode.
To run Grafana in the PROD mode, import cacert.pem
from the build/Certificates/rootca/
directory to the browser certificates. Complete the following steps to import certificates:
In Chrome browser, go to Settings.
In Search settings, enter Manage certificates.
Click Security.
On the Advanced section, click Manage certificates.
On the Certificates window, click the Trusted Root Certification Authorities tab.
Click Import.
On the Certificate Import Wizard, click Next.
Click Browse.
Go to the
IEdgeInsights/build/Certificates/rootca/
directory.Select the cacert.pem file.
Select all checkboxes and then, click Import.
Run Grafana in the DEV Mode¶
To run Grafana in the DEV mode, complete the following steps:
After starting the
ia_grafana
service, go tohttp://< host ip >:3000
.Enter the default credentials details, username: “admin” and password: “admin”.
On the Home Dashboard page, on the left corner, click the Dashboards icon.
Click the Manage Dashboards tab.
From the list of preconfigured dashboards, click Point_Data_Dashboard.
Click Panel Title and then, select Edit.
On the Point_Data_Dashboard page, if required make modifications to the query.
Execute Queries¶
On the Point_Data_Dashboard
, the green spikes visible in the graph are the results of the default query. To run queries, perform the following steps:
In the FROM section of query, click default_classifier_results. A list is displayed with the name of measurements present in InfluxDB.
Note: If any other measurement is set the graph will switch to the measurement query results. By default, the FROM section will have default point_classifier_results WHERE +.
In the SELECT section, click temperature. A list will display the fields tags present in the schema of the measurements set in the FROM section.
Note: By default the SELECT section will have field(temperature) mean() +. The graph will change according to the values you select.
Run Grafana for Video Use Cases¶
Perform the following steps to run Grafana for a video use case:
Ensure that the endpoint of the publisher, that you want to subscribe to, is mentioned in the Subscribers section of the config(
[WORK_DIR]/IEdgeInsights/Grafana/config.json
) file.On the Home Dashboard page, on the left corner, click the Dashboards icon.
Click the Manage Dashboards tab, to view the list of all the preconfigured dashboards.
Select Open EII Video and Time Series Dashboard, to view multiple panels with topic names of the subscriber as the panel names along with a time-series panel named
Time Series
.Hover over the topic name. The panel title will display multiple options.
Click View to view the subscribed frames for each topic.
Note
The only supported browser for Grafana support for video use case is Google Chrome.
Changing gridPos for the video frame panels is prohibited since these values are altered internally to support multi instance.
Grafana does not support visualization for GVA, CustomUDF streams
Grafana currently supports running a maximum number of 12 streams only.
Helm chart for Grafana video use case is not supported.
InfluxDBConnector Module¶
InfluxDBConnector will subscribe to the InfluxDB and start the zmq publisher, zmq subscriber threads, and zmq request reply thread based on PubTopics, SubTopics and QueryTopics configuration.
zmq subscriber thread connects to the PUB socket of zmq bus on which the data is published by VideoAnalytics and push it to the InfluxDB.
zmq publisher thread will publish the point data ingested by the telegraf and the classifier result coming out of the point data analytics.
zmq reply request service will receive the InfluxDB select query and response with the historical data.
Configuration¶
All the InfluxDBConnector module configuration are added into etcd (distributed key-value data store) under AppName
as mentioned in the environment section of this app’s service definition in docker-compose.
If AppName
is InfluxDBConnector
, then the app’s config would look like as below for /InfluxDBConnector/config
key in Etcd:
"influxdb": {
"retention": "1h30m5s",
"dbname": "datain",
"ssl": "True",
"verifySsl": "False",
"port": "8086"
}
For nested json data, by default InfluxDBConnector will flatten the nested json and push the flat data to InfluxDB to avoid the flattening of any particular nested key mention the tag key in the config.json(``[WORK_DIR]/IEdgeInsights/InfluxDBConnector/config.json``) file. Currently the defects
key is ignored from flattening. Every key to be ignored has to be in a new line.
For example,
ignore_keys = [ "Key1", "Key2", "Key3" ]
By default, all the keys in the data schema will be pushed to InfluxDB as fields. If tags are present in data schema, it can be mentioned in the config.json(``[WORK_DIR]/IEdgeInsights/InfluxDBConnector/config.json``) file then the data pushed to InfluxDB, will have fields and tags both. Currently, no tags are present in the data scheme and tag_keys is kept blank in the config file.
For example,
tag_keys = [ "Tag1", "Tag2" ]
For more details on Etcd secrets and messagebus endpoint configuration, refer to Etcd_Secrets_Configuration.md and MessageBus Configuration respectively.