Telegraf Overview¶
Telegraf is a part of the TICKstack. It is a plugin-based agent that has many input and output plugins. In EII’s basic configuration, it’s being used for data ingestion and sending data to influxDB. However, the EII framework does not restrict any features of Telegraf.
Plugins¶
The plugin subscribes to a configured topic or topic prefixes. The plugin has a component called a subscriber, which receives the data from the EII message bus. After receiving the data, depending on the configuration, the plugin processes the data, either synchronously or asynchronously.
In synchronous processing**, the receiver thread (the thread that receives the data from the message bus) is also responsible for processing the data (JSON parsing). After processing the previous data only, the receiver thread processes the next data available on the message bus.
In asynchronous processing the receiver thread receives the data and put it into the queue. A pool of threads will be dequeuing the data from the queue and processing it.
Guidelines for choosing the data processing options are as follows:
Synchronous option:When the ingestion rate is consistent
Asynchronous options:There are two options
Topic-specific queue+threadpool:Frequent spike in ingestion rate for a specific topic
Global queue+threadpool:Sometimes spike in ingestion rate for a specific topic
Steps to Independently Build and Deploy the Telegraf Service¶
Note
For running 2 or more microservices, we recommend users to try the use case-driven approach for building and deploying as mentioned in Generate Consolidated Files for a Subset of Edge Insights for Industrial Services.
Steps to Independently Build the Telegraf Service¶
Note
When switching between independent deployment of the service with and without config manager agent service dependency, one would run into issues with docker-compose build
w.r.t Certificates folder existence. As a workaround, run the command sudo rm -rf Certificates
to proceed with docker-compose build
.
To independently build Telegraf service, complete the following steps:
The downloaded source code should have a directory named Telegraf:
cd IEdgeInsights/Telegraf
Copy the IEdgeInsights/build/.env file using the following command in the current folder
cp ../build/.env .
NOTE: Update the HOST_IP and ETCD_HOST variables in the .env file with your system IP.
# Source the .env using the following command: set -a && source .env && set +a
Independently build
docker-compose build
Steps to Independently Deploy the Telegraf Service¶
You can deploy the Telegraf service in any of the following two ways:
Deploy the Telegraf Service withoutthe Config Manager Agent Dependency¶
Run the following commands to deploy Telegraf service without Config Manager Agent dependency:
# Enter the Telegraf directory
cd IEdgeInsights/Telegraf
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .Note: Ensure that
docker ps
is clean anddocker network ls
doesn’t have EII bridge network.
Update .env file for following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value to `true` and `DEV_MODE` value to `true`.
Source the .env using the following command:
set -a && source .env && set +a
# Run the service
docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml up -d
Note
Telegraf container restarts automatically when its config is modified in config.json
file.
If user is updating the config.json file using vi or vim
editor, it is required to append the set backupcopy=yes
in ~/.vimrc
so that the changes done on the host machine config.json gets reflected inside the container mount point.
Deploy The Telegraf Service with the Config Manager Agent Dependency¶
Run the following commands to deploy the Telegraf service with the Config Manager Agent dependency:
Note
Ensure that the Config Manager Agent image present in the system. If not, build the Config Manager Agent locally when independently deploying the service with Config Manager Agent dependency.
# Enter the Telegraf directory
cd IEdgeInsights/Telegraf
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .
Note
Ensure that docker ps
is clean and docker network ls
does not have EII bridge network.
Update .env file for following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value is set to `false`.
Copy the docker-compose.yml from IEdgeInsights/ConfigMgrAgent as docker-compose.override.yml in IEdgeInsights/Telegraf.
cp ../ConfigMgrAgent/docker-compose.yml docker-compose.override.yml
Copy the builder.py with standalone mode changes from IEdgeInsights/build directory
cp ../build/builder.py .Run the builder.py in standalone mode, this will generate eii_config.json and update docker-compose.override.yml
python3 builder.py -s trueBuilding the service (This step is optional for building the service if not already done in the
Independently buildable
step above)docker-compose build
For running the service in PROD mode, run the below command:
NOTE: Make sure to update
DEV_MODE
tofalse
in .env while running in PROD mode and source the .env using the commandset -a && source .env && set +a
.docker-compose up -dFor running the service in DEV mode, run the below command:
NOTE: Make sure to update
DEV_MODE
totrue
in .env while running in DEV mode and source the .env using the commandset -a && source .env && set +a
.docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml -f docker-compose.override.yml up -d
Telegraf’s Default Configuration¶
Telegraf starts with the default configuration which is present at config/Telegraf/Telegraf.conf(
[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
) (for the dev mode the name is ‘Telegraf_devmode.conf’). By default the below plugins are enabled:
MQTT input plugin ([[inputs.mqtt_consumer]])
EII message bus input plugin ([[inputs.eii_msgbus]])
Influxdb output plugin ([[outputs.influxdb]])
Telegraf will be started using script ‘telegraf_start.py. This script will get the configuration from ETCD first and then it will start the Telegraf service by picking the right configuration depending on the developer/production mode. By default, only a single instance of the Telegraf container runs (named ‘ia_telegraf’).
Volume Mounting of Telegraf Configuration Files¶
In the DEV mode (DEV_MODE=true
in [WORK_DIR]/IEdgeInsights/build/.env
), telegraf conf files are being volume mounted inside the Telegraf container image as can be seen in it’s docker-compose-dev.override.yml
. This gives the flexibility for the developers to update the conf file on the host machine and see the changes being reflected in Telegraf, this can be done by just restarting the Telegraf container without the need to rebuilding the telegraf container image.
Note
If Telegraf is being run as Multi Telegraf, then make sure that the same file path is volume mounted. Ex: If volume mount Telegraf1 instance, the volume mount would look like:
volumes:
- ./config/Telegraf1/:/etc/Telegraf/Telegraf1
MQTT Sample Configuration and the Testing Tool¶
To test with MQTT publisher in k8s helm environment, update ‘MQTT_BROKER_HOST’ Environment Variables in values.yaml(
[WORK_DIR]/IEdgeInsights/Telegraf/helm/values.yaml
) with HOST IP address of the system where MQTT Broker is running.To test with remote MQTT broker in docker environment, update ‘MQTT_BROKER_HOST’ Environment Variables in docker-compose.yml(
[WORK_DIR]/IEdgeInsights/Telegraf/docker-compose.yml
) with HOST IP address of the system where MQTT Broker is running.
ia_telegraf:
environment:
...
MQTT_BROKER_HOST: '<HOST IP address of the system where MQTT Broker is running>'
Telegraf instance can be configured with pressure point data ingestion. In the following example, the MQTT input plugin of Telegraf is configured to read pressure point data and stored into the ‘point_pressure_data’ measurement.
# # Read metrics from MQTT topic(s) [[inputs.mqtt_consumer]] # ## MQTT broker URLs to be used. The format should be scheme://host:port, # ## schema can be tcp, ssl, or ws. servers = ["tcp://localhost:1883"] # # ## MQTT QoS, must be 0, 1, or 2 # qos = 0 # ## Connection timeout for initial connection in seconds # connection_timeout = "30s" # # ## Topics to subscribe to topics = [ "pressure/simulated/0", ] name_override = "point_pressure_data" data_format = "json" # # # if true, messages that can't be delivered while the subscriber is offline # # will be delivered when it comes back (such as on service restart). # # NOTE: if true, client_id MUST be set persistent_session = false # # If empty, a random client ID will be generated. client_id = "" # # ## username and password to connect MQTT server. username = "" password = ""
To start the MQTT-Publisher with pressure data,
cd ../tools/mqtt/publisher/
Change the command option in docker-compose.yml(
[WORK_DIR]/IEdgeInsights/tools/mqtt/publisher/docker-compose.yml
) to:["--pressure", "10:30"]
Build and Run MQTT Publisher:
docker-compose up --build -d
Refer to the tools/mqtt/publisher/README.md
Enable Message Bus Input Plugin in Telegraf¶
The purpose of this enablement is to allow Telegraf to receive data from the message bus and store it into influxdb which is scalable.
Plugin Configuration¶
Configuration of the plugin is divided as follows:
ETCD configuration
Configuration in Telegraf.conf file config/Telegraf/Telegraf.conf(
[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
)
ETCD Configuration¶
As an EII message bus plugin and part of the EII framework, the message bus configuration and plugin’s topic specific configuration is kept in ETCD.
Following is the sample configuration:
{
"config": {
"influxdb": {
"username": "admin",
"password": "admin123",
"dbname": "datain"
},
"default": {
"topics_info": [
"topic-pfx1:temperature:10:2",
"topic-pfx2:pressure::",
"topic-pfx3:humidity"
],
"queue_len": 10,
"num_worker": 2,
"profiling": "false"
}
},
"interfaces": {
"Subscribers": [
{
"Name": "default",
"Type": "zmq_tcp",
"EndPoint": "ia_zmq_broker:60515",
"Topics": [
"*"
],
"PublisherAppName": "ZmqBroker"
}
]
}
}
Brief Description of the Configuration.
EII’s Telegraf has ‘config’ and ‘interfaces’ sections, where:
“interfaces” are the EII interface details. “config” are:
config:Contains the configuration of the influxdb (“influxdb”) and EII message bus input plugin (“default”). In the above sample configuration, the ” default” is an instance name. This instance name is referenced from the Telegraf’s configuration file config/Telegraf/Telegraf.conf(
[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
).topics_info:This is an array of topic prefix configuration, where the user specifies, how the data from the topic prefix should be processed. Following is the interpretation of topic information in every line:
“topic-pfx1:temperature:10:2”:Process data from the topic prefix ‘topic-pfx1’ asynchronously using the dedicated queue of length 10 and dedicated thread pool of size 2. And the processed data will be stored at a measurement named ‘temperature’ in influxdb.
“topic-pfx2:pressure::”:Process data from the topic prefix ‘topic-pfx2’ asynchronously using the global queue and global thread pool. And the processed data will be stored at a measurement named ‘pressure’ in influxdb.
“topic-pfx3:humidity” : Process data synchronously. And the processed data will be stored at a measurement named ‘humidity’ in influxdb.
Note: If topic specific configuration is not mentioned, then by default, data is processed synchronously and the measurement name would be the same as the topic name.
queue_len:Global queue length.
num_worker: Global thread pool size.
profiling: This is to enable the profiling mode of this plugin (value can be either “true” or “false”). In profiling mode, every point will get the following information and will be kept in the same measurement:
Total time spent in plugin (time in ns)
Time spent in queue (in case of asynchronous processing only and time in ns)
Time spend in JSON processing (time in ns)
The name of the thread pool and the thread id which processed the point.
Note
: The name of the global thread pool is “GLOBAL”. For a topic specific thread pool, the name is “for-$topic-name”.*
Configuration at Telegraf.conf File¶
The plugin instance name is an additional key, kept in the plugin configuration section. This key is used to fetch the configuration from ETCD. The following is the minimum sample configuration with a single plugin instance:
[[inputs.eii_msgbus]]
**instance_name = "default"**
data_format = "json"
json_strict = true
Here, the value default
acts as a key in the file config. json([WORK_DIR]/IEdgeInsights/Telegraf/config.json
). For this key, there is a configuration in the interfaces
and config
sections of the file config. json([WORK_DIR]/IEdgeInsights/Telegraf/config.json
). So the value of instance_name
acts as a connect/glue between the Telegraf configuration config/Telegraf/Telegraf.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
) and the ETCD configuration config.json([WORK_DIR]/IEdgeInsights/Telegraf/config.json
).
Note
: As it is a Telegraf input plugin, Telegraf’s parser configuration must be in Telegraf. conf file. More information on the telegraf JSON parser plugin can be found at https://github.com/influxdata/telegraf/tree/master/plugins/parsers/json. If there are multiple Telegraf instances, then the location of the Telegraf configuration files would be different. For more details, refer to the Optional: Adding multiple Telegraf instance section.
Advanced: Multiple Plugin Sections of EII Message Bus Input Plugin¶
Multiple configuration sections of the message bus input plugin is kept in the config/Telegraf/Telegraf.conf(``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf``) file.
Following is an example:
Assume there are two EII apps, one with the AppName “EII_APP1” and another with the AppName “EII_APP2”, which are publishing the data to EII message bus.
The Telegraf’s ETCD configuration:
{
"config":{
"subscriber1":{
"topics_info":[
"topic-pfx1:temperature:10:2",
"topic-pfx2:pressure::",
"topic-pfx3:humidity"
],
"queue_len":100,
"num_worker":5,
"profiling":"true"
},
"subscriber2":{
"topics_info":[
"topic-pfx21:temperature2:10:2",
"topic-pfx22:pressure2::",
"topic-pfx23:humidity2"
],
"queue_len":100,
"num_worker":10,
"profiling":"true"
}
},
"interfaces":{
"Subscribers":[
{
"Name":"subscriber1",
"EndPoint":"EII_APP1_container_name:5569",
"Topics":[
"*"
],
"Type":"zmq_tcp",
"PublisherAppName": "EII_APP1"
},
{
"Name":"subscriber2",
"EndPoint":"EII_APP2_container_name:5570",
"Topics":[
"topic-pfx21",
"topic-pfx22",
"topic-pfx23"
],
"Type":"zmq_tcp",
"PublisherAppName": "EII_APP2"
}
]
}
}
The Telegraf.conf configuration sections:
[[inputs.eii_msgbus]]
instance_name = "subscriber1"
data_format = "json"
json_strict = true
[[inputs.eii_msgbus]]
instance_name = "subscriber2"
data_format = "json"
json_strict = true
Using Input Plugins¶
By default, the message bus input plugin is disabled. To configure the EII input plugin, uncomment the following lines in config/Telegraf/Telegraf.conf(``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf``) and config/Telegraf/Telegraf_devmode.conf(``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf_devmode.conf``)
```sh
[[inputs.eii_msgbus]]
instance_name = "default"
data_format = "json"
json_strict = true
tag_keys = [
"my_tag_1",
"my_tag_2"
]
json_string_fields = [
"field1",
"field2"
]
json_name_key = ""
```
Edit config.json(
[WORK_DIR]/IEdgeInsights/Telegraf/config.json
) to add message bus input plugin.{ "config": { ... "default": { "topics_info": [ "topic-pfx1:temperature:10:2", "topic-pfx2:pressure::", "topic-pfx3:humidity" ], "queue_len": 10, "num_worker": 2, "profiling": "false" }, ... }, "interfaces": { "Subscribers": [ { "Name": "default", "Type": "zmq_tcp", "EndPoint": "ia_zmq_broker:60515", "Topics": [ "*" ], "PublisherAppName": "ZmqBroker" } ], ... } }
Enable Message Bus Output Plugin in Telegraf¶
Purpose
To receive the data from Telegraf Input Plugin and publish the data to EII msgbus.
Configuration of the Plugin
Configuration of the plugin is divided into two parts:
ETCD configuration
Configuration in Telegraf.conf file config/Telegraf/Telegraf.conf(
[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
)
ETCD configuration
As an EII message bus plugin and part of the EII framework, the message bus configuration and plugin’s topic specific configuration is kept in ETCD.
Following is the sample configuration:
{
"config": {
"publisher": {
"measurements": ["*"],
"profiling": "false"
}
},
"interfaces": {
"Publishers": [
{
"Name": "publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65077",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
]
}
}
Brief Description of the Configuration
EII’s Telegraf has ‘config’ and ‘interfaces’ sections, where:
“interfaces” are the EII interface details. “config” are:
config:Contains EII message bus output plugin (“publisher”). In the above sample configuration, the “publisher” is an instance name. This instance name is referenced from the Telegraf’s configuration file config/Telegraf/Telegraf.conf(
[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
)measurements:This is an array of measurements configuration, where user specifies, which measurement data should be published in message bus.
profiling:This is to enable the profiling mode of this plugin (value can be either “true” or “false”).
Configuration in Telegraf.conf File
The plugin instance name is an additional key, kept in the plugin configuration section. This key is used to fetch the configuration from ETCD. Following is the minimum, sample configuration with a single plugin instance:
[[outputs.eii_msgbus]]
instance_name = "publisher"
Here, the value ‘publisher’ acts as a key in the file config.json(``[WORK_DIR]/IEdgeInsights/Telegraf/config.json``). The configuration in the ‘interfaces’ and ‘config’ sections of the file config.json(``[WORK_DIR]/IEdgeInsights/Telegraf/config.json``) has this key. Here the value of ‘instance_name’ acts as a connect/glue between the Telegraf configuration config/Telegraf/Telegraf.conf(``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf``) and the ETCD configuration config.json(``[WORK_DIR]/IEdgeInsights/Telegraf/config.json``)
Advanced: Multiple Plugin Sections of Message Bus Output Plugin¶
Multiple configuration sections of the message bus output plugin is kept in the config/Telegraf/Telegraf.conf(``[WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf``) file.
The Telegraf’s ETCD configuration:
{
"config": {
"publisher1": {
"measurements": ["*"],
"profiling": "false"
},
"publisher2": {
"measurements": ["*"],
"profiling": "false"
}
},
"interfaces": {
"Publishers": [
{
"Name": "publisher1",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65077",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
},
{
"Name": "publisher2",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65078",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
]
}
}
The Telegraf.conf configuration:
[[outputs.eii_msgbus]]
instance_name = "publisher1"
[[outputs.eii_msgbus]]
instance_name = "publisher2"
Run Telegraf Input Output Plugin in IPC Mode¶
Modify the interfaces section of config.json(``[WORK_DIR]/IEdgeInsights/Telegraf/config.json``) to run in IPC mode:
"interfaces": {
"Subscribers": [
{
"Name": "default",
"Type": "zmq_ipc",
"EndPoint": {
"SocketDir": "/EII/sockets",
"SocketFile": "backend-socket"
},
"Topics": [
"*"
],
"PublisherAppName": "ZmqBroker"
}
],
"Publishers": [
{
"Name": "publisher",
"Type": "zmq_ipc",
"EndPoint": {
"SocketDir": "/EII/sockets",
"SocketFile": "telegraf-out"
},
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
]
}
To Add Multiple Telegraf Instances (Optional)¶
Users can add multiple instances of Telegarf. To do this, user needs to add environment variable named ‘ConfigInstance’ in the docker-compose.yml file. For every additional telegraf instance, there must be an additional compose section in the docker-compose.yml file.
The configuration for every instance must be in the telegraf image. Following are the standard instances:
For instance named $ConfigInstance the telegraf configuration has to be kept in the repository at config(
[WORK_DIR]/IEdgeInsights/Telegraf/config
)/$ConfigInstance/$ConfigInstance.conf (for production mode) and config([WORK_DIR]/IEdgeInsights/Telegraf/config
)/$ConfigInstance/$ConfigInstance_devmode.conf (for developer mode).The same files will be available inside the respective container at ‘/etc/Telegraf/$ConfigInstance/$ConfigInstance.conf’ (for production mode) ‘/etc/Telegraf/$ConfigInstance/$ConfigInstance_devmode.conf’ (for developer mode)
Following are some examples:
Example: For $ConfigInstance = ‘Telegraf1’
The location of the Telegraf configuration is config(
[WORK_DIR]/IEdgeInsights/Telegraf/config
)/Telegraf1/Telegraf1.conf (for production mode) and config([WORK_DIR]/IEdgeInsights/Telegraf/config
)/Telegraf1/Telegraf1_devmode.conf (for developer mode) -The additional docker compose section, which is manually added in the file ‘docker-compose.yml’ is:
ia_telegraf1:
build:
context: $PWD/../Telegraf
dockerfile: $PWD/../Telegraf/Dockerfile
args:
EII_VERSION: ${EII_VERSION}
EII_UID: ${EII_UID}
EII_USER_NAME: ${EII_USER_NAME}
TELEGRAF_SOURCE_TAG: ${TELEGRAF_SOURCE_TAG}
TELEGRAF_GO_VERSION: ${TELEGRAF_GO_VERSION}
UBUNTU_IMAGE_VERSION: ${UBUNTU_IMAGE_VERSION}
CMAKE_INSTALL_PREFIX: ${EII_INSTALL_PATH}
container_name: ia_telegraf1
hostname: ia_telegraf1
image: ${DOCKER_REGISTRY}edgeinsights/ia_telegraf:${EII_VERSION}
restart: unless-stopped
ipc: "none"
security_opt:
- no-new-privileges
read_only: true
healthcheck:
test: ["CMD-SHELL", "exit", "0"]
interval: 5m
environment:
AppName: "Telegraf"
ConfigInstance: "Telegraf1"
CertType: "pem,zmq"
DEV_MODE: ${DEV_MODE}
no_proxy: "${ETCD_HOST},ia_influxdbconnector"
NO_PROXY: "${ETCD_HOST},ia_influxdbconnector"
ETCD_HOST: ${ETCD_HOST}
ETCD_CLIENT_PORT: ${ETCD_CLIENT_PORT}
MQTT_BROKER_HOST: ${HOST_IP}
INFLUX_SERVER: ${HOST_IP}
INFLUXDB_PORT: $INFLUXDB_PORT
ETCD_PREFIX: ${ETCD_PREFIX}
ports:
- 65078:65078
networks:
- eii
volumes:
- "vol_temp_telegraf:/tmp/"
- "vol_eii_socket:${SOCKET_DIR}"
- ./Certificates/Telegraf:/run/secrets/Telegraf
- ./Certificates/rootca:/run/secrets/rootca
Note
: If user wants to add Telegraf output plugin in Telegraf instance, modify config.json([WORK_DIR]/IEdgeInsights/Telegraf/config.json
), docker-compose.yml([WORK_DIR]/IEdgeInsights/Telegraf/docker-compose.yml
) and Telegraf configuration(.conf) files.
Add publisher configuration in config.json(
[WORK_DIR]/IEdgeInsights/Telegraf/config.json
):
{
"config": {
...,
"<output plugin instance_name>": {
"measurements": ["*"],
"profiling": "true"
}
},
"interfaces": {
...,
"Publishers": [
...,
{
"Name": "<output plugin instance_name>",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:<publisher port>",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
]
}
}
Example:
{
"config": {
...,
"publisher1": {
"measurements": ["*"],
"profiling": "true"
}
},
"interfaces": {
...,
"Publishers": [
...,
{
"Name": "publisher1",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65078",
"Topics": [
"*"
],
"AllowedClients": [
"*"
]
}
]
}
}
Expose “publisher port” in docker-compose.yml(
[WORK_DIR]/IEdgeInsights/Telegraf/docker-compose.yml
) file:ia_telegraf<ConfigInstance number>: ... ports: - <publisher port>:<publisher port>
Example:
ia_telegraf<ConfigInstance number>:
...
ports:
- 65078:65078
Add eii_msgbus output plugin in Telegraf instance configuration file config(
[WORK_DIR]/IEdgeInsights/Telegraf/config
)/$ConfigInstance/$ConfigInstance.conf (for production mode) and config([WORK_DIR]/IEdgeInsights/Telegraf/config
)/$ConfigInstance/$ConfigInstance_devmode.conf (for developer mode).[[outputs.eii_msgbus]] instance_name = “
“
Example:
For $ConfigInstance = ‘Telegraf1’
User needs to add the following section in config(
[WORK_DIR]/IEdgeInsights/Telegraf/config
)/Telegraf1/Telegraf1.conf (for production mode) and config([WORK_DIR]/IEdgeInsights/Telegraf/config
)/Telegraf1/Telegraf1_devmode.conf (for developer mode):[[outputs.eii_msgbus]] instance_name = “publisher1”
To allow the changes to the docker file to take place, run the builder.py script command.
cd [WORK_DIR]/IEdgeInsights/build python3 builder.py
Following are the commands to provision, build and bring up all the containers:
cd [WORK_DIR]/IEdgeInsights/build/ docker-compose build docker-compose up -d
Based on the previous example, the user can check that the Telegraf service will have multiple containers using the docker ps command.
Note: The additional configuration is kept in Telegraf/config/$ConfigInstance/telegraf.d
in a modular way. For example, create a directory telegraf.d
in Telegraf/config/config/$ConfigInstance
.
mkdir config/$ConfigInstance/telegraf.d
cd config/$ConfigInstance/telegraf.d
Additional configuration files are kept inside the directory and the following command is used to start the Telegraf in the docker-compose.yml file:
command: ["telegraf -config=/etc/Telegraf/$ConfigInstance/$ConfigInstance.conf -config-directory=/etc/Telegraf/$ConfigInstance/telegraf.d"]
Overview of the Kapacitor¶
Introduction to the Point-Data Analytics(Time Series Data)¶
Any integral value that gets generated over time is point data. For examples:
Temperature at a different time in a day.
Number of oil barrels processed per minute.
By doing the analytics over point data, the factory can have an anomaly detection mechanism where PointDataAnalytics is considered.
IEdgeInsights uses the TICK stack to do point data analytics.
It has temperature anomaly detection, an example for demonstrating the time-series data analytics flow.
The high-level flow of the data is:
MQTT-temp-sensor–>Telegraf–>Influx–>Kapacitor–>Influx.
MQTT-temp-sensor simulator sends the data to the Telegraf. Telegraf sends the same data to Influx and Influx sends it to Kapacitor. Kapacitor does anomaly detection and publishes the results back to Influx.
Here, Telegraf is the TICK stack component and supports the number of input plug-ins for data ingestion. Influx is a time-series database. Kapacitor is an analytics engine where users can write custom analytics plug-ins (TICK scripts).
Starting the Example¶
To start the mqtt-temp-sensor, refer to tools/mqtt-publisher/README.md.
If System Integrator (SI) wants to use the IEdgeInsights only for point data analytics, then analyze Video use case containers ia_video_ingestion and ia_video_analytics in
../build/docker-compose.yml
Starting the EII.
To start the EII in production mode, provisioning must be done. Following are the commands to be executed after provisioning:
cd build docker-compose build docker-compose up -d
To start the EII in developer mode, refer to README.
To verify the output, check the output of the following commands:
docker logs -f ia_influxdbconnector
Following is the snapshot of sample output of the ia_influxdbconnector command.
I0822 09:03:01.705940 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:01.927094 1 pubManager.go:111] Published message: map[data:point_classifier_results,host=ia_telegraf,topic=temperature/simulated/0 temperature=19.29358085726703,ts=1566464581.6201317 1566464581621377117] I0822 09:03:02.704000 1 pubManager.go:111] Published message: map[data:point_data,host=ia_telegraf,topic=temperature/simulated/0 ts=1566464582.6218634,temperature=27.353740759929877 1566464582622771952]
Purpose of the Telegraf¶
Telegraf is a data entry point for IEdgeInsights. It supports many input plugins, which is used for point data ingestion. In the previous example, the MQ Telemetry Transport (MQTT) input plugin of Telegraf is used. Following are the configurations of the plugins:
# # Read metrics from MQTT topic(s)
[[inputs.mqtt_consumer]]
# ## MQTT broker URLs to be used. The format should be scheme://host:port,
# ## schema can be tcp, ssl, or ws.
servers = ["tcp://localhost:1883"]
#
# ## MQTT QoS, must be 0, 1, or 2
# qos = 0
# ## Connection timeout for initial connection in seconds
# connection_timeout = "30s"
#
# ## Topics to subscribe to
topics = [
"temperature/simulated/0",
]
name_override = "point_data"
data_format = "json"
#
# # if true, messages that can't be delivered while the subscriber is offline
# # will be delivered when it comes back (such as on service restart).
# # NOTE: If true, client_id MUST be set
persistent_session = false
# # If empty, a random client ID will be generated.
client_id = ""
#
# ## username and password to connect MQTT server.
username = ""
password = ""
In the production mode, the Telegraf configuration file is
Telegraf/config/telegraf.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf.conf
) and in developer mode,
the Telegraf configuration file is
Telegraf/config/telegraf_devmode.conf([WORK_DIR]/IEdgeInsights/Telegraf/config/Telegraf/Telegraf_devmode.conf
).
For more information on the supported input and output plugins refer to https://docs.influxdata.com/telegraf/v1.10/plugins/
Purpose of the Kapacitor¶
About Kapacitor and UDF
You can write the custom anomaly detection algorithm in PYTHON/GOLANG. And these algorithms are called as UDF (user-defined function). These algorithms follow certain API standards for the Kapacitor to call these UDFs at run time.
IEdgeInsights has the sample UDF (user-defined function) written in GOLANG. Kapacitor is subscribed to the InfluxDB, and gets the temperature data. After fetching this data, Kapacitor calls the UDF, which detects the anomaly in the temperature and sends back the results to Influx.
The sample Go UDF is at go_classifier.go(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/go_classifier.go
) and the TICKscript is at go_point_classifier.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/go_point_classifier.tick
)The sample Python UDF is at py_classifier.py(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs/py_classifier.py
) and the TICKscript is at py_point_classifier.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/py_point_classifier.tick
)For more details, on Kapacitor and UDF, refer to the following links:
Writing a sample UDF at anomaly detection
UDF and kapacitor interaction socket_udf
In production mode, the Kapacitor configuration file is Kapacitor/config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in developer mode, the Kapacitor configuration file is Kapacitor/config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
).
Custom UDFs available in the UDF([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) Directory¶
UNIX Socket based UDFs
go_classifier.go:Filters the points based on temperature (data >20 and <25 filtered out).
py_classifier.py:Filters the points based on temperature (data >20 and <25 filtered out).
profiling_udf.go:Add the profiling (time taken to process the data) data in the points.
temperature_classifier.go:Filter the points based on temperature (data <25 filtered out).
humidity_classifier.py:Filter the points based on humidity (data <25 filtered out).
Process based UDFs
rfc_classifier.py:Random Forest Classification algo sample. This UDF is used for profiling udf as well.
Steps to Configure the UDFs in the Kapacitor¶
Keep the custom UDFs in the udfs(
[WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory and the TICKscript in the tick_scripts([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts
) directory.Keep the training data set (if any) required for the custom UDFs in the training_data_sets(
[WORK_DIR]/IEdgeInsights/Kapacitor/training_data_sets
) directory.For python UDFs, any external python package dependency needs to be installed. To install the python package using pip, it can be added in the requirements.txt(
[WORK_DIR]/IEdgeInsights/Kapacitor/requirements.txt
) file and to install the python package using conda, it can be added in the conda_requirements.txt([WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt
) file.Modify the UDF section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). Mention the custom UDF in the configuration, for example:[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
For go or python based UDF, update the values of keys named “type”, “name”, “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file, for example:"task": [{ "tick_script": "py_point_classifier.tick", "task_name": "py_point_classifier", "udfs": [{ "type": "python", "name": "py_classifier" }] }]
For TICKscript only UDF, update the values of keys named “tick_script”, “task_name”, in the config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
)file, for example:"task": [{ "tick_script": "simple_logging.tick", "task_name": "simple_logging" }]
Note:
By default, go_classifier and rfc_classifier is configured.
Mention the TICKscript UDF function same as configured in the Kapacitor configuration file. For example, UDF Node in the TICKscript:
@py_point_classifier()
should be same as
[udf.functions.py_point_classifier] socket = "/tmp/socket_file" timeout = "20s"
go or python based UDF should listen on the same socket file as mentioned in the the UDF section in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). For example:[udf.functions.customUDF] socket = "/tmp/socket_file" timeout = "20s"
For a process based UDFs, provide the correct path of the code within the container in the kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and in the kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
). By default, the files and directories will be copied into the container under the “/OpenEII” directory. It is recommended to keep the custom UDFs in the udfs([WORK_DIR]/IEdgeInsights/Kapacitor/udfs
) directory, the path of the custom UDF will be “/OpenEII/udfs/customUDF_name” as shown in below example. If the UDF is kept in different path, modify the path in the args accordingly.The PYTHONPATH of the Kapacitor agent directory is “/OpenEII/go/src/github.com/influxdata/kapacitor/udf/agent/py/”. Following example shows how to pass:
[udf.functions.customUDF] prog = "python3.7" args = ["-u", "/EII/udfs/customUDF"] timeout = "60s" [udf.functions.customUDF.env] PYTHONPATH = "/go/src/github.com/influxdata/kapacitor/udf/agent/py/"
Perform the provisioning and run the EII stack.
Steps to Run the Samples of Multiple UDFs in a Single Task and Multiple Tasks using Single UDF¶
Refer to the samples/README
Kapacitor Input and Output Plugins¶
Purpose of Plugins¶
The plugins allow Kapacitor to interact directly with EII Message Bus. They use message bus publisher or subscriber interface. Using these plugins Kapacitor receives data from various EII publishers and sends data to various EII subscribers. Hence, it’s possible to have a time-series use case without InfluxDB and Kapacitor as independent analytical engine.
A simple use case flow is as follows:
MQTT-temp-sensor–>Telegraf–>Kapacitor–>TimeseriesProfiler
Using Input Plugin¶
Following are the steps using input Plugins:
Configure the EII input plugin in config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
)
For example:
[eii]
enabled = true
Edit config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) to add a subscriber under interfaces.
For example, to receive data published by Telegraf:
TCP mode
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_tcp",
"EndPoint": "ia_telegraf:65077",
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
**IPC mode**
"Subscribers": [
{
"Name": "telegraf_sub",
"Type": "zmq_ipc",
"EndPoint": {
"SocketDir": "/EII/sockets",
"SocketFile": "telegraf-out"
},
"PublisherAppName": "Telegraf",
"Topics": [
"*"
]
}
]
Note
For IPC mode, we need to specify the ‘EndPoint’ as a dict of ‘SocketDir’ and ‘SocketFile’, where ‘Topics’ is [*] (as in the previous example). For a single topic the ‘EndPoint’ is [**] (as in the example of Kapacitor output plugin).
"EndPoint": "/EII/sockets"
The received data is available in the 'EII' storage for the TICKscript.
Create or modify a TICKscript to process the data and configure the same in config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
). For example, use the stock tick_scripts/eii_input_plugin_logging.tick([WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/eii_input_plugin_logging.tick
) which logs the data received from ‘EII’ storage onto the kapacitor log file (residing in the container at /mp/log/kapacitor/kapacitor.log)."task": [ { "tick_script": "eii_input_plugin_logging.tick", "task_name": "eii_input_plugin_logging" } ]
Perform the provisioning and run the EII stack.
The subscribed data is available in the previous log file, which is seen by executing the following command:
docker exec ia_kapacitor tail -f /tmp/log/kapacitor/kapacitor.log
Using Output Plugin¶
Following are the steps using output Plugins:
Create or modify a TICKscript to use ‘eiiOut’ node to send the data using publisher interface. Following is an example to modify the profiling UDF:
dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('point_data') @profiling_udf() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Add a publisher interface to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic that is ‘sample_publisher’ and ‘sample_topic’ respectively as seen in the previously example.
For example:
TCP mode
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_tcp",
"EndPoint": "0.0.0.0:65034",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
**IPC mode**
"Publishers": [
{
"Name": "sample_publisher",
"Type": "zmq_ipc",
"EndPoint": "/EII/sockets",
"Topics": [
"sample_topic"
],
"AllowedClients": [
"TimeSeriesProfiler"
]
}
]
Perform provisioning service and run the EII stack.
Using Input or Output Plugin with RFC UDF¶
Following are the steps using input or output Plugins with RFC UDF:
Add the RFC task to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
):"task": [ { "tick_script": "rfc_task.tick", "task_name": "random_forest_sample", "udfs": [{ "type": "python", "name": "rfc_classifier" }] } ]
Modify the rfc_task.tick(
[WORK_DIR]/IEdgeInsights/Kapacitor/tick_scripts/rfc_task.tick
) as seen in the following example:dbrp "eii"."autogen" var data0 = stream |from() .database('eii') .retentionPolicy('autogen') .measurement('ts_data') |window() .period(3s) .every(4s) data0 @rfc() |eiiOut() .pubname('sample_publisher') .topic('sample_topic')
Modify the Kapacitor conf files Kapacitor/config/kapacitor.conf(
[WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor.conf
) and Kapacitor/config/kapacitor_devmode.conf([WORK_DIR]/IEdgeInsights/Kapacitor/config/kapacitor_devmode.conf
) UDF section to remove Go classifier UDF related configurations since this conflicts with the existing python RFC UDF configuration:# Configuration for UDFs (User Defined Functions) [udf.functions] # [udf.functions.go_point_classifier] # socket = "/tmp/point_classifier" # timeout = "20s"
Add a publisher interface to config.json(
[WORK_DIR]/IEdgeInsights/Kapacitor/config.json
) with the same publisher name and topic that is ‘sample_publisher’ and ‘sample_topic’ respectively as in the previous example.For example:
TCP mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_tcp", "EndPoint": "0.0.0.0:65034", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
IPC mode
"Publishers": [ { "Name": "sample_publisher", "Type": "zmq_ipc", "EndPoint": "/EII/sockets", "Topics": [ "sample_topic" ], "AllowedClients": [ "TimeSeriesProfiler", "EmbSubscriber", "GoSubscriber" ] } ]
Perform provisioning service, build and run the EII stack.
Steps to Independently Build and Deploy the Kapacitor Service¶
Note
For running 2 or more microservices, we recommend that users try the use case-driven approach for building and deploying as mentioned in Generate Consolidated Files for a Subset of Edge Insights for Industrial Services
Steps to Independently Build the Kapacitor Service¶
Note
When switching between independent deployment of the service with and without the configuration manager agent service dependency, one would run into issues with the docker-compose build
w.r.t the Certificates folder existence. As a workaround, run the command sudo rm -rf Certificates
to proceed with docker-compose build
.
To independently build the Kapacitor service, complete the following steps:
The downloaded source code should have a directory named Kapacitor:
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder
cp ../build/.env .
NOTE: Update the HOST_IP and ETCD_HOST variables in the .env file with your system IP.
# Source the .env using the following command: set -a && source .env && set +a
Independently build
docker-compose build
Steps to Independently Deploy the Kapacitor Service¶
You can deploy the Kapacitor service in any of the following two ways:
Deploy the Kapacitor Service without the Config Manager Agent Dependency¶
Run the following commands to deploy the Kapacitor service without Configuration Manager Agent dependency:
# Enter the Kapacitor directory
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .Note: Ensure that
docker ps
is clean anddocker network ls
must not have EII bridge network.
Update .env file for the following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value to `true` and `DEV_MODE` value to `true`.
Source the .env using the following command:
set -a && source .env && set +a
# Run the service
docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml up -d
Note
Kapacitor container restarts automatically when its config is modified in config.json
file.
If user is updating the config.json file using vi or vim
editor, it is required to append the set backupcopy=yes
in ~/.vimrc
so that the changes done on the host machine config.json gets reflected inside the container mount point.
Deploy the Kapacitor Service with the Config Manager Agent Dependency¶
Run the following commands to deploy the Kapacitor service with the Config Manager Agent dependency:
Note
Ensure that the Config Manager Agent image present in the system. If not, build the Config Manager Agent locally when independently deploying the service with Config Manager Agent dependency.
# Enter the Kapacitor directory
cd IEdgeInsights/Kapacitor
Copy the IEdgeInsights/build/.env file using the following command in the current folder, if not already present.
cp ../build/.env .Note: Ensure that
docker ps
is clean anddocker network ls
doesn’t have EII bridge networks.
Update .env file for following:
1. HOST_IP and ETCD_HOST variables with your system IP.
2. `READ_CONFIG_FROM_FILE_ENV` value is set to `false`.
Copy the docker-compose.yml from IEdgeInsights/ConfigMgrAgent as docker-compose.override.yml in IEdgeInsights/Kapacitor.
cp ../ConfigMgrAgent/docker-compose.yml docker-compose.override.yml
Copy the builder.py with standalone mode changes from IEdgeInsights/build directory
cp ../build/builder.py
Run the builder.py in standalone mode, this will generate eii_config.json and update docker-compose.override.yml
python3 builder.py -s trueBuilding the service (This step is optional for building the service if not already done in the
Independently buildable
step above)docker-compose build
For running the service in PROD mode, run the below command:
NOTE: Make sure to update
DEV_MODE
tofalse
in .env while running in PROD mode and source the .env using the commandset -a && source .env && set +a
.docker-compose up -dFor running the service in DEV mode, run the below command:
NOTE: Make sure to update
DEV_MODE
totrue
in .env while running in DEV mode and source the .env using the commandset -a && source .env && set +a
.docker-compose -f docker-compose.yml -f docker-compose-dev.override.yml -f docker-compose.override.yml up -d
Troubleshooting¶
If Kapacitor build fails with the ‘broken pipe’ related errors then add the following line to the conda_requirements.txt([WORK_DIR]/IEdgeInsights/Kapacitor/conda_requirements.txt
) and retry the build:
scikit-learn==1.0.0
Time Series Python UDFs Development¶
In the DEV mode (DEV_MODE=true
in [WORK_DIR]/IEdgeInsights/build/.env
), the Python UDFs are being volume mounted inside the Kapacitor container image that is seen in it’s docker-compose-dev.override.yml
. This gives the flexibility for the developers to update their UDFs on the host machine and see the changes being reflected in Kapacitor. This is done by just restarting the Kapactior container without the need to rebuild the Kapacitor container image.
DataStore¶
For more details see the DataStore Microservice section.