What is Kukur?

Kukur makes time series data and metadata available to the Apache Arrow ecosystem. Kukur can be used as a Python library or as a standalone application that exposes an Arrow Flight interface.

Kukur is under active development. Breaking changes to the interfaces are possible.

Kukur uses semantic versioning. While < 1.0.0, changes to the minor version indicate breaking changes.

Potential usage scenarios are:

  • ad-hoc in a data project

  • as a time series data integration hub on your own system

  • as a centrally managed time series data integration hub

  • as a library in a Python application that needs time series data

Getting Started

This example shows how to:

  • run Kukur in a data project,

  • expose a CSV file through it

  • and connect to it using the Kukur client.

The only prerequisite is a working Python 3 installation. Minor changes to the shell commands, but not the Kukur configuration, are required depending on your OS.

Installation

First create a new directory and enter it:

$ mkdir data-project
$ cd data-project

Create a Python virtualenv and activate it:

$ python -m venv venv
$ source venv/bin/activate

Install Kukur and list the command line options to verify that the installation was OK:

(venv) $ pip install kukur
(venv) $ kukur --help
usage: kukur [-h] [--config-file CONFIG_FILE] {flight,test,api-key} ...

Start Kukur.

positional arguments:
  {flight,inspect,test,api-key}
                        Select the CLI action
    flight              Enable the Arrow Flight interface (the default)
    inspect             List resources in a blob store and determine their schema
    test                Test data source connectivity
    api-key             Create an api key for the Arrow Flight interface

optional arguments:
  -h, --help            show this help message and exit
  --config-file CONFIG_FILE
                        Path to the configuration file

Kukur supports "extras" at install time. Use these to install requirements for the given source.

  • [adodb] for connections to ADODB and OLEDB data sources

  • [cratedb] for connections to CrateDB

  • [datafusion] for connections using Apache Arrow DataFusion

  • [delta] for connections to Delta Lakes

  • [influxdb] for connections to InfluxDB

  • [inspect] for inspection of storage containers

  • [kusto] for connections to Azure Data Explorer

  • [odbc] for connections to ODBC data sources

  • [piwebapi] for connections to PI Web API

  • [postgresql] for connections to PostgreSQL

  • [redshift] for connections to Redshift

For example:

(venv) $ pip install kukur[adodb,odbc]

Configuration

Kukur connects to many different local or remote time series sources. A local CSV file will be used in this example.

Create a directory data/:

(venv) $ mkdir data

Add the CSV data in data/example.csv:

data/example.csv

outside-temperature,2020-01-02T00:00:00Z,1
outside-temperature,2020-01-02T01:00:00Z,2
outside-temperature,2020-01-02T02:00:00Z,3

The next step is to configure a Kukur data source that exposes this CSV to any client.

Kukur uses TOML as its configuration language. Create Kukur.toml in the root folder of your project. Define a source called 'example' that exposes the data/example.csv CSV:

Kukur.toml

[source.example]
type = "csv"
path = "data/example.csv"

Use the Kukur CLI to test connectivity:

(venv) $ kukur test data \
    --source example \
    --name outside-temperature \
    --start 2020-01-01 \
    --end 2021-01-01
2021-03-29 11:12:37,855 INFO kukur.source.test MainThread : Requesting data for "outside-temperature (example)" from 2020-01-01 00:00:00 to 2021-01-01 00:00:00
2020-01-02T00:00:00+00:00,1
2020-01-02T01:00:00+00:00,2
2020-01-02T02:00:00+00:00,3

The Kukur CLI logs to stderr, while the data itself is printed to stdout. The test CLI command can thus be (ab-)used to extract data from any configured data source as CSV.

Now, having this data is useful, but where Kukur sets itself apart is that it also provides an opinionated interface for metadata.

Is the outside temperature defined in Kelvin? Unless we’re probing a spacecraft at the dark side of the moon, this is unlikely, but there is no way to know.

Our thermometer probably has physical measurement limits as well. When values outside the -20 deg C to 60 deg C scale that this particular thermometer supports appear, using them is probably not a good idea.

Similarly, the person that was writing down the measurements is not able to read the values with infinite accuracy. At best, there will be a 0.5 deg C accuracy margin for any measurement.

Many time series sources expose this kind of metadata and Kukur can read it.

Let’s create another CSV file:

data/example-metadata.csv

series name,description,unit,physical lower limit,physical upper limit,accuracy
outside-temperature,Temperature in Antwerp,deg C,-20,60,0.5

Kukur can mix-and-match metadata. For example, data can be stored in an InfluxDB database, while descriptions of the measurements are stored in a CSV file, but the sensor limits are stored in MS SQL database.

Update the configuration:

Kukur.toml

