Apache Flink Connector YTsaurus

Apache Flink Connector YTsaurus is a connector for streaming and batch data processing on Apache Flink. It works with sorted dynamic tables in YTsaurus and supports writes, reading bounded streams, and Lookup operations.

The connector source code is available on GitHub.

Features

  • writing to YTsaurus dynamic tables — supports streaming writes; the basic scenario is covered in Quick Start;
  • automatic table creation — creates tables automatically before writing if they do not exist;
  • advanced table resharding — provides configurable resharding strategies for optimal performance. For details, see Table Resharding;
  • data partitioning — supports multiple partitioning granularities: hour, day, week, month, year. For details, see Data Partitioning;
  • synchronous and asynchronous Lookup operations — supports both execution modes for Lookup operations on YTsaurus dynamic tables. For details, see Lookup Operations;
  • Lookup caching — supports FULL and PARTIAL cache strategies to optimize performance. For details, see Lookup Operations;
  • multi-cluster Lookup support — lets you run Lookup operations against multiple YTsaurus clusters depending on availability. See an example in Examples;
  • trackable fields — lets you track field values through metrics.

Installation

Note

The current version of the connector requires:

  • Java 11
  • Apache Flink 1.20.X

Replace connectorVersion with the latest version from Maven Central.

<dependency>
    <groupId>tech.ytsaurus.flyt.connectors.ytsaurus</groupId>
    <artifactId>flink-connector-ytsaurus</artifactId>
    <version>${connectorVersion}</version>
    <classifier>all</classifier>
</dependency>
implementation("tech.ytsaurus.flyt.connectors.ytsaurus:flink-connector-ytsaurus:$connectorVersion:all")

Quick Start

1. Install a YTsaurus cluster

You can skip this step if you already have a cluster configured.

For local development and testing, follow the official documentation to install the YTsaurus cluster via Kind. For production deployments, follow the YTsaurus Admin Guide.

Note

flink-connector-ytsaurus uses the Java YTsaurus client. Before you start, verify which proxy address you need to specify in the proxy parameter for your environment.

Install an Apache Flink cluster using the official documentation. The connector requires Apache Flink version 1.20.X.

3. Install Flink Connector YTsaurus in the Apache Flink cluster

Build the connector from source (see Building from Source) or download it from the Maven repository. After you build or download the connector, place the resulting JAR file in ${FLINK_ROOT}/lib.

Open the conf/config.yaml file and change the rest.port parameter from 8081 to 8083 to avoid port conflicts with YTsaurus.

Start the Apache Flink cluster:

./bin/start-cluster.sh

Start Flink SQL Client:

./bin/sql-client.sh

6. Run the demo job

  1. Create a Datagen source:

    CREATE TABLE simple_datagen_source (
        id BIGINT,
        name STRING,
        age INT,
        salary DOUBLE,
        is_active BOOLEAN,
        created_at TIMESTAMP(3)
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50',
        'number-of-rows' = '1000',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '100000',
        'fields.name.length' = '20',
        'fields.age.min' = '22',
        'fields.age.max' = '65',
        'fields.salary.min' = '30000.0',
        'fields.salary.max' = '150000.0'
    );
    
  2. Create a YTsaurus sink:

    CREATE TABLE ytsaurus_simple_sink (
        id BIGINT,
        name STRING,
        age INT,
        salary DOUBLE,
        is_active BOOLEAN,
        created_at TIMESTAMP(3),
        updated_at TIMESTAMP(3),
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'ytsaurus',
        'proxy' = 'localhost:8081',
        'path' = '//tmp/flink_simple_test_table',
        'credentials-source' = 'options',
        'username' = 'admin',
        'token' = 'password',
        'schema' = '[
            {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
            {"name"="name";"type"="string";"required"=%false};
            {"name"="age";"type"="int64";"required"=%false};
            {"name"="salary";"type"="double";"required"=%false};
            {"name"="is_active";"type"="boolean";"required"=%false};
            {"name"="created_at";"type"="string";"required"=%false};
            {"name"="updated_at";"type"="string";"required"=%false}
        ]'
    );
    
  3. Run a job that writes the generated data to a YTsaurus dynamic table:

    INSERT INTO ytsaurus_simple_sink
    SELECT
        id,
        name,
        age,
        salary,
        is_active,
        created_at,
        CURRENT_TIMESTAMP AS updated_at
    FROM simple_datagen_source;
    
  4. Monitor job progress at localhost:8083.

  5. The table flink_simple_test_table will be created in the /tmp/flink_simple_test_table directory and will contain the results of the Flink job.

