What is Kukur?
Kukur makes time series data and metadata available to the Apache Arrow ecosystem. Kukur can be used as a Python library or as a standalone application that exposes an Arrow Flight interface.
Kukur is under active development. Breaking changes to the interfaces are possible. Kukur uses semantic versioning.
While |
Potential usage scenarios are:
-
ad-hoc in a data project
-
as a time series data integration hub on your own system
-
as a centrally managed time series data integration hub
-
as a library in a Python application that needs time series data
Getting Started
This example shows how to:
-
run Kukur in a data project,
-
expose a CSV file through it
-
and connect to it using the Kukur client.
The only prerequisite is a working Python 3 installation. Minor changes to the shell commands, but not the Kukur configuration, are required depending on your OS.
Installation
First create a new directory and enter it:
$ mkdir data-project
$ cd data-project
Create a Python virtualenv and activate it:
$ python -m venv venv
$ source venv/bin/activate
Install Kukur and list the command line options to verify that the installation was OK:
(venv) $ pip install kukur
(venv) $ kukur --help
usage: kukur [-h] [--config-file CONFIG_FILE] {flight,test,api-key} ...
Start Kukur.
positional arguments:
{flight,inspect,test,api-key}
Select the CLI action
flight Enable the Arrow Flight interface (the default)
inspect List resources in a blob store and determine their schema
test Test data source connectivity
api-key Create an api key for the Arrow Flight interface
optional arguments:
-h, --help show this help message and exit
--config-file CONFIG_FILE
Path to the configuration file
Kukur supports "extras" at install time. Use these to install requirements for the given source.
-
[adodb]
for connections to ADODB and OLEDB data sources -
[cratedb]
for connections to CrateDB -
[datafusion]
for connections using Apache Arrow DataFusion -
[delta]
for connections to Delta Lakes -
[influxdb]
for connections to InfluxDB -
[inspect]
for inspection of storage containers -
[kusto]
for connections to Azure Data Explorer -
[numpy]
for creating simulator sources -
[odbc]
for connections to ODBC data sources -
[piwebapi]
for connections to PI Web API -
[postgresql]
for connections to PostgreSQL -
[redshift]
for connections to Redshift
For example:
(venv) $ pip install kukur[adodb,odbc]
Configuration
Kukur connects to many different local or remote time series sources. A local CSV file will be used in this example.
Create a directory data/
:
(venv) $ mkdir data
Add the CSV data in data/example.csv
:
data/example.csv
outside-temperature,2020-01-02T00:00:00Z,1 outside-temperature,2020-01-02T01:00:00Z,2 outside-temperature,2020-01-02T02:00:00Z,3
The next step is to configure a Kukur data source that exposes this CSV to any client.
Kukur uses TOML as its configuration language.
Create Kukur.toml
in the root folder of your project.
Define a source called 'example' that exposes the data/example.csv
CSV:
Kukur.toml
[source.example]
type = "csv"
path = "data/example.csv"
Use the Kukur CLI to test connectivity:
(venv) $ kukur test data \
--source example \
--name outside-temperature \
--start 2020-01-01 \
--end 2021-01-01
2021-03-29 11:12:37,855 INFO kukur.source.test MainThread : Requesting data for "outside-temperature (example)" from 2020-01-01 00:00:00 to 2021-01-01 00:00:00
2020-01-02T00:00:00+00:00,1
2020-01-02T01:00:00+00:00,2
2020-01-02T02:00:00+00:00,3
The Kukur CLI logs to stderr, while the data itself is printed to stdout.
The |
Now, having this data is useful, but where Kukur sets itself apart is that it also provides an opinionated interface for metadata.
Is the outside temperature defined in Kelvin? Unless we’re probing a spacecraft at the dark side of the moon, this is unlikely, but there is no way to know.
Our thermometer probably has physical measurement limits as well. When values outside the -20 deg C to 60 deg C scale that this particular thermometer supports appear, using them is probably not a good idea.
Similarly, the person that was writing down the measurements is not able to read the values with infinite accuracy. At best, there will be a 0.5 deg C accuracy margin for any measurement.
Many time series sources expose this kind of metadata and Kukur can read it.
Let’s create another CSV file:
data/example-metadata.csv
series name,description,unit,physical lower limit,physical upper limit,accuracy outside-temperature,Temperature in Antwerp,deg C,-20,60,0.5
Kukur can mix-and-match metadata. For example, data can be stored in an InfluxDB database, while descriptions of the measurements are stored in a CSV file, but the sensor limits are stored in MS SQL database.
Update the configuration:
Kukur.toml
[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"
Request the metadata using the CLI:
(venv) $ kukur test metadata \
--source example \
--name outside-temperature
2021-03-29 11:41:48,936 INFO kukur.source.test MainThread : Requesting metadata for "outside-temperature (example)"
series name,description,unit,physical lower limit,physical upper limit,functional lower limit,functional upper limit,accuracy,interpolation type,data type,dictionary name,dictionary
outside-temperature,Temperature in Antwerp,deg C,-20.0,60.0,,,0.5,,,,
Many fields are blank because our CSV file did not contain them.
The interpolation type for example is a very important piece of metadata. When resampling values of multiple time series to the same timestamps, using linear interpolation most likely results in different values than using stepwise interpolation. |
Using the Kukur Client
Now, having validated the Kukur configuration, let’s start the Kukur server:
(venv) $ kukur
Open another shell, enter the virtualenv, start Python and import all Kukur objects:
$ source venv/bin/activate
(venv) $ python
Python 3.9.2 (default, Feb 20 2021, 18:40:11)
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kukur import *
Let’s try to request the metadata:
>>> client = Client()
>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "data-project/venv/lib/python3.9/site-packages/kukur/client.py", line 73, in get_metadata
results = list(
File "pyarrow/_flight.pyx", line 1239, in do_action
File "pyarrow/_flight.pyx", line 66, in pyarrow._flight.check_flight_status
pyarrow._flight.FlightUnauthenticatedError: gRPC returned unauthenticated error, with message: invalid token. Detail: Unauthenticated
Kukur is secure by default and does not allow unauthenticated access. Since we’re running Kukur locally, it’s OK to enable anonymous access.
First stop Kukur.
Then add a [flight]
section to the configuration in Kukur.toml
:
Kukur.toml
[flight]
authentication = false
[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"
Restart Kukur:
(venv) $ kukur
Alternatively, use kukur api-key
to define local API keys.
Now, go back to the Python session and request the metadata:
>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Metadata(SeriesSelector(source='example', name='outside-temperature'), {'description': 'Temperature in Antwerp', 'unit': 'deg C', 'physical lower limit': -20.0, 'physical upper limit': 60.0, 'functional lower limit': None, 'functional upper limit': None, 'accuracy': 0.5, 'interpolation type': None, 'data type': None, 'dictionary name': None, 'dictionary': None})
Finally, read the data:
>>> from datetime import datetime
>>> client.get_data(
SeriesSelector('example', 'outside-temperature'),
datetime.fromisoformat('2020-01-01T00:00:00+00:00'),
datetime.fromisoformat('2021-01-01T00:00:00+00:00'),
)
pyarrow.Table
ts: timestamp[us, tz=UTC]
value: int64
Data is always returned as an Apache Arrow table with two columns: a timestamp and a value.
>>> table = _
>>> table.to_pydict()
{'ts': [datetime.datetime(2020, 1, 2, 0, 0, tzinfo=<UTC>), datetime.datetime(2020, 1, 2, 1, 0, tzinfo=<UTC>), datetime.datetime(2020, 1, 2, 2, 0, tzinfo=<UTC>)], 'value': [1, 2, 3]}
Using Kukur, we now have metadata and data in a format that allows us to correctly analyze our outside temperature.
More importantly: we have made data access scalable,
as the Kukur configuration can be used the next time data is needed.
To do so we can store the Kukur configuration in a version control system, such as git
.
Storing the Configuration in Version Control
This requires git
to be installed.
Create a local repository by running:
$ git init .
Ignore the data, virtualenv and Kukur databases using a .gitignore
file:
.gitignore
data/ venv/ *.sqlite
Now track the current revision of the Kukur configuration:
$ git add Kukur.toml
$ git commit -v
This repository can now be shared with other people, effortlessly granting them access to the same data sources.
Using Docker
In a data project that does not use Python, the Kukur Docker container can be used to export data as CSV for any supported source.
$ docker run --rm \
-u $(id -u) \
-v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml \
-v $(pwd)/data:/usr/src/app/data \
-it timeseer/kukur:latest python -m kukur.cli test data \
--source example \
--name outside-temperature \
--start 2020-01-01 \
--end 2021-01-01
-u $(id -u)
-
Run Kukur using your user. This ensures that permissions for volume mounts are correct, since Kukur does not run as root inside the container.
-v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml
-
This mounts the Kukur configuration file
Kukur.toml
to the expected location inside the container. By using the--config-file
command line flag, a different location can be chosen. -v $(pwd)/data:/usr/src/app/data
-
This mounts the CSV example data on the location that is specified in the configuration. Not required when connecting to data sources that are not file-based.
The Arrow Flight interface is made available on port 8081 when running Kukur as a service.
Use port mapping to expose it on localhost (-p 8081:8081
).
For API key storage,
Kukur creates a sqlite database and stores it in the path given by data_dir
.
Define a data directory in Kukur.toml
:
Kukur.toml
data_dir = "db"
[flight]
authentication = false
[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"
Create that directory and run the docker container while mounting it:
$ mkdir db
$ docker run --rm \
-u $(id -u) \
-v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml \
-v $(pwd)/data:/usr/src/app/data \
-v $(pwd)/db:/usr/src/app/db \
-p 8081:8081 \
-it timeseer/kukur:latest
Then, access it using the Kukur client:
(venv) $ python
Python 3.9.2 (default, Feb 20 2021, 18:40:11)
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kukur import *
>>> client = Client()
>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Metadata(SeriesSelector(source='example', name='outside-temperature'), {'description': 'Temperature in Antwerp', 'unit': 'deg C', 'physical lower limit': -20.0, 'physical upper limit': 60.0, 'functional lower limit': None, 'functional upper limit': None, 'accuracy': 0.5, 'interpolation type': None, 'data type': None, 'dictionary name': None, 'dictionary': None})
Development
Kukur is developed on GitHub. Visit https://github.com/timeseer-ai/kukur to learn more.
Kukur is open source software, licensed under the Apache License, Version 2.0.
Domain Model
To understand Kukur, three concepts need to be understood:
SeriesSelector
A time series data source contains hundreds, thousands or even millions of time series.
A convenient and expressive way of selecting which time series should be queried is required.
This is a SeriesSelector
.
The simplest mapping is associating a unique name with each series. It follows that the minimum unique identifier includes the source and the name of the time series.
Timestamp |
Series name |
Value |
Many time series databases add additional structure to this.
In InfluxDB, for example, the layout looks like:
Timestamp |
Series |
Values |
|||
Timestamp |
Measurement |
Tag A |
Tag B |
Field A |
Field B |
A time series is identified by a 'measurement name', multiple 'tag key-value pairs' and a 'field key'.
Metadata
Kukur predefines a set of metadata fields that are all present in an ideal world. Custom fields can be added both programmatically and using configuration.
Both physical and functional limits are configurable. Physical limits are limits of the actual sensor that collected the time series data. Functional limits are the limits of the expected range of the time series. For example, while my thermometer could indicate -20 deg C (physical lower limit), anything below -5 deg C (functional lower limit) would be cause for alarm.
The DICTIONARY
DataType
warrants an explanation as well.
Time series of type DICTIONARY
store numerical values.
Each numerical value is given a meaning.
For example: the state of a pump could be ON
or OFF
.
OFF
can be represented as 0
in the time series data, while ON
could be encoded as 1
.
These dictionaries that map numerical values to string labels are often named as well,
hence the dictionary name
field.
Source
Kukur data sources implement at least three methods:
search(SeriesSelector) → Generator[Union[SeriesSelector, Metadata]]
-
Return all time series matching the selector or even the metadata of them if it is readily available.
get_metadata(SeriesSelector) → Metadata
-
Return metadata for the selected time series.
get_data(SeriesSelector, Datetime, Datetime) → pyarrow.Table
-
Return data for the selected time series in the given time period.
Configuration Reference
Kukur is configured by editing a configuration file in the TOML language.
The default location of this configuration file is Kukur.toml
.
Alternative paths can be configured using the --config-file
flag.
Data Storage
Kukur needs local data storage for API keys.
The path to the local data storage of Kukur is configured by:
data_dir = "<path to data directory>"
data_dir
is not required.
By default, data is stored in the current working directory.
Includes
Other configuration files can be included from the main configuration file by using .
Multiple includes can be specified by using an array of tables:
[[include]]
glob = "<file glob>"
The glob
key is required.
Paths in includes files are resolved relative to the application working directory. They are not relative to the included configuration file.
Conflicts between configuration values are handled depending on the value type:
-
string
: the last included field overrides earlier values -
list
: the items in the list are appended to the earlier list -
mapping
: the earlier mapping is updated with the new mapping, overriding existing keys
Note that the main configuration file is processed before any includes. This means it is not possible to override configuration set by an include from the main configuration file.
Example:
[[include]]
glob = "tests/test_data/*/*.toml"
This will include all TOML files in a direct subdirectory of tests/test_data/
.
For example, tests/test_data/csv/csv-examples.toml
could contain:
[source.row]
type = "csv"
path = "examples/csv/row.csv"
metadata = "examples/csv/row-metadata.csv"
Logging
A [logging]
section configures diagnostic logging inside Kukur.
[logging]
level = "info"
# path = ""
Possible values for level
are :
-
warning
-
info
(the default) -
debug
When path
is configured,
Logs will be written to the specified path.
The logs at that path will be rotated daily and 7 rotated files will be kept.
Arrow Flight
A [flight]
section configures the Arrow Flight interface.
[flight]
host = "0.0.0.0"
port = 8081
authentication = true
Kukur listens on port 8081
of all IP addresses by default.
Authentication can be turned off for local instances or when provided by external services
by setting authentication
to false
.
When authentication is turned on, an API key has to be supplied by callers.
Inspect
The Kukur CLI supports inspecting file storage locations for data files and previewing them.
File storage locations include:
-
Local filesystems using
inspect filesystem <path>
-
Azure Data Lake Storage Gen2 using
inspect blob --uri abfss://<container>@<storage account>/<path>
-
AWS S3 buckets using
inspect blob --uri s3://<bucket>/<path>
Detected files can be previewed using --preview
.
For example
$ python -m kukur.cli inspect filesystem --path tests/test_data/delta
delta,tests/test_data/delta/delta-row-quality
directory,tests/test_data/delta/partitions
delta,tests/test_data/delta/delta-pivot
delta,tests/test_data/delta/delta-notz
delta,tests/test_data/delta/delta-numerical
delta,tests/test_data/delta/delta-row
delta,tests/test_data/delta/delta-unordered
delta,tests/test_data/delta/delta-row-tags
$ python -m kukur.cli inspect filesystem --path tests/test_data/delta/delta-row --preview
pyarrow.Table
name: string
ts: timestamp[us, tz=UTC]
value: double
---
name: [["test-tag-1","test-tag-1","test-tag-1","test-tag-1","test-tag-1",...,"test-tag-3","test-tag-3","test-tag-3","test-tag-3","test-tag-3"]]
ts: [[2020-01-01 00:00:00.000000Z,2020-02-01 00:00:00.000000Z,2020-03-01 00:00:00.000000Z,2020-04-01 00:00:00.000000Z,2020-05-01 00:00:00.000000Z,...,2020-01-01 00:25:00.000000Z,2020-01-01 00:26:00.000000Z,2020-01-01 00:27:00.000000Z,2020-01-01 00:28:00.000000Z,2020-01-01 00:29:00.000000Z]]
value: [[1,2,2,1,1,...,6,9,9.5,8,6]]
$ AZURE_USE_AZURE_CLI=True python -m kukur.cli inspect blob --uri abfss://[email protected]
delta,abfss://[email protected]/iot_devices
delta,abfss://[email protected]/tsai-antwerp
parquet,abfss://[email protected]/tsai-antwerp.parquet
Connections to Azure are configured using the default Azure Identity environment variables. Likewise for connections on AWS.
Sources
Time Series Sources
Parameters common to all data sources are documented here.
[source.<name>]
type = "<the type of the source>"
query_retry_count = 0 # number of retries
query_retry_delay = 1.0 # seconds between retries as float
metadata_type = "<the type of the metadata source>"
metadata_sources = [] # names of additional metadata sources
data_query_interval_seconds = 123456 # <number of seconds>
# ...
Each source has a name. When multiple sources with the same name are configured, the last configured one wins.
type
specifies the type of the data source for this source.
Valid values are:
-
adodb
-
arrows
-
azure-data-explorer
-
cratedb
-
csv
-
delta
-
feather
-
influxdb
-
kukur
-
oledb
-
parquet
-
piwebapi-da
-
plugin
-
postgresql
-
redshift
-
sqlite
type
is required.
metadata_type
specifies the type of the metadata source for this source.
Valid values are:
-
adodb
-
cratedb
-
csv
-
datafusion
-
json
-
kukur
-
oledb
-
piwebapi-da
-
plugin
-
postgresql
-
redshift
-
sqlite
metadata_type
is optional.
The type
is used when not specified.
Additional metadata can be loaded from other sources as described in the section "Metadata Sources".
When multiple sources return an entry for the same metadata field, the source that occurs first in this list
gets priority.
Metadata returned by the source itself or by the metadata
type configuration takes precendence over any additional sources configured here.
query_retry_count
and query_retry_delay
allow retrying all queries to a source in case of failure.
By default no requests will be retried.
query_retry_delay
accepts floating point numbers,
for example query_retry_delay = 0.5
will wait for 0.5 second after the first failure and between subsequent retries.
data_query_interval_seconds
defines the maximum duration of the interval in which data is queried in one request.
A longer query is split in intervals of this length and later on reassembled.
This prevents query timeouts or works around a maximum number of points that can be returned by a source.
Example:
[source.sql]
type = "odbc"
connection_string = "..."
metadata_query = "..."
metadata_columns = [] # ...
data_query = "..."
data_query_interval_seconds = 86400
This ensures that queries will at most query for one day of data.
data_query_interval_seconds
is optional.
Metadata Sources
Metadata sources provide extra metadata for series in time series sources. All time series sources that provide metadata can be used as a metadata source. They accept the same configuration parameters as the time series source.
The optional fields
parameter specifies which metadata fields will be used from the given source.
Possible metadata fields are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Example:
[source.noaa]
type = "influxdb"
database = "NOAA_water_database"
metadata_sources = ["noaa"]
[metadata.noaa]
type = "csv"
metadata = "examples/influxdb/noaa-metadata.csv"
fields = ["lower limit", "upper limit"]
Note that sources that are fast when listing all time series in the source by returning metadata in the same query, will need to do an additional query to each of the configured metadata sources for each time series in the source.
Quality
There is a possibility, for some types of sources, to add a quality column in the source file that represents the quality of the data point in the source, e.g. OPC quality code.
In this case a mapping needs to be provided.
Example:
[source.<name>]
quality_mapping = "example_mapping"
[quality_mapping.example_mapping]
GOOD = [192, 194, 197]
In this example we map the OPC quality of 192, 194 and 197 defined in the source as a good quality point in Kukur.
It is also possible to include ranges in the quality mapping.
Example:
[quality_mapping.example_mapping]
GOOD = [[192], [194, 200]]
In this example we map the OPC quality of 192, and the range of 194-200 defined in the source as a good quality point in Kukur.
If string values are used in the source, similar configuration can be used.
Example:
[quality_mapping.example_mapping]
GOOD = ["GoodQuality", "ExcellentQuality"]
In this example we map the "GoodQuality" and "ExcellentQuality" defined in the source as a good quality point in Kukur.
Supported Sources
Multiple types of time series sources are supported:
-
ADODB connections (including OLEDB)
-
Apache Feather files
-
Apache Parquet files
-
Azure Data Explorer tables
-
Binary Plugins
-
CrateDB databases
-
CSV files
-
Delta Lake tables
-
InfluxDB databases
-
Kukur [JSON] metadata files
-
ODBC data sources
-
PostgreSQL databases
-
Redshift data warehouses
-
SQLite databases
Source-specific parameters are documented in the linked sources reference.
Sources reference
ADODB
Sources with type = "adodb"
configure ADODB sources.
ADODB is supported on Windows only. Install pywin32 separately to enable. Data sources supporting OLEDB can be accessed through ADODB.
The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.
[source.<name>]
type = "adodb"
connection_string = "<ADODB connection_string>"
connection_string_path = "<path to connection string>"
query_timeout_seconds = 0
query_timeout_enable = true
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the adodb driver>"
data_timezone = "<override or specify time zone of timestamps returned by the adodb driver>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
The examples given here operate on an Excel file with three sheets:
Metadata$: name, description, units, [interpolation type], [data type], [dictionary name]
Dictionary$: name, value, label
Data$: name, timestamp, value
Connection
The connection_string
contains the provider and various options.
connection_string = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=examples/adodb/adodb.xlsx;Extended Properties=\"Excel 12.0 XML; HDR=YES\""
Alternatively, connection_string_path
can point to a file that contains the connection string.
Whitespace at the start and end of the connection string file is removed.
Some ADODB drivers do not support parameter binding.
Set query_string_parameters
to true
,
to use string interpolation of parameters in queries.
In that case use {}
to format parameters into queries.
In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}
.
Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.
query_timeout_seconds
defines the timeout on a query.
Default is 0,
no timeout.
Some drivers do not allow setting a timeout.
Disable it using query_timeout_enable = false
.
Search
The list_query
is optional.
It returns a list of time series names found in the source.
When provided, it does not need a series to have been used in another context before it can be analyzed.
list_query = "select name from [Metadata$]"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select name, description, units, [interpolation type], [data type], [dictionary name] from [Metadata$]"
list_columns = ["series name", "description", "unit", "interpolation type", "data type", "dictionary name"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in metadata columns are:
-
series name
(required) -
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all ADODB sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "adodb_lowercase"
[metadata_value_mapping.adodb_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
metadata_query = "select description, units, [interpolation type], [data type], [dictionary name] from [Metadata$] where name = ?"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = ["description", "unit", "interpolation type", "data type", "dictionary name"]
Built-in types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "adodb_lowercase"
[metadata_value_mapping.adodb_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, [interpolation type], [data type], [dictionary name]
from [Metadata$]
where my_location = ? and my_plant = ?
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
dictionary_query = "select value, label from [Dictionary$] where name = ?"
The query should return rows of two columns:
-
the numerical value that occurs in the data, in a type that can be converted to an integer
-
the label for the numerical value (as
adBSTR
)
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts three parameters:
-
the name of the series (as
adBSTR
) -
the start date of the time range to query data (as
adDBTimeStamp
) -
the end date of the time range to query data (as
adDBTimeStamp
)
data_query = "select timestamp, value from [Data$] where name = ? and timestamp between ? and ?"
This query should return rows of two columns:
-
the timestamp of the data point
-
the value of the data point
It will try to convert columns to the expected type.
The data query can be read from a file by using data_query_path
instead of data_query
.
If the provider or data source does not accept adDBTimeStamp
, it can be formatted as a string.
The data_query_datetime_format
option accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
If the driver doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for the queries to the driver.
Example:
data_query_timezone = "UTC"
If the query or driver returns dates without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select timestamp, value
from [Data$]
where my_location = ? and my_plant = ? and timestamp >= ? and timestamp < ?
"""
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select timestamp, {field},
from [Data$]
where name = ? and timestamp >= ? and timestamp < ?
"""
Quality
There is a possibility to add a quality column.
In this case the data query changes:
data_query = "select timestamp, value, quality from [Data$] where name = ? and timestamp between ? and ?"
Where quality
represents the column that contains the data point quality of the ADODB source.
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
Apache Arrow IPC Streaming
Sources with type = "arrows"
configure Apache Arrow IPC Streaming format sources.
All settings that apply to Apache Feather sources apply.
Apache Feather
Sources with type = "feather"
configure Apache Feather sources.
Feather supports the same source layouts as CSV:
[source.<name>]
type = "feather"
format = "row|dir|pivot"
path = "<path to data>"
tag_columns = ["series name"]
field_columns = ["value"]
quality_mapping = "<name>"
path
is required.
format
defaults to "row"
.
Metadata in Feather is not supported.
Use a different metadata_type
to connect to metadata.
For example:
metadata_type = "csv"
Column mapping
Accepted by Apache Feather sources that use the row based or directory based
data models.
This configuration allows mapping the columns of tables read from feather
files to the columns expected by Kukur.
This is done by setting the column_mapping
option for the source.
[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"
series name
is only valid for the row
format,
as the directory data model uses the feather file names as the
series names.
quality
mapping is optional.
Custom datetime format
The data_datetime_format
option allows the timestamps to be parsed using a custom datetime format.
It accepts the formatting options supported by Python.
data_datetime_format = "%Y/%m/%dT%H:%M"
Custom timezone
The data_timezone
option allows to specify the time zone of the timestamps.
This option should only be used when the timestamps in the source do not contain time zone information.
data_timezone = "America/Sao_Paulo"
Row format
format = "row"
The row based format expects path
to be a Feather file with at least 3 columns:
-
The first column contains the
series name
as a string -
The second column contains the timestamp as
pyarrow.TimestampType
. -
The third column contains the value as a numerical type or as strings
Alternatively,
if the tag_columns
and field_columns
options are used,
each combination of values defined in tag_columns
and field_columns
define a series.
Directory Based Format
[source."<name>"]
type = "feather"
format = "dir"
path = ""
[[source."<name>".partitions]]
origin = "tag"
key = "series name"
# path_encoding = "base64"
The directory based format expects path
to be a directory structure containing Feather files.
The directory structure is traversed based on the configured partitions
.
Each partition corresponds to a tag in the SeriesSelector
.
The resulting partition can optionally be base64
-encoded.
The last configured partition defines the file name.
The .feather
extension is added to it.
The Feather file contains at least 2 columns:
-
The first column contains the timestamp as
pyarrow.TimestampType
. -
The second column contains the value as a numerical type or a string
Pivot Format
format = "pivot"
The pivot format expect path
to be a Feather file.
The first column in the file is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.
Quality
There is a possibility to add a quality column in the Feather file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
A quality column is not available for a Feather file with a pivot data format.
Azure Blob Storage
Kukur can load Feather files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.
The following
[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"
Paths provided to path
will be relative to the container root.
The azure_identity
field is optional.
The special value default
causes connections to be made using the default Azure credentials.
This is the only supported value and allows connections using a managed service identity.
When the azure_identity
field is omitted,
the azure_connection_string
needs to contain the necessary secrets (SAS token, Access key).
AWS S3
Kukur can load CSV files from AWS S3.
[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""
All fields are optional.
If neither aws_access_key
nor aws_secret_key
are provided,
then attempts to establish the credentials automatically are being made.
The following methods are tried, in order:
-
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables
-
Configuration files such as ~/.aws/credentials and ~/.aws/config
-
For nodes on Amazon EC2, the EC2 Instance Metadata Service
Apache Parquet
Sources with type = "parquet"
configure Apache Parquet sources.
Parquet supports the same source layouts as CSV:
[source.<name>]
type = "parquet"
format = "row|dir|pivot"
path = "<path to data>"
tag_columns = ["series name"]
field_columns = ["value"]
quality_mapping = "<name>"
path
is required.
format
defaults to "row"
.
Metadata in Parquet is not supported.
Use a different metadata_type
to connect to metadata.
For example:
metadata_type = "csv"
Column mapping
Accepted by Apache Parquet sources that use the row based or directory based
data models.
This configuration allows mapping the columns of tables read from parquet
files to the columns expected by Kukur.
This is done by setting the column_mapping
option for the source.
[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"
series name
is only valid for the row
format,
as the directory data model uses the parquet file names as the series names.
quality
mapping is optional.
Custom datetime format
The data_datetime_format
option allows the timestamps to be parsed using a custom datetime format.
It accepts the formatting options supported by Python.
data_datetime_format = "%Y/%m/%dT%H:%M"
Custom timezone
The data_timezone
option allows to specify the time zone of the timestamps.
This option should only be used when the timestamps in the source do not contain time zone information.
data_timezone = "America/Sao_Paulo"
Row format
format = "row"
The row based format expects path
to be a Parquet file with at least 3 columns:
-
The first column contains the
series name
as a string -
The second column contains the timestamp
-
The third column contains the value as a numerical type or a string
Alternatively,
if the tag_columns
and field_columns
options are used,
each combination of values defined in tag_columns
and field_columns
define a series.
Directory Based Format
[source."<name>"]
type = "parquet"
format = "dir"
path = ""
[[source."<name>".partitions]]
origin = "tag"
key = "series name"
# path_encoding = "base64"
The directory based format expects path
to be a directory structure containing Parquet files.
The directory structure is traversed based on the configured partitions
.
Each partition corresponds to a tag in the SeriesSelector
.
The resulting partition can optionally be base64
-encoded.
The last configured partition defines the file name.
The .parquet
extension is added to it.
The Parquet file contains at least 2 columns:
-
The first column contains the timestamp
-
The second column contains the value as a numerical type or a string
Pivot Format
format = "pivot"
The pivot format expects path
to be a Parquet file.
The first column in the file is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.
Quality
There is a possibility to add a quality column in the Parquet file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
A quality column is not available for a Parquet file with a pivot data format.
Azure Blob Storage
Kukur can load Parquet files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.
The following
[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"
Paths provided to path
will be relative to the container root.
The azure_identity
field is optional.
The special value default
causes connections to be made using the default Azure credentials.
This is the only supported value and allows connections using a managed service identity.
When the azure_identity
field is omitted,
the azure_connection_string
needs to contain the necessary secrets (SAS token, Access key).
AWS S3
Kukur can load CSV files from AWS S3.
[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""
All fields are optional.
If neither aws_access_key
nor aws_secret_key
are provided,
then attempts to establish the credentials automatically are being made.
The following methods are tried, in order:
-
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables
-
Configuration files such as ~/.aws/credentials and ~/.aws/config
-
For nodes on Amazon EC2, the EC2 Instance Metadata Service
CrateDB
Sources with type = "cratedb"
configure CrateDB sources.
CrateDB support requires the crate Python package.
The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.
[source.<name>]
type = "cratedb"
connection_string = "<cratedb connection_string>"
connection_string_path = "<path to connection string>"
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the crate client>"
data_timezone = "<override or specify time zone of timestamps returned by the crate client>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
Example here use the following schema:
create table Data (
timestamp timestamp with time zone,
name text,
value double precision,
unit text
)
This assumes only one unit is ever present per time series.
Connection
The connection_string
contains the provider and various options.
connection_string = "localhost:4200"
Alternatively, connection_string_path
can point to a file that contains the connection string.
Whitespace at the start and end of the connection string file is removed.
Version 0.26 of the crate client does not support parameter binding for timestamps with timezones.
Set query_string_parameters
to true
to work around this.
Use {}
to format parameters into queries.
In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}
.
Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.
Search
The list_query
is optional.
It returns a list of time series names found in the source.
When provided, it does not need a series to have been used in another context before it can be analyzed.
list_query = "select distinct name from Data"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select distinct name, unit from Data"
list_columns = ["series name", "unit"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in metadata columns are:
-
series name
(required) -
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all CrateDB sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "crate_lowercase"
[metadata_value_mapping.crate_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
metadata_query = "select unit from Data where name = '{}'"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = [ "unit"]
Built-in types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "crate_lowercase"
[metadata_value_mapping.crate_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = '{}' and my_plant = '{}'
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
The query should return rows of two columns:
-
the numerical value that occurs in the data, in a type that can be converted to an integer
-
the label for the numerical value (as
adBSTR
)
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts three parameters:
-
the name of the series (as text)
-
the start date of the time range to query data (as text)
-
the end date of the time range to query data (as text)
data_query = "select timestamp, value from Data where name = '{}' and timestamp >= '{}' and timestamp < '{}'"
This query should return rows of two columns:
-
the timestamp of the data point
-
the value of the data point
It will try to convert columns to the expected type.
The data query can be read from a file by using data_query_path
instead of data_query
.
The data_query_datetime_format
option allows queries using a custom datetime format.
It accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
If timestamps without a time zone are used in the database,
convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for all queries.
Example:
data_query_timezone = "UTC"
If the query returns timestamps without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select timestamp, value
from Data
where my_location = '{}' and my_plant = '{}' and timestamp >= '{}' and timestamp < '{}'
"""
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select timestamp, {field},
from Data
where name = '{}' and timestamp >= '{}' and timestamp < '{}'
"""
Quality
There is a possibility to add a quality column.
In this case the data query changes:
data_query = "select timestamp, value, quality from Data where name = '{}' and timestamp >= '{}' and timestamp < '{}'"
Where quality
represents the column that contains the data point quality.
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
CSV
Sources with type = "csv"
configure CSV sources.
[source.<name>]
type = "csv"
path = "<path>"
metadata = "<path>"
metadata_fields = ["<name>", "<name>"]
metadata_mapping = "<name>"
metadata_value_mapping ="<name>"
format = "row|pivot|dir"
tag_columns = ["series name"]
field_columns = ["value"]
metadata_field_column = "<name>"
dictionary_dir = "<path>"
quality_mapping = "<name>"
file_encoding = "<codec>"
header_row = "false|true"
data_datetime_format = "<date format string>"
data_timezone = "<time zone>"
data_decimal_point = "."
data_column_separator = ","
path
is required for time series sources.
It is optional when used as a metadata source.
file_encoding
is optional and defaults to UTF-8
.
A list of all supported codecs can be found here.
Three CSV data models are supported: - row based (series name, timestamp, value, quality (optional)) - pivot (multiple series with values at the same timestamp) - directory based, one CSV file per tag
Column mapping
Accepted by CSV sources that use the row based or directory based data models
and where the files contain a header row.
This configuration allows mapping the columns from CSV files to the columns
expected by Kukur.
This is done by setting the column_mapping
option for the source.
[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"
series name
is only valid for the row
format,
as the directory data model uses the CSV file names as the series names.
quality
mapping is optional.
Data conversion options
The data_datetime_format
option allows the timestamps to be parsed using a custom datetime format.
It accepts the formatting options supported by Python.
data_datetime_format = "%Y/%m/%dT%H:%M"
The data_timezone
option allows to specify the time zone of the timestamps.
This option should only be used when the timestamps in the source do not contain time zone information.
data_timezone = "America/Sao_Paulo"
To accept numerical data that uses ,
as the decimal point,
use:
data_decimal_point = ","
When columns are not separated by columns,
use data_column_separator
.
For example, to use ;
:
data_column_separator = ";"
Row Based Format
A row based CSV data file may have a header row with column names.
The header_row
configuration is used to indicate this.
The header_row
configuration defaults to false
.
At least 3 columns are present:
-
series name
-
timestamp in RFC3339 format (up to nanosecond precision)
-
numerical value (up to double precision floating point)
Example:
test-tag-1,2020-01-01T00:00:00Z,1
test-tag-1,2020-02-01T00:00:00Z,2
test-tag-2,2020-01-01T00:00:00Z,Product A
test-tag-2,2020-02-01T00:00:00Z,Product B
Alternatively, the third column can contain string values. It is not possible to mix numerical and string values in one column. This will cause all numerical values to be interpreted as strings.
Dictionary data is integer numerical data. Labels are only for presenting to users.
When header_row = "true"
,
additional structure for time series can be read from specific columns.
Consider:
location,plant,ts,product,value
Antwerp,P1,2020-01-01T00:00:00Z,A,1
Antwerp,P2,2020-01-01T00:00:00Z,A,1
Antwerp,P1,2020-01-02T00:00:00Z,A,2
Antwerp,P1,2020-01-03T00:00:00Z,B,1
Antwerp,P2,2020-01-03T00:00:00Z,A,2
Barcelona,P1,2020-01-01T00:00:00Z,A,1
Barcelona,P2,2020-01-01T00:00:00Z,A,1
Barcelona,P1,2020-01-02T00:00:00Z,A,2
Barcelona,P1,2020-01-03T00:00:00Z,B,1
Barcelona,P2,2020-01-03T00:00:00Z,A,2
Here,
a time series is defined by the location
and plant
tags.
Each series has two fields product
and value
.
Use the tag_columns
and field_colums
configuration options to achieve this:
[source."..."]
type = "csv"
format = "row"
path = "..."
header_row = true
tag_columns = ["location", "plant"]
field_columns = ["product", "value"]
Pivot Format
The header row of CSV data files in pivot format defines which time series are available.
This means that the header_row
option is ignored for this format as the header
row is always required.
Other rows start with a timestamp in RFC3339 format and contain one value for each series.
timestamp,test-tag-1,test-tag-2
2020-01-01T00:00:00Z,1,10
2020-02-01T00:00:00Z,2,11
Directory Based Format
The directory based format expects one CSV file per tag.
CSV files are formatted in the row based format, but without the series name.
They are named <series name>.csv
.
path
refers to the directory that contains the CSV files.
Example test-tag-1.csv
:
2020-01-01T00:00:00Z,1
2020-02-01T00:00:00Z,2
Metadata
Metadata is configured in a matrix format. A header row describes the metadata entry.
Supported types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Not all columns need to be present.
Example:
series name,unit,functional lower limit,functional upper limit,accuracy
test-tag-1,m,0,1,0.1
Extra columns will be ignored,
unless the metadata_fields
parameter is present.
In that case all fields defined there - and only these - will be included,
including custom metadata fields.
Example:
series name,description,unit,process type,location
test-tag-1,"custom fields example",m,batch,Antwerp
[source.<name>]
metadata_fields = ["unit", "process type"]
Only the unit
and the process type
fields will be available in the resulting Metadata
.
Metadata lookups respect the tag_columns
option.
An optional field
can be matched by configuring a column name in metadata_field_column
.
When the dictionary name
field is present, the directory given in dictionary_dir
is searched for a file <dictionary name>.csv
.
This file contains a comma separated mapping of numerical values to labels.
Example:
0,OFF
1,ON
Columns in a metadata CSV often do not match the names of metadata fields in Kukur.
An optional metadata_mapping
maps Kukur field names to column names.
Example:
[source.<name>]
metadata_mapping = "ip21"
[metadata_mapping.ip21]
"series name" = "NAME"
description = "IP_DESCRIPTION"
unit = "IP_ENG_UNITS"
Where the metadata CSV contains:
NAME,IP_ENG_UNITS,lower limit
test-tag-1,kg,1
Fields that are not included in the mapping,
such as functional lower limit
in the example,
translate to the corresponding metadata field or are skipped altogether.
Metadata mappings can be shared between sources.
Values in a metadata CSV can also be different.
The optional metadata_value_mapping
maps Kukur metadata field values to values as they appear in a source.
Example:
[source.<name>]
metadata_value_mapping = "example_value_mapping"
[metadata_value_mapping.example_value_mapping."interpolation type"]
LINEAR = "linear"
STEPPED = "stepped"
[metadata_value_mapping.example_value_mapping."data type"]
FLOAT64 = ["int16", "int32"]
In this example,
when the interpolation type
column contains the value linear
,
Kukur will interpret it as the expected uppercase LINEAR
.
When the data type
column contains either int16
or int32
,
Kukur will interpret it as FLOAT64
.
series name,interpolation type,data type
test-tag-1,linear,int32
metadata_mapping
and metadata_value_mapping
can be used together
to map wildly different metadata formats to a CSV supported by Kukur.
Quality
There is a possibility to add a quality column in the CSV file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
A quality column is not available for a CSV file with a pivot data format.
Azure Blob Storage
Kukur can load CSV files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.
The following
[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"
Paths provided to path
, metadata
or dictionary_dir
will be relative to the container root.
The azure_identity
field is optional.
The special value default
causes connections to be made using the default Azure credentials.
This is the only supported value and allows connections using a managed service identity.
When the azure_identity
field is omitted,
the azure_connection_string
needs to contain the necessary secrets (SAS token, Access key).
AWS S3
Kukur can load CSV files from AWS S3.
[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""
All fields are optional.
If neither aws_access_key
nor aws_secret_key
are provided,
then attempts to establish the credentials automatically are being made.
The following methods are tried, in order:
-
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables
-
Configuration files such as ~/.aws/credentials and ~/.aws/config
-
For nodes on Amazon EC2, the EC2 Instance Metadata Service
Azure Data Explorer
Sources with type = "azure-data-explorer"
configure Azure Data Explorer sources.
The azure-kusto-data Python package is required.
[source.<name>]
type = "azure-data-explorer"
connection_string = "<connection_string>"
database = "<database>"
table = "<table>"
timestamp_column = "ts"
tag_columns = []
metadata_columns = []
ignored_columns = []
metadata_mapping = ""
metadata_value_mapping = ""
database
, table
and connection_string
are required.
See here to learn more about connection strings.
The DefaultAzureCredential from the azure-identity package is used for authentication.
timestamp_column
is an optional parameter used to define the name
of the database column that contains timestamps for the series.
Defaults to "ts"
.
Listing time series in an Azure Data Explorer table is supported only when the tag_columns
parameter is specified.
Each value in the parameter is a column in the table that will be part of the set of tags of the series.
Other columns are assumed to be fields.
For example:
tag_columns = [
"deviceId",
"plant",
"location",
]
Metadata columns can be defined as a list in the metadata_columns
parameter.
For example:
metadata_columns = [
"unit",
]
Columns that should not appear in either tags, fields or metadata can be ignored.
They can be defined as a list in the ignored_columns
parameter.
For example:
ignored_columns = [
"unknown",
]
Columns in a metadata often do not match the names of metadata fields in Kukur.
An optional metadata_mapping
maps Kukur field names to column names.
Example:
[source.<name>]
metadata_mapping = "example_metadata_mapping"
[metadata_mapping.example_metadata_mapping]
description = "DESCRIPTION"
unit = "ENG_UNITS"
Fields that are not included in the mapping,
such as functional lower limit
in the example,
translate to the corresponding metadata field or are skipped altogether.
Metadata mappings can be shared between sources.
Values in a metadata column can also be different.
The optional metadata_value_mapping
maps Kukur metadata field values to values as they appear in a source.
Example:
[source.<name>]
metadata_value_mapping = "example_value_mapping"
[metadata_value_mapping.example_value_mapping."interpolation type"]
LINEAR = "linear"
STEPPED = "stepped"
[metadata_value_mapping.example_value_mapping."data type"]
FLOAT64 = ["int16", "int32"]
In this example,
when the interpolation type
column contains the value linear
,
Kukur will interpret it as the expected uppercase LINEAR
.
When the data type
column contains either int16
or int32
,
Kukur will interpret it as FLOAT64
.
metadata_mapping
and metadata_value_mapping
can be used together
to map wildly different metadata formats to a format supported by Kukur.
DataFusion
Sources with type = "datafusion"
configure Apache Arrow DataFusion sources.
DataFusion sources define tables that can be queried using SQL.
DataFusion source support the search
operation only.
Data queries are not supported and neither are individual time series metadata lookups.
[source.<name>]
type = "datafusion"
list_query = "" # required: SQL query
tag_columns = ["series name"]
field_columns = ["value"]
metadata_value_mapping = "" # optional: convert metadata values
[[source.datafusion.table]]
type = "csv|delta|json|parquet"
name = "" # name of the table in SQL queries
location = "" # path to a file or URI of a Delta Table
The column names returned by the list_query
are used as-is to populate tags as present in tag_columns
and known metadata fields.
Columns with unknown names define extra metadata fields.
Kukur configures DataFusion to be case sensitive.
The following example defines a DataFusion source that connects to 3 tables:
-
A CSV file containing a list of all time series and their units.
-
An NDJSON file containing descriptions for selected time series.
-
A Delta Table containing data types for selected time series.
A metadata_value_mapping
can be provided to convert metadata values received from DataFusion to values supported by Kukur.
[source.datafusion]
type = "datafusion"
list_query = """
select
s.name as "series name",
s.unit as "unit",
d.description as "description",
t."data type" as "data type"
from series s
left join description d on s.name = d.name
left join datatype t on s.name = t.name
"""
metadata_value_mapping = "datafusion_mapping"
[[source.datafusion.table]]
type = "csv"
name = "series"
location = "tests/test_data/datafusion/series.csv"
[[source.datafusion.table]]
type = "json"
name = "description"
location = "tests/test_data/datafusion/description.json"
[[source.datafusion.table]]
type = "delta"
name = "datatype"
location = "tests/test_data/datafusion/datatype"
[metadata_value_mapping.datafusion_mapping.unit]
"m" = "M"
Delta Lake
Sources with type = "delta"
configure Delta Lake sources.
Delta Lake supports the row
and pivot
formats:
[source.<name>]
type = "delta"
format = "row|pivot"
uri = "<uri of Delta Lake>"
tag_columns = ["series name"]
field_columns = ["value"]
sort_by_timestamp = true
[[source.<name>.partitions]]
origin = "tag"
key = "series name"
[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"
format
defaults to "row"
.
uri
is required.
sort_by_timestamp
is an optional parameter to sort unordered delta lake sources.
Default is true
.
Metadata in Delta Lakes is not supported.
Use a different metadata_type
to connect to metadata.
For example:
metadata_type = "csv"
Connecting to a Delta Lake on Azure
The delta-rs library used by Kukur supports Delta Lakes in an Azure storage account. Unfortunately it uses environment variables for configuration. This means one instance of Kukur is able to connect to one storage account only.
For example:
[source."Delta"]
type = "delta"
uri = "abfss://[email protected]/tsai-antwerp"
This configuration connects to the satsdeltalake
storage account.
It opens the tsai-antwerp
Delta Lake in the poc
container.
Multiple authentication options are available.
The recommended authentication method while running on Azure includes using a Managed Identity.
To do so,
the AZURE_STORAGE_ACCOUNT
environment variable should duplicate the storage account given in the uri
.
Next to this,
when not running on Azure App Service,
the IDENTITY_HEADER
environment variable should be set to any value,
for example foo
.
Column mapping
Accepted by Delta Lake sources that use the row format.
This configuration allows mapping the columns of the Delta Lake to the
columns expected by Kukur.
This is done by setting the column_mapping
option for the source.
[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"
quality
mapping is optional.
Partitioning
For the row
format,
it is possible to take advantage of delta lake partitioning by defining one or more partitions:
[[source.<name>.partitions]]
origin = "tag"
key = "location"
Every tag based partition key must be included in the tag_columns
option.
Timestamp based partitioning is also supported:
[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"
For timestamp based partitions:
-
key
indicates the resolution to be used when querying for partitions. Supported values areYEAR
,MONTH
andDAY
. -
format
allows optional formatting of the timestamp that is used for partitioning. It accepts the formatting options supported by Python. By default the year, month or day is used. -
column
is optional. Needed if the name of the column used for partitioning does not exactly match thekey
value.
For example, when a column that contains both the year and the month defines a partition:
[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"
format = "%Y-%m"
Custom datetime format
The data_datetime_format
option allows the timestamps to be parsed using a custom datetime format.
It accepts the formatting options supported by Python.
data_datetime_format = "%Y/%m/%dT%H:%M"
Custom timezone
The data_timezone
option allows to specify the time zone of the timestamps.
This option should only be used when the timestamps in the source do not contain time zone information.
data_timezone = "America/Sao_Paulo"
Row format
format = "row"
The row based format expects to find a Delta Lake at the given uri
with at least 3 columns:
-
The first column contains the
series name
as a string -
The second column contains the timestamp
-
The third column contains the value as a numerical type or as strings
-
A fourth optional column contains quality data
Guidance and mapping options for the quality column can be found in the source documentation .
Alternatively,
if the tag_columns
and field_columns
options are used,
each combination of values defined in tag_columns
and field_columns
define a series.
Pivot Format
format = "pivot"
The pivot format expect to find a Delta Lake at the given URI
.
The first column in the lake is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.
Elasticsearch
Sources with type = "elasticsearch"
configure Elasticsearch sources.
[source.<name>]
type = "elasticsearch"
scheme = "http | https"
host = "localhost"
port = 9200
index = ""
metadata_index = ""
list_query = ""
metadata_query = ""
tag_columns = ["series name"]
field_columns = ["value"]
metadata_columns = []
metadata_field_column = ""
timestamp_column = "ts"
metadata_mapping = ""
metadata_value_mapping = ""
query_timeout_seconds = 60
At least one of index
, list_query
or metadata_query
are required.
index
refers to the Elasticsearch index.
When index
is provided, the Query DSL is used to search time series and get data.
list_query
refers to a SQL query that can be used to search time series.
When list_query
is provided the Elasticsearch SQL query is used for search requests.
metadata_query
refers to a SQL query that can be used to get metadata of a time series.
When metadata_query
is provided the Elasticsearch SQL query is used for metadata requests.
metadata_index
is optional and defaults to index
if not provided.
It can be used to refer to a different Elasticsearch index for listing time series and their metadata.
timestamp_column
is an optional parameter used to define the name
of the database column that contains timestamps for the series.
Defaults to "ts"
.
Listing time series in Elasticsearch is supported only when the tag_columns
and field_columns
or metadata_field_column
parameters are specified.
For example:
tag_columns = [
"deviceId",
"plant",
"location",
]
Metadata columns can be defined as a list in the metadata_columns
parameter.
For example:
metadata_columns = [
"unit",
]
Field columns can be defined as a list in the field_columns
parameter.
For example:
field_columns = [
"temperature",
]
Columns in a metadata often do not match the names of metadata fields in Kukur.
An optional metadata_mapping
maps Kukur field names to column names.
Example:
[source.<name>]
metadata_mapping = "example_metadata_mapping"
[metadata_mapping.example_metadata_mapping]
description = "DESCRIPTION"
unit = "ENG_UNITS"
Fields that are not included in the mapping,
such as functional lower limit
in the example,
translate to the corresponding metadata field or are skipped altogether.
Metadata mappings can be shared between sources.
Values in a metadata column can also be different.
The optional metadata_value_mapping
maps Kukur metadata field values to values as they appear in a source.
Example:
[source.<name>]
metadata_value_mapping = "example_value_mapping"
[metadata_value_mapping.example_value_mapping."interpolation type"]
LINEAR = "linear"
STEPPED = "stepped"
[metadata_value_mapping.example_value_mapping."data type"]
FLOAT64 = ["int16", "int32"]
In this example,
when the interpolation type
column contains the value linear
,
Kukur will interpret it as the expected uppercase LINEAR
.
When the data type
column contains either int16
or int32
,
Kukur will interpret it as FLOAT64
.
metadata_mapping
and metadata_value_mapping
can be used together
to map wildly different metadata formats to a format supported by Kukur.
Credentials can be added in a separate credential config.
For example:
[source.<name>.credentials]
username = ""
password = ""
api_key = ""
Either username and password needs to be provided or the encoded value of the api key provided by Elasticsearch.
query_timeout_seconds
configures the timeout of the API request to Elasticsearch.
InfluxDB
The InfluxDB data source is experimental. Time series are represented by stringifying the 'measurement name', 'tag keys and values' and 'field'. The SeriesSelector concepts needs to be refined through 'tags' and 'fields' to properly support InfluxDB. Furthermore, storing a password in the configuration should not be required. |
Sources with
configure InfluxDB sources.type = "influxdb"
Currently InfluxDB v1.8 is supported.
Configuration Template
[source.<name>]
type = "influxdb"
host = "<host IP / Address of the InfluxDB server>"
port = <port of the InfluxDB server>
ssl = <boolean to indicate if SSL needs to be used>
database = "<name of the database to use>"
username = "<username used to login into the database>"
password = "<password used to login into the database>"
Required fields
-
type
-
database
Connection
If host
, port
or ssl
are not provided, the default will be used.
-
host
(default: localhost) -
port
(default: 8086) -
ssl
(default: false)
Authentication
If the following fields are filled in, the client will connect with the filled in credentials:
-
username
-
password
Example configuration
[source.influxSource]
type = "influxdb"
host = "localhost"
port = 8086
database = "data"
username = "influxAdmin"
password = "password123"
Kukur
Sources with type = "kukur"
configure connections to other Kukur or Timeseer instances.
[source.<name>]
type = "kukur"
host = "<hostname or address of the other Kukur instance>"
port = "<port of the Kukur instance>"
source = "<the remote source>"
api_key_name= "<the name of the api key>"
api_key= "<the api key>"
source
is a required field.
It points to the remote source that will be queried.
The name of the local and the remote sources do not need to match.
source = "test"
host
and port
define the host and port where Kukur is running.
Defaults are:
host = "localhost"
port = 8081
The api_key_name
and api_key
can be created in Kukur using the CLI or in Timeseer under Configure > Global preferences > API keys
.
api_key_name= "test-key"
api_key= "PcRU3xy0dsVpX8CeKCO1WPqPevECB9mZAETdWngK7druSl23JFA0Rw"
Omit api_key_name
and api_key
when the [flight]
configuration has authentication = false
.
For example, this connects to the 'test' source on a local instance:
[source.arrow]
type = "kukur"
host = "localhost"
port = 8081
source = "test"
api_key_name= "test-key"
api_key= "PcRU3xy0dsVpX8CeKCO1WPqPevECB9mZAETdWngK7druSl23JFA0Rw"
ODBC
Sources with type = "odbc"
configure ODBC sources.
The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.
[source.<name>]
type = "odbc"
connection_string = "<ODBC connection_string>"
connection_string_path = "<path to connection string>"
autocommit = false
query_timeout_seconds = 0
query_timeout_enable = true
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the odbc driver>"
data_query_tags = [] # provide only a subset of tags to the data query
data_timezone = "<override or specify time zone of timestamps returned by the odbc driver>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
The examples given here operate on this schema:
create table Metadata (
name nvarchar(max),
description nvarchar(max),
units nvarchar(max),
dictionary_name nvarchar(max)
);
create table Dictionary (
name nvarchar(max),
value int,
label nvarchar(max)
);
create table Data (
name nvarchar(max),
ts datetime2,
value float(53),
);
Connection
The connection_string
can point to a DSN or be a direct driver connection.
connection_string = "DSN=kukur;UID=sa;PWD=Kukur!AI"
or
connection_string = "Driver={/usr/lib/libtdsodbc.so};Server=localhost;Port=1433;Database=TestData;UID=sa;PWD=Kukur!AI"
Alternatively, connection_string_path
can point to a file that contains the connection string.
Whitespace at the start and end of the connection string file is removed.
Some ODBC drivers do not support parameter binding.
Set query_string_parameters
to true
,
to use string interpolation of parameters in queries.
In that case use {}
to format parameters into queries.
In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}
.
Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.
query_timeout_seconds
defines the timeout on a query.
Default is 0,
no timeout.
Some drivers do not allow setting a timeout.
Disable it using query_timeout_enable = false
.
By default,
a transaction is created in which all queries run.
Some drivers do not support this.
A transaction will not be created when autocommit = true
.
Search
The list_query
is optional.
It returns a list of time series names found in the source.
When provided, it does not need a series to have been used in another context before it can be analyzed.
list_query = "select name from Metadata"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in columns are:
-
series name
(required) -
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all ODBC sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "odbc_lowercase"
[metadata_value_mapping.odbc_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
metadata_query = "select description, units from Metadata where name = ?"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = ["description", "unit"]
Supported types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "odbc_lowercase"
[metadata_value_mapping.odbc_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = ? and my_plant = ?
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary_query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
dictionary_query = "select value, label from Dictionary where name = ?"
The first column with the dictionary key can be any type that can be converted to an integer, even SQL_CHAR
.
The second column with the dictionary value should be a SQL_CHAR
or SQL_WCHAR
.
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts three parameters:
-
the name of the series (as
SQL_VARCHAR
) -
the start date of the time range to query data in (as
SQL_TYPE_TIMESTAMP
) -
the end date of the time range to query data in (as
SQL_TYPE_TIMESTAMP
)
data_query = "select ts, value from Data where name = ? and ts between ? and ?"
This query should return rows of two columns:
-
the timestamp of the data point (preferably as
SQL_TYPE_TIMESTAMP
) -
the value of the data point (preferably as
SQL_REAL
,SQL_FLOAT
orSQL_DOUBLE
)
When the return type of a column is of types SQL_CHAR
or SQL_WCHAR
,
It will try to convert to the expected type.
If the provider or data source does not accept SQL_TYPE_TIMESTAMP
, it can be formatted as a string.
The data_query_datetime_format
option accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
The data query can be read from a file by using data_query_path
instead of data_query
.
If the driver doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for the queries to the driver.
Example:
data_query_timezone = "UTC"
If the query or driver returns dates without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = ? and my_plant = ? and ts >= ? and ts < ?
"""
It is possible to pass only a subset of tags to the data query,
by using data_query_tags
.
[source.<name>]
data_query_tags = ["location"]
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = ? and ts >= ? and ts < ?
"""
Quality
There is a possibility to add a quality column.
In this case the data query changes:
data_query = "select ts, value, quality from Data where name = ? and ts between ? and ?"
Where quality
represents the column that contains the data point quality of the ODBC source.
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
PI Data Archive using Web API
Sources with type = "piwebapi-da"
configure connections to OSIsoft PI Data Archive using the PI Web API.
[source.<name>]
type = "piwebapi-da"
data_archive_uri = "The self-URI of the DataServer"
max_returned_items_per_call = 150000
verify_ssl = true
timeout_seconds = 60 # The maximum time to wait for a response
username = "" # optional: username for basic authentication
password = "" # optional: password for basic authentication
source
and data_archive_uri
are required fields.
data_archive_uri
is the URI of the DataServer
resource in PI Web API that corresponds to the Data Archive.
To get this value,
open https://pi.example.org/piwebapi/dataservers
and copy the Self
link from the response:
{
"Links": {},
"Items": [
{
"WebId": "F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk",
"Id": "6f40df05-d9cd-44f7-8d4a-3cb826d0f97d",
"Name": "mypiserver",
"Path": "\\\\PIServers[mypiserver]",
"IsConnected": true,
"ServerVersion": "3.4.440.477",
"ServerTime": "2022-02-18T15:10:08.8757629Z",
"Links": {
"Self": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk",
"Points": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk/points",
"EnumerationSets": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk/enumerationsets"
}
}
]
}
The PI Web API configuration limits the maximum number of items returned in a response to one request.
This limit is applied to the number of time series (PI Points) and to the number of values.
The default setting is 150000
.
Adapt the number in the Kukur configuration when it has been increased or decreased on the server.
Since the number of values returned in a data request is also limited by this setting,
ensure that the data_query_interval_seconds
option is set to a safe value.
For example,
a time series that is being recorded at 1 value per second,
will have 86400 values per day.
This means a safe value of data_query_interval_seconds
is 86400
when the default limit of 150000
is in place in PI Web API.
For a time series that is being recorded at 1 value per minute and can have 44640 values per month,
data_query_interval_seconds = 2678400
is a safe value.
Increase timeout_seconds
when responses could take longer than the already long 2 minutes.
Use with query_retry_count
and query_retry_delay
to work around flaky connections.
Set verify_ssl
to false
when PI Web API uses a self-signed certificate.
By default,
Kerberos authentication is tried.
When Basic authentication is required,
set the username
and password
fields in the configuration.
For example,
this defines a pi
source,
with the Web API running on pi.example.org
:
[source.pi]
type = "piwebapi-da"
data_archive_uri = "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk"
When the Web API is using a self-signed certificate and basic authentication, the configuration becomes:
[source.pi]
type = "piwebapi-da"
data_archive_uri = "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk"
verify_ssl = false
username = "auser"
password = "apassword"
Plugin
Sources with type = "plugin"
call out to a separate binary that implements all source interface operations.
[source.<name>]
type = "plugin"
cmd = ["/path/to/binary"]
extra = "foo"
cmd
is a required field.
All other fields are passed to the binary as provided.
cmd
can be either a simple string or a list of strings.
The binary will be called with one extra argument:
-
search
to list time series or metadata -
metadata
to get metadata for one time series -
data
to return Arrow data
Input is sent on standard input. Output is expected on standard output.
Any information sent by the binary to standard error is logged at the warning level.
Search
When called with the search
argument,
the binary will receive JSON on standard input.
{
"config": {},
"search": {
"source": "<source name>"
}
}
"config"
contains the source configuration.
The response written to standard output should contain:
{
"series": [],
"metadata": []
}
Both "series"
and "metadata"
are optional.
"series"
should contain a list of time series.
{
"source": "<source name>",
"tags": {
"tag-1": "tag-1-value",
"tag-2": "tag-2-value"
},
"field": "optional-field"
}
"metadata"
should contain a list of time series metadata dictionaries.
Time series included in "metadata"
should not appear in "series"
.
{
"series": {
"source": "<source name>",
"tags": {
"tag-1": "tag-1-value",
"tag-2": "tag-2-value"
},
"field": "optional-field"
},
"description": "<the description of the series>"
}
Kukur has built-in support for these metadata fields:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Metadata
When called with the metadata
argument,
the binary will receive JSON on standard input.
{
"config": {},
"metadata": {
"series": {
"source": "<source name>",
"tags": {
"tag-1": "tag-1-value",
"tag-2": "tag-2-value"
},
"field": "optional-field"
}
}
}
"config"
contains the source configuration.
"metadata"
contains the "series"
to return metadata for.
The response written to standard output should contain metadata fields:
{
"description": "<the description of the series>"
}
Kukur has built-in support for these metadata fields:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Note that metadata
will generally not be called when metadata is returned by search
.
Data
When called with the data
argument,
the binary will receive JSON on standard input.
{
"config": {},
"data": {
"series": {
"source": "<source name>",
"tags": {
"tag-1": "tag-1-value",
"tag-2": "tag-2-value"
},
"field": "optional-field"
},
"startDate": "YYYY-MM-DDTHH:MM:SS+00:00",
"endDate": "YYYY-MM-DDTHH:MM:SS+00:00",
}
}
"config"
contains the source configuration.
"metadata"
contains the "series"
to return metadata for.
The "startDate"
and "endDate"
fields are formatted per RFC3339.
The response written to standard output should be in the Apache Arrow IPC Streaming Format.
The schema of the record batches should contain 2 or 3 columns.
A column with the name ts
contains the timestamps of the data values.
Kukur will try to convert timestamps to Timestamp[unit: us, timezone: UTC]
.
A column with the name value
contains the data values.
An optional column quality
contains quality flags for the data values.
0
is considered bad,
1
is considered good.
To support other quality values, a quality mapping can be configured for the source as provided by the source documentation .
Example
This example shows the basic structure for a type = "plugin"
binary.
[source.Plugin]
type = "plugin"
cmd = ["/home/myuser/dev/timeseer/kukur/venv/bin/python3", "data/plugin/plugin.py"]
quality_mapping = "plugin_quality"
[quality_mapping.plugin_quality]
GOOD = ["GOOD"]
cmd
points to a Python interpreter in a virtualenv where the PyArrow library has been installed.
Since quality flags are provided as text,
a quality_mapping
has been defined.
import json
import sys
from datetime import datetime
from pyarrow import Table, ipc
def _run() -> None:
"""Perform the action requested by the first argument."""
if sys.argv[1] == "search":
_search()
if sys.argv[1] == "metadata":
_metadata()
if sys.argv[1] == "data":
_data()
sys.exit(0)
def _search() -> None:
"""List all time series (or metadata)."""
query = json.load(sys.stdin)
print(query, file=sys.stderr)
source_name = query["search"]["source"]
data: dict = {
"metadata": [
{
"series": {"source": source_name, "tags": {"series name": "test"}},
"description": "Test series",
}
],
"series": [{"source": source_name, "tags": {"series name": "test-2"}}],
}
json.dump(data, sys.stdout)
def _metadata() -> None:
"""Return metadata for the time series received on stdin."""
query = json.load(sys.stdin)
print(query, file=sys.stderr)
source_name = query["metadata"]["series"]["source"]
series_name = query["metadata"]["series"]["tags"]["series name"]
data: dict = {"description": f"Description of {series_name} ({source_name})"}
json.dump(data, sys.stdout)
def _data() -> None:
"""Return Arrow IPC with that for the time series received on stdin."""
query = json.load(sys.stdin)
print(query, file=sys.stderr)
source_name = query["data"]["series"]["source"]
series_name = query["data"]["series"]["tags"]["series name"]
start_date = datetime.fromisoformat(query["data"]["startDate"])
end_date = datetime.fromisoformat(query["data"]["endDate"])
table = Table.from_pydict(
{"ts": [start_date, end_date], "value": [0, 42], "quality": ["BAD", "GOOD"]}
)
with ipc.new_stream(sys.stdout.buffer, table.schema) as writer:
writer.write_table(table)
if __name__ == "__main__":
_run()
PostgreSQL
Sources with type = "postgresql"
configure PostgreSQL sources.
odbc
sources can also connect to PostgreSQL,
provided the PostgreSQL ODBC driver is available.
Many other database systems support the PostgreSQL wire format. This source can be used to connect to them as well.
pg8000
requires connection options provided in .connection
.
psycopg
requires a connection_string
or connection_string_path
.
The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.
[source.<name>]
type = "postgresql"
connection_string = "<postgresql connection_string>" # for psycopg only
connection_string_path = "<path to connection string>" # for psycopg only
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the database>"
data_query_tags = [] # provide only a subset of tags to the data query
data_timezone = "<override or specify time zone of timestamps returned by the database>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
provider = "pg8000"
# for pg8000 only
[source.<name>.connection]
user = ""
host = ""
port= 5432
password = ""
The examples given here operate on this schema:
create table Metadata (
id serial,
name text not null,
description text,
units text,
dictionary_name text,
);
create table Dictionary (
id serial,
name text not null,
value integer not null,
label text not null
);
create table Data (
id serial,
name text not null,
ts timestamp with time zone,
value double precision
);
Connection
Two providers require different connection configurations:
pg8000
Connection options need to be provided in .connection
.
[source.<name>.connection]
user = "postgres"
host = "localhost"
port= 5432
password = "Timeseer!AI"
Psycopg
The connection_string
supports the keywords supported by libpq
.
connection_string = "host=localhost port=5432 dbname=postgres user=postgres password=Timeseer!AI"
Alternatively, connection_string_path
can point to a file that contains the connection string.
Whitespace at the start and end of the connection string file is removed.
libpq
also supports reading passwords from a file using the passfile
parameter.
Use this to securely inject passwords into a container.
Search
The list_query
is optional.
It returns a list of time series found in the source.
list_query = "select name from Metadata"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in metadata fields are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all PostgreSQL sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "pg_lowercase"
[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
metadata_query = "select description, units from Metadata where name = %s"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = ["description", "unit"]
Supported types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "pg_lowercase"
[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = %s and my_plant = %s
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary_query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
dictionary_query = "select value, label from Dictionary where name = %s"
The first column with the dictionary key can be any type that can be converted to an integer, even SQL_CHAR
.
The second column with the dictionary value should be a SQL_CHAR
or SQL_WCHAR
.
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts multiple parameters:
-
each tag value for the tags defined in
tag_columns
-
the start date of the time range to query data in (as a
timestamp with time zone
) -
the end date of the time range to query data in (as a
timestamp with time zone
)
data_query = "select ts, value from Data where name = %s and ts between %s and %s"
This query should return rows of two columns:
-
the timestamp of the data point (preferably as
timestamp with time zone
) -
the value of the data point (preferably as
double precision
,integer
ortext
)
If the database table does not accept timestamp with time zone
,
it can be formatted as a string.
The data_query_datetime_format
option accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
The data query can be read from a file by using data_query_path
instead of data_query
.
If the database table doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for the queries to the driver.
Example:
data_query_timezone = "UTC"
If the query returns dates without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided to the data query in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = %s and my_plant = %s and ts >= %s and ts < %s
"""
It is possible to pass only a subset of tags to the data query,
by using data_query_tags
.
[source.<name>]
data_query_tags = ["location"]
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = %s and ts >= %s and ts < %s
"""
Quality
A quality column can be returned for each data point.
In this case the data query changes:
data_query = "select ts, value, quality from Data where name = %s and ts between %s and %s"
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
Redshift
Sources with type = "redshift"
configure Redshift sources.
Redshift sources connect to Redshift using the options provided in .connection
:
[source.<name>]
type = "redshift"
[source.<name>.connection]
region = ""
host = ""
database = ""
iam = true
This example configuration is sufficient to connect from an EC2 instance that has an IAM role with sufficient permissions:
[source.myns.connection]
iam = true
host = "myns-1234567890.eu-west-1.redshift-serverless.amazonaws.com"
database = "dev"
All connection parameters for the Python Redshift driver are accepted.
Next to the .connection
properties,
Redshift sources support these properties:
[source.<name>]
type = "redshift"
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the database>"
data_query_tags = [] # provide only a subset of tags to the data query
data_timezone = "<override or specify time zone of timestamps returned by the database>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
The examples given here operate on this schema:
create table Metadata (
id integer identity,
name text not null,
description text,
units text,
dictionary_name text
);
create table Dictionary (
id integer identity,
name text not null,
value integer not null,
label text not null
);
create table Data (
id integer identity,
name text not null,
ts timestamptz not null
value double precision not null
);
Search
The list_query
is optional.
It returns a list of time series found in the source.
list_query = "select name from Metadata"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in metadata fields are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all Redshift sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "pg_lowercase"
[metadata_value_mapping.redshift_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
This query is generally not required when the list
query returns metadata.
metadata_query = "select description, units from Metadata where name = %s"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = ["description", "unit"]
Supported types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "pg_lowercase"
[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = %s and my_plant = %s
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary_query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
dictionary_query = "select value, label from Dictionary where name = %s"
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts multiple parameters:
-
each tag value for the tags defined in
tag_columns
-
the start date of the time range to query data in (as a
timestamp with time zone
) -
the end date of the time range to query data in (as a
timestamp with time zone
)
data_query = "select ts, value from Data where name = %s and ts between %s and %s"
This query should return rows of two columns:
-
the timestamp of the data point (preferably as
timestamp with time zone
) -
the value of the data point (preferably as
double precision
,integer
ortext
)
If the database table does not accept timestamp with time zone
,
it can be formatted as a string.
The data_query_datetime_format
option accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
The data query can be read from a file by using data_query_path
instead of data_query
.
If the database table doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for the queries to the driver.
Example:
data_query_timezone = "UTC"
If the query returns dates without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided to the data query in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = %s and my_plant = %s and ts >= %s and ts < %s
"""
It is possible to pass only a subset of tags to the data query,
by using data_query_tags
.
[source.<name>]
data_query_tags = ["location"]
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = %s and ts >= %s and ts < %s
"""
Quality
A quality column can be returned for each data point.
In this case the data query changes:
data_query = "select ts, value, quality from Data where name = %s and ts between %s and %s"
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
SQLite
Sources with type = "sqlite"
configure SQLite sources.
The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.
[source.<name>]
type = "sqlite"
connection_string = "<SQLite connection_string>"
connection_string_path = "<path to connection string>"
query_timeout_seconds = 0
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps>"
data_query_tags = [] # provide only a subset of tags to the data query
data_timezone = "<override or specify time zone of timestamps returned by the data query>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300 # number of rows analysed to determine the type of the value column
The examples given here operate on a SQLite database with three tables:
create table Metadata (
id integer primary key autoincrement,
name text not null,
description text,
units text,
interpolationType text,
dataType text,
dictionaryName text
);
create table Dictionary (
id integer primary key autoincrement,
name text not null,
value real not null,
label text not null
);
create table Data (
id integer primary key autoincrement,
name text not null,
ts datetime not null,
value real
);
Connection
The connection_string
contains either the SQLite database file name or the URI to the database.
connection_string = "data/db.sqlite"
Alternatively, connection_string_path
can point to a file that contains the connection string.
Whitespace at the start and end of the connection string file is removed.
query_timeout_seconds
defines the timeout on a query.
Default is 0,
no timeout.
Search
The list_query
is optional.
It returns a list of time series names found in the source.
When provided, it does not need a series to have been used in another context before it can be analyzed.
list_query = "select name from Metadata"
The query can be read from a file by using list_query_path
instead of list_query
.
The query can either return only series names or all metadata.
When it returns all metadata, include a list_columns
entry that describes all columns:
list_query = "select name, description, units, interpolationType, dataType, dictionaryName from Metadata"
list_columns = ["series name", "description", "unit", "interpolation type", "data type", "dictionary name"]
All columns defined in tag_columns
should be included in list_columns
.
All combinations of rows returned by the list_query
and values in field_columns
define a series.
Built-in metadata columns are:
-
series name
(required) -
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the list_columns
list.
Not all SQLite sources can map metadata field values to the values expected by Kukur.
Use metadata_value_mapping
to convert them.
Example:
[source.<name>]
metadata_value_mapping = "sqlite_lowercase"
[metadata_value_mapping.sqlite_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Metadata
The metadata_query
is a query that accepts one parameter for each tag in a series,
ordered by tag_columns
.
metadata_query = "select description, units, interpolationType, dataType, dictionaryName from Metadata where name = ?"
The columns in the result set should be mapped to a supported type of metadata.
The metadata_columns
entry contains a list with the positional mapping.
metadata_columns = ["description", "unit", "interpolation type", "data type", "dictionary name"]
Built-in types of metadata are:
-
description
-
unit
-
functional lower limit
-
functional upper limit
-
physical lower limit
-
physical upper limit
-
accuracy
-
accuracy percentage
-
interpolation type
(LINEAR
,STEPPED
) -
data type
(FLOAT32
,FLOAT64
,STRING
,DICTIONARY
,CATEGORICAL
) -
dictionary name
Custom metadata fields can be defined by including them in the metadata_columns
list.
The metadata query can be read from a file by using metadata_query_path
instead of metadata_query
.
Metadata values can be converted using metadata_value_mapping
.
Example:
[source.<name>]
metadata_value_mapping = "sqlite_lowercase"
[metadata_value_mapping.sqlite_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"
This example converts lowercase data types to the uppercase strings expected by Kukur.
Kukur supports the SQLite MATCH
operator.
Consider the following table that contains additional metadata about a sensor vendor per location:
create table Sensors (
id integer primary key autoincrement,
location text not null,
vendor text not null
);
If the location is part of the series name, the following query will return the sensor vendor:
select vendor from Sensors where location = (? match 'location=([^,]+)')
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = ? and my_plant = ?
"""
Dictionary
A dictionary maps numerical (integer) values to textual labels.
The dictionary query
is a query that accepts one parameter: the name of the dictionary.
The dictionary name for a series is returned by the dictionary name
list or metadata column.
dictionary_query = "select value, label from Dictionary where name = ?"
The query should return rows of two columns:
-
the numerical value that occurs in the data, in a type that can be converted to an integer
-
the label for the numerical value
The dictionary query can be read from a file by using dictionary_query_path
instead of dictionary_query
.
Data
The data_query
is a query that accepts three parameters:
-
the name of the series
-
the start date of the time range to query data
-
the end date of the time range to query data
data_query = "select ts, value from Data where name = ? and ts >= ? and ts < ?"
This query should return rows of two columns:
-
the timestamp of the data point
-
the value of the data point
It will try to convert columns to the expected type.
The data query can be read from a file by using data_query_path
instead of data_query
.
Kukur expects a table to contain ISO8601 formatted timestamps.
Alternatively, the start and end date can be formatted as a string.
The data_query_datetime_format
option accepts the formatting options supported by Python.
Example:
data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"
This converts timestamps to the ISO8601 format.
If the table doesn’t store timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone
option.
The request will use the converted timestamps as naive timestamps for the queries.
Example:
data_query_timezone = "UTC"
If the query returns dates without a time zone,
the time zone can be specified by the data_timezone
option.
Example:
data_timezone = "UTC"
The exact available time zones are system-dependent.
Set enable_trace_logging
to true
to log the fetched data before conversion.
enable_trace_logging = true
If the configuration defines tag_columns
,
they are provided in the same order as defined in tag_columns
.
[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = ? and my_plant = ? and ts >= ? and ts < ?
"""
It is possible to pass only a subset of tags to the data query,
by using data_query_tags
.
[source.<name>]
data_query_tags = ["location"]
If the configuration defines field_columns
,
the field is available as {field}
in the data_query
.
[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = ? and ts >= ? and ts < ?
"""
Quality
There is a possibility to add a quality column.
In this case the data query changes:
data_query = "select timestamp, value, quality from Data where name = ? and ts >= ? and ts < ?"
Where quality
represents the column that contains the data point quality.
Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.
Connectivity guides
Kukur can connect to many different time series data sources using the options detailed in the Sources reference. Concrete examples of complex integrations are documented here.
Connecting to Aspen InfoPlus.21
Kukur connects to AspenTech’s InfoPlus.21 data historian using the Aspen SQLplus for Aspen InfoPlus.21 ODBC driver.
First, create an ODBC data source for the IP.21 server.
Consider an ODBC data source AB
.
The basic Kukur configuration to connect is:
[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"
The next step is to configure a query to retrieve a list of time series and their metadata.
The IP.21 database is very flexible, with possibly multiple repeat areas per record. The exact query will need some knowledge about the local configuration.
This example will expose all time series defined by IP_AnalogDef
:
[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"
list_query = "SELECT NAME, IP_DESCRIPTION, IP_ENG_UNITS, IP_LOW_LOW_LIMIT, IP_LOW_LIMIT, IP_HIGH_LIMIT, IP_HIGH_HIGH_LIMIT, IP_DC_SIGNIFICANCE, IP_STEPPED,'FLOAT64' FROM IP_AnalogDef"
list_columns = ["series name", "description", "unit", "physical lower limit", "functional lower limit", "functional upper limit", "physical upper limit", "accuracy", "interpolation type", "data type"]
metadata_value_mapping = 'IP21_Mapping'
[metadata_value_mapping.IP21_Mapping.'interpolation type']
LINEAR = 'Interpolated'
STEPPED = 'Stepped'
Note that this assumes that all time series defined here have a data type of FLOAT64
.
A mapping table like the interpolation type mapping can map data types returned by SQLplus to data types known in Kukur.
Pay special attention to the configured limits.
The example assumes that both IP_LOW_LOW_LIMIT
and IP_LOW_LIMIT
have been configured.
The IP_LOW_LOW_LIMIT
maps to the physical lower limit
field in Kukur and represents the lower limit of the sensor.
The IP_LOW_LIMIT
maps to the functional lower limit
field in Kukur and represents the expected lower limit of the process.
The final step is to define queries to read data from time series.
The SQLplus ODBC driver does not support query parameters. Ensure the connection is read-only when untrusted input can be provided to Kukur.
Kukur formats timestamps as 2020-02-13T19:05:43+00:00
.
SQLplus does not accept all valid RFC3339/ISO8601 timestamps.
This means a custom data_query_datetime_format
is required,
where the time zone information is provided as Z
instead of +00:00
.
To avoid overloading the data historian, the time span of one query has been limited to one day of data. Kukur will split and recombine longer queries.
[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"
list_query = "SELECT NAME, IP_DESCRIPTION, IP_ENG_UNITS, IP_LOW_LOW_LIMIT, IP_LOW_LIMIT, IP_HIGH_LIMIT, IP_HIGH_HIGH_LIMIT, IP_DC_SIGNIFICANCE, IP_STEPPED,'FLOAT64' FROM IP_AnalogDef"
list_columns = ["series name", "description", "unit", "physical lower limit", "functional lower limit", "functional upper limit", "physical upper limit", "accuracy", "interpolation type", "data type"]
metadata_value_mapping = 'IP21_Mapping'
query_string_parameters = true
data_query = "SELECT ISO8601(IP_TREND_TIME),IP_TREND_VALUE FROM IP_AnalogDef.1 WHERE NAME='{}' AND IP_TREND_TIME >= '{}' AND IP_TREND_TIME < '{}' ORDER BY IP_TREND_TIME ASC"
data_query_datetime_format = "%Y-%m-%dT%H:%M:%SZ"
data_query_interval_seconds = 86400
[metadata_value_mapping.IP21_Mapping.'interpolation type']
LINEAR = 'Interpolated'
STEPPED = 'Stepped'
Note that SQLplus also provides the HISTORY
table that allows a more generic way to read data.
Connecting to Azure Data Lakes using Azure Synapse Analytics
There is no one-size-fits-all approach to accessing time series data stored in containers in Azure general purpose v2 Storage Accounts (Azure Data Lake). Data is typically stored in blobs, organised using the Data Lake Storage Gen2 Hierarchical Namespace feature. Both the content of the blobs and their organisation is user defined.
For example, one data lake could contain one Parquet file with all time series data points for one hour,
organised as <year>/<month>/data-<day>-<hour>.parquet
,
while another stores one CSV file per day for each time series,
as <month>-<day>-<year>/<series name>.csv
.
Kukur and other time series analytics tooling expects data sources to respond to queries that ask data for one time series in a given time period. Mapping this query to the file format and organisation of a data lake is the domain of specialized tooling, a data lake engine, such as Azure Synapse Analytics serverless SQL pool, AWS Athena or Dremio. Structured views on the - to anyone but the human eye - unstructured data in the data lake are maintained in these engines. Kukur connects to and queries the data lake engine for time series data, which determines which blobs (files) to query and how to transform the content of these files to a single virtualized columnar representation.
Example
Consider an Azure Synapse workspace ws-timeseer
, connected to an Azure Data Lake Storage Gen2 storage account sasynapsetimeseer
.
The storage accounts contains a blob storage container fs-synapse
that has the 'Hierarchical Namespace' feature enabled.
Inside the container, time series data is stored in Parquet files.
Each Parquet file contains three columns:
-
series name: string
-
ts: timestamp[us, tz=UTC]
-
value: double
series name ts value
0 test-tag-1 2021-01-01 00:00:00+00:00 0.5
1 test-tag-2 2021-01-02 00:00:00+00:00 100.0
2 test-tag-1 2021-01-03 00:00:00+00:00 1.5
3 test-tag-2 2021-01-04 00:00:00+00:00 -100.0
The Parquet files in this example are organized in directories per year. Inside each directory, one file contains one month of time series data.
Creating a view
In the Synapse Analytics workspace,
all data can be virtualized using the OPENROWSET
function and wildcard matching in the path that is provided to it.
SELECT
"series name", ts, value
FROM
OPENROWSET(
BULK 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse/historian-data/year=*/lake-*.parquet',
FORMAT='PARQUET'
) AS [result]
ORDER BY ts, "series name"
Running this query returns the expected results:
Directly using the previous query in Kukur and other tooling is not a good idea.
The query would need to be updated everywhere it is used when small changes to the underlying storage are made.
A SQL VIEW
hides the internal complexity from any consumers.
VIEW
s cannot be created inside the master
database that is available by default.
A new HistorianData
database will contain the Timeseries
view.
CREATE DATABASE HistorianData;
go
USE HistorianData;
go
DROP VIEW IF EXISTS Timeseries;
go
CREATE VIEW Timeseries
as
SELECT
"series name", ts, value
FROM
OPENROWSET(
BULK 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse/historian-data/year=*/lake-*.parquet',
FORMAT='PARQUET'
) AS [result];
This results in the straightforward query we set out to find:
select ts, value
from Timeseries
where "series name" = 'test-tag-1'
and ts between '2021-01-01T00:00:00Z' and '2021-02-01T00:00:00Z'
order by ts
Authentication using Managed Identities
Several authentication methods are available when connecting to Azure Synapse from Kukur.
In this example,
Kukur is running on a Windows VM with a Managed Identity that will be used to authenticate to Synapse.
Furthermore,
by using a database scoped credential SynapseIdentity
,
anyone allowed to access the view will have access to the underlying data.
USE HistorianData;
go
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<STRONG PASSPHRASE>';
go
CREATE DATABASE SCOPED CREDENTIAL SynapseIdentity
WITH IDENTITY = 'Managed Identity';
A database scoped credential cannot be used directly in a query, but must be part of a data source.
USE HistorianData;
go
CREATE EXTERNAL DATA SOURCE HistorianDataSource
WITH ( LOCATION = 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse',
CREDENTIAL = SynapseIdentity
)
The Timeseries
view needs to be updated to use the newly created data source.
USE HistorianData;
go
DROP VIEW IF EXISTS Timeseries;
go
CREATE VIEW Timeseries
as
SELECT
"series name", ts, value
FROM
OPENROWSET(
BULK 'historian-data/year=*/lake-*.parquet',
DATA_SOURCE = 'HistorianDataSource',
FORMAT='PARQUET'
) AS [result];
The Windows VM that is running Kukur has the Managed Identity ts-windows
.
A database user needs to be created for it and permissions for bulk operations,
the SynapseIdentity
credential and the Timeseries
view need to be granted.
USE HistorianData;
go
CREATE USER [ts-windows] FROM EXTERNAL PROVIDER
GRANT ADMINISTER DATABASE BULK OPERATIONS TO [ts-windows]
GRANT CONTROL ON DATABASE SCOPED CREDENTIAL :: SynapseIdentity to [ts-windows]
GRANT SELECT ON Object::dbo.[Timeseries] to [ts-windows]
Kukur uses the ODBC Driver for SQL Server to connect to Azure Synapse. Download and install it.
Create a new ODBC data source in Kukur that connects using the ODBC driver to the SQL endpoint of the Synapse workspace.
The connection string includes Authentication=ActiveDirectoryMsi
to use the Managed Identity.
[source.synapse]
type = "odbc"
connection_string = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:ws-timeseer-ondemand.sql.azuresynapse.net,1433;Database=HistorianData;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;Authentication=ActiveDirectoryMsi"
data_query = "select ts, value from Timeseries where \"series name\" = ? and ts between ? and ? order by ts"
data_timezone = "UTC"
Running this gives the expected result:
(venv) PS C:\..\kukur> python -m kukur.cli test data --source synapse --name test-tag-1 --start 2021-01-01 --end 2021-02-01
2021-04-29 09:44:08,589 INFO kukur.source.test MainThread : Requesting data for "test-tag-1 (synapse)" from 2021-01-01 00:00:00 to 2021-02-01 00:00:00
2021-01-01T00:00:00+00:00,0.5
2021-01-03T00:00:00+00:00,1.5
2021-02-01T00:00:00+00:00,0.5
Optimizing data access
In order to answer queries for the Timeseries
view,
Synapse needs to scan all files.
This is not cost and time efficient.
Azure Synapse supports using file name information in queries. It is a good idea to organise the storage in the data lake to take advantage of this.
Connecting to GE Proficy Historian
Kukur connects to GE Proficy using the Proficy OLE DB Provider. The OLE DB Provider needs to be installed as part of the Proficy client tools.
Kukur initiates OLEDB connections using the ADODB source type.
To connect to the default Proficy Historian, use the following connection_string
:
[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="
Metadata about time series is available in the ihTags
table.
It is required to map the values for data type
and interpolation type
defined in Proficy to those known in Kukur.
Additionally, RowCount = 0
needs to be added to the where
-clause,
since by default only 5000 rows are returned by the OLEDB provider.
[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="
list_query = "select Tagname, Description, EngUnits, DataType, StepValue, LoEngineeringUnits, HiEngineeringUnits from ihtags where RowCount = 0"
list_columns = ["series name", "description", "unit", "data type", "interpolation type", "physical lower limit", "physical upper limit"]
metadata_value_mapping = "PROFICY_mapping"
[metadata_value_mapping.PROFICY_mapping."data type"]
FLOAT64 = ["Scaled", "SingleFloat", "DoubleFloat", "SingleInteger", "DoubleInteger", "Quad Integer", "Unsigned Single Integer", "Unsigned Double Integer", "Unsigned Quad Integer", "USingleInteger", "UDoubleInteger", "UQuadInteger"]
STRING = ["Boolean", "FixedString", "VariableString", "Byte"]
[metadata_value_mapping.PROFICY_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1
Time zone handling is crucial when requesting data.
Kukur uses timestamps in UTC by default.
Using the timezone = '0'
where
-clause ensures that Proficy also interprets and returns dates in UTC.
The OLEDB provider does not accepts the timestamps as provided by Kukur through pywin32
,
so a conversion to a date time string is required.
The complete configuration is:
[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="
list_query = "select Tagname, Description, EngUnits, DataType, StepValue, LoEngineeringUnits, HiEngineeringUnits from ihtags where RowCount = 0"
list_columns = ["series name", "description", "unit", "data type", "interpolation type", "physical lower limit", "physical upper limit"]
metadata_value_mapping = "PROFICY_mapping"
data_query = "select TimeStamp, Value from ihRawData where RowCount = 0 and SamplingMode = 'RawByTime' and timezone = '0' and Tagname = ? and TimeStamp >= ? and TimeStamp < ? order by Timestamp asc"
data_query_interval_seconds = 86400
data_query_datetime_format = "%Y-%m-%d %H:%M:%S"
data_timezone = "UTC"
[metadata_value_mapping.PROFICY_mapping."data type"]
FLOAT64 = ["Scaled", "SingleFloat", "DoubleFloat", "SingleInteger", "DoubleInteger", "Quad Integer", "Unsigned Single Integer", "Unsigned Double Integer", "Unsigned Quad Integer", "USingleInteger", "UDoubleInteger", "UQuadInteger"]
STRING = ["Boolean", "FixedString", "VariableString", "Byte"]
[metadata_value_mapping.PROFICY_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1
Connecting to OSIsoft PI
Kukur connects to OSIsoft PI using the PI OLEDB Provider.
The basic configuration to connect to a PI server PIADB
is:
[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB"
To list time series and collect metadata, use:
[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB"
list_query = "select tag, descriptor, engunits, zero, zero + span, compdev, pointtype, step, digitalset from pipoint2"
list_columns = ["series name", "description", "unit", "physical lower limit", "physical upper limit", "accuracy", "data type", "interpolation type", "dictionary name"]
metadata_value_mapping = "PI_mapping"
dictionary_query = "select code, name from pids where digitalset = ?"
[metadata_value_mapping.PI_mapping."data type"]
FLOAT64 = ["R", "I", " "]
STRING = "S"
DICTIONARY = "D"
[metadata_value_mapping.PI_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1
The example above uses the OSIsoft recommended interpretation of the zero
and span
PI point attributes.
Update the calculation of the physical lower limit
and physical upper limit
as needed when diverging from this.
The ISO8601/RFC3339 timestamps provided by Kukur to the OLEDB driver when requesting data are not natively accepted.
They need to be converted to a format accepted by PI using data_query_datetime_format
.
To avoid overloading the data historian, the time span of one query has been limited to one day of data. Kukur will split and recombine longer queries.
[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB;Time Zone = UTC"
list_query = "select tag, descriptor, engunits, zero, zero + span, compdev, pointtype, step, digitalset from pipoint2"
list_columns = ["series name", "description", "unit", "physical lower limit", "physical upper limit", "accuracy", "data type", "interpolation type", "dictionary name"]
metadata_value_mapping = "PI_mapping"
dictionary_query = "select code, name from pids where digitalset = ?"
data_query = "select time, value from picomp2 where tag = ? and time >= ? and time < ?"
data_query_datetime_format = "%Y-%m-%d %H:%M:%S"
data_query_interval_seconds = 86400
data_query_timezone = "UTC"
data_timezone = "UTC"
[metadata_value_mapping.PI_mapping."data type"]
FLOAT64 = ["R", "I", " "]
STRING = "S"
DICTIONARY = "D"
[metadata_value_mapping.PI_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1
The PI OLEDB provider expects and produces timestamps in the local time zone by default.
Adding the Time Zone = UTC
parameter to the connection string and configuring the data_query_timezone
and data_timezone
properties ensures that timestamps are always interpreted as UTC.
This avoids problems around daylight savings time changes,
which are unavoidable when using local times.
Connecting to Databricks SQL Warehouses on Azure using Managed Identities
Connections to Databricks SQL Warehouses from Kukur are made using the Databricks ODBC Driver.
This guide assumes Kukur is running as a container on an Azure VM with a System Assigned Managed Identity. The Managed Identity will be used to connect securely to the SQL Warehouse.
The first step is to build a container image that includes the ODBC driver.
An example Dockerfile
is available.
Creating a Service Principal in Databricks
The Managed Identity of the VM needs to be defined as a Service Principal in Databricks.
The full documentation to achieve this is available at Set up and use Azure managed identities authentication for Azure Databricks automation
The required steps are:
-
Go to the VM in the Azure portal. Copy the
Object ID
of the VM Identity. -
Go to Entra ID. Search for the
Object ID
copied in the previous step. Copy theApplication ID
. -
Go to the Databricks Account console.
-
In
User management
-Service principals
, chooseAdd service principal
. -
Choose
Microsoft Entra ID managed
. Paste theApplication ID
. Give the service principal a name. -
Go to
SQL Warehouses
in the Databricks Workspace. AddCan use
permissions to the service principal. -
In the
Catalog
, grantData Reader
permissions to the service principal.
Configuring Kukur
First open the Azure Portal.
Navigate to the Properties
of the Azure Databricks Service
.
Copy the Id
.
Open the SQL Warehouse in the Databricks Workspace.
Go to Connection details
.
Copy the Server hostname
and the HTTP path
.
This example below connects to adb-5136731089164599.19.azuredatabricks.net
at /sql/1.0/warehouses/2cf2d4cd375bb81a
.
The Id
of the Workspace is: /subscriptions/4eaf5c02-76be-4f45-b92a-d5882e686c95/resourceGroups/rg-kukur/providers/Microsoft.Databricks/workspaces/kukur-demo
.
A connection is made to the table at tsaideltalake.poc.tsai_antwerp
.
[source.Databricks]
type = "odbc"
connection_string = """
Driver=/opt/simba/spark/lib/64/libsparkodbc_sb64.so;
Host=adb-5136731089164599.19.azuredatabricks.net;
Port=443;
HTTPPath=/sql/1.0/warehouses/2cf2d4cd375bb81a;
SSL=1;
ThriftTransport=2;
AuthMech=11;
Auth_Flow=3;
Azure_workspace_resource_id=/subscriptions/4eaf5c02-76be-4f45-b92a-d5882e686c95/resourceGroups/rg-kukur/providers/Microsoft.Databricks/workspaces/kukur-demo;
"""
autocommit = true
list_query = """
select distinct `series name` from tsdeltalake.poc.tsai_antwerp
"""
list_columns = ["series name"]
data_query = """
select ts, value
from tsdeltalake.poc.tsai_antwerp
where `series name` = ?
and ts >= ?
and ts < ?
"""
It is important to set autocommit = true
since the Spark ODBC driver does not support transactions.
Refer to Authentication settings for the Databricks ODBC Driver for alternative authentication options.