Apache Flink Connector YTsaurus
Apache Flink Connector YTsaurus is a connector for streaming and batch data processing on Apache Flink. It works with sorted dynamic tables in YTsaurus and supports writes, reading bounded streams, and Lookup operations.
The connector source code is available on GitHub.
Features
- writing to YTsaurus dynamic tables — supports streaming writes; the basic scenario is covered in Quick Start;
- automatic table creation — creates tables automatically before writing if they do not exist;
- advanced table resharding — provides configurable resharding strategies for optimal performance. For details, see Table Resharding;
- data partitioning — supports multiple partitioning granularities: hour, day, week, month, year. For details, see Data Partitioning;
- synchronous and asynchronous Lookup operations — supports both execution modes for Lookup operations on YTsaurus dynamic tables. For details, see Lookup Operations;
- Lookup caching — supports
FULLandPARTIALcache strategies to optimize performance. For details, see Lookup Operations; - multi-cluster Lookup support — lets you run Lookup operations against multiple YTsaurus clusters depending on availability. See an example in Examples;
- trackable fields — lets you track field values through metrics.
Installation
Note
The current version of the connector requires:
- Java 11
- Apache Flink 1.20.X
Replace connectorVersion with the latest version from Maven Central.
<dependency>
<groupId>tech.ytsaurus.flyt.connectors.ytsaurus</groupId>
<artifactId>flink-connector-ytsaurus</artifactId>
<version>${connectorVersion}</version>
<classifier>all</classifier>
</dependency>
implementation("tech.ytsaurus.flyt.connectors.ytsaurus:flink-connector-ytsaurus:$connectorVersion:all")
Quick Start
1. Install a YTsaurus cluster
You can skip this step if you already have a cluster configured.
For local development and testing, follow the official documentation to install the YTsaurus cluster via Kind. For production deployments, follow the YTsaurus Admin Guide.
Note
flink-connector-ytsaurus uses the Java YTsaurus client. Before you start, verify which proxy address you need to specify in the proxy parameter for your environment.
2. Install an Apache Flink cluster
Install an Apache Flink cluster using the official documentation. The connector requires Apache Flink version 1.20.X.
3. Install Flink Connector YTsaurus in the Apache Flink cluster
Build the connector from source (see Building from Source) or download it from the Maven repository. After you build or download the connector, place the resulting JAR file in ${FLINK_ROOT}/lib.
4. Change the Flink Web UI port
Open the conf/config.yaml file and change the rest.port parameter from 8081 to 8083 to avoid port conflicts with YTsaurus.
5. Start the Apache Flink cluster with Flink SQL Client
Start the Apache Flink cluster:
./bin/start-cluster.sh
Start Flink SQL Client:
./bin/sql-client.sh
6. Run the demo job
-
Create a Datagen source:
CREATE TABLE simple_datagen_source ( id BIGINT, name STRING, age INT, salary DOUBLE, is_active BOOLEAN, created_at TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50', 'number-of-rows' = '1000', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '100000', 'fields.name.length' = '20', 'fields.age.min' = '22', 'fields.age.max' = '65', 'fields.salary.min' = '30000.0', 'fields.salary.max' = '150000.0' ); -
Create a YTsaurus sink:
CREATE TABLE ytsaurus_simple_sink ( id BIGINT, name STRING, age INT, salary DOUBLE, is_active BOOLEAN, created_at TIMESTAMP(3), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'ytsaurus', 'proxy' = 'localhost:8081', 'path' = '//tmp/flink_simple_test_table', 'credentials-source' = 'options', 'username' = 'admin', 'token' = 'password', 'schema' = '[ {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"}; {"name"="name";"type"="string";"required"=%false}; {"name"="age";"type"="int64";"required"=%false}; {"name"="salary";"type"="double";"required"=%false}; {"name"="is_active";"type"="boolean";"required"=%false}; {"name"="created_at";"type"="string";"required"=%false}; {"name"="updated_at";"type"="string";"required"=%false} ]' ); -
Run a job that writes the generated data to a YTsaurus dynamic table:
INSERT INTO ytsaurus_simple_sink SELECT id, name, age, salary, is_active, created_at, CURRENT_TIMESTAMP AS updated_at FROM simple_datagen_source; -
Monitor job progress at localhost:8083.

-
The table
flink_simple_test_tablewill be created in the/tmp/flink_simple_test_tabledirectory and will contain the results of the Flink job.
Congratulations! You've launched your first job with YTsaurus and Apache Flink.
Configuration Options
The YTsaurus connector supports a wide range of configuration options.
The options below are grouped by purpose. For most write and Lookup scenarios, specify the table path by using path. In multi-cluster configurations, use path-map instead of path.
This section describes:
- Required Options;
- Path Configuration;
- Authentication Options;
- Table Configuration;
- Partitioning Options;
- Resharding Options;
- Transaction and Performance Options;
- Lookup Options;
- Cache Options for Lookup Operations;
- Other Options.
Required Options
| Option | Type | Description |
|---|---|---|
proxy |
String | YTsaurus proxy address |
schema |
String | YSON schema definition for the YTsaurus table. See Schema Definition |
credentials-source |
String | Authentication method (options, env, your-custom-provider) |
Path Configuration
| Option | Type | Default | Description |
|---|---|---|---|
path |
String | - | Path to the YTsaurus table |
path-map |
Map<String, String> | - | Mapping from cluster to table path for multi-cluster lookups |
Specify one of the following options:
path- for a single table in a single cluster;path-map- for multi-cluster Lookup scenarios.
Authentication Options
| Option | Type | Default | Description |
|---|---|---|---|
username |
String | - | YTsaurus username (when using options credentials source) |
token |
String | - | YTsaurus token (when using options credentials source) |
Table Configuration
| Option | Type | Default | Description |
|---|---|---|---|
optimize-for |
Enum | - | Table optimization mode (LOOKUP, SCAN) |
primary-medium |
Enum | - | Primary storage medium (DEFAULT, SSD_BLOBS) |
tablet-cell-bundle |
String | - | Tablet cell bundle name |
enable-dynamic-store-read |
Boolean | true |
Enable dynamic store read attribute |
custom-attributes |
String | - | Custom table attributes in YSON format |
Partitioning Options
| Option | Type | Default | Description |
|---|---|---|---|
partition-key |
String | - | Column name to use for partitioning |
partition-scale |
Enum | - | Partitioning granularity (HOUR, HOUR_T, DAY, WEEK, MONTH, SHORT_MONTH, YEAR, SHORT_YEAR) |
partition-ttl-day-cnt |
Integer | - | Number of days to keep partitions |
partition-ttl-in-days-from-creation |
Integer | - | TTL in days from partition creation |
min-partition-ttl |
Integer | 20 |
Minimum partition TTL in days |
Resharding Options
| Option | Type | Default | Description |
|---|---|---|---|
reshard.strategy |
Enum | NONE |
Resharding strategy (NONE, FIXED, LAST_PARTITIONS) |
reshard.tablet-count |
Integer | - | Number of tablets for resharding |
reshard.uniform |
Boolean | false |
Use uniform partitioning |
reshard.last-partitions-count |
Integer | 7 |
Number of partitions to consider in LAST_PARTITIONS strategy |
Transaction and Performance Options
| Option | Type | Default | Description |
|---|---|---|---|
commit-transaction-period |
Duration | - | Period for committing transactions |
transaction-timeout |
Duration | - | Transaction timeout |
transaction-atomicity |
Enum | - | Transaction atomicity level |
rows-in-transaction-limit |
Integer | - | Maximum rows per transaction |
rows-in-modification-limit |
Integer | - | Maximum rows per modification |
retry-strategy |
Enum | EXPONENTIAL |
Retry strategy (EXPONENTIAL, NO_RETRY) |
Lookup Options
| Option | Type | Default | Description |
|---|---|---|---|
lookup.async |
Boolean | false |
Enable asynchronous lookup |
lookup-method |
Enum | LOOKUP |
Lookup method (LOOKUP, SELECT). LOOKUP method works only with key columns, but has better performance. SELECT method works with any columns, but works slowly. |
cluster-pick-strategy |
String | FirstAvailableClusterPickStrategy |
Strategy for picking clusters in multi-cluster setup. You can choose your own implementation of the strategy. |
Cache Options for Lookup Operations
| Option | Type | Default | Description |
|---|---|---|---|
lookup.cache |
Enum | NONE |
Cache type (NONE, PARTIAL, FULL) |
lookup.partial-cache.max-rows |
Long | - | Maximum rows in partial cache |
lookup.partial-cache.expire-after-write |
Duration | - | Cache expiration after write |
lookup.partial-cache.expire-after-access |
Duration | - | Cache expiration after access |
lookup.partial-cache.cache-missing-key |
Boolean | - | Cache missing keys |
lookup.full-cache.reload-strategy |
Enum | - | Full cache reload strategy (PERIODIC, TIMED) |
lookup.full-cache.periodic-reload-interval |
Duration | - | Periodic reload interval |
lookup.full-cache.timed-reload-iso-time |
String | - | Timed reload ISO time |
Other Options
| Option | Type | Default | Description |
|---|---|---|---|
trackable-field |
String | - | Field name to track |
proxy-role |
String | - | Proxy role |
The trackable-field parameter is useful when you need to observe the values of a specific field through connector metrics.
Schema Definition
YTsaurus tables require a YSON schema definition. Provide the schema as a YSON list of column definitions.
Schema format:
[
{"name"="column_name";"type"="data_type";"required"=%false;"sort_order"="ascending"};
{"name"="another_column";"type"="string";"required"=%false}
]
For more information about YTsaurus schemas, see the official documentation.
Authentication
The connector supports multiple authentication methods:
This section describes:
Options-based Authentication
Provide credentials directly in the table configuration:
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token'
Environment-based Authentication
Read credentials from environment variables:
'credentials-source' = 'env'
Set the following environment variables:
YT_USER- YTsaurus usernameYT_TOKEN- YTsaurus token
Custom Authentication
To create your own authentication method, implement the CredentialsProvider interface.
Data Partitioning
The connector supports automatic data partitioning based on timestamp fields.
To configure partitioning, specify the date or time column in partition-key and choose the required granularity in partition-scale. The connector automatically calculates the partition value from the selected field. For a working configuration, see Partitioning Example.
Supported Partition Scales
HOUR- Hourly partitions (format:YYYY-MM-DD HH:00:00)HOUR_T- Hourly partitions with T separator (format:YYYY-MM-DDTHH:00:00)DAY- Daily partitions (format:YYYY-MM-DD)WEEK- Weekly partitions (format:YYYY-MM-DD, where the date is the Monday of the corresponding week)MONTH- Monthly partitions (format:YYYY-MM-01)SHORT_MONTH- Short monthly partitions (format:YYYY-MM)YEAR- Yearly partitions (format:YYYY-01-01, that is, the first day of the year)SHORT_YEAR- Short yearly partitions (format:YYYY)
Partitioning Example
CREATE TABLE partitioned_data_source (
id BIGINT,
data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.data.length' = '20',
'fields.event_time.max-past' = '7d'
);
CREATE TABLE partitioned_table (
id BIGINT,
data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'path' = '//tmp/partitioned_table',
'partition-key' = 'event_time',
'partition-scale' = 'DAY',
'partition-ttl-day-cnt' = '30',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="data";"type"="string";"required"=%false};
{"name"="event_time";"type"="string";"required"=%false}
]'
);
INSERT INTO partitioned_table
SELECT *
FROM partitioned_data_source;
Result:

Table Resharding
The connector supports automatic table resharding at table creation time. Resharding lets you set the number of tablets upfront — either as a fixed value or based on statistics from existing partitions. This helps distribute write load evenly from the start.
Resharding Strategies
NONE- Disable reshardingFIXED- Resharding with a fixed number of tabletsLAST_PARTITIONS- Resharding based on the average number of tablets in the last N partitions
Resharding Example
'reshard.strategy' = 'FIXED',
'reshard.tablet-count' = '10',
'reshard.uniform' = 'true'
Lookup Operations
The connector supports both synchronous and asynchronous Lookup operations with caching.
Lookup operations enrich a stream with data from an external table by key. In SQL terms, this corresponds to the Lookup Join scenario. For more information about lookup joins, see {#T}.
This section covers stream enrichment with data from YTsaurus. If you only need the basic write scenario, the steps in Quick Start are sufficient.
This section describes:
Lookup Methods
LOOKUP- standard Lookup operationSELECT- Lookup operation based on aSELECTquery
Cache Types
NONE- No cachingPARTIAL- Partial caching with configurable size and TTLFULL- Full table caching with periodic or timed reload
Lookup Example
The Lookup connector requires a YSON formatter. Build or download the YSON formatter according to the documentation and place it in $FLINK_ROOT/lib.
Prepare data for the Lookup operation.
CREATE TABLE users_datagen (
id BIGINT,
name STRING,
age INT,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TABLE lookup_table_sink (
id BIGINT,
name STRING,
age INT,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//tmp/lookup_table',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="name";"type"="string";"required"=%false};
{"name"="age";"type"="int64";"required"=%false};
{"name"="created_at";"type"="string";"required"=%false}
]'
);
INSERT INTO lookup_table_sink
SELECT
id,
name,
age,
created_at
FROM users_datagen;
Join order data with user data.
CREATE TABLE orders_datagen (
order_id BIGINT,
user_id BIGINT,
total_price INT,
created_at TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '100',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000',
'fields.total_price.min' = '1',
'fields.total_price.max' = '10000'
);
CREATE TABLE lookup_table (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'format' = 'yson',
'proxy' = 'localhost:8081',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'path' = '//tmp/lookup_table',
'lookup.async' = 'true',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '10000',
'lookup.partial-cache.expire-after-access' = '1h',
'lookup-method' = 'LOOKUP',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="name";"type"="string";"required"=%false}
]'
);
SELECT
o.order_id,
o.total_price,
o.created_at,
l.id as user_id,
l.name,
l.age
FROM orders_datagen o
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF o.proc_time AS l
ON o.user_id = l.id;
Open the Apache Flink UI at localhost:8083.

Flink SQL Client displays the results of the Lookup Join operation in real time.

Examples
This section provides:
- Basic Sink Example;
- Partitioned Table with Resharding;
- Lookup Table with Full Cache;
- Multi-cluster Configuration.
Basic Sink Example
CREATE TABLE ytsaurus_sink (
user_id BIGINT,
username STRING,
email STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/users_table',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'schema' = '[
{"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="username";"type"="string";"required"=%false};
{"name"="email";"type"="string";"required"=%false};
{"name"="created_at";"type"="string";"required"=%false}
]'
);
Partitioned Table with Resharding
CREATE TABLE events_table (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/events',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'partition-key' = 'event_time',
'partition-scale' = 'DAY',
'partition-ttl-day-cnt' = '90',
'reshard.strategy' = 'LAST_PARTITIONS',
'reshard.tablet-count' = '20',
'reshard.last-partitions-count' = '7',
'reshard.uniform' = 'true',
'optimize-for' = 'SCAN',
'schema' = '[
{"name"="event_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="user_id";"type"="int64";"required"=%false};
{"name"="event_type";"type"="string";"required"=%false};
{"name"="event_data";"type"="string";"required"=%false};
{"name"="event_time";"type"="string";"required"=%false}
]'
);
Lookup Table with Full Cache
CREATE TABLE user_lookup (
user_id BIGINT,
username STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/users',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'lookup.cache' = 'FULL',
'lookup.full-cache.reload-strategy' = 'PERIODIC',
'lookup.full-cache.periodic-reload-interval' = '1h',
'optimize-for' = 'LOOKUP',
'schema' = '[
{"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="username";"type"="string";"required"=%false};
{"name"="email";"type"="string";"required"=%false}
]'
);
Multi-cluster Configuration
CREATE TABLE multi_cluster_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path-map' = 'cluster1://tmp/table1,cluster2://tmp/table2',
'cluster-pick-strategy' = 'FirstAvailableClusterPickStrategy',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="data";"type"="string";"required"=%false}
]'
);
What's Next
- Sorted dynamic tables — learn more about the tables the connector works with;
- YSON formatter for Flink — if you need to work with YSON directly in Flink jobs;
- YTsaurus Java client — the low-level API the connector is built on.