Congratulations! You've launched your first job with YTsaurus and Apache Flink.

Configuration Options

The YTsaurus connector supports a wide range of configuration options.

The options below are grouped by purpose. For most write and Lookup scenarios, specify the table path by using path. In multi-cluster configurations, use path-map instead of path.

This section describes:

Required Options

Option Type Description
proxy String YTsaurus proxy address
schema String YSON schema definition for the YTsaurus table. See Schema Definition
credentials-source String Authentication method (options, env, your-custom-provider)

Path Configuration

Option Type Default Description
path String - Path to the YTsaurus table
path-map Map<String, String> - Mapping from cluster to table path for multi-cluster lookups

Specify one of the following options:

  • path - for a single table in a single cluster;
  • path-map - for multi-cluster Lookup scenarios.

Authentication Options

Option Type Default Description
username String - YTsaurus username (when using options credentials source)
token String - YTsaurus token (when using options credentials source)

Table Configuration

Option Type Default Description
optimize-for Enum - Table optimization mode (LOOKUP, SCAN)
primary-medium Enum - Primary storage medium (DEFAULT, SSD_BLOBS)
tablet-cell-bundle String - Tablet cell bundle name
enable-dynamic-store-read Boolean true Enable dynamic store read attribute
custom-attributes String - Custom table attributes in YSON format

Partitioning Options

Option Type Default Description
partition-key String - Column name to use for partitioning
partition-scale Enum - Partitioning granularity (HOUR, HOUR_T, DAY, WEEK, MONTH, SHORT_MONTH, YEAR, SHORT_YEAR)
partition-ttl-day-cnt Integer - Number of days to keep partitions
partition-ttl-in-days-from-creation Integer - TTL in days from partition creation
min-partition-ttl Integer 20 Minimum partition TTL in days

Resharding Options

Option Type Default Description
reshard.strategy Enum NONE Resharding strategy (NONE, FIXED, LAST_PARTITIONS)
reshard.tablet-count Integer - Number of tablets for resharding
reshard.uniform Boolean false Use uniform partitioning
reshard.last-partitions-count Integer 7 Number of partitions to consider in LAST_PARTITIONS strategy

Transaction and Performance Options

Option Type Default Description
commit-transaction-period Duration - Period for committing transactions
transaction-timeout Duration - Transaction timeout
transaction-atomicity Enum - Transaction atomicity level
rows-in-transaction-limit Integer - Maximum rows per transaction
rows-in-modification-limit Integer - Maximum rows per modification
retry-strategy Enum EXPONENTIAL Retry strategy (EXPONENTIAL, NO_RETRY)

Lookup Options

Option Type Default Description
lookup.async Boolean false Enable asynchronous lookup
lookup-method Enum LOOKUP Lookup method (LOOKUP, SELECT). LOOKUP method works only with key columns, but has better performance. SELECT method works with any columns, but works slowly.
cluster-pick-strategy String FirstAvailableClusterPickStrategy Strategy for picking clusters in multi-cluster setup. You can choose your own implementation of the strategy.

Cache Options for Lookup Operations

Option Type Default Description
lookup.cache Enum NONE Cache type (NONE, PARTIAL, FULL)
lookup.partial-cache.max-rows Long - Maximum rows in partial cache
lookup.partial-cache.expire-after-write Duration - Cache expiration after write
lookup.partial-cache.expire-after-access Duration - Cache expiration after access
lookup.partial-cache.cache-missing-key Boolean - Cache missing keys
lookup.full-cache.reload-strategy Enum - Full cache reload strategy (PERIODIC, TIMED)
lookup.full-cache.periodic-reload-interval Duration - Periodic reload interval
lookup.full-cache.timed-reload-iso-time String - Timed reload ISO time