[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"

Request the metadata using the CLI:

(venv) $ kukur test metadata \
    --source example \
    --name outside-temperature
2021-03-29 11:41:48,936 INFO kukur.source.test MainThread : Requesting metadata for "outside-temperature (example)"
series name,description,unit,physical lower limit,physical upper limit,functional lower limit,functional upper limit,accuracy,interpolation type,data type,dictionary name,dictionary
outside-temperature,Temperature in Antwerp,deg C,-20.0,60.0,,,0.5,,,,

Many fields are blank because our CSV file did not contain them.

The interpolation type for example is a very important piece of metadata. When resampling values of multiple time series to the same timestamps, using linear interpolation most likely results in different values than using stepwise interpolation.

Using the Kukur Client

Now, having validated the Kukur configuration, let’s start the Kukur server:

(venv) $ kukur

Open another shell, enter the virtualenv, start Python and import all Kukur objects:

$ source venv/bin/activate
(venv) $ python
Python 3.9.2 (default, Feb 20 2021, 18:40:11)
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kukur import *

Let’s try to request the metadata:

>>> client = Client()
>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "data-project/venv/lib/python3.9/site-packages/kukur/client.py", line 73, in get_metadata
    results = list(
  File "pyarrow/_flight.pyx", line 1239, in do_action
  File "pyarrow/_flight.pyx", line 66, in pyarrow._flight.check_flight_status
pyarrow._flight.FlightUnauthenticatedError: gRPC returned unauthenticated error, with message: invalid token. Detail: Unauthenticated

Kukur is secure by default and does not allow unauthenticated access. Since we’re running Kukur locally, it’s OK to enable anonymous access.

First stop Kukur. Then add a [flight] section to the configuration in Kukur.toml:

Kukur.toml

[flight]
authentication = false

[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"

Restart Kukur:

(venv) $ kukur

Alternatively, use kukur api-key to define local API keys.

Now, go back to the Python session and request the metadata:

>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Metadata(SeriesSelector(source='example', name='outside-temperature'), {'description': 'Temperature in Antwerp', 'unit': 'deg C', 'physical lower limit': -20.0, 'physical upper limit': 60.0, 'functional lower limit': None, 'functional upper limit': None, 'accuracy': 0.5, 'interpolation type': None, 'data type': None, 'dictionary name': None, 'dictionary': None})

Finally, read the data:

>>> from datetime import datetime
>>> client.get_data(
        SeriesSelector('example', 'outside-temperature'),
        datetime.fromisoformat('2020-01-01T00:00:00+00:00'),
        datetime.fromisoformat('2021-01-01T00:00:00+00:00'),
    )
pyarrow.Table
ts: timestamp[us, tz=UTC]
value: int64

Data is always returned as an Apache Arrow table with two columns: a timestamp and a value.

>>> table = _
>>> table.to_pydict()
{'ts': [datetime.datetime(2020, 1, 2, 0, 0, tzinfo=<UTC>), datetime.datetime(2020, 1, 2, 1, 0, tzinfo=<UTC>), datetime.datetime(2020, 1, 2, 2, 0, tzinfo=<UTC>)], 'value': [1, 2, 3]}

Using Kukur, we now have metadata and data in a format that allows us to correctly analyze our outside temperature.

More importantly: we have made data access scalable, as the Kukur configuration can be used the next time data is needed. To do so we can store the Kukur configuration in a version control system, such as git.

Storing the Configuration in Version Control

This requires git to be installed.

Create a local repository by running:

$ git init .

Ignore the data, virtualenv and Kukur databases using a .gitignore file:

.gitignore

data/
venv/
*.sqlite

Now track the current revision of the Kukur configuration:

$ git add Kukur.toml
$ git commit -v

This repository can now be shared with other people, effortlessly granting them access to the same data sources.

Using Docker

In a data project that does not use Python, the Kukur Docker container can be used to export data as CSV for any supported source.

$ docker run --rm \
    -u $(id -u) \
    -v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml \
    -v $(pwd)/data:/usr/src/app/data \
    -it timeseer/kukur:latest python -m kukur.cli test data \
        --source example \
        --name outside-temperature \
        --start 2020-01-01 \
        --end 2021-01-01
-u $(id -u)

Run Kukur using your user. This ensures that permissions for volume mounts are correct, since Kukur does not run as root inside the container.

-v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml

This mounts the Kukur configuration file Kukur.toml to the expected location inside the container. By using the --config-file command line flag, a different location can be chosen.

-v $(pwd)/data:/usr/src/app/data

This mounts the CSV example data on the location that is specified in the configuration. Not required when connecting to data sources that are not file-based.

The Arrow Flight interface is made available on port 8081 when running Kukur as a service. Use port mapping to expose it on localhost (-p 8081:8081).

For API key storage, Kukur creates a sqlite database and stores it in the path given by data_dir. Define a data directory in Kukur.toml:

Kukur.toml

data_dir = "db"

[flight]
authentication = false

[source.example]
type = "csv"
path = "data/example.csv"
metadata = "data/example-metadata.csv"

Create that directory and run the docker container while mounting it:

$ mkdir db
$ docker run --rm \
    -u $(id -u) \
    -v $(pwd)/Kukur.toml:/usr/src/app/Kukur.toml \
    -v $(pwd)/data:/usr/src/app/data \
    -v $(pwd)/db:/usr/src/app/db \
    -p 8081:8081 \
    -it timeseer/kukur:latest

Then, access it using the Kukur client:

(venv) $ python
Python 3.9.2 (default, Feb 20 2021, 18:40:11)
[GCC 10.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from kukur import *
>>> client = Client()
>>> client.get_metadata(SeriesSelector('example', 'outside-temperature'))
Metadata(SeriesSelector(source='example', name='outside-temperature'), {'description': 'Temperature in Antwerp', 'unit': 'deg C', 'physical lower limit': -20.0, 'physical upper limit': 60.0, 'functional lower limit': None, 'functional upper limit': None, 'accuracy': 0.5, 'interpolation type': None, 'data type': None, 'dictionary name': None, 'dictionary': None})

Development

Kukur is developed on GitHub. Visit https://github.com/timeseer-ai/kukur to learn more.

Kukur is open source software, licensed under the Apache License, Version 2.0.

Domain Model

To understand Kukur, three concepts need to be understood:

SeriesSelector

Diagram

A time series data source contains hundreds, thousands or even millions of time series. A convenient and expressive way of selecting which time series should be queried is required. This is a SeriesSelector.

The simplest mapping is associating a unique name with each series. It follows that the minimum unique identifier includes the source and the name of the time series.

Timestamp

Series name

Value

Many time series databases add additional structure to this.

In InfluxDB, for example, the layout looks like:

Timestamp

Series

Values

Timestamp

Measurement

Tag A

Tag B

Field A

Field B

A time series is identified by a 'measurement name', multiple 'tag key-value pairs' and a 'field key'.

Metadata

Kukur predefines a set of metadata fields that are all present in an ideal world. Custom fields can be added both programmatically and using configuration.

Diagram

Both physical and functional limits are configurable. Physical limits are limits of the actual sensor that collected the time series data. Functional limits are the limits of the expected range of the time series. For example, while my thermometer could indicate -20 deg C (physical lower limit), anything below -5 deg C (functional lower limit) would be cause for alarm.

The DICTIONARY DataType warrants an explanation as well.

Time series of type DICTIONARY store numerical values. Each numerical value is given a meaning.

For example: the state of a pump could be ON or OFF. OFF can be represented as 0 in the time series data, while ON could be encoded as 1.

These dictionaries that map numerical values to string labels are often named as well, hence the dictionary name field.

Source

Kukur data sources implement at least three methods:

Diagram
search(SeriesSelector) → Generator[Union[SeriesSelector, Metadata]]

Return all time series matching the selector or even the metadata of them if it is readily available.

get_metadata(SeriesSelector) → Metadata

Return metadata for the selected time series.

get_data(SeriesSelector, Datetime, Datetime) → pyarrow.Table

Return data for the selected time series in the given time period.

Configuration Reference

Kukur is configured by editing a configuration file in the TOML language.

The default location of this configuration file is Kukur.toml. Alternative paths can be configured using the --config-file flag.

Data Storage

Kukur needs local data storage for API keys.

The path to the local data storage of Kukur is configured by:

data_dir = "<path to data directory>"

data_dir is not required. By default, data is stored in the current working directory.

Includes

Other configuration files can be included from the main configuration file by using . Multiple includes can be specified by using an array of tables:

[[include]]
glob = "<file glob>"

The glob key is required.

Paths in includes files are resolved relative to the application working directory. They are not relative to the included configuration file.

Conflicts between configuration values are handled depending on the value type:

  • string: the last included field overrides earlier values

  • list: the items in the list are appended to the earlier list

  • mapping: the earlier mapping is updated with the new mapping, overriding existing keys

Note that the main configuration file is processed before any includes. This means it is not possible to override configuration set by an include from the main configuration file.

Example:

[[include]]
glob = "tests/test_data/*/*.toml"

This will include all TOML files in a direct subdirectory of tests/test_data/.

For example, tests/test_data/csv/csv-examples.toml could contain:

[source.row]
type = "csv"
path = "examples/csv/row.csv"
metadata = "examples/csv/row-metadata.csv"

Logging

A [logging] section configures diagnostic logging inside Kukur.

[logging]
level = "info"
# path = ""

Possible values for level are :

  • warning

  • info (the default)

  • debug

When path is configured, Logs will be written to the specified path. The logs at that path will be rotated daily and 7 rotated files will be kept.

Arrow Flight

A [flight] section configures the Arrow Flight interface.

[flight]
host = "0.0.0.0"
port = 8081
authentication = true

Kukur listens on port 8081 of all IP addresses by default.

Authentication can be turned off for local instances or when provided by external services by setting authentication to false. When authentication is turned on, an API key has to be supplied by callers.

Inspect

The Kukur CLI supports inspecting file storage locations for data files and previewing them.

File storage locations include:

  • Local filesystems using inspect filesystem <path>

  • Azure Data Lake Storage Gen2 using inspect blob --uri abfss://<container>@<storage account>/<path>

  • AWS S3 buckets using inspect blob --uri s3://<bucket>/<path>

Detected files can be previewed using --preview.

For example

$ python -m kukur.cli inspect filesystem --path tests/test_data/delta
delta,tests/test_data/delta/delta-row-quality
directory,tests/test_data/delta/partitions
delta,tests/test_data/delta/delta-pivot
delta,tests/test_data/delta/delta-notz
delta,tests/test_data/delta/delta-numerical
delta,tests/test_data/delta/delta-row
delta,tests/test_data/delta/delta-unordered
delta,tests/test_data/delta/delta-row-tags
$ python -m kukur.cli inspect filesystem --path tests/test_data/delta/delta-row --preview
pyarrow.Table
name: string
ts: timestamp[us, tz=UTC]
value: double
---
name: [["test-tag-1","test-tag-1","test-tag-1","test-tag-1","test-tag-1",...,"test-tag-3","test-tag-3","test-tag-3","test-tag-3","test-tag-3"]]
ts: [[2020-01-01 00:00:00.000000Z,2020-02-01 00:00:00.000000Z,2020-03-01 00:00:00.000000Z,2020-04-01 00:00:00.000000Z,2020-05-01 00:00:00.000000Z,...,2020-01-01 00:25:00.000000Z,2020-01-01 00:26:00.000000Z,2020-01-01 00:27:00.000000Z,2020-01-01 00:28:00.000000Z,2020-01-01 00:29:00.000000Z]]
value: [[1,2,2,1,1,...,6,9,9.5,8,6]]
$ AZURE_USE_AZURE_CLI=True python -m kukur.cli inspect blob --uri abfss://[email protected]
delta,abfss://[email protected]/iot_devices
delta,abfss://[email protected]/tsai-antwerp
parquet,abfss://[email protected]/tsai-antwerp.parquet

Connections to Azure are configured using the default Azure Identity environment variables. Likewise for connections on AWS.

Sources

Time Series Sources

Parameters common to all data sources are documented here.

[source.<name>]
type = "<the type of the source>"
query_retry_count = 0 # number of retries
query_retry_delay = 1.0 # seconds between retries as float
metadata_type = "<the type of the metadata source>"
metadata_sources = [] # names of additional metadata sources
data_query_interval_seconds = 123456 # <number of seconds>
# ...

Each source has a name. When multiple sources with the same name are configured, the last configured one wins.

type specifies the type of the data source for this source. Valid values are:

  • adodb

  • arrows

  • azure-data-explorer

  • cratedb

  • csv

  • delta

  • feather

  • influxdb

  • kukur

  • oledb

  • parquet

  • piwebapi-da

  • plugin

  • postgresql

  • redshift

  • sqlite

type is required.

metadata_type specifies the type of the metadata source for this source.

Valid values are:

  • adodb

  • cratedb

  • csv

  • datafusion

  • json

  • kukur

  • oledb

  • piwebapi-da

  • plugin

  • postgresql

  • redshift

  • sqlite

metadata_type is optional. The type is used when not specified.

Additional metadata can be loaded from other sources as described in the section "Metadata Sources". When multiple sources return an entry for the same metadata field, the source that occurs first in this list gets priority. Metadata returned by the source itself or by the metadata type configuration takes precendence over any additional sources configured here.

query_retry_count and query_retry_delay allow retrying all queries to a source in case of failure. By default no requests will be retried. query_retry_delay accepts floating point numbers, for example query_retry_delay = 0.5 will wait for 0.5 second after the first failure and between subsequent retries.

data_query_interval_seconds defines the maximum duration of the interval in which data is queried in one request. A longer query is split in intervals of this length and later on reassembled. This prevents query timeouts or works around a maximum number of points that can be returned by a source.

Example:

[source.sql]
type = "odbc"
connection_string = "..."
metadata_query = "..."
metadata_columns = [] # ...
data_query = "..."
data_query_interval_seconds = 86400

This ensures that queries will at most query for one day of data.

data_query_interval_seconds is optional.

Metadata Sources

Metadata sources provide extra metadata for series in time series sources. All time series sources that provide metadata can be used as a metadata source. They accept the same configuration parameters as the time series source.

The optional fields parameter specifies which metadata fields will be used from the given source.

Possible metadata fields are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Example:

[source.noaa]
type = "influxdb"
database = "NOAA_water_database"
metadata_sources = ["noaa"]

[metadata.noaa]
type = "csv"
metadata = "examples/influxdb/noaa-metadata.csv"
fields = ["lower limit", "upper limit"]

Note that sources that are fast when listing all time series in the source by returning metadata in the same query, will need to do an additional query to each of the configured metadata sources for each time series in the source.

Quality

There is a possibility, for some types of sources, to add a quality column in the source file that represents the quality of the data point in the source, e.g. OPC quality code.

In this case a mapping needs to be provided.

Example:

[source.<name>]
quality_mapping = "example_mapping"

[quality_mapping.example_mapping]
GOOD = [192, 194, 197]

In this example we map the OPC quality of 192, 194 and 197 defined in the source as a good quality point in Kukur.

It is also possible to include ranges in the quality mapping.

Example:

[quality_mapping.example_mapping]
GOOD = [[192], [194, 200]]

In this example we map the OPC quality of 192, and the range of 194-200 defined in the source as a good quality point in Kukur.

If string values are used in the source, similar configuration can be used.

Example:

[quality_mapping.example_mapping]
GOOD = ["GoodQuality", "ExcellentQuality"]

In this example we map the "GoodQuality" and "ExcellentQuality" defined in the source as a good quality point in Kukur.

Supported Sources

Multiple types of time series sources are supported:

Source-specific parameters are documented in the linked sources reference.

Sources reference

ADODB

Sources with type = "adodb" configure ADODB sources.

ADODB is supported on Windows only. Install pywin32 separately to enable. Data sources supporting OLEDB can be accessed through ADODB.

The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.

[source.<name>]
type = "adodb"
connection_string = "<ADODB connection_string>"
connection_string_path = "<path to connection string>"
query_timeout_seconds = 0
query_timeout_enable = true
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the adodb driver>"
data_timezone = "<override or specify time zone of timestamps returned by the adodb driver>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column

The examples given here operate on an Excel file with three sheets:

Metadata$: name, description, units, [interpolation type], [data type], [dictionary name]
Dictionary$: name, value, label
Data$: name, timestamp, value

Connection

The connection_string contains the provider and various options.

connection_string = "Provider=Microsoft.ACE.OLEDB.12.0;Data Source=examples/adodb/adodb.xlsx;Extended Properties=\"Excel 12.0 XML; HDR=YES\""

Alternatively, connection_string_path can point to a file that contains the connection string. Whitespace at the start and end of the connection string file is removed.

Some ADODB drivers do not support parameter binding. Set query_string_parameters to true, to use string interpolation of parameters in queries.

In that case use {} to format parameters into queries. In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}. Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.

query_timeout_seconds defines the timeout on a query. Default is 0, no timeout.

Some drivers do not allow setting a timeout. Disable it using query_timeout_enable = false.

The list_query is optional. It returns a list of time series names found in the source. When provided, it does not need a series to have been used in another context before it can be analyzed.

list_query = "select name from [Metadata$]"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select name, description, units, [interpolation type], [data type], [dictionary name] from [Metadata$]"
list_columns = ["series name", "description", "unit", "interpolation type", "data type", "dictionary name"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in metadata columns are:

  • series name (required)

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all ADODB sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "adodb_lowercase"

[metadata_value_mapping.adodb_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns.

metadata_query = "select description, units, [interpolation type], [data type], [dictionary name] from [Metadata$] where name = ?"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = ["description", "unit", "interpolation type", "data type", "dictionary name"]

Built-in types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "adodb_lowercase"

[metadata_value_mapping.adodb_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, [interpolation type], [data type], [dictionary name]
from [Metadata$]
where my_location = ? and my_plant = ?
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

dictionary_query = "select value, label from [Dictionary$] where name = ?"

The query should return rows of two columns:

  • the numerical value that occurs in the data, in a type that can be converted to an integer

  • the label for the numerical value (as adBSTR)

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts three parameters:

  • the name of the series (as adBSTR)

  • the start date of the time range to query data (as adDBTimeStamp)

  • the end date of the time range to query data (as adDBTimeStamp)

data_query = "select timestamp, value from [Data$] where name = ? and timestamp between ? and ?"

This query should return rows of two columns:

  • the timestamp of the data point

  • the value of the data point

It will try to convert columns to the expected type.

The data query can be read from a file by using data_query_path instead of data_query.

If the provider or data source does not accept adDBTimeStamp, it can be formatted as a string. The data_query_datetime_format option accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

If the driver doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for the queries to the driver.

Example:

data_query_timezone = "UTC"

If the query or driver returns dates without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select timestamp, value
from [Data$]
where my_location = ? and my_plant = ? and timestamp >= ? and timestamp < ?
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select timestamp, {field},
from [Data$]
where name = ? and timestamp >= ? and timestamp < ?
"""

Quality

There is a possibility to add a quality column.

In this case the data query changes:

data_query = "select timestamp, value, quality from [Data$] where name = ? and timestamp between ? and ?"

Where quality represents the column that contains the data point quality of the ADODB source.

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

Apache Arrow IPC Streaming

Sources with type = "arrows" configure Apache Arrow IPC Streaming format sources.

All settings that apply to Apache Feather sources apply.

Apache Feather

Sources with type = "feather" configure Apache Feather sources.

Feather supports the same source layouts as CSV:

[source.<name>]
type = "feather"
format = "row|dir|pivot"
path = "<path to data>"
tag_columns = ["series name"]
field_columns = ["value"]
quality_mapping = "<name>"

path is required.

format defaults to "row".

Metadata in Feather is not supported. Use a different metadata_type to connect to metadata.

For example:

metadata_type = "csv"

Column mapping

Accepted by Apache Feather sources that use the row based or directory based data models. This configuration allows mapping the columns of tables read from feather files to the columns expected by Kukur. This is done by setting the column_mapping option for the source.

[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"

series name is only valid for the row format, as the directory data model uses the feather file names as the series names.

quality mapping is optional.

Custom datetime format

The data_datetime_format option allows the timestamps to be parsed using a custom datetime format. It accepts the formatting options supported by Python.

data_datetime_format = "%Y/%m/%dT%H:%M"

Custom timezone

The data_timezone option allows to specify the time zone of the timestamps. This option should only be used when the timestamps in the source do not contain time zone information.

data_timezone = "America/Sao_Paulo"

Row format

format = "row"

The row based format expects path to be a Feather file with at least 3 columns:

  • The first column contains the series name as a string

  • The second column contains the timestamp as pyarrow.TimestampType.

  • The third column contains the value as a numerical type or as strings

Alternatively, if the tag_columns and field_columns options are used, each combination of values defined in tag_columns and field_columns define a series.

Directory Based Format

[source."<name>"]
type = "feather"
format = "dir"
path = ""

[[source."<name>".partitions]]
origin = "tag"
key = "series name"
# path_encoding = "base64"

The directory based format expects path to be a directory structure containing Feather files.

The directory structure is traversed based on the configured partitions. Each partition corresponds to a tag in the SeriesSelector.

The resulting partition can optionally be base64-encoded.

The last configured partition defines the file name. The .feather extension is added to it.

The Feather file contains at least 2 columns:

  • The first column contains the timestamp as pyarrow.TimestampType.

  • The second column contains the value as a numerical type or a string

Pivot Format

format = "pivot"

The pivot format expect path to be a Feather file.

The first column in the file is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.

Quality

There is a possibility to add a quality column in the Feather file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

A quality column is not available for a Feather file with a pivot data format.

Azure Blob Storage

Kukur can load Feather files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.

The following

[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"

Paths provided to path will be relative to the container root.

The azure_identity field is optional. The special value default causes connections to be made using the default Azure credentials. This is the only supported value and allows connections using a managed service identity.

When the azure_identity field is omitted, the azure_connection_string needs to contain the necessary secrets (SAS token, Access key).

AWS S3

Kukur can load CSV files from AWS S3.

[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""

All fields are optional. If neither aws_access_key nor aws_secret_key are provided, then attempts to establish the credentials automatically are being made. The following methods are tried, in order:

  • AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables

  • Configuration files such as ~/.aws/credentials and ~/.aws/config

  • For nodes on Amazon EC2, the EC2 Instance Metadata Service

Apache Parquet

Sources with type = "parquet" configure Apache Parquet sources.

Parquet supports the same source layouts as CSV:

[source.<name>]
type = "parquet"
format = "row|dir|pivot"
path = "<path to data>"
tag_columns = ["series name"]
field_columns = ["value"]
quality_mapping = "<name>"

path is required.

format defaults to "row".

Metadata in Parquet is not supported. Use a different metadata_type to connect to metadata.

For example:

metadata_type = "csv"

Column mapping

Accepted by Apache Parquet sources that use the row based or directory based data models. This configuration allows mapping the columns of tables read from parquet files to the columns expected by Kukur. This is done by setting the column_mapping option for the source.

[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"

series name is only valid for the row format, as the directory data model uses the parquet file names as the series names.

quality mapping is optional.

Custom datetime format

The data_datetime_format option allows the timestamps to be parsed using a custom datetime format. It accepts the formatting options supported by Python.

data_datetime_format = "%Y/%m/%dT%H:%M"

Custom timezone

The data_timezone option allows to specify the time zone of the timestamps. This option should only be used when the timestamps in the source do not contain time zone information.

data_timezone = "America/Sao_Paulo"

Row format

format = "row"

The row based format expects path to be a Parquet file with at least 3 columns:

  • The first column contains the series name as a string

  • The second column contains the timestamp

  • The third column contains the value as a numerical type or a string

Alternatively, if the tag_columns and field_columns options are used, each combination of values defined in tag_columns and field_columns define a series.

Directory Based Format

[source."<name>"]
type = "parquet"
format = "dir"
path = ""

[[source."<name>".partitions]]
origin = "tag"
key = "series name"
# path_encoding = "base64"

The directory based format expects path to be a directory structure containing Parquet files.

The directory structure is traversed based on the configured partitions. Each partition corresponds to a tag in the SeriesSelector.

The resulting partition can optionally be base64-encoded.

The last configured partition defines the file name. The .parquet extension is added to it.

The Parquet file contains at least 2 columns:

  • The first column contains the timestamp

  • The second column contains the value as a numerical type or a string

Pivot Format

format = "pivot"

The pivot format expects path to be a Parquet file.

The first column in the file is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.

Quality

There is a possibility to add a quality column in the Parquet file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

A quality column is not available for a Parquet file with a pivot data format.

Azure Blob Storage

Kukur can load Parquet files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.

The following

[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"

Paths provided to path will be relative to the container root.

The azure_identity field is optional. The special value default causes connections to be made using the default Azure credentials. This is the only supported value and allows connections using a managed service identity.

When the azure_identity field is omitted, the azure_connection_string needs to contain the necessary secrets (SAS token, Access key).

AWS S3

Kukur can load CSV files from AWS S3.

[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""

All fields are optional. If neither aws_access_key nor aws_secret_key are provided, then attempts to establish the credentials automatically are being made. The following methods are tried, in order:

  • AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables

  • Configuration files such as ~/.aws/credentials and ~/.aws/config

  • For nodes on Amazon EC2, the EC2 Instance Metadata Service

CrateDB

Sources with type = "cratedb" configure CrateDB sources.

CrateDB support requires the crate Python package.

The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.

[source.<name>]
type = "cratedb"
connection_string = "<cratedb connection_string>"
connection_string_path = "<path to connection string>"
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the crate client>"
data_timezone = "<override or specify time zone of timestamps returned by the crate client>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column

Example here use the following schema:

create table Data (
    timestamp timestamp with time zone,
    name text,
    value double precision,
    unit text
)

This assumes only one unit is ever present per time series.

Connection

The connection_string contains the provider and various options.

connection_string = "localhost:4200"

Alternatively, connection_string_path can point to a file that contains the connection string. Whitespace at the start and end of the connection string file is removed.

Version 0.26 of the crate client does not support parameter binding for timestamps with timezones. Set query_string_parameters to true to work around this.

Use {} to format parameters into queries. In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}. Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.

Search

The list_query is optional. It returns a list of time series names found in the source. When provided, it does not need a series to have been used in another context before it can be analyzed.

list_query = "select distinct name from Data"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select distinct name, unit from Data"
list_columns = ["series name", "unit"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in metadata columns are:

  • series name (required)

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all CrateDB sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "crate_lowercase"

[metadata_value_mapping.crate_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns.

metadata_query = "select unit from Data where name = '{}'"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = [ "unit"]

Built-in types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "crate_lowercase"

[metadata_value_mapping.crate_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = '{}' and my_plant = '{}'
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

The query should return rows of two columns:

  • the numerical value that occurs in the data, in a type that can be converted to an integer

  • the label for the numerical value (as adBSTR)

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts three parameters:

  • the name of the series (as text)

  • the start date of the time range to query data (as text)

  • the end date of the time range to query data (as text)

data_query = "select timestamp, value from Data where name = '{}' and timestamp >= '{}' and timestamp < '{}'"

This query should return rows of two columns:

  • the timestamp of the data point

  • the value of the data point

It will try to convert columns to the expected type.

The data query can be read from a file by using data_query_path instead of data_query.

The data_query_datetime_format option allows queries using a custom datetime format. It accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

If timestamps without a time zone are used in the database, convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for all queries.

Example:

data_query_timezone = "UTC"

If the query returns timestamps without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select timestamp, value
from Data
where my_location = '{}' and my_plant = '{}' and timestamp >= '{}' and timestamp < '{}'
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select timestamp, {field},
from Data
where name = '{}' and timestamp >= '{}' and timestamp < '{}'
"""

Quality

There is a possibility to add a quality column.

In this case the data query changes:

data_query = "select timestamp, value, quality from Data where name = '{}' and timestamp >= '{}' and timestamp < '{}'"

Where quality represents the column that contains the data point quality.

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

CSV

Sources with type = "csv" configure CSV sources.

[source.<name>]
type = "csv"
path = "<path>"
metadata = "<path>"
metadata_fields = ["<name>", "<name>"]
metadata_mapping = "<name>"
metadata_value_mapping ="<name>"
format = "row|pivot|dir"
tag_columns = ["series name"]
field_columns = ["value"]
metadata_field_column = "<name>"
dictionary_dir = "<path>"
quality_mapping = "<name>"
file_encoding = "<codec>"
header_row = "false|true"
data_datetime_format = "<date format string>"
data_timezone = "<time zone>"
data_decimal_point = "."
data_column_separator = ","

path is required for time series sources. It is optional when used as a metadata source.

file_encoding is optional and defaults to UTF-8. A list of all supported codecs can be found here.

Three CSV data models are supported: - row based (series name, timestamp, value, quality (optional)) - pivot (multiple series with values at the same timestamp) - directory based, one CSV file per tag

Column mapping

Accepted by CSV sources that use the row based or directory based data models and where the files contain a header row. This configuration allows mapping the columns from CSV files to the columns expected by Kukur. This is done by setting the column_mapping option for the source.

[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"

series name is only valid for the row format, as the directory data model uses the CSV file names as the series names.

quality mapping is optional.

Data conversion options

The data_datetime_format option allows the timestamps to be parsed using a custom datetime format. It accepts the formatting options supported by Python.

data_datetime_format = "%Y/%m/%dT%H:%M"

The data_timezone option allows to specify the time zone of the timestamps. This option should only be used when the timestamps in the source do not contain time zone information.

data_timezone = "America/Sao_Paulo"

To accept numerical data that uses , as the decimal point, use:

data_decimal_point = ","

When columns are not separated by columns, use data_column_separator. For example, to use ;:

data_column_separator = ";"

Row Based Format

A row based CSV data file may have a header row with column names. The header_row configuration is used to indicate this. The header_row configuration defaults to false.

At least 3 columns are present:

  • series name

  • timestamp in RFC3339 format (up to nanosecond precision)

  • numerical value (up to double precision floating point)

Example:

test-tag-1,2020-01-01T00:00:00Z,1
test-tag-1,2020-02-01T00:00:00Z,2
test-tag-2,2020-01-01T00:00:00Z,Product A
test-tag-2,2020-02-01T00:00:00Z,Product B

Alternatively, the third column can contain string values. It is not possible to mix numerical and string values in one column. This will cause all numerical values to be interpreted as strings.

Dictionary data is integer numerical data. Labels are only for presenting to users.

When header_row = "true", additional structure for time series can be read from specific columns.

Consider:

location,plant,ts,product,value
Antwerp,P1,2020-01-01T00:00:00Z,A,1
Antwerp,P2,2020-01-01T00:00:00Z,A,1
Antwerp,P1,2020-01-02T00:00:00Z,A,2
Antwerp,P1,2020-01-03T00:00:00Z,B,1
Antwerp,P2,2020-01-03T00:00:00Z,A,2
Barcelona,P1,2020-01-01T00:00:00Z,A,1
Barcelona,P2,2020-01-01T00:00:00Z,A,1
Barcelona,P1,2020-01-02T00:00:00Z,A,2
Barcelona,P1,2020-01-03T00:00:00Z,B,1
Barcelona,P2,2020-01-03T00:00:00Z,A,2

Here, a time series is defined by the location and plant tags. Each series has two fields product and value.

Use the tag_columns and field_colums configuration options to achieve this:

[source."..."]
type = "csv"
format = "row"
path = "..."
header_row = true
tag_columns = ["location", "plant"]
field_columns = ["product", "value"]

Pivot Format

The header row of CSV data files in pivot format defines which time series are available. This means that the header_row option is ignored for this format as the header row is always required.

Other rows start with a timestamp in RFC3339 format and contain one value for each series.

timestamp,test-tag-1,test-tag-2
2020-01-01T00:00:00Z,1,10
2020-02-01T00:00:00Z,2,11

Directory Based Format

The directory based format expects one CSV file per tag. CSV files are formatted in the row based format, but without the series name. They are named <series name>.csv.

path refers to the directory that contains the CSV files.

Example test-tag-1.csv:

2020-01-01T00:00:00Z,1
2020-02-01T00:00:00Z,2

Metadata

Metadata is configured in a matrix format. A header row describes the metadata entry.

Supported types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Not all columns need to be present.

Example:

series name,unit,functional lower limit,functional upper limit,accuracy
test-tag-1,m,0,1,0.1

Extra columns will be ignored, unless the metadata_fields parameter is present. In that case all fields defined there - and only these - will be included, including custom metadata fields.

Example:

series name,description,unit,process type,location
test-tag-1,"custom fields example",m,batch,Antwerp
[source.<name>]
metadata_fields = ["unit", "process type"]

Only the unit and the process type fields will be available in the resulting Metadata.

Metadata lookups respect the tag_columns option. An optional field can be matched by configuring a column name in metadata_field_column.

When the dictionary name field is present, the directory given in dictionary_dir is searched for a file <dictionary name>.csv. This file contains a comma separated mapping of numerical values to labels.

Example:

0,OFF
1,ON

Columns in a metadata CSV often do not match the names of metadata fields in Kukur. An optional metadata_mapping maps Kukur field names to column names.

Example:

[source.<name>]
metadata_mapping = "ip21"

[metadata_mapping.ip21]
"series name" = "NAME"
description = "IP_DESCRIPTION"
unit = "IP_ENG_UNITS"

Where the metadata CSV contains:

NAME,IP_ENG_UNITS,lower limit
test-tag-1,kg,1

Fields that are not included in the mapping, such as functional lower limit in the example, translate to the corresponding metadata field or are skipped altogether.

Metadata mappings can be shared between sources.

Values in a metadata CSV can also be different. The optional metadata_value_mapping maps Kukur metadata field values to values as they appear in a source.

Example:

[source.<name>]
metadata_value_mapping = "example_value_mapping"

[metadata_value_mapping.example_value_mapping."interpolation type"]
LINEAR = "linear"
STEPPED = "stepped"

[metadata_value_mapping.example_value_mapping."data type"]
FLOAT64 = ["int16", "int32"]

In this example, when the interpolation type column contains the value linear, Kukur will interpret it as the expected uppercase LINEAR. When the data type column contains either int16 or int32, Kukur will interpret it as FLOAT64.

series name,interpolation type,data type
test-tag-1,linear,int32

metadata_mapping and metadata_value_mapping can be used together to map wildly different metadata formats to a CSV supported by Kukur.

Quality

There is a possibility to add a quality column in the CSV file. Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

A quality column is not available for a CSV file with a pivot data format.

Azure Blob Storage

Kukur can load CSV files from Azure Blob Storage. This requires the azure-storage-blob and azure-identity Python packages.

The following

[source."My Azure Source"]
...
loader = "azure-blob"
azure_connection_string = "DefaultEndpointsProtocol=https;EndpointSuffix=core.windows.net;AccountName=<storage account name>"
azure_container = "<container name>"
azure_identity = "default"

Paths provided to path, metadata or dictionary_dir will be relative to the container root.

The azure_identity field is optional. The special value default causes connections to be made using the default Azure credentials. This is the only supported value and allows connections using a managed service identity.

When the azure_identity field is omitted, the azure_connection_string needs to contain the necessary secrets (SAS token, Access key).

AWS S3

Kukur can load CSV files from AWS S3.

[source."My AWS Source"]
...
loader = "aws-s3"
aws_access_key=""
aws_secret_key=""
aws_session_token=""
aws_region=""

All fields are optional. If neither aws_access_key nor aws_secret_key are provided, then attempts to establish the credentials automatically are being made. The following methods are tried, in order:

  • AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and AWS_SESSION_TOKEN from environment variables

  • Configuration files such as ~/.aws/credentials and ~/.aws/config

  • For nodes on Amazon EC2, the EC2 Instance Metadata Service

Azure Data Explorer

Sources with type = "azure-data-explorer" configure Azure Data Explorer sources.

The azure-kusto-data Python package is required.

[source.<name>]
type = "azure-data-explorer"
connection_string = "<connection_string>"
database = "<database>"
table = "<table>"
timestamp_column = "ts"
tag_columns = []
metadata_columns = []
ignored_columns = []
metadata_mapping = ""
metadata_value_mapping = ""

database, table and connection_string are required.

See here to learn more about connection strings.

The DefaultAzureCredential from the azure-identity package is used for authentication.

timestamp_column is an optional parameter used to define the name of the database column that contains timestamps for the series. Defaults to "ts".

Listing time series in an Azure Data Explorer table is supported only when the tag_columns parameter is specified. Each value in the parameter is a column in the table that will be part of the set of tags of the series. Other columns are assumed to be fields.

For example:

tag_columns = [
    "deviceId",
    "plant",
    "location",
]

Metadata columns can be defined as a list in the metadata_columns parameter.

For example:

metadata_columns = [
    "unit",
]

Columns that should not appear in either tags, fields or metadata can be ignored. They can be defined as a list in the ignored_columns parameter.

For example:

ignored_columns = [
    "unknown",
]

Columns in a metadata often do not match the names of metadata fields in Kukur. An optional metadata_mapping maps Kukur field names to column names.

Example:

[source.<name>]
metadata_mapping = "example_metadata_mapping"

[metadata_mapping.example_metadata_mapping]
description = "DESCRIPTION"
unit = "ENG_UNITS"

Fields that are not included in the mapping, such as functional lower limit in the example, translate to the corresponding metadata field or are skipped altogether.

Metadata mappings can be shared between sources.

Values in a metadata column can also be different. The optional metadata_value_mapping maps Kukur metadata field values to values as they appear in a source.

Example:

[source.<name>]
metadata_value_mapping = "example_value_mapping"

[metadata_value_mapping.example_value_mapping."interpolation type"]
LINEAR = "linear"
STEPPED = "stepped"

[metadata_value_mapping.example_value_mapping."data type"]
FLOAT64 = ["int16", "int32"]

In this example, when the interpolation type column contains the value linear, Kukur will interpret it as the expected uppercase LINEAR. When the data type column contains either int16 or int32, Kukur will interpret it as FLOAT64.

metadata_mapping and metadata_value_mapping can be used together to map wildly different metadata formats to a format supported by Kukur.

DataFusion

Sources with type = "datafusion" configure Apache Arrow DataFusion sources.

DataFusion sources define tables that can be queried using SQL. DataFusion source support the search operation only. Data queries are not supported and neither are individual time series metadata lookups.

[source.<name>]
type = "datafusion"
list_query = "" # required: SQL query
tag_columns = ["series name"]
field_columns = ["value"]
metadata_value_mapping = "" # optional: convert metadata values

[[source.datafusion.table]]
type = "csv|delta|json|parquet"
name = "" # name of the table in SQL queries
location = "" # path to a file or URI of a Delta Table

The column names returned by the list_query are used as-is to populate tags as present in tag_columns and known metadata fields. Columns with unknown names define extra metadata fields.

Kukur configures DataFusion to be case sensitive.

The following example defines a DataFusion source that connects to 3 tables:

  • A CSV file containing a list of all time series and their units.

  • An NDJSON file containing descriptions for selected time series.

  • A Delta Table containing data types for selected time series.

A metadata_value_mapping can be provided to convert metadata values received from DataFusion to values supported by Kukur.

[source.datafusion]
type = "datafusion"
list_query = """
    select
        s.name as "series name",
        s.unit as "unit",
        d.description as "description",
        t."data type" as "data type"
    from series s
        left join description d on s.name = d.name
        left join datatype t on s.name = t.name
"""
metadata_value_mapping = "datafusion_mapping"

[[source.datafusion.table]]
type = "csv"
name = "series"
location = "tests/test_data/datafusion/series.csv"

[[source.datafusion.table]]
type = "json"
name = "description"
location = "tests/test_data/datafusion/description.json"

[[source.datafusion.table]]
type = "delta"
name = "datatype"
location = "tests/test_data/datafusion/datatype"

[metadata_value_mapping.datafusion_mapping.unit]
"m" = "M"

Delta Lake

Sources with type = "delta" configure Delta Lake sources.

Delta Lake supports the row and pivot formats:

[source.<name>]
type = "delta"
format = "row|pivot"
uri = "<uri of Delta Lake>"
tag_columns = ["series name"]
field_columns = ["value"]
sort_by_timestamp = true

[[source.<name>.partitions]]
origin = "tag"
key = "series name"

[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"

format defaults to "row".

uri is required.

sort_by_timestamp is an optional parameter to sort unordered delta lake sources. Default is true.

Metadata in Delta Lakes is not supported. Use a different metadata_type to connect to metadata.

For example:

metadata_type = "csv"

Connecting to a Delta Lake on Azure

The delta-rs library used by Kukur supports Delta Lakes in an Azure storage account. Unfortunately it uses environment variables for configuration. This means one instance of Kukur is able to connect to one storage account only.

For example:

[source."Delta"]
type = "delta"
uri = "abfss://[email protected]/tsai-antwerp"

This configuration connects to the satsdeltalake storage account. It opens the tsai-antwerp Delta Lake in the poc container.

Multiple authentication options are available.

The recommended authentication method while running on Azure includes using a Managed Identity. To do so, the AZURE_STORAGE_ACCOUNT environment variable should duplicate the storage account given in the uri. Next to this, when not running on Azure App Service, the IDENTITY_HEADER environment variable should be set to any value, for example foo.

Column mapping

Accepted by Delta Lake sources that use the row format. This configuration allows mapping the columns of the Delta Lake to the columns expected by Kukur. This is done by setting the column_mapping option for the source.

[source.<name>.column_mapping]
"series name" = "name"
"ts" = "timestamp"
"value" = "value"
"quality" = "quality column"

quality mapping is optional.

Partitioning

For the row format, it is possible to take advantage of delta lake partitioning by defining one or more partitions:

[[source.<name>.partitions]]
origin = "tag"
key = "location"

Every tag based partition key must be included in the tag_columns option.

Timestamp based partitioning is also supported:

[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"

For timestamp based partitions:

  • key indicates the resolution to be used when querying for partitions. Supported values are YEAR, MONTH and DAY.

  • format allows optional formatting of the timestamp that is used for partitioning. It accepts the formatting options supported by Python. By default the year, month or day is used.

  • column is optional. Needed if the name of the column used for partitioning does not exactly match the key value.

For example, when a column that contains both the year and the month defines a partition:

[[source.<name>.partitions]]
origin = "timestamp"
key = "YEAR"
column = "year"
format = "%Y-%m"

Custom datetime format

The data_datetime_format option allows the timestamps to be parsed using a custom datetime format. It accepts the formatting options supported by Python.

data_datetime_format = "%Y/%m/%dT%H:%M"

Custom timezone

The data_timezone option allows to specify the time zone of the timestamps. This option should only be used when the timestamps in the source do not contain time zone information.

data_timezone = "America/Sao_Paulo"

Row format

format = "row"

The row based format expects to find a Delta Lake at the given uri with at least 3 columns:

  • The first column contains the series name as a string

  • The second column contains the timestamp

  • The third column contains the value as a numerical type or as strings

  • A fourth optional column contains quality data

Guidance and mapping options for the quality column can be found in the source documentation .

Alternatively, if the tag_columns and field_columns options are used, each combination of values defined in tag_columns and field_columns define a series.

Pivot Format

format = "pivot"

The pivot format expect to find a Delta Lake at the given URI.

The first column in the lake is a timestamp. Further columns contain the values. Some columns can be numerical while other columns contain strings. The name of each column is the series name.

InfluxDB

The InfluxDB data source is experimental.

Time series are represented by stringifying the 'measurement name', 'tag keys and values' and 'field'. The SeriesSelector concepts needs to be refined through 'tags' and 'fields' to properly support InfluxDB.

Furthermore, storing a password in the configuration should not be required.

Sources with type = "influxdb" configure InfluxDB sources.

Currently InfluxDB v1.8 is supported.

Configuration Template

[source.<name>]
type = "influxdb"
host = "<host IP / Address of the InfluxDB server>"
port = <port of the InfluxDB server>
ssl = <boolean to indicate if SSL needs to be used>
database = "<name of the database to use>"
username = "<username used to login into the database>"
password = "<password used to login into the database>"

Required fields

  • type

  • database

Connection

If host, port or ssl are not provided, the default will be used.

  • host (default: localhost)

  • port (default: 8086)

  • ssl (default: false)

Authentication

If the following fields are filled in, the client will connect with the filled in credentials:

  • username

  • password

Example configuration

[source.influxSource]
type = "influxdb"
host = "localhost"
port = 8086
database = "data"
username = "influxAdmin"
password = "password123"

Kukur

Sources with type = "kukur" configure connections to other Kukur or Timeseer instances.

[source.<name>]
type = "kukur"
host = "<hostname or address of the other Kukur instance>"
port = "<port of the Kukur instance>"
source = "<the remote source>"
api_key_name= "<the name of the api key>"
api_key= "<the api key>"

source is a required field. It points to the remote source that will be queried. The name of the local and the remote sources do not need to match.

source = "test"

host and port define the host and port where Kukur is running.

Defaults are:

host = "localhost"
port = 8081

The api_key_name and api_key can be created in Kukur using the CLI or in Timeseer under Configure > Global preferences > API keys.

api_key_name= "test-key"
api_key= "PcRU3xy0dsVpX8CeKCO1WPqPevECB9mZAETdWngK7druSl23JFA0Rw"

Omit api_key_name and api_key when the [flight] configuration has authentication = false.

For example, this connects to the 'test' source on a local instance:

[source.arrow]
type = "kukur"
host = "localhost"
port = 8081
source = "test"
api_key_name= "test-key"
api_key= "PcRU3xy0dsVpX8CeKCO1WPqPevECB9mZAETdWngK7druSl23JFA0Rw"

ODBC

Sources with type = "odbc" configure ODBC sources.

The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.

[source.<name>]
type = "odbc"
connection_string = "<ODBC connection_string>"
connection_string_path = "<path to connection string>"
query_timeout_seconds = 0
query_timeout_enable = true
query_string_parameters = false
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the odbc driver>"
data_timezone = "<override or specify time zone of timestamps returned by the odbc driver>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column

The examples given here operate on this schema:

create table Metadata (
    name nvarchar(max),
    description nvarchar(max),
    units nvarchar(max),
    dictionary_name nvarchar(max)
);

create table Dictionary (
    name nvarchar(max),
    value int,
    label nvarchar(max)
);

create table Data (
    name nvarchar(max),
    ts datetime2,
    value float(53),
);

Connection

The connection_string can point to a DSN or be a direct driver connection.

connection_string = "DSN=kukur;UID=sa;PWD=Kukur!AI"

or

connection_string = "Driver={/usr/lib/libtdsodbc.so};Server=localhost;Port=1433;Database=TestData;UID=sa;PWD=Kukur!AI"

Alternatively, connection_string_path can point to a file that contains the connection string. Whitespace at the start and end of the connection string file is removed.

Some ODBC drivers do not support parameter binding. Set query_string_parameters to true, to use string interpolation of parameters in queries.

In that case use {} to format parameters into queries. In queries with multiple parameters, the order can be changed by using the argument position: {1} {2} {0}. Use a read-only connection with a minimal amount of privileges as SQL Injection are possible in that case and cannot be prevented by Kukur.

query_timeout_seconds defines the timeout on a query. Default is 0, no timeout.

Some drivers do not allow setting a timeout. Disable it using query_timeout_enable = false.

Search

The list_query is optional. It returns a list of time series names found in the source. When provided, it does not need a series to have been used in another context before it can be analyzed.

list_query = "select name from Metadata"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in columns are:

  • series name (required)

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all ODBC sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "odbc_lowercase"

[metadata_value_mapping.odbc_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns.

metadata_query = "select description, units from Metadata where name = ?"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = ["description", "unit"]

Supported types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "odbc_lowercase"

[metadata_value_mapping.odbc_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = ? and my_plant = ?
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary_query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

dictionary_query = "select value, label from Dictionary where name = ?"

The first column with the dictionary key can be any type that can be converted to an integer, even SQL_CHAR. The second column with the dictionary value should be a SQL_CHAR or SQL_WCHAR.

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts three parameters:

  • the name of the series (as SQL_VARCHAR)

  • the start date of the time range to query data in (as SQL_TYPE_TIMESTAMP)

  • the end date of the time range to query data in (as SQL_TYPE_TIMESTAMP)

data_query = "select ts, value from Data where name = ? and ts between ? and ?"

This query should return rows of two columns:

  • the timestamp of the data point (preferably as SQL_TYPE_TIMESTAMP)

  • the value of the data point (preferably as SQL_REAL, SQL_FLOAT or SQL_DOUBLE)

When the return type of a column is of types SQL_CHAR or SQL_WCHAR, It will try to convert to the expected type.

If the provider or data source does not accept SQL_TYPE_TIMESTAMP, it can be formatted as a string. The data_query_datetime_format option accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

The data query can be read from a file by using data_query_path instead of data_query.

If the driver doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for the queries to the driver.

Example:

data_query_timezone = "UTC"

If the query or driver returns dates without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = ? and my_plant = ? and ts >= ? and ts < ?
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = ? and ts >= ? and ts < ?
"""

Quality

There is a possibility to add a quality column.

In this case the data query changes:

data_query = "select ts, value, quality from Data where name = ? and ts between ? and ?"

Where quality represents the column that contains the data point quality of the ODBC source.

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

PI Data Archive using Web API

Sources with type = "piwebapi-da" configure connections to OSIsoft PI Data Archive using the PI Web API.

[source.<name>]
type = "piwebapi-da"
data_archive_uri = "The self-URI of the DataServer"
max_returned_items_per_call = 150000
verify_ssl = true
username = "" # optional: username for basic authentication
password = "" # optional: password for basic authentication

source and data_archive_uri are required fields.

data_archive_uri is the URI of the DataServer resource in PI Web API that corresponds to the Data Archive.

To get this value, open https://pi.example.org/piwebapi/dataservers and copy the Self link from the response:

{
  "Links": {},
  "Items": [
    {
      "WebId": "F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk",
      "Id": "6f40df05-d9cd-44f7-8d4a-3cb826d0f97d",
      "Name": "mypiserver",
      "Path": "\\\\PIServers[mypiserver]",
      "IsConnected": true,
      "ServerVersion": "3.4.440.477",
      "ServerTime": "2022-02-18T15:10:08.8757629Z",
      "Links": {
        "Self": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk",
        "Points": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk/points",
        "EnumerationSets": "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk/enumerationsets"
      }
    }
  ]
}

The PI Web API configuration limits the maximum number of items returned in a response to one request. This limit is applied to the number of time series (PI Points) and to the number of values. The default setting is 150000.

Adapt the number in the Kukur configuration when it has been increased or decreased on the server. Since the number of values returned in a data request is also limited by this setting, ensure that the data_query_interval_seconds option is set to a safe value.

For example, a time series that is being recorded at 1 value per second, will have 86400 values per day. This means a safe value of data_query_interval_seconds is 86400 when the default limit of 150000 is in place in PI Web API. For a time series that is being recorded at 1 value per minute and can have 44640 values per month, data_query_interval_seconds = 2678400 is a safe value.

Set verify_ssl to false when PI Web API uses a self-signed certificate.

By default, Kerberos authentication is tried. When Basic authentication is required, set the username and password fields in the configuration.

For example, this defines a pi source, with the Web API running on pi.example.org:

[source.pi]
type = "piwebapi-da"
data_archive_uri = "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk"

When the Web API is using a self-signed certificate and basic authentication, the configuration becomes:

[source.pi]
type = "piwebapi-da"
data_archive_uri = "https://pi.example.org/piwebapi/dataservers/F1DSBd9Ab83Y90SNSjy4JtD5fQVk0tVFMtUEk"
verify_ssl = false
username = "auser"
password = "apassword"

Plugin

Sources with type = "plugin" call out to a separate binary that implements all source interface operations.

[source.<name>]
type = "plugin"
cmd = ["/path/to/binary"]
extra = "foo"

cmd is a required field. All other fields are passed to the binary as provided.

cmd can be either a simple string or a list of strings.

The binary will be called with one extra argument:

  • search to list time series or metadata

  • metadata to get metadata for one time series

  • data to return Arrow data

Input is sent on standard input. Output is expected on standard output.

Any information sent by the binary to standard error is logged at the warning level.

Search

When called with the search argument, the binary will receive JSON on standard input.

{
    "config": {},
    "search": {
        "source": "<source name>"
    }
}

"config" contains the source configuration.

The response written to standard output should contain:

{
    "series": [],
    "metadata": []
}

Both "series" and "metadata" are optional.

"series" should contain a list of time series.

{
    "source": "<source name>",
    "tags": {
        "tag-1": "tag-1-value",
        "tag-2": "tag-2-value"
    },
    "field": "optional-field"
}

"metadata" should contain a list of time series metadata dictionaries. Time series included in "metadata" should not appear in "series".

{
    "series": {
        "source": "<source name>",
        "tags": {
            "tag-1": "tag-1-value",
            "tag-2": "tag-2-value"
        },
        "field": "optional-field"
    },
    "description": "<the description of the series>"
}

Kukur has built-in support for these metadata fields:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Metadata

When called with the metadata argument, the binary will receive JSON on standard input.

{
    "config": {},
    "metadata": {
        "series": {
            "source": "<source name>",
            "tags": {
                "tag-1": "tag-1-value",
                "tag-2": "tag-2-value"
            },
            "field": "optional-field"
        }
    }
}

"config" contains the source configuration.

"metadata" contains the "series" to return metadata for.

The response written to standard output should contain metadata fields:

{
    "description": "<the description of the series>"
}

Kukur has built-in support for these metadata fields:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Note that metadata will generally not be called when metadata is returned by search.

Data

When called with the data argument, the binary will receive JSON on standard input.

{
    "config": {},
    "data": {
        "series": {
            "source": "<source name>",
            "tags": {
                "tag-1": "tag-1-value",
                "tag-2": "tag-2-value"
            },
            "field": "optional-field"
        },
        "startDate": "YYYY-MM-DDTHH:MM:SS+00:00",
        "endDate": "YYYY-MM-DDTHH:MM:SS+00:00",
    }
}

"config" contains the source configuration.

"metadata" contains the "series" to return metadata for.

The "startDate" and "endDate" fields are formatted per RFC3339.

The response written to standard output should be in the Apache Arrow IPC Streaming Format.

The schema of the record batches should contain 2 or 3 columns.

A column with the name ts contains the timestamps of the data values. Kukur will try to convert timestamps to Timestamp[unit: us, timezone: UTC].

A column with the name value contains the data values.

An optional column quality contains quality flags for the data values. 0 is considered bad, 1 is considered good.

To support other quality values, a quality mapping can be configured for the source as provided by the source documentation .

Example

This example shows the basic structure for a type = "plugin" binary.

[source.Plugin]
type = "plugin"
cmd = ["/home/myuser/dev/timeseer/kukur/venv/bin/python3", "data/plugin/plugin.py"]
quality_mapping = "plugin_quality"

[quality_mapping.plugin_quality]
GOOD = ["GOOD"]

cmd points to a Python interpreter in a virtualenv where the PyArrow library has been installed.

Since quality flags are provided as text, a quality_mapping has been defined.

import json
import sys
from datetime import datetime

from pyarrow import Table, ipc


def _run() -> None:
    """Perform the action requested by the first argument."""
    if sys.argv[1] == "search":
        _search()
    if sys.argv[1] == "metadata":
        _metadata()
    if sys.argv[1] == "data":
        _data()
    sys.exit(0)


def _search() -> None:
    """List all time series (or metadata)."""
    query = json.load(sys.stdin)
    print(query, file=sys.stderr)
    source_name = query["search"]["source"]
    data: dict = {
        "metadata": [
            {
                "series": {"source": source_name, "tags": {"series name": "test"}},
                "description": "Test series",
            }
        ],
        "series": [{"source": source_name, "tags": {"series name": "test-2"}}],
    }
    json.dump(data, sys.stdout)


def _metadata() -> None:
    """Return metadata for the time series received on stdin."""
    query = json.load(sys.stdin)
    print(query, file=sys.stderr)
    source_name = query["metadata"]["series"]["source"]
    series_name = query["metadata"]["series"]["tags"]["series name"]
    data: dict = {"description": f"Description of {series_name} ({source_name})"}
    json.dump(data, sys.stdout)


def _data() -> None:
    """Return Arrow IPC with that for the time series received on stdin."""
    query = json.load(sys.stdin)
    print(query, file=sys.stderr)
    source_name = query["data"]["series"]["source"]
    series_name = query["data"]["series"]["tags"]["series name"]
    start_date = datetime.fromisoformat(query["data"]["startDate"])
    end_date = datetime.fromisoformat(query["data"]["endDate"])
    table = Table.from_pydict(
        {"ts": [start_date, end_date], "value": [0, 42], "quality": ["BAD", "GOOD"]}
    )
    with ipc.new_stream(sys.stdout.buffer, table.schema) as writer:
        writer.write_table(table)


if __name__ == "__main__":
    _run()

PostgreSQL

Sources with type = "postgresql" configure PostgreSQL sources.

odbc sources can also connect to PostgreSQL, provided the PostgreSQL ODBC driver is available.

Many other database systems support the PostgreSQL wire format. This source can be used to connect to them as well.

Two providers can be used: - pg8000 - psycopg

pg8000 requires connection options provided in .connection. psycopg requires a connection_string or connection_string_path.

The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.

[source.<name>]
type = "postgresql"
connection_string = "<postgresql connection_string>"    # for psycopg only
connection_string_path = "<path to connection string>"  # for psycopg only
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the database>"
data_timezone = "<override or specify time zone of timestamps returned by the database>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column
provider = "pg8000"

# for pg8000 only
[source.<name>.connection]
user = ""
host = ""
port= 5432
password = ""

The examples given here operate on this schema:

create table Metadata (
    id serial,
    name text not null,
    description text,
    units text,
    dictionary_name text,
);

create table Dictionary (
    id serial,
    name text not null,
    value integer not null,
    label text not null
);

create table Data (
    id serial,
    name text not null,
    ts  timestamp with time zone,
    value double precision
);

Connection

Two providers require different connection configurations:

pg8000

Connection options need to be provided in .connection.

[source.<name>.connection]
user = "postgres"
host = "localhost"
port= 5432
password = "Timeseer!AI"
Psycopg

The connection_string supports the keywords supported by libpq.

connection_string = "host=localhost port=5432 dbname=postgres user=postgres password=Timeseer!AI"

Alternatively, connection_string_path can point to a file that contains the connection string. Whitespace at the start and end of the connection string file is removed.

libpq also supports reading passwords from a file using the passfile parameter. Use this to securely inject passwords into a container.

Search

The list_query is optional. It returns a list of time series found in the source.

list_query = "select name from Metadata"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in metadata fields are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all PostgreSQL sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "pg_lowercase"

[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns.

metadata_query = "select description, units from Metadata where name = %s"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = ["description", "unit"]

Supported types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "pg_lowercase"

[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = %s and my_plant = %s
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary_query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

dictionary_query = "select value, label from Dictionary where name = %s"

The first column with the dictionary key can be any type that can be converted to an integer, even SQL_CHAR. The second column with the dictionary value should be a SQL_CHAR or SQL_WCHAR.

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts multiple parameters:

  • each tag value for the tags defined in tag_columns

  • the start date of the time range to query data in (as a timestamp with time zone)

  • the end date of the time range to query data in (as a timestamp with time zone)

data_query = "select ts, value from Data where name = %s and ts between %s and %s"

This query should return rows of two columns:

  • the timestamp of the data point (preferably as timestamp with time zone)

  • the value of the data point (preferably as double precision, integer or text)

If the database table does not accept timestamp with time zone, it can be formatted as a string. The data_query_datetime_format option accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

The data query can be read from a file by using data_query_path instead of data_query.

If the database table doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for the queries to the driver.

Example:

data_query_timezone = "UTC"

If the query returns dates without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided to the data query in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = %s and my_plant = %s and ts >= %s and ts < %s
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = %s and ts >= %s and ts < %s
"""

Quality

A quality column can be returned for each data point.

In this case the data query changes:

data_query = "select ts, value, quality from Data where name = %s and ts between %s and %s"

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

Redshift

Sources with type = "redshift" configure Redshift sources.

Redshift sources connect to Redshift using the options provided in .connection:

[source.<name>]
type = "redshift"

[source.<name>.connection]
region = ""
host = ""
database = ""
iam = true

This example configuration is sufficient to connect from an EC2 instance that has an IAM role with sufficient permissions:

[source.myns.connection]
iam = true
host = "myns-1234567890.eu-west-1.redshift-serverless.amazonaws.com"
database = "dev"

Next to the .connection properties, Redshift sources support these properties:

[source.<name>]
type = "redshift"
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for a possible dictionary mapping>"
dictionary_query_path = "<path to a dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_timezone = "<override or specify time zone of timestamps to send a naive timestamp to the database>"
data_timezone = "<override or specify time zone of timestamps returned by the database>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column

The examples given here operate on this schema:

create table Metadata (
    id integer identity,
    name text not null,
    description text,
    units text,
    dictionary_name text
);

create table Dictionary (
    id integer identity,
    name text not null,
    value integer not null,
    label text not null
);

create table Data (
    id integer identity,
    name text not null,
    ts timestamptz not null
    value double precision not null
);

Search

The list_query is optional. It returns a list of time series found in the source.

list_query = "select name from Metadata"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select name, description, units from Metadata"
list_columns = ["series name", "description", "unit"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in metadata fields are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all Redshift sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "pg_lowercase"

[metadata_value_mapping.redshift_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns. This query is generally not required when the list query returns metadata.

metadata_query = "select description, units from Metadata where name = %s"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = ["description", "unit"]

Supported types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "pg_lowercase"

[metadata_value_mapping.pg_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = %s and my_plant = %s
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary_query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

dictionary_query = "select value, label from Dictionary where name = %s"

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts multiple parameters:

  • each tag value for the tags defined in tag_columns

  • the start date of the time range to query data in (as a timestamp with time zone)

  • the end date of the time range to query data in (as a timestamp with time zone)

data_query = "select ts, value from Data where name = %s and ts between %s and %s"

This query should return rows of two columns:

  • the timestamp of the data point (preferably as timestamp with time zone)

  • the value of the data point (preferably as double precision, integer or text)

If the database table does not accept timestamp with time zone, it can be formatted as a string. The data_query_datetime_format option accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

The data query can be read from a file by using data_query_path instead of data_query.

If the database table doesn’t accept timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for the queries to the driver.

Example:

data_query_timezone = "UTC"

If the query returns dates without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided to the data query in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = %s and my_plant = %s and ts >= %s and ts < %s
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = %s and ts >= %s and ts < %s
"""

Quality

A quality column can be returned for each data point.

In this case the data query changes:

data_query = "select ts, value, quality from Data where name = %s and ts between %s and %s"

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

SQLite

Sources with type = "sqlite" configure SQLite sources.

The connection string and queries can either be configured in the configuration file, or loaded from files. Inline configuration takes precedence over file-based configuration when both are provided.

[source.<name>]
type = "sqlite"
connection_string = "<SQLite connection_string>"
connection_string_path = "<path to connection string>"
query_timeout_seconds = 0
list_query = "<query to list all time series in a source>"
list_query_path = "<path to list_query>"
list_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
tag_columns = ["series name"]
field_columns = []
metadata_query = "<query for metadata of one series>"
metadata_query_path = "<path to metadata query>"
metadata_columns = ["<metadata type of column 1>", "<metadata type of column 2"]
dictionary_query = "<query for dictionary mappings>"
dictionary_query_path = "<path to the dictionary query>"
metadata_value_mapping = "<metadata_value_mapping name>"
data_query = "<query for data of one series in time range>"
data_query_path = "<path to data query>"
data_query_datetime_format = "<strftime format>"
data_query_timezone = "<override or specify time zone of timestamps>"
data_timezone = "<override or specify time zone of timestamps returned by the data query>"
enable_trace_logging = false
quality_mapping = "<name>"
type_checking_row_limit = 300  # number of rows analysed to determine the type of the value column

The examples given here operate on a SQLite database with three tables:

create table Metadata (
    id integer primary key autoincrement,
    name text not null,
    description text,
    units text,
    interpolationType text,
    dataType text,
    dictionaryName text
);

create table Dictionary (
    id integer primary key autoincrement,
    name text not null,
    value real not null,
    label text not null
);

create table Data (
    id integer primary key autoincrement,
    name text not null,
    ts datetime not null,
    value real
);

Connection

The connection_string contains either the SQLite database file name or the URI to the database.

connection_string = "data/db.sqlite"

Alternatively, connection_string_path can point to a file that contains the connection string. Whitespace at the start and end of the connection string file is removed.

query_timeout_seconds defines the timeout on a query. Default is 0, no timeout.

Search

The list_query is optional. It returns a list of time series names found in the source. When provided, it does not need a series to have been used in another context before it can be analyzed.

list_query = "select name from Metadata"

The query can be read from a file by using list_query_path instead of list_query.

The query can either return only series names or all metadata. When it returns all metadata, include a list_columns entry that describes all columns:

list_query = "select name, description, units, interpolationType, dataType, dictionaryName from Metadata"
list_columns = ["series name", "description", "unit", "interpolation type", "data type", "dictionary name"]

All columns defined in tag_columns should be included in list_columns. All combinations of rows returned by the list_query and values in field_columns define a series.

Built-in metadata columns are:

  • series name (required)

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the list_columns list.

Not all SQLite sources can map metadata field values to the values expected by Kukur. Use metadata_value_mapping to convert them.

Example:

[source.<name>]
metadata_value_mapping = "sqlite_lowercase"

[metadata_value_mapping.sqlite_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Metadata

The metadata_query is a query that accepts one parameter for each tag in a series, ordered by tag_columns.

metadata_query = "select description, units, interpolationType, dataType, dictionaryName from Metadata where name = ?"

The columns in the result set should be mapped to a supported type of metadata. The metadata_columns entry contains a list with the positional mapping.

metadata_columns = ["description", "unit", "interpolation type", "data type", "dictionary name"]

Built-in types of metadata are:

  • description

  • unit

  • functional lower limit

  • functional upper limit

  • physical lower limit

  • physical upper limit

  • accuracy

  • accuracy percentage

  • interpolation type (LINEAR, STEPPED)

  • data type (FLOAT32, FLOAT64, STRING, DICTIONARY, CATEGORICAL)

  • dictionary name

Custom metadata fields can be defined by including them in the metadata_columns list.

The metadata query can be read from a file by using metadata_query_path instead of metadata_query.

Metadata values can be converted using metadata_value_mapping.

Example:

[source.<name>]
metadata_value_mapping = "sqlite_lowercase"

[metadata_value_mapping.sqlite_lowercase."data type"]
FLOAT64 = "float64"
STRING = ["string", "text", "varchar"]
DICTIONARY = "dictionary"

This example converts lowercase data types to the uppercase strings expected by Kukur.

Kukur supports the SQLite MATCH operator.

Consider the following table that contains additional metadata about a sensor vendor per location:

create table Sensors (
    id integer primary key autoincrement,
    location text not null,
    vendor text not null
);

If the location is part of the series name, the following query will return the sensor vendor:

select vendor from Sensors where location = (? match 'location=([^,]+)')

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
metadata_query = """
select description, units, interpolationType, dataType, dictionaryName
from Metadata
where my_location = ? and my_plant = ?
"""

Dictionary

A dictionary maps numerical (integer) values to textual labels. The dictionary query is a query that accepts one parameter: the name of the dictionary.

The dictionary name for a series is returned by the dictionary name list or metadata column.

dictionary_query = "select value, label from Dictionary where name = ?"

The query should return rows of two columns:

  • the numerical value that occurs in the data, in a type that can be converted to an integer

  • the label for the numerical value

The dictionary query can be read from a file by using dictionary_query_path instead of dictionary_query.

Data

The data_query is a query that accepts three parameters:

  • the name of the series

  • the start date of the time range to query data

  • the end date of the time range to query data

data_query = "select ts, value from Data where name = ? and ts >= ? and ts < ?"

This query should return rows of two columns:

  • the timestamp of the data point

  • the value of the data point

It will try to convert columns to the expected type.

The data query can be read from a file by using data_query_path instead of data_query.

Kukur expects a table to contain ISO8601 formatted timestamps.

Alternatively, the start and end date can be formatted as a string. The data_query_datetime_format option accepts the formatting options supported by Python.

Example:

data_query_datetime_format = "%Y-%m-%dT%H:%M:%S%z"

This converts timestamps to the ISO8601 format.

If the table doesn’t store timezoned timestamps you can specify the prefered timestamp for the input to convert the timestamp with the data_query_timezone option. The request will use the converted timestamps as naive timestamps for the queries.

Example:

data_query_timezone = "UTC"

If the query returns dates without a time zone, the time zone can be specified by the data_timezone option.

Example:

data_timezone = "UTC"

The exact available time zones are system-dependent.

Set enable_trace_logging to true to log the fetched data before conversion.

enable_trace_logging = true

If the configuration defines tag_columns, they are provided in the same order as defined in tag_columns.

[source.<name>]
tag_columns = ["location", "plant"]
data_query = """
select ts, value
from Data
where my_location = ? and my_plant = ? and ts >= ? and ts < ?
"""

If the configuration defines field_columns, the field is available as {field} in the data_query.

[source.<name>]
field_columns = ["temperature", "pressure"]
data_query = """
select ts, {field},
from Data
where name = ? and ts >= ? and ts < ?
"""

Quality

There is a possibility to add a quality column.

In this case the data query changes:

data_query = "select timestamp, value, quality from Data where name = ? and ts >= ? and ts < ?"

Where quality represents the column that contains the data point quality.

Check the source documentation to configure the mapping of a value in the quality column to a quality status known to Kukur.

Connectivity guides

Kukur can connect to many different time series data sources using the options detailed in the Sources reference. Concrete examples of complex integrations are documented here.

Connecting to Aspen InfoPlus.21

Kukur connects to AspenTech’s InfoPlus.21 data historian using the Aspen SQLplus for Aspen InfoPlus.21 ODBC driver.

First, create an ODBC data source for the IP.21 server.

Consider an ODBC data source AB. The basic Kukur configuration to connect is:

[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"

The next step is to configure a query to retrieve a list of time series and their metadata.

The IP.21 database is very flexible, with possibly multiple repeat areas per record. The exact query will need some knowledge about the local configuration.

This example will expose all time series defined by IP_AnalogDef:

[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"
list_query = "SELECT NAME, IP_DESCRIPTION, IP_ENG_UNITS, IP_LOW_LOW_LIMIT, IP_LOW_LIMIT, IP_HIGH_LIMIT, IP_HIGH_HIGH_LIMIT, IP_DC_SIGNIFICANCE, IP_STEPPED,'FLOAT64' FROM IP_AnalogDef"
list_columns = ["series name", "description", "unit", "physical lower limit", "functional lower limit", "functional upper limit", "physical upper limit", "accuracy", "interpolation type", "data type"]
metadata_value_mapping = 'IP21_Mapping'

[metadata_value_mapping.IP21_Mapping.'interpolation type']
LINEAR = 'Interpolated'
STEPPED = 'Stepped'

Note that this assumes that all time series defined here have a data type of FLOAT64. A mapping table like the interpolation type mapping can map data types returned by SQLplus to data types known in Kukur.

Pay special attention to the configured limits. The example assumes that both IP_LOW_LOW_LIMIT and IP_LOW_LIMIT have been configured. The IP_LOW_LOW_LIMIT maps to the physical lower limit field in Kukur and represents the lower limit of the sensor. The IP_LOW_LIMIT maps to the functional lower limit field in Kukur and represents the expected lower limit of the process.

The final step is to define queries to read data from time series.

The SQLplus ODBC driver does not support query parameters. Ensure the connection is read-only when untrusted input can be provided to Kukur.

Kukur formats timestamps as 2020-02-13T19:05:43+00:00. SQLplus does not accept all valid RFC3339/ISO8601 timestamps. This means a custom data_query_datetime_format is required, where the time zone information is provided as Z instead of +00:00.

To avoid overloading the data historian, the time span of one query has been limited to one day of data. Kukur will split and recombine longer queries.

[source.IP21AB]
type = "odbc"
connection_string = "DSN=AB"
list_query = "SELECT NAME, IP_DESCRIPTION, IP_ENG_UNITS, IP_LOW_LOW_LIMIT, IP_LOW_LIMIT, IP_HIGH_LIMIT, IP_HIGH_HIGH_LIMIT, IP_DC_SIGNIFICANCE, IP_STEPPED,'FLOAT64' FROM IP_AnalogDef"
list_columns = ["series name", "description", "unit", "physical lower limit", "functional lower limit", "functional upper limit", "physical upper limit", "accuracy", "interpolation type", "data type"]
metadata_value_mapping = 'IP21_Mapping'
query_string_parameters = true
data_query = "SELECT ISO8601(IP_TREND_TIME),IP_TREND_VALUE FROM IP_AnalogDef.1 WHERE NAME='{}' AND IP_TREND_TIME >= '{}' AND IP_TREND_TIME < '{}' ORDER BY IP_TREND_TIME ASC"
data_query_datetime_format = "%Y-%m-%dT%H:%M:%SZ"
data_query_interval_seconds = 86400

[metadata_value_mapping.IP21_Mapping.'interpolation type']
LINEAR = 'Interpolated'
STEPPED = 'Stepped'

Note that SQLplus also provides the HISTORY table that allows a more generic way to read data.

Connecting to Azure Data Lakes using Azure Synapse Analytics

There is no one-size-fits-all approach to accessing time series data stored in containers in Azure general purpose v2 Storage Accounts (Azure Data Lake). Data is typically stored in blobs, organised using the Data Lake Storage Gen2 Hierarchical Namespace feature. Both the content of the blobs and their organisation is user defined.

For example, one data lake could contain one Parquet file with all time series data points for one hour, organised as <year>/<month>/data-<day>-<hour>.parquet, while another stores one CSV file per day for each time series, as <month>-<day>-<year>/<series name>.csv.

Kukur and other time series analytics tooling expects data sources to respond to queries that ask data for one time series in a given time period. Mapping this query to the file format and organisation of a data lake is the domain of specialized tooling, a data lake engine, such as Azure Synapse Analytics serverless SQL pool, AWS Athena or Dremio. Structured views on the - to anyone but the human eye - unstructured data in the data lake are maintained in these engines. Kukur connects to and queries the data lake engine for time series data, which determines which blobs (files) to query and how to transform the content of these files to a single virtualized columnar representation.

Example

Consider an Azure Synapse workspace ws-timeseer, connected to an Azure Data Lake Storage Gen2 storage account sasynapsetimeseer. The storage accounts contains a blob storage container fs-synapse that has the 'Hierarchical Namespace' feature enabled.

05 workspace

Inside the container, time series data is stored in Parquet files.

Each Parquet file contains three columns:

  • series name: string

  • ts: timestamp[us, tz=UTC]

  • value: double

  series name                        ts  value
0  test-tag-1 2021-01-01 00:00:00+00:00    0.5
1  test-tag-2 2021-01-02 00:00:00+00:00  100.0
2  test-tag-1 2021-01-03 00:00:00+00:00    1.5
3  test-tag-2 2021-01-04 00:00:00+00:00 -100.0

The Parquet files in this example are organized in directories per year. Inside each directory, one file contains one month of time series data.

10 lake
Creating a view

In the Synapse Analytics workspace, all data can be virtualized using the OPENROWSET function and wildcard matching in the path that is provided to it.

SELECT
    "series name", ts, value
FROM
    OPENROWSET(
        BULK 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse/historian-data/year=*/lake-*.parquet',
        FORMAT='PARQUET'
    ) AS [result]
ORDER BY ts, "series name"

Running this query returns the expected results:

15 query results

Directly using the previous query in Kukur and other tooling is not a good idea. The query would need to be updated everywhere it is used when small changes to the underlying storage are made. A SQL VIEW hides the internal complexity from any consumers.

VIEW s cannot be created inside the master database that is available by default. A new HistorianData database will contain the Timeseries view.

CREATE DATABASE HistorianData;
go
USE HistorianData;
go
DROP VIEW IF EXISTS Timeseries;
go
CREATE VIEW Timeseries
as
SELECT
    "series name", ts, value
FROM
    OPENROWSET(
        BULK 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse/historian-data/year=*/lake-*.parquet',
        FORMAT='PARQUET'
    ) AS [result];

This results in the straightforward query we set out to find:

select ts, value
from Timeseries
where "series name" = 'test-tag-1'
  and ts between '2021-01-01T00:00:00Z' and '2021-02-01T00:00:00Z'
order by ts
Authentication using Managed Identities

Several authentication methods are available when connecting to Azure Synapse from Kukur.

In this example, Kukur is running on a Windows VM with a Managed Identity that will be used to authenticate to Synapse. Furthermore, by using a database scoped credential SynapseIdentity, anyone allowed to access the view will have access to the underlying data.

USE HistorianData;
go
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<STRONG PASSPHRASE>';
go
CREATE DATABASE SCOPED CREDENTIAL SynapseIdentity
WITH IDENTITY = 'Managed Identity';

A database scoped credential cannot be used directly in a query, but must be part of a data source.

USE HistorianData;
go
CREATE EXTERNAL DATA SOURCE HistorianDataSource
WITH (    LOCATION   = 'https://sasynapsetimeseer.dfs.core.windows.net/fs-synapse',
          CREDENTIAL = SynapseIdentity
)

The Timeseries view needs to be updated to use the newly created data source.

USE HistorianData;
go
DROP VIEW IF EXISTS Timeseries;
go
CREATE VIEW Timeseries
as
SELECT
    "series name", ts, value
FROM
    OPENROWSET(
        BULK 'historian-data/year=*/lake-*.parquet',
        DATA_SOURCE = 'HistorianDataSource',
        FORMAT='PARQUET'
    ) AS [result];

The Windows VM that is running Kukur has the Managed Identity ts-windows.

20 managed identity

A database user needs to be created for it and permissions for bulk operations, the SynapseIdentity credential and the Timeseries view need to be granted.

USE HistorianData;
go
CREATE USER [ts-windows] FROM EXTERNAL PROVIDER
GRANT ADMINISTER DATABASE BULK OPERATIONS TO [ts-windows]
GRANT CONTROL ON DATABASE SCOPED CREDENTIAL :: SynapseIdentity to [ts-windows]
GRANT SELECT ON Object::dbo.[Timeseries] to [ts-windows]

Kukur uses the ODBC Driver for SQL Server to connect to Azure Synapse. Download and install it.

Create a new ODBC data source in Kukur that connects using the ODBC driver to the SQL endpoint of the Synapse workspace. The connection string includes Authentication=ActiveDirectoryMsi to use the Managed Identity.

./data/synapse.toml
[source.synapse]
type = "odbc"
connection_string = "Driver={ODBC Driver 17 for SQL Server};Server=tcp:ws-timeseer-ondemand.sql.azuresynapse.net,1433;Database=HistorianData;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30;Authentication=ActiveDirectoryMsi"
data_query = "select ts, value from Timeseries where \"series name\" = ? and ts between ? and ? order by ts"
data_timezone = "UTC"

Running this gives the expected result:

(venv) PS C:\..\kukur> python -m kukur.cli test data --source synapse --name test-tag-1 --start 2021-01-01 --end 2021-02-01
2021-04-29 09:44:08,589 INFO kukur.source.test MainThread : Requesting data for "test-tag-1 (synapse)" from 2021-01-01 00:00:00 to 2021-02-01 00:00:00
2021-01-01T00:00:00+00:00,0.5
2021-01-03T00:00:00+00:00,1.5
2021-02-01T00:00:00+00:00,0.5
Optimizing data access

In order to answer queries for the Timeseries view, Synapse needs to scan all files. This is not cost and time efficient.

Azure Synapse supports using file name information in queries. It is a good idea to organise the storage in the data lake to take advantage of this.

Connecting to GE Proficy Historian

Kukur connects to GE Proficy using the Proficy OLE DB Provider. The OLE DB Provider needs to be installed as part of the Proficy client tools.

Kukur initiates OLEDB connections using the ADODB source type.

To connect to the default Proficy Historian, use the following connection_string:

[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="

Metadata about time series is available in the ihTags table.

It is required to map the values for data type and interpolation type defined in Proficy to those known in Kukur. Additionally, RowCount = 0 needs to be added to the where-clause, since by default only 5000 rows are returned by the OLEDB provider.

[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="
list_query = "select Tagname, Description, EngUnits, DataType, StepValue, LoEngineeringUnits, HiEngineeringUnits from ihtags where RowCount = 0"
list_columns = ["series name", "description", "unit", "data type", "interpolation type", "physical lower limit", "physical upper limit"]
metadata_value_mapping = "PROFICY_mapping"

[metadata_value_mapping.PROFICY_mapping."data type"]
FLOAT64 = ["Scaled", "SingleFloat", "DoubleFloat", "SingleInteger", "DoubleInteger", "Quad Integer", "Unsigned Single Integer", "Unsigned Double Integer", "Unsigned Quad Integer", "USingleInteger", "UDoubleInteger", "UQuadInteger"]
STRING = ["Boolean", "FixedString", "VariableString", "Byte"]

[metadata_value_mapping.PROFICY_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1

Time zone handling is crucial when requesting data. Kukur uses timestamps in UTC by default. Using the timezone = '0' where-clause ensures that Proficy also interprets and returns dates in UTC.

The OLEDB provider does not accepts the timestamps as provided by Kukur through pywin32, so a conversion to a date time string is required.

The complete configuration is:

[source.MyProficy]
type = "adodb"
connection_string = "Provider=ihOLEDB.iHistorian.1;User Id=;Password="
list_query = "select Tagname, Description, EngUnits, DataType, StepValue, LoEngineeringUnits, HiEngineeringUnits from ihtags where RowCount = 0"
list_columns = ["series name", "description", "unit", "data type", "interpolation type", "physical lower limit", "physical upper limit"]
metadata_value_mapping = "PROFICY_mapping"
data_query = "select TimeStamp, Value from ihRawData where RowCount = 0 and SamplingMode = 'RawByTime' and timezone = '0' and Tagname = ? and TimeStamp >= ? and TimeStamp < ? order by Timestamp asc"
data_query_interval_seconds = 86400
data_query_datetime_format = "%Y-%m-%d %H:%M:%S"
data_timezone = "UTC"

[metadata_value_mapping.PROFICY_mapping."data type"]
FLOAT64 = ["Scaled", "SingleFloat", "DoubleFloat", "SingleInteger", "DoubleInteger", "Quad Integer", "Unsigned Single Integer", "Unsigned Double Integer", "Unsigned Quad Integer", "USingleInteger", "UDoubleInteger", "UQuadInteger"]
STRING = ["Boolean", "FixedString", "VariableString", "Byte"]

[metadata_value_mapping.PROFICY_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1

Connecting to OSIsoft PI

Kukur connects to OSIsoft PI using the PI OLEDB Provider.

The basic configuration to connect to a PI server PIADB is:

[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB"

To list time series and collect metadata, use:

[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB"
list_query = "select tag, descriptor, engunits, zero, zero + span, compdev, pointtype, step, digitalset from pipoint2"
list_columns = ["series name", "description", "unit", "physical lower limit", "physical upper limit", "accuracy", "data type", "interpolation type", "dictionary name"]
metadata_value_mapping = "PI_mapping"
dictionary_query = "select code, name from pids where digitalset = ?"

[metadata_value_mapping.PI_mapping."data type"]
FLOAT64 = ["R", "I", " "]
STRING = "S"
DICTIONARY = "D"

[metadata_value_mapping.PI_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1

The example above uses the OSIsoft recommended interpretation of the zero and span PI point attributes. Update the calculation of the physical lower limit and physical upper limit as needed when diverging from this.

The ISO8601/RFC3339 timestamps provided by Kukur to the OLEDB driver when requesting data are not natively accepted. They need to be converted to a format accepted by PI using data_query_datetime_format .

To avoid overloading the data historian, the time span of one query has been limited to one day of data. Kukur will split and recombine longer queries.

[source.PIAB]
type = "adodb"
connection_string = "Provider = PIOLEDB;Data Source = PIAB;Time Zone = UTC"
list_query = "select tag, descriptor, engunits, zero, zero + span, compdev, pointtype, step, digitalset from pipoint2"
list_columns = ["series name", "description", "unit", "physical lower limit", "physical upper limit", "accuracy", "data type", "interpolation type", "dictionary name"]
metadata_value_mapping = "PI_mapping"
dictionary_query = "select code, name from pids where digitalset = ?"
data_query = "select time, value from picomp2 where tag = ? and time >= ? and time < ?"
data_query_datetime_format = "%Y-%m-%d %H:%M:%S"
data_query_interval_seconds = 86400
data_query_timezone = "UTC"
data_timezone = "UTC"

[metadata_value_mapping.PI_mapping."data type"]
FLOAT64 = ["R", "I", " "]
STRING = "S"
DICTIONARY = "D"

[metadata_value_mapping.PI_mapping."interpolation type"]
LINEAR = 0
STEPPED = 1

The PI OLEDB provider expects and produces timestamps in the local time zone by default. Adding the Time Zone = UTC parameter to the connection string and configuring the data_query_timezone and data_timezone properties ensures that timestamps are always interpreted as UTC. This avoids problems around daylight savings time changes, which are unavoidable when using local times.