Contents
========
* `Contents <#contents>`__
* `Point-data (Time series data) analytics introduction <#point-data-time-series-data-analytics-introduction>`__
* `Starting the example <#starting-the-example>`__
* `Purpose of Telegraf <#purpose-of-telegraf>`__
* `Purpose of Kapacitor <#purpose-of-kapacitor>`__
* `Custom UDFs available in the udfs directory <#custom-udfs-available-in-the-udfs-directory>`__
* `Steps to configure the UDFs in Kapacitor <#steps-to-configure-the-udfs-in-kapacitor>`__
* `Steps to run the samples of multiple UDFs in a single task and multiple tasks using single UDF <#steps-to-run-the-samples-of-multiple-udfs-in-a-single-task-and-multiple-tasks-using-single-udf>`__
* `Kapacitor input and output plugins <#kapacitor-input-and-output-plugins>`__
* `Purpose of plugins <#purpose-of-plugins>`__
* `Using input plugin <#using-input-plugin>`__
* `Using output plugin <#using-output-plugin>`__
* `Using input/output plugin with RFC UDF <#using-inputoutput-plugin-with-rfc-udf>`__
Kapacitor
---------
Point Data (Time Series data) Analytics Overview
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note:: In this document, you will find labels of 'Edge Insights for Industrial (EII)' for file names, paths, code snippets, and so on. Consider the references of EII as Open Edge Insights for Industrial (Open EII). This is due to the product name change of EII as Open EII.
Any integral value that gets generated over time, we can say it is a point data.
The examples can be :
* Temperature at a different time in a day.
* Number of oil barrels processed per minute.
By doing the analytics over point data, the factory can have an anomaly detection mechanism.
That's where the PointDataAnalytics come into the picture.
IEdgeInsights uses the `TICK stack `_
to do point data analytics.
IEdgeInsights has a temperature anomaly detection example for demonstrating the time-series data analytics flow.
The high-level flow of the data, in the example, can be seen as MQTT-temp-sensor-->Telegraf-->Influx-->Kapacitor-->Influx.
MQTT-temp-sensor simulator sends the data to the Telegraf. Telegraf just sends the same data to the
Influx and Influx send it to Kapacitor. Kapacitor does anomaly detection and publishes the results back to
Influx.
Here,
Telegraf is the TICK stack component and supporting the number of input plug-ins for data ingestion.
Influx is a time-series database.
Kapacitor is an analytics engine where users can write custom analytics plug-ins (TICK scripts).
Starting the example
^^^^^^^^^^^^^^^^^^^^
#.
To start the mqtt-temp-sensor, please refer `tools/mqtt-publisher/README.md 3.0.1/IEdgeInsights/tools/mqtt/README.html>`_.
#.
In case, if SI wants to use the IEdgeInsights only for Point Data Analytics,
then comment Video use case containers ia_video_ingestion and ia_video_analytics in ``../build/docker-compose.yml``
#.
Starting the Open EII.
To start the Open EII in production mode, provisioning is required. After provisioning, please follow the below commands
.. code-block:: sh
cd build
docker-compose -f docker-compose-build.yml build
eii_start.sh
To start the Open EII in developer mode, please refer to the `README 3.0.1/IEdgeInsights/README.html#provision>`_.
#.
To verify the output please check the output of below command
.. code-block:: sh
docker logs -f ia_influxdbconnector
Below is the snapshot of sample output of the ia_influxdbconnector command.
.. code-block:: sh
I0822 09:03:01.705940 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117]
I0822 09:03:01.927094 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117]
I0822 09:03:02.704000 1 pubManager.go:111] Published message: map[data:point_data,host=ia_telegraf,topic=temperature/simulated/0 ts=1566464582.6218634,temperature=27.353740759929877 1566464582622771952]
The data can be visualized using the Grafana dashboard, to know more refer `Grafana/README.md 3.0.1/IEdgeInsights/Grafana/README.html>`_
Purpose of Telegraf
^^^^^^^^^^^^^^^^^^^
Telegraf is one of the data entry points for IEdgeInsights. It supports many input plugins, which can be used for
point data ingestion. In the above example, the MQTT input plugin of Telegraf is used. And below is the configuration
of the plugin.
.. code-block:: sh
# # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
# ## MQTT broker URLs to be used. The format should be scheme://host:port,
# ## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
#
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
topics = [
"temperature/simulated/0",
]
name_override = "point_data"
data_format = "json"
#
# # if true, messages that can't be delivered while the subscriber is offline
# # will be delivered when it comes back (such as on service restart).
# # NOTE: if true, client_id MUST be set
persistent_session = false
# # If empty, a random client ID will be generated.
client_id = ""
#
# ## username and password to connect MQTT server.
username = ""
password = ""
The production mode Telegraf configuration file is
Telegraf/config/telegraf.conf(\ ``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf``\ ) and in developer mode,
the configuration file is
Telegraf/config/telegraf_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf_devmode.conf``\ ).
For more information on the supported input and output plugins please refer
`https://docs.influxdata.com/telegraf/v1.10/plugins/ `_
Purpose of Kapacitor
^^^^^^^^^^^^^^^^^^^^
About Kapacitor and UDF
*
User can write the custom anomaly detection algorithm in PYTHON/GOLANG. And these algorithms will be called as
UDF (user-defined function). These algorithms have to follow certain API standards so that the Kapacitor will be able to
call these UDFs at run time.
*
IEdgeInsights has come up with the sample UDF written in GOLANG. Kapacitor is subscribed to the InfluxDB, and
gets the temperature data. After getting this data, Kapacitor calls these UDF, which detects the anomaly in the temperature
and sends back the results to Influx.
*
The sample Go UDF is at go_classifier.go(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/go_classifier.go``\ ) and
the tick script is at go_point_classifier.tick(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/go_point_classifier.tick``\ )
*
The sample Python UDF is at py_classifier.py(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/py_classifier.py``\ ) and
the tick script is at py_point_classifier.tick(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/py_point_classifier.tick``\ )
For more details, on Kapacitor and UDF, please refer below links
i) Writing a sample UDF at `anomaly detection `_
ii) UDF and kapacitor interaction `here `_
*
In production mode the Kapacitor config file is
Kapacitor/config/kapacitor.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf``\ )
and in developer mode the config file would be
Kapacitor/config/kapacitor_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf``\ )
Custom UDFs available in the udfs(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/udfs``\ ) directory
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
UNIX Socket based UDFs
#.
go_classifier.go: Filter the points based on temperature (data > 20 and < 25 filtered out).
#.
py_classifier.py: Filter the points based on temperature (data > 20 and < 25 filtered out).
#.
profiling_udf.go: Add the profiling (time taken to process the data) data in the points.
#.
temperature_classifier.go: Filter the points based on temperature (data < 25 filtered out).
#.
humidity_classifier.py: Filter the points based on humidity (data < 25 filtered out).
*
Process based UDFs
#. rfc_classifier.py: Random Forest Classification algo sample. This UDF can be used as profiling udf as well.
Steps to configure the UDFs in Kapacitor
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
*
Keep the custom UDFs in the udfs(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/udfs``\ ) directory and the TICK script in the tick_scripts(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts``\ ) directory.
*
Keep the training data set (if any) required for the custom UDFs in the training_data_sets(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/training_data_sets``\ ) directory.
*
For python UDFs, if any external python package dependency needs to be installed. To install the python package using pip, it can be added in the requirements.txt(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/requirements.txt``\ ) file and to install the python package using conda, it can be added in the conda_requirements.txt(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt``\ ) file.
*
Modify the udf section in the kapacitor.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf``\ ) and in the kapacitor_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf``\ ).
Mention the custom UDF in the conf
for example
.. code-block:: sh
[udf.functions.customUDF]
socket = "/tmp/socket_file"
timeout = "20s"
*
In case of go/python based UDF, update the values of keys named "type", "name", "tick_script", "task_name", in the
config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ )file.
for example
.. code-block:: sh
"task": [{
"tick_script": "py_point_classifier.tick",
"task_name": "py_point_classifier",
"udfs": [{
"type": "python",
"name": "py_classifier"
}]
}]
*
In case of, tick only UDF, update the values of keys named "tick_script", "task_name", in the config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ )file.
for example
.. code-block:: sh
"task": [{
"tick_script": "simple_logging.tick",
"task_name": "simple_logging"
}]
..
**Note:**
#.
By default, go_classifier and rfc_classifier is configured.
#.
Mention the TICK script udf function same as configured in the Kapacitor config file.
For example, UDF Node in the TICK script
.. code-block:: sh
@py_point_classifier()
should be same as
.. code-block:: sh
[udf.functions.py_point_classifier]
socket = "/tmp/socket_file"
timeout = "20s"
#.
go/python based UDF should listen on the same socket file as mentioned in the the udf section in the
kapacitor.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf``\ ) and in the kapacitor_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf``\ ).
For example
.. code-block:: sh
[udf.functions.customUDF]
socket = "/tmp/socket_file"
timeout = "20s"
#.
In case of process based UDFs, provide the correct path of the code within the container
in the kapacitor.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf``\ ) and in the kapacitor_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf``\ ).
By default, the files and directories will be copied into the container under the "/EII" directory. It is recommended to keep the custom UDFs
in the udfs(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/udfs``\ ) directory, the path of the custom UDF will be "/EII/udfs/customUDF_name" as shown in below example.
If the UDF is kept in different path please modify the path in the args accordingly.
The PYTHONPATH of the Kapacitor agent directory is "/EII/go/src/github.com/influxdata/kapacitor/udf/agent/py/". How to pass
it is shown in the below example.
For example
.. code-block:: sh
[udf.functions.customUDF]
prog = "python3.7"
args = ["-u", "/EII/udfs/customUDF"]
timeout = "60s"
[udf.functions.customUDF.env]
PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/"
*
Do the `provisioning 3.0.1/IEdgeInsights/README.html#provision>`_ and run the Open EII stack.
Steps to run the samples of multiple UDFs in a single task and multiple tasks using single UDF
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Refer to the `samples/README 3.0.1/IEdgeInsights/Samples/README.html>`_
Kapacitor input and output plugins
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Purpose of plugins
~~~~~~~~~~~~~~~~~~
The plugins allow Kapacitor to interact directly with Open EII Message bus. They use message bus publisher/subscriber interface. Using these plugins Kapacitor can now receive data from various Open EII publishers and send data to various Open EII subscribers. Hence, it's possible to have a time-series use case without InfluxDB and Kapacitor can act as an independent analytical engine.
A simple use case flow can be as follows:
MQTT-temp-sensor-->Telegraf-->Kapacitor-->TimeseriesProfiler
Using input plugin
~~~~~~~~~~~~~~~~~~
*
Configure the Open EII input plugin in config/kapacitor.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf``\ ) and config/kapacitor_devmode.conf(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf``\ )
For example:
.. code-block:: sh
[eii]
enabled = true
*
Edit config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ ) to add a subscriber under interfaces.
For example, to receive data published by Telegraf:
**TCP mode**
.. code-block:: sh
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_tcp",
"EndPoint": "ia_telegraf:65077",
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
**IPC mode**
.. code-block:: sh
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_ipc",
"EndPoint": {
"SocketDir": "/EII/sockets",
"SocketFile": "telegraf-out"
},
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
..
**Note:** For IPC mode, we need to specify the 'EndPoint' as a dict of 'SocketDir' and 'SocketFile' in case
where 'Topics' is [*] (as in the above example).
In case of single topic the 'EndPoint' can be defined as below (as in the example of Kapacitor o/p plugin):**
.. code-block:: sh
"EndPoint": "/EII/sockets"
The received data will be available in the 'eii' storage for the tick scripts to use.
*
Create/modify a tick script to process the data and configure the same in config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ ).
For example, use the stock tick_scripts/eii_input_plugin_logging.tick(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/eii_input_plugin_logging.tick``\ ) which logs the data received from 'eii'
storage onto the kapacitor log file (residing in the container at /tmp/log/kapacitor/kapacitor.log).
.. code-block:: sh
"task": [
{
"tick_script": "eii_input_plugin_logging.tick",
"task_name": "eii_input_plugin_logging"
}
]
*
Do the `provisioning 3.0.1/IEdgeInsights/README.html#provision>`_ and run the Open EII stack.
The subscribed data will now be available in the above logs file which can be viewed with the
command below:
.. code-block:: sh
docker exec ia_kapacitor tail -f /tmp/log/kapacitor/kapacitor.log
Using output plugin
~~~~~~~~~~~~~~~~~~~
*
Create/modify a tick script to use 'eiiOut' node to send the data using publisher interface
For example, you may modify the profiling UDF as below:
.. code-block:: sh
dbrp "eii"."autogen"
var data0 = stream
|from()
.database('eii')
.retentionPolicy('autogen')
.measurement('point_data')
@profiling_udf()
|eiiOut()
.pubname('sample_publisher')
.topic('sample_topic')
*
Add a publisher interface added to config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ ) with the same publisher name and topic
i.e. 'sample_publisher' and 'sample_topic' respectively as in the above example.
For example:
**TCP mode**
.. code-block:: sh
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65034",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
**IPC mode**
.. code-block:: sh
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
*
Do the `provisioning 3.0.1/IEdgeInsights/README.html#provision>`_ and run the Open EII stack.
Using input/output plugin with RFC UDF
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
Add the RFC task to config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ ):
.. code-block:: sh
"task": [
{
"tick_script": "rfc_task.tick",
"task_name": "random_forest_sample",
"udfs": [{
"type": "python",
"name": "rfc_classifier"
}]
}
]
*
Modify the rfc_task.tick(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/rfc_task.tick``\ ) as below, for example:
.. code-block:: sh
dbrp "eii"."autogen"
var data0 = stream
|from()
.database('eii')
.retentionPolicy('autogen')
.measurement('ts_data')
|window()
.period(3s)
.every(4s)
data0
@rfc()
|eiiOut()
.pubname('sample_publisher')
.topic('sample_topic')
*
Add a publisher interface added to config.json(\ ``[WORK_DIR]/IEdgeInsights/Kapacitor/config.json``\ ) with the same publisher name and topic
i.e. 'sample_publisher' and 'sample_topic' respectively as in the above example.
For example:
**TCP mode**
.. code-block:: sh
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65034",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler",
"EmbSubscriber",
"GoSubscriber"
]
}
]
**IPC mode**
.. code-block:: sh
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler",
"EmbSubscriber",
"GoSubscriber"
]
}
]
*
Do the `provisioning 3.0.1/IEdgeInsights/README.html#provision>`_ and run the Open EII stack.