Other Options

Option Type Default Description
trackable-field String - Field name to track
proxy-role String - Proxy role

The trackable-field parameter is useful when you need to observe the values of a specific field through connector metrics.

Schema Definition

YTsaurus tables require a YSON schema definition. Provide the schema as a YSON list of column definitions.

Schema format:

[
    {"name"="column_name";"type"="data_type";"required"=%false;"sort_order"="ascending"};
    {"name"="another_column";"type"="string";"required"=%false}
]

For more information about YTsaurus schemas, see the official documentation.

Authentication

The connector supports multiple authentication methods:

This section describes:

Options-based Authentication

Provide credentials directly in the table configuration:

'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token'

Environment-based Authentication

Read credentials from environment variables:

'credentials-source' = 'env'

Set the following environment variables:

  • YT_USER - YTsaurus username
  • YT_TOKEN - YTsaurus token

Custom Authentication

To create your own authentication method, implement the CredentialsProvider interface.

Data Partitioning

The connector supports automatic data partitioning based on timestamp fields.

To configure partitioning, specify the date or time column in partition-key and choose the required granularity in partition-scale. The connector automatically calculates the partition value from the selected field. For a working configuration, see Partitioning Example.

Supported Partition Scales

  • HOUR - Hourly partitions (format: YYYY-MM-DD HH:00:00)
  • HOUR_T - Hourly partitions with T separator (format: YYYY-MM-DDTHH:00:00)
  • DAY - Daily partitions (format: YYYY-MM-DD)
  • WEEK - Weekly partitions (format: YYYY-MM-DD, where the date is the Monday of the corresponding week)
  • MONTH - Monthly partitions (format: YYYY-MM-01)
  • SHORT_MONTH - Short monthly partitions (format: YYYY-MM)
  • YEAR - Yearly partitions (format: YYYY-01-01, that is, the first day of the year)
  • SHORT_YEAR - Short yearly partitions (format: YYYY)

Partitioning Example

CREATE TABLE partitioned_data_source (
id BIGINT,
data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.data.length' = '20',
'fields.event_time.max-past' = '7d'
);
CREATE TABLE partitioned_table (
    id BIGINT,
    data STRING,
    event_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'path' = '//tmp/partitioned_table',
    'partition-key' = 'event_time',
    'partition-scale' = 'DAY',
    'partition-ttl-day-cnt' = '30',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="data";"type"="string";"required"=%false};
        {"name"="event_time";"type"="string";"required"=%false}
    ]'
);
INSERT INTO partitioned_table
SELECT *
FROM partitioned_data_source;

Result:

Table Resharding

The connector supports automatic table resharding at table creation time. Resharding lets you set the number of tablets upfront — either as a fixed value or based on statistics from existing partitions. This helps distribute write load evenly from the start.

Resharding Strategies

  • NONE - Disable resharding
  • FIXED - Resharding with a fixed number of tablets
  • LAST_PARTITIONS - Resharding based on the average number of tablets in the last N partitions

Resharding Example

'reshard.strategy' = 'FIXED',
'reshard.tablet-count' = '10',
'reshard.uniform' = 'true'

Lookup Operations

The connector supports both synchronous and asynchronous Lookup operations with caching.

Lookup operations enrich a stream with data from an external table by key. In SQL terms, this corresponds to the Lookup Join scenario. For more information about lookup joins, see {#T}.

This section covers stream enrichment with data from YTsaurus. If you only need the basic write scenario, the steps in Quick Start are sufficient.

This section describes:

Lookup Methods

  • LOOKUP - standard Lookup operation
  • SELECT - Lookup operation based on a SELECT query

Cache Types

  • NONE - No caching
  • PARTIAL - Partial caching with configurable size and TTL
  • FULL - Full table caching with periodic or timed reload

Lookup Example

The Lookup connector requires a YSON formatter. Build or download the YSON formatter according to the documentation and place it in $FLINK_ROOT/lib.

Prepare data for the Lookup operation.

CREATE TABLE users_datagen (
    id BIGINT,
    name STRING,
    age INT,
    created_at TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50',
    'number-of-rows' = '1000',
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);
CREATE TABLE lookup_table_sink (
    id BIGINT,
    name STRING,
    age INT,
    created_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//tmp/lookup_table',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="name";"type"="string";"required"=%false};
        {"name"="age";"type"="int64";"required"=%false};
        {"name"="created_at";"type"="string";"required"=%false}
    ]'
);
INSERT INTO lookup_table_sink
SELECT
    id,
    name,
    age,
    created_at
FROM users_datagen;

Join order data with user data.

CREATE TABLE orders_datagen (
    order_id BIGINT,
    user_id BIGINT,
    total_price INT,
    created_at TIMESTAMP(3),
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'number-of-rows' = '100',
    'fields.user_id.min' = '1',
    'fields.user_id.max' = '1000',
    'fields.total_price.min' = '1',
    'fields.total_price.max' = '10000'
);
CREATE TABLE lookup_table (
    id BIGINT,
    name STRING,
    age INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'format' = 'yson',
    'proxy' = 'localhost:8081',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'path' = '//tmp/lookup_table',
    'lookup.async' = 'true',
    'lookup.cache' = 'PARTIAL',
    'lookup.partial-cache.max-rows' = '10000',
    'lookup.partial-cache.expire-after-access' = '1h',
    'lookup-method' = 'LOOKUP',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="name";"type"="string";"required"=%false}
    ]'
);
SELECT
    o.order_id,
    o.total_price,
    o.created_at,
    l.id as user_id,
    l.name,
    l.age
FROM orders_datagen o
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF o.proc_time AS l
ON o.user_id = l.id;

Open the Apache Flink UI at localhost:8083.

Flink SQL Client displays the results of the Lookup Join operation in real time.

Examples

This section provides:

Basic Sink Example

CREATE TABLE ytsaurus_sink (
    user_id BIGINT,
    username STRING,
    email STRING,
    created_at TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/users_table',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'schema' = '[
        {"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="username";"type"="string";"required"=%false};
        {"name"="email";"type"="string";"required"=%false};
        {"name"="created_at";"type"="string";"required"=%false}
    ]'
);

Partitioned Table with Resharding

CREATE TABLE events_table (
    event_id BIGINT,
    user_id BIGINT,
    event_type STRING,
    event_data STRING,
    event_time TIMESTAMP(3),
    PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/events',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'partition-key' = 'event_time',
    'partition-scale' = 'DAY',
    'partition-ttl-day-cnt' = '90',
    'reshard.strategy' = 'LAST_PARTITIONS',
    'reshard.tablet-count' = '20',
    'reshard.last-partitions-count' = '7',
    'reshard.uniform' = 'true',
    'optimize-for' = 'SCAN',
    'schema' = '[
        {"name"="event_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="user_id";"type"="int64";"required"=%false};
        {"name"="event_type";"type"="string";"required"=%false};
        {"name"="event_data";"type"="string";"required"=%false};
        {"name"="event_time";"type"="string";"required"=%false}
    ]'
);

Lookup Table with Full Cache

CREATE TABLE user_lookup (
    user_id BIGINT,
    username STRING,
    email STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/users',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'lookup.cache' = 'FULL',
    'lookup.full-cache.reload-strategy' = 'PERIODIC',
    'lookup.full-cache.periodic-reload-interval' = '1h',
    'optimize-for' = 'LOOKUP',
    'schema' = '[
        {"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="username";"type"="string";"required"=%false};
        {"name"="email";"type"="string";"required"=%false}
    ]'
);

Multi-cluster Configuration

CREATE TABLE multi_cluster_table (
    id BIGINT,
    data STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path-map' = 'cluster1://tmp/table1,cluster2://tmp/table2',
    'cluster-pick-strategy' = 'FirstAvailableClusterPickStrategy',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="data";"type"="string";"required"=%false}
    ]'
);

What's Next