Contents¶
Overview of the Kapacitor¶
Introduction to the Point-Data Analytics(Time Series Data)¶
Any integral value that gets generated over time is point data. For examples:
Temperature at a different time in a day.
Number of oil barrels processed per minute.
By doing the analytics over point data, the factory can have an anomaly detection mechanism where PointDataAnalytics is considered.
IEdgeInsights uses the TICK stack to do point data analytics.
It has temperature anomaly detection, an example for demonstrating the time-series data analytics flow.
The high-level flow of the data is:
MQTT-temp-sensor–>Telegraf–>Influx–>Kapacitor–>Influx.
MQTT-temp-sensor simulator sends the data to the Telegraf. Telegraf sends the same data to Influx and Influx sends it to Kapacitor. Kapacitor does anomaly detection and publishes the results back to Influx.
Here, Telegraf is the TICK stack component and supports the number of input plug-ins for data ingestion. Influx is a time-series database. Kapacitor is an analytics engine where users can write custom analytics plug-ins (TICK scripts).
Starting the Example¶
To start the mqtt-temp-sensor, refer to tools/mqtt-publisher/README.md.
If System Integrator (SI) wants to use the IEdgeInsights only for point data analytics, then analyze Video use case containers ia_video_ingestion and ia_video_analytics in
../build/docker-compose.yml
Starting the EII.
To start the EII in production mode, provisioning must be done. Following are the commands to be executed after provisioning:
cd build docker-compose build docker-compose up -d
To start the EII in developer mode, refer to README.
To verify the output, check the output of the following commands:
docker logs -f ia_influxdbconnector
Following is the snapshot of sample output of the ia_influxdbconnector command.
I0822 09:03:01.705940 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:01.927094 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:02.704000 1 pubManager.go:111] Published message: map[data:point_data,host=ia_telegraf,topic=temperature/simulated/0 ts=1566464582.6218634,temperature=27.353740759929877 1566464582622771952]
Purpose of the Telegraf¶
Telegraf is a data entry point for IEdgeInsights. It supports many input plugins, which is used for point data ingestion. In the previous example, the MQ Telemetry Transport (MQTT) input plugin of Telegraf is used. Following are the configurations of the plugins:
# # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
# ## MQTT broker URLs to be used. The format should be scheme://host:port,
# ## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
#
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
topics = [
"temperature/simulated/0",
]
name_override = "point_data"
data_format = "json"
#
# # if true, messages that can't be delivered while the subscriber is offline
# # will be delivered when it comes back (such as on service restart).
# # NOTE: If true, client_id MUST be set
persistent_session = false
# # If empty, a random client ID will be generated.
client_id = ""
#
# ## username and password to connect MQTT server.
username = ""
password = ""
In the production mode, the Telegraf configuration file is
Telegraf/config/telegraf.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
) and in developer mode,
the Telegraf configuration file is
Telegraf/config/telegraf_devmode.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf_devmode.conf
).
For more information on the supported input and output plugins refer to https://docs.influxdata.com/telegraf/v1.10/plugins/
Purpose of the Kapacitor¶
About Kapacitor and UDF
You can write the custom anomaly detection algorithm in PYTHON/GOLANG. And these algorithms are called as UDF (user-defined function). These algorithms follow certain API standards for the Kapacitor to call these UDFs at run time.
IEdgeInsights has the sample UDF (user-defined function) written in GOLANG. Kapacitor is subscribed to the InfluxDB, and gets the temperature data. After fetching this data, Kapacitor calls the UDF, which detects the anomaly in the temperature and sends back the results to Influx.
The sample Go UDF is at go_classifier.go(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/go_classifier.go
) and the TICKscript is at go_point_classifier.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/go_point_classifier.tick
)The sample Python UDF is at py_classifier.py(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/py_classifier.py
) and the TICKscript is at py_point_classifier.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/py_point_classifier.tick
)For more details, on Kapacitor and UDF, refer to the following links:
Writing a sample UDF at anomaly detection
UDF and kapacitor interaction socket_udf
In production mode, the Kapacitor configuration file is Kapacitor/config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in developer mode, the Kapacitor configuration file is Kapacitor/config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
).
Custom UDFs available in the UDF([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) Directory¶
UNIX Socket based UDFs
go_classifier.go:Filters the points based on temperature (data >20 and <25 filtered out).
py_classifier.py:Filters the points based on temperature (data >20 and <25 filtered out).
profiling_udf.go:Add the profiling (time taken to process the data) data in the points.
temperature_classifier.go:Filter the points based on temperature (data <25 filtered out).
humidity_classifier.py:Filter the points based on humidity (data <25 filtered out).
Process based UDFs
rfc_classifier.py:Random Forest Classification algo sample. This UDF is used for profiling udf as well.
Steps to Configure the UDFs in the Kapacitor¶
Keep the custom UDFs in the udfs(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory and the TICKscript in the tick_scripts([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts
) directory.Keep the training data set (if any) required for the custom UDFs in the training_data_sets(
[WORK_DIR]/IEdgeInsights/Kapacitor/training_data_sets
) directory.For python UDFs, any external python package dependency needs to be installed. To install the python package using pip, it can be added in the requirements.txt(
[WORK_DIR]/IEdgeInsights/Kapacitor/requirements.txt
) file and to install the python package using conda, it can be added in the conda_requirements.txt([WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt
) file.Modify the UDF section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). Mention the custom UDF in the configuration, for example:[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
For go or python based UDF, update the values of keys named “type”, “name”, “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file, for example:"task": [{ "tick_script": "py_point_classifier.tick", "task_name": "py_point_classifier", "udfs": [{ "type": "python", "name": "py_classifier" }] }]
For TICKscript only UDF, update the values of keys named “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file, for example:"task": [{ "tick_script": "simple_logging.tick", "task_name": "simple_logging" }]
Note:
By default, go_classifier and rfc_classifier is configured.
Mention the TICKscript UDF function same as configured in the Kapacitor configuration file. For example, UDF Node in the TICKscript:
@py_point_classifier()
should be same as
[udf.functions.py_point_classifier] socket = "/tmp/socket_file" timeout = "20s"
go or python based UDF should listen on the same socket file as mentioned in the the UDF section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). For example:[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
For a process based UDFs, provide the correct path of the code within the container in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). By default, the files and directories will be copied into the container under the “/OpenEII” directory. It is recommended to keep the custom UDFs in the udfs([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory, the path of the custom UDF will be “/OpenEII/udfs/customUDF_name” as shown in below example. If the UDF is kept in different path, modify the path in the args accordingly.The PYTHONPATH of the Kapacitor agent directory is “/OpenEII/go/src/github.com/influxdata/kapacitor/udf/agent/py/”. Following example shows how to pass:
[udf.functions.customUDF] prog = "python3.7" args = ["-u", "/EII/udfs/customUDF"] timeout = "60s" [udf.functions.customUDF.env] PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/"
Perform the provisioning and run the EII stack.
Steps to Run the Samples of Multiple UDFs in a Single Task and Multiple Tasks using Single UDF¶
Refer to the samples/README
Kapacitor Input and Output Plugins¶
Purpose of Plugins¶
The plugins allow Kapacitor to interact directly with EII Message Bus. They use message bus publisher or subscriber interface. Using these plugins Kapacitor receives data from various EII publishers and sends data to various EII subscribers. Hence, it’s possible to have a time-series use case without InfluxDB and Kapacitor as independent analytical engine.
A simple use case flow is as follows:
MQTT-temp-sensor–>Telegraf–>Kapacitor–>TimeseriesProfiler
Using Input Plugin¶
Following are the steps using input Plugins:
Configure the EII input plugin in config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
)
For example:
[eii]
enabled = true
Edit config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) to add a subscriber under interfaces.
For example, to receive data published by Telegraf:
TCP mode
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_tcp",
"EndPoint": "ia_telegraf:65077",
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
**IPC mode**
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_ipc",
"EndPoint": {
"SocketDir": "/EII/sockets",
"SocketFile": "telegraf-out"
},
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
Note
For IPC mode, we need to specify the ‘EndPoint’ as a dict of ‘SocketDir’ and ‘SocketFile’, where ‘Topics’ is [*] (as in the previous example). For a single topic the ‘EndPoint’ is [**] (as in the example of Kapacitor output plugin).
"EndPoint": "/EII/sockets"
The received data is available in the 'EII' storage for the TICKscript.
Create or modify a TICKscript to process the data and configure the same in config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
). For example, use the stock tick_scripts/eii_input_plugin_logging.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/eii_input_plugin_logging.tick
) which logs the data received from ‘EII’ storage onto the kapacitor log file (residing in the container at /mp/log/kapacitor/kapacitor.log)."task": [ { "tick_script": "eii_input_plugin_logging.tick", "task_name": "eii_input_plugin_logging" } ]
Perform the provisioning and run the EII stack.
The subscribed data is available in the previous log file, which is seen by executing the following command:
docker exec ia_kapacitor tail -f /tmp/log/kapacitor/kapacitor.log
Using Output Plugin¶
Following are the steps using output Plugins:
Create or modify a TICKscript to use ‘eiiOut’ node to send the data using publisher interface. Following is an example to modify the profiling UDF:
dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('point_data') @profiling_udf() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Add a publisher interface to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic that is ‘sample_publisher’ and ‘sample_topic’ respectively as seen in the previously example.
For example:
TCP mode
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65034",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
**IPC mode**
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
Perform provisioning service and run the EII stack.
Using Input or Output Plugin with RFC UDF¶
Following are the steps using input or output Plugins with RFC UDF:
Add the RFC task to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
):"task": [ { "tick_script": "rfc_task.tick", "task_name": "random_forest_sample", "udfs": [{ "type": "python", "name": "rfc_classifier" }] } ]
Modify the rfc_task.tick(
[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/rfc_task.tick
) as seen in the following example:dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('ts_data') |window() .period(3s) .every(4s) data0 @rfc() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Modify the Kapacitor conf files Kapacitor/config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and Kapacitor/config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
) UDF section to remove Go classifier UDF related configurations since this conflicts with the existing python RFC UDF configuration:# Configuration for UDFs (User Defined Functions) [udf.functions] # [udf.functions.go_point_classifier] # socket = "/tmp/point_classifier" # timeout = "20s"
Add a publisher interface to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic that is ‘sample_publisher’ and ‘sample_topic’ respectively as in the previous example.For example:
TCP mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_tcp", "EndPoint": "0.0.0.0:65034", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
IPC mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_ipc", "EndPoint": "/EII/sockets", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
Perform provisioning service, build and run the EII stack.
Steps to Independently Build and Deploy the Kapacitor Service¶
Note
For running 2 or more microservices, we recommend that users try the use case-driven approach for building and deploying as mentioned in Generate Consolidated Files for a Subset of Edge Insights for Industrial Services
Steps to Independently Build the Kapacitor Service¶
Note
When switching between independent deployment of the service with and without the configuration manager agent service dependency, one would run into issues with the docker-compose build
w.r.t the Certificates folder existence. As a workaround, run the command sudo rm -rf Certificates
to proceed with docker-compose build
.
To independently build the Kapacitor service, complete the following steps:
The downloaded source code should have a directory named Kapacitor:
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder
cp ../build/.env .
NOTE: Update the HOST_IP and ETCD_HOST variables in the .env file with your system IP.
# Source the .env using the following command: set -a && source .env && set +a
Independently build
docker-compose build
Steps to Independently Deploy the Kapacitor Service¶
You can deploy the Kapacitor service in any of the following two ways:
Deploy the Kapacitor Service without the Config Manager Agent Dependency¶
Run the following commands to deploy the Kapacitor service without Configuration Manager Agent dependency:
# Enter the Kapacitor directory
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .Note: Ensure that
docker ps
is clean anddocker network ls
must not have EII bridge network.
Update .env file for the following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value to `true` and `DEV_MODE` value to `true`.
Source the .env using the following command:
set -a && source .env && set +a
# Run the service
docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml up -d
Note
Kapacitor container restarts automatically when its config is modified in config.json
file.
If user is updating the config.json file using vi or vim
editor, it is required to append the set backupcopy=yes
in ~/.vimrc
so that the changes done on the host machine config.json gets reflected inside the container mount point.
Deploy the Kapacitor Service with the Config Manager Agent Dependency¶
Run the following commands to deploy the Kapacitor service with the Config Manager Agent dependency:
Note
Ensure that the Config Manager Agent image present in the system. If not, build the Config Manager Agent locally when independently deploying the service with Config Manager Agent dependency.
# Enter the Kapacitor directory
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .Note: Ensure that
docker ps
is clean anddocker network ls
doesn’t have EII bridge networks.
Update .env file for following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value is set to `false`.
Copy the docker-compose.yml from IEdgeInsights/ConfigMgrAgent as docker-compose.override.yml in IEdgeInsights/Kapacitor.
cp ../ConfigMgrAgent/docker-compose.yml docker-compose.override.yml
Copy the builder.py with standalone mode changes from IEdgeInsights/build directory
cp ../build/builder.py
Run the builder.py in standalone mode, this will generate eii_config.json and update docker-compose.override.yml
python3 builder.py -s trueBuilding the service (This step is optional for building the service if not already done in the
Independently buildable
step above)docker-compose build
For running the service in PROD mode, run the below command:
NOTE: Make sure to update
DEV_MODE
tofalse
in .env while running in PROD mode and source the .env using the commandset -a && source .env && set +a
.docker-compose up -dFor running the service in DEV mode, run the below command:
NOTE: Make sure to update
DEV_MODE
totrue
in .env while running in DEV mode and source the .env using the commandset -a && source .env && set +a
.docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml -f docker-compose.override.yml up -d
Troubleshooting¶
If Kapacitor build fails with the ‘broken pipe’ related errors then add the following line to the conda_requirements.txt([WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt
) and retry the build:
scikit-learn==1.0.0
Time Series Python UDFs Development¶
In the DEV mode (DEV_MODE=true
in [WORK_DIR]/IEdgeInsights/build/.env
), the Python UDFs are being volume mounted inside the Kapacitor container image that is seen in it’s docker-compose-dev.override.yml
. This gives the flexibility for the developers to update their UDFs on the host machine and see the changes being reflected in Kapacitor. This is done by just restarting the Kapactior container without the need to rebuild the Kapacitor container image.