Apache Flink Connector YTsaurus
Apache Flink Connector YTsaurus — это коннектор для потоковой и пакетной обработки данных на Apache Flink. Работает с сортированными динамическими таблицами YTsaurus и поддерживает запись, чтение ограниченных потоков и Lookup-операции.
Исходный код коннектора доступен на GitHub.
Возможности
- запись в динамические таблицы YTsaurus — поддержка потоковой записи данных; базовый сценарий приведён в разделе Быстрый старт;
- автоматическое создание таблиц — таблицы создаются автоматически перед записью, если они не существуют;
- предварительное решардирование таблиц — настраиваемые стратегии решардирования таблиц для оптимальной производительности. Подробнее см. в разделе Решардирование таблиц;
- партиционирование данных — поддержка партиционирования различной гранулярности: час, день, неделя, месяц, год. Подробнее см. в разделе Партиционирование данных;
- синхронные и асинхронные Lookup-операции — поддержка обоих режимов выполнения Lookup-операций из динамических таблиц YTsaurus. Подробнее см. в разделе Lookup-операции;
- кеширование Lookup-запросов — поддержка стратегий кеширования
FULLиPARTIALдля оптимизации производительности. Подробнее см. в разделе Lookup-операции; - поддержка Lookup из нескольких кластеров — возможность выполнять Lookup из нескольких кластеров YTsaurus в зависимости от доступности. Пример см. в разделе Примеры;
- отслеживаемые поля — возможность отслеживать значения полей через метрики.
Установка
Примечание
Текущая версия коннектора требует:
- Java 11
- Apache Flink 1.20.X
Замените connectorVersion на актуальную версию со страницы Maven Central.
<dependency>
<groupId>tech.ytsaurus.flyt.connectors.ytsaurus</groupId>
<artifactId>flink-connector-ytsaurus</artifactId>
<version>${connectorVersion}</version>
<classifier>all</classifier>
</dependency>
implementation("tech.ytsaurus.flyt.connectors.ytsaurus:flink-connector-ytsaurus:$connectorVersion:all")
Быстрый старт
1. Установите кластер YTsaurus
Этот шаг можно пропустить, если кластер уже существует.
Для локальной разработки и тестирования рекомендуется воспользоваться официальной документацией по установке кластера YTsaurus через Kind. Для продакшн процессов следуйте руководству администратора YTsaurus.
Примечание
flink-connector-ytsaurus использует Java-клиент YTsaurus. Перед началом работы уточните, какой адрес прокси требуется указать в параметре proxy для вашего окружения.
2. Установите кластер Apache Flink
Установите кластер Apache Flink, следуя официальной документации. Коннектор требует Apache Flink версии 1.20.X.
3. Установите Flink Connector YTsaurus в кластер Apache Flink
Соберите коннектор из исходного кода (см. Building from Source) или скачайте его из репозитория Maven. После сборки или загрузки поместите полученный jar-файл в директорию ${FLINK_ROOT}/lib.
4. Измените порт Flink Web UI
Откройте файл conf/config.yaml и измените параметр rest.port с 8081 на 8083, чтобы избежать конфликта портов с YTsaurus.
5. Запустите кластер Apache Flink с Flink SQL Client
Запустите кластер Apache Flink:
./bin/start-cluster.sh
Запустите Flink SQL Client:
./bin/sql-client.sh
6. Запустите демо-джобу
-
Создайте источник Datagen:
CREATE TABLE simple_datagen_source ( id BIGINT, name STRING, age INT, salary DOUBLE, is_active BOOLEAN, created_at TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50', 'number-of-rows' = '1000', 'fields.id.kind' = 'sequence', 'fields.id.start' = '1', 'fields.id.end' = '100000', 'fields.name.length' = '20', 'fields.age.min' = '22', 'fields.age.max' = '65', 'fields.salary.min' = '30000.0', 'fields.salary.max' = '150000.0' ); -
Создайте приёмник YTsaurus:
CREATE TABLE ytsaurus_simple_sink ( id BIGINT, name STRING, age INT, salary DOUBLE, is_active BOOLEAN, created_at TIMESTAMP(3), updated_at TIMESTAMP(3), PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'ytsaurus', 'proxy' = 'localhost:8081', 'path' = '//tmp/flink_simple_test_table', 'credentials-source' = 'options', 'username' = 'admin', 'token' = 'password', 'schema' = '[ {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"}; {"name"="name";"type"="string";"required"=%false}; {"name"="age";"type"="int64";"required"=%false}; {"name"="salary";"type"="double";"required"=%false}; {"name"="is_active";"type"="boolean";"required"=%false}; {"name"="created_at";"type"="string";"required"=%false}; {"name"="updated_at";"type"="string";"required"=%false} ]' ); -
Запустите джобу, которая будет записывать сгенерированные данные в динамическую таблицу YTsaurus:
INSERT INTO ytsaurus_simple_sink SELECT id, name, age, salary, is_active, created_at, CURRENT_TIMESTAMP AS updated_at FROM simple_datagen_source; -
Следите за работой джобы по адресу localhost:8083

-
Таблица
flink_simple_test_tableбудет создана в директории/tmp/flink_simple_test_tableи будет содержать результаты выполнения Flink джобы.
Поздравляем! Вы запустили свою первую джобу с YTsaurus и Apache Flink.
Параметры конфигурации
- Обязательные параметры;
- Конфигурация пути;
- Параметры аутентификации;
- Конфигурация таблицы;
- Параметры партиционирования;
- Параметры решардирования;
- Параметры транзакций и производительности;
- Параметры Lookup-операции;
- Параметры кеша для Lookup-операции;
- Прочие параметры.
Обязательные параметры
| Параметр | Тип | Описание |
|---|---|---|
proxy |
String | Адрес прокси YTsaurus |
schema |
String | Определение схемы в формате YSON для таблицы YTsaurus. См. Определение схемы |
credentials-source |
String | Метод аутентификации (options, env, your-custom-provider) |
Конфигурация пути
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
path |
String | - | Путь к таблице YTsaurus |
path-map |
Map<String, String> | - | Map соответствия кластер–путь к таблице для Lookup из нескольких кластеров |
Укажите один из параметров:
path— для работы с одной таблицей в одном кластере;path-map— для Lookup-сценариев с несколькими кластерами.
Параметры аутентификации
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
username |
String | - | Имя пользователя (при использовании источника учётных данных options) |
token |
String | - | Токен (при использовании источника учётных данных options) |
Конфигурация таблицы
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
optimize-for |
Enum | - | Режим оптимизации таблицы (LOOKUP, SCAN) |
primary-medium |
Enum | - | primary-medium таблицы (DEFAULT, SSD_BLOBS) |
tablet-cell-bundle |
String | - | Название бандла |
enable-dynamic-store-read |
Boolean | true |
Атрибут чтения из динамического хранилища |
custom-attributes |
String | - | Пользовательские атрибуты таблицы в формате YSON |
Параметры партиционирования
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
partition-key |
String | - | Имя столбца, по которому будет выполняться партиционирование |
partition-scale |
Enum | - | Гранулярность партиционирования (HOUR, HOUR_T, DAY, WEEK, MONTH, SHORT_MONTH, YEAR, SHORT_YEAR) |
partition-ttl-day-cnt |
Integer | - | Количество дней хранения партиции |
partition-ttl-in-days-from-creation |
Integer | - | TTL в днях с момента создания партиции |
min-partition-ttl |
Integer | 20 |
Минимальный TTL партиции в днях |
Параметры решардирования
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
reshard.strategy |
Enum | NONE |
Стратегия решардирования (NONE, FIXED, LAST_PARTITIONS) |
reshard.tablet-count |
Integer | - | Количество таблетов при решардировании |
reshard.uniform |
Boolean | false |
Использовать ли uniform при партиционировании |
reshard.last-partitions-count |
Integer | 7 |
Количество партиций для анализа в стратегии LAST_PARTITIONS |
Параметры транзакций и производительности
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
commit-transaction-period |
Duration | - | Период фиксации транзакций |
transaction-timeout |
Duration | - | Таймаут транзакции |
transaction-atomicity |
Enum | - | Уровень атомарности транзакции |
rows-in-transaction-limit |
Integer | - | Максимальное количество строк в транзакции |
rows-in-modification-limit |
Integer | - | Максимальное количество строк в одной модификации |
retry-strategy |
Enum | EXPONENTIAL |
Стратегия попыток записи (EXPONENTIAL, NO_RETRY) |
Параметры Lookup-операции
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
lookup.async |
Boolean | false |
Включить асинхронный Lookup |
lookup-method |
Enum | LOOKUP |
Метод Lookup (LOOKUP, SELECT). Метод LOOKUP работает только с ключевыми столбцами, но обеспечивает лучшую производительность. Метод SELECT работает с любыми столбцами, но работает медленнее. |
cluster-pick-strategy |
String | FirstAvailableClusterPickStrategy |
Стратегия выбора кластера в конфигурации с несколькими кластерами. Можно указать собственную реализацию стратегии. |
Параметры кеша для Lookup-операции
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
lookup.cache |
Enum | NONE |
Тип кеша (NONE, PARTIAL, FULL) |
lookup.partial-cache.max-rows |
Long | - | Максимальное количество строк в partial кеше |
lookup.partial-cache.expire-after-write |
Duration | - | Время жизни кеша после записи |
lookup.partial-cache.expire-after-access |
Duration | - | Время жизни кеша после обращения |
lookup.partial-cache.cache-missing-key |
Boolean | - | Кешировать отсутствующие ключи |
lookup.full-cache.reload-strategy |
Enum | - | Стратегия перезагрузки full кеша (PERIODIC, TIMED) |
lookup.full-cache.periodic-reload-interval |
Duration | - | Интервал периодического обновления full кеша |
lookup.full-cache.timed-reload-iso-time |
String | - | Время перезагрузки по расписанию в формате ISO |
Прочие параметры
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
trackable-field |
String | - | Имя поля для отслеживания его значений в метриках |
proxy-role |
String | - | Прокси роль |
Параметр trackable-field полезен в сценариях, где важно наблюдать за значениями одного из полей через метрики коннектора.
Определение схемы
Таблицы YTsaurus требуют определения схемы в формате YSON. Схема должна быть предоставлена в виде YSON-списка с описанием столбцов.
Формат схемы:
[
{"name"="column_name";"type"="data_type";"required"=%false;"sort_order"="ascending"};
{"name"="another_column";"type"="string";"required"=%false}
]
Подробнее о схемах YTsaurus см. в документации.
Аутентификация
Коннектор поддерживает несколько методов аутентификации:
- Аутентификация через параметры;
- Аутентификация через переменные окружения;
- Пользовательская аутентификация.
Аутентификация через параметры
Учётные данные передаются непосредственно в конфигурации таблицы:
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token'
Аутентификация через переменные окружения
Учётные данные считываются из переменных окружения:
'credentials-source' = 'env'
Задайте следующие переменные окружения:
YT_USER— имя пользователяYT_TOKEN— токен
Пользовательская аутентификация
Чтобы создать собственный метод аутентификации, реализуйте интерфейс CredentialsProvider.
Партиционирование данных
Коннектор поддерживает автоматическое партиционирование данных на основе полей с временными метками.
Чтобы настроить партиционирование, укажите столбец с датой или временем в параметре partition-key и выберите нужную гранулярность в параметре partition-scale. Коннектор будет автоматически вычислять значение партиции по значению указанного поля. Рабочую конфигурацию см. в разделе Пример использования партиционирования.
Поддерживаемые масштабы партиций
HOUR— почасовые партиции (формат:YYYY-MM-DD HH:00:00)HOUR_T— почасовые партиции с разделителем T (формат:YYYY-MM-DDTHH:00:00)DAY— ежедневные партиции (формат:YYYY-MM-DD)WEEK— еженедельные партиции (формат:YYYY-MM-DDс датой понедельника)MONTH— ежемесячные партиции (формат:YYYY-MM-01)SHORT_MONTH— короткие ежемесячные партиции (формат:YYYY-MM)YEAR— ежегодные партиции (формат:YYYY-01-01, то есть первый день года)SHORT_YEAR— короткие ежегодные партиции (формат:YYYY)
Пример использования партиционирования
CREATE TABLE partitioned_data_source (
id BIGINT,
data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.data.length' = '20',
'fields.event_time.max-past' = '7d'
);
CREATE TABLE partitioned_table (
id BIGINT,
data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'path' = '//tmp/partitioned_table',
'partition-key' = 'event_time',
'partition-scale' = 'DAY',
'partition-ttl-day-cnt' = '30',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="data";"type"="string";"required"=%false};
{"name"="event_time";"type"="string";"required"=%false}
]'
);
INSERT INTO partitioned_table
SELECT *
FROM partitioned_data_source;
Результат:

Решардирование таблиц
Коннектор поддерживает автоматическое решардирование таблиц при их создании. Решардирование позволяет задать количество таблетов заранее — фиксированно или на основе статистики уже существующих партиций. Это помогает равномерно распределить нагрузку на запись с самого начала.
Стратегии решардирования
NONE— отключить решардированиеFIXED— решардирование с фиксированным количеством таблетовLAST_PARTITIONS— решардирование на основе среднего количества таблетов в последних N партициях
Пример настроек решардирования
'reshard.strategy' = 'FIXED',
'reshard.tablet-count' = '10',
'reshard.uniform' = 'true'
Lookup-операции
Коннектор поддерживает как синхронные, так и асинхронные Lookup-операции с возможностью кеширования. Lookup-операции используются для обогащения потока данными из внешней таблицы по ключу. В терминах SQL это соответствует сценарию Lookup Join. Подробнее о lookup join см. в разделе {#T}.
Этот раздел относится к сценарию обогащения потока данными из YTsaurus. Если вам нужен базовый сценарий записи данных, достаточно выполнить шаги из раздела Быстрый старт.
В разделе описаны:
Методы Lookup
LOOKUP— стандартная Lookup-операцияSELECT— Lookup-операция на основе SELECT запроса
Типы кеша
NONE— без кешированияPARTIAL— частичное кеширование с настраиваемым размером и TTLFULL— полное кеширование таблицы с периодической или плановой перезагрузкой
Пример Lookup
Для Lookup-коннектора требуется YSON-форматтер. Соберите или загрузите форматтер YSON согласно документации и поместите его в $FLINK_ROOT/lib.
Подготовьте данные для Lookup-операции.
CREATE TABLE users_datagen (
id BIGINT,
name STRING,
age INT,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000'
);
CREATE TABLE lookup_table_sink (
id BIGINT,
name STRING,
age INT,
created_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//tmp/lookup_table',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="name";"type"="string";"required"=%false};
{"name"="age";"type"="int64";"required"=%false};
{"name"="created_at";"type"="string";"required"=%false}
]'
);
INSERT INTO lookup_table_sink
SELECT
id,
name,
age,
created_at
FROM users_datagen;
Объедините данные заказов с данными пользователей.
CREATE TABLE orders_datagen (
order_id BIGINT,
user_id BIGINT,
total_price INT,
created_at TIMESTAMP(3),
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '100',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000',
'fields.total_price.min' = '1',
'fields.total_price.max' = '10000'
);
CREATE TABLE lookup_table (
id BIGINT,
name STRING,
age INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'format' = 'yson',
'proxy' = 'localhost:8081',
'credentials-source' = 'options',
'username' = 'admin',
'token' = 'password',
'path' = '//tmp/lookup_table',
'lookup.async' = 'true',
'lookup.cache' = 'PARTIAL',
'lookup.partial-cache.max-rows' = '10000',
'lookup.partial-cache.expire-after-access' = '1h',
'lookup-method' = 'LOOKUP',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="name";"type"="string";"required"=%false}
]'
);
SELECT
o.order_id,
o.total_price,
o.created_at,
l.id as user_id,
l.name,
l.age
FROM orders_datagen o
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF o.proc_time AS l
ON o.user_id = l.id;
Проверьте интерфейс Apache Flink по адресу localhost:8083.

Flink SQL Client отобразит результаты операции Lookup Join в режиме реального времени.

Примеры
- Минимальная конфигурация;
- Партиционированная таблица с решардированием;
- Lookup-таблица с FULL кешем;
- Конфигурация с Lookup из нескольких кластеров.
Минимальная конфигурация
CREATE TABLE ytsaurus_sink (
user_id BIGINT,
username STRING,
email STRING,
created_at TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/users_table',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'schema' = '[
{"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="username";"type"="string";"required"=%false};
{"name"="email";"type"="string";"required"=%false};
{"name"="created_at";"type"="string";"required"=%false}
]'
);
Партиционированная таблица с решардированием
CREATE TABLE events_table (
event_id BIGINT,
user_id BIGINT,
event_type STRING,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/events',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'partition-key' = 'event_time',
'partition-scale' = 'DAY',
'partition-ttl-day-cnt' = '90',
'reshard.strategy' = 'LAST_PARTITIONS',
'reshard.tablet-count' = '20',
'reshard.last-partitions-count' = '7',
'reshard.uniform' = 'true',
'optimize-for' = 'SCAN',
'schema' = '[
{"name"="event_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="user_id";"type"="int64";"required"=%false};
{"name"="event_type";"type"="string";"required"=%false};
{"name"="event_data";"type"="string";"required"=%false};
{"name"="event_time";"type"="string";"required"=%false}
]'
);
Lookup-таблица с FULL кешем
CREATE TABLE user_lookup (
user_id BIGINT,
username STRING,
email STRING,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path' = '//home/your-user/users',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'lookup.cache' = 'FULL',
'lookup.full-cache.reload-strategy' = 'PERIODIC',
'lookup.full-cache.periodic-reload-interval' = '1h',
'optimize-for' = 'LOOKUP',
'schema' = '[
{"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="username";"type"="string";"required"=%false};
{"name"="email";"type"="string";"required"=%false}
]'
);
Конфигурация с Lookup из нескольких кластеров
CREATE TABLE multi_cluster_table (
id BIGINT,
data STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'ytsaurus',
'proxy' = 'localhost:8081',
'path-map' = 'cluster1://tmp/table1,cluster2://tmp/table2',
'cluster-pick-strategy' = 'FirstAvailableClusterPickStrategy',
'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token',
'schema' = '[
{"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
{"name"="data";"type"="string";"required"=%false}
]'
);
Что дальше
- Сортированные динамические таблицы — подробнее об устройстве таблиц, с которыми работает коннектор;
- YSON-форматтер для Flink — если нужна работа с YSON напрямую в задачах Flink;
- Java-клиент YTsaurus — низкоуровневый API, на котором построен коннектор.