Apache Flink Connector YTsaurus

Apache Flink Connector YTsaurus — это коннектор для потоковой и пакетной обработки данных на Apache Flink. Работает с сортированными динамическими таблицами YTsaurus и поддерживает запись, чтение ограниченных потоков и Lookup-операции.

Исходный код коннектора доступен на GitHub.

Возможности

  • запись в динамические таблицы YTsaurus — поддержка потоковой записи данных; базовый сценарий приведён в разделе Быстрый старт;
  • автоматическое создание таблиц — таблицы создаются автоматически перед записью, если они не существуют;
  • предварительное решардирование таблиц — настраиваемые стратегии решардирования таблиц для оптимальной производительности. Подробнее см. в разделе Решардирование таблиц;
  • партиционирование данных — поддержка партиционирования различной гранулярности: час, день, неделя, месяц, год. Подробнее см. в разделе Партиционирование данных;
  • синхронные и асинхронные Lookup-операции — поддержка обоих режимов выполнения Lookup-операций из динамических таблиц YTsaurus. Подробнее см. в разделе Lookup-операции;
  • кеширование Lookup-запросов — поддержка стратегий кеширования FULL и PARTIAL для оптимизации производительности. Подробнее см. в разделе Lookup-операции;
  • поддержка Lookup из нескольких кластеров — возможность выполнять Lookup из нескольких кластеров YTsaurus в зависимости от доступности. Пример см. в разделе Примеры;
  • отслеживаемые поля — возможность отслеживать значения полей через метрики.

Установка

Примечание

Текущая версия коннектора требует:

  • Java 11
  • Apache Flink 1.20.X

Замените connectorVersion на актуальную версию со страницы Maven Central.

<dependency>
    <groupId>tech.ytsaurus.flyt.connectors.ytsaurus</groupId>
    <artifactId>flink-connector-ytsaurus</artifactId>
    <version>${connectorVersion}</version>
    <classifier>all</classifier>
</dependency>
implementation("tech.ytsaurus.flyt.connectors.ytsaurus:flink-connector-ytsaurus:$connectorVersion:all")

Быстрый старт

1. Установите кластер YTsaurus

Этот шаг можно пропустить, если кластер уже существует.

Для локальной разработки и тестирования рекомендуется воспользоваться официальной документацией по установке кластера YTsaurus через Kind. Для продакшн процессов следуйте руководству администратора YTsaurus.

Примечание

flink-connector-ytsaurus использует Java-клиент YTsaurus. Перед началом работы уточните, какой адрес прокси требуется указать в параметре proxy для вашего окружения.

Установите кластер Apache Flink, следуя официальной документации. Коннектор требует Apache Flink версии 1.20.X.

3. Установите Flink Connector YTsaurus в кластер Apache Flink

Соберите коннектор из исходного кода (см. Building from Source) или скачайте его из репозитория Maven. После сборки или загрузки поместите полученный jar-файл в директорию ${FLINK_ROOT}/lib.

Откройте файл conf/config.yaml и измените параметр rest.port с 8081 на 8083, чтобы избежать конфликта портов с YTsaurus.

Запустите кластер Apache Flink:

./bin/start-cluster.sh

Запустите Flink SQL Client:

./bin/sql-client.sh

6. Запустите демо-джобу

  1. Создайте источник Datagen:

    CREATE TABLE simple_datagen_source (
        id BIGINT,
        name STRING,
        age INT,
        salary DOUBLE,
        is_active BOOLEAN,
        created_at TIMESTAMP(3)
    ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50',
        'number-of-rows' = '1000',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '100000',
        'fields.name.length' = '20',
        'fields.age.min' = '22',
        'fields.age.max' = '65',
        'fields.salary.min' = '30000.0',
        'fields.salary.max' = '150000.0'
    );
    
  2. Создайте приёмник YTsaurus:

    CREATE TABLE ytsaurus_simple_sink (
        id BIGINT,
        name STRING,
        age INT,
        salary DOUBLE,
        is_active BOOLEAN,
        created_at TIMESTAMP(3),
        updated_at TIMESTAMP(3),
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'ytsaurus',
        'proxy' = 'localhost:8081',
        'path' = '//tmp/flink_simple_test_table',
        'credentials-source' = 'options',
        'username' = 'admin',
        'token' = 'password',
        'schema' = '[
            {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
            {"name"="name";"type"="string";"required"=%false};
            {"name"="age";"type"="int64";"required"=%false};
            {"name"="salary";"type"="double";"required"=%false};
            {"name"="is_active";"type"="boolean";"required"=%false};
            {"name"="created_at";"type"="string";"required"=%false};
            {"name"="updated_at";"type"="string";"required"=%false}
        ]'
    );
    
  3. Запустите джобу, которая будет записывать сгенерированные данные в динамическую таблицу YTsaurus:

    INSERT INTO ytsaurus_simple_sink
    SELECT
        id,
        name,
        age,
        salary,
        is_active,
        created_at,
        CURRENT_TIMESTAMP AS updated_at
    FROM simple_datagen_source;
    
  4. Следите за работой джобы по адресу localhost:8083

  5. Таблица flink_simple_test_table будет создана в директории /tmp/flink_simple_test_table и будет содержать результаты выполнения Flink джобы.

Поздравляем! Вы запустили свою первую джобу с YTsaurus и Apache Flink.

Параметры конфигурации

Обязательные параметры

Параметр Тип Описание
proxy String Адрес прокси YTsaurus
schema String Определение схемы в формате YSON для таблицы YTsaurus. См. Определение схемы
credentials-source String Метод аутентификации (options, env, your-custom-provider)

Конфигурация пути

Параметр Тип По умолчанию Описание
path String - Путь к таблице YTsaurus
path-map Map<String, String> - Map соответствия кластер–путь к таблице для Lookup из нескольких кластеров

Укажите один из параметров:

  • path — для работы с одной таблицей в одном кластере;
  • path-map — для Lookup-сценариев с несколькими кластерами.

Параметры аутентификации

Параметр Тип По умолчанию Описание
username String - Имя пользователя (при использовании источника учётных данных options)
token String - Токен (при использовании источника учётных данных options)

Конфигурация таблицы

Параметр Тип По умолчанию Описание
optimize-for Enum - Режим оптимизации таблицы (LOOKUP, SCAN)
primary-medium Enum - primary-medium таблицы (DEFAULT, SSD_BLOBS)
tablet-cell-bundle String - Название бандла
enable-dynamic-store-read Boolean true Атрибут чтения из динамического хранилища
custom-attributes String - Пользовательские атрибуты таблицы в формате YSON

Параметры партиционирования

Параметр Тип По умолчанию Описание
partition-key String - Имя столбца, по которому будет выполняться партиционирование
partition-scale Enum - Гранулярность партиционирования (HOUR, HOUR_T, DAY, WEEK, MONTH, SHORT_MONTH, YEAR, SHORT_YEAR)
partition-ttl-day-cnt Integer - Количество дней хранения партиции
partition-ttl-in-days-from-creation Integer - TTL в днях с момента создания партиции
min-partition-ttl Integer 20 Минимальный TTL партиции в днях

Параметры решардирования

Параметр Тип По умолчанию Описание
reshard.strategy Enum NONE Стратегия решардирования (NONE, FIXED, LAST_PARTITIONS)
reshard.tablet-count Integer - Количество таблетов при решардировании
reshard.uniform Boolean false Использовать ли uniform при партиционировании
reshard.last-partitions-count Integer 7 Количество партиций для анализа в стратегии LAST_PARTITIONS

Параметры транзакций и производительности

Параметр Тип По умолчанию Описание
commit-transaction-period Duration - Период фиксации транзакций
transaction-timeout Duration - Таймаут транзакции
transaction-atomicity Enum - Уровень атомарности транзакции
rows-in-transaction-limit Integer - Максимальное количество строк в транзакции
rows-in-modification-limit Integer - Максимальное количество строк в одной модификации
retry-strategy Enum EXPONENTIAL Стратегия попыток записи (EXPONENTIAL, NO_RETRY)

Параметры Lookup-операции

Параметр Тип По умолчанию Описание
lookup.async Boolean false Включить асинхронный Lookup
lookup-method Enum LOOKUP Метод Lookup (LOOKUP, SELECT). Метод LOOKUP работает только с ключевыми столбцами, но обеспечивает лучшую производительность. Метод SELECT работает с любыми столбцами, но работает медленнее.
cluster-pick-strategy String FirstAvailableClusterPickStrategy Стратегия выбора кластера в конфигурации с несколькими кластерами. Можно указать собственную реализацию стратегии.

Параметры кеша для Lookup-операции

Параметр Тип По умолчанию Описание
lookup.cache Enum NONE Тип кеша (NONE, PARTIAL, FULL)
lookup.partial-cache.max-rows Long - Максимальное количество строк в partial кеше
lookup.partial-cache.expire-after-write Duration - Время жизни кеша после записи
lookup.partial-cache.expire-after-access Duration - Время жизни кеша после обращения
lookup.partial-cache.cache-missing-key Boolean - Кешировать отсутствующие ключи
lookup.full-cache.reload-strategy Enum - Стратегия перезагрузки full кеша (PERIODIC, TIMED)
lookup.full-cache.periodic-reload-interval Duration - Интервал периодического обновления full кеша
lookup.full-cache.timed-reload-iso-time String - Время перезагрузки по расписанию в формате ISO

Прочие параметры

Параметр Тип По умолчанию Описание
trackable-field String - Имя поля для отслеживания его значений в метриках
proxy-role String - Прокси роль

Параметр trackable-field полезен в сценариях, где важно наблюдать за значениями одного из полей через метрики коннектора.

Определение схемы

Таблицы YTsaurus требуют определения схемы в формате YSON. Схема должна быть предоставлена в виде YSON-списка с описанием столбцов.

Формат схемы:

[
    {"name"="column_name";"type"="data_type";"required"=%false;"sort_order"="ascending"};
    {"name"="another_column";"type"="string";"required"=%false}
]

Подробнее о схемах YTsaurus см. в документации.

Аутентификация

Коннектор поддерживает несколько методов аутентификации:

Аутентификация через параметры

Учётные данные передаются непосредственно в конфигурации таблицы:

'credentials-source' = 'options',
'username' = 'your-username',
'token' = 'your-token'

Аутентификация через переменные окружения

Учётные данные считываются из переменных окружения:

'credentials-source' = 'env'

Задайте следующие переменные окружения:

  • YT_USER — имя пользователя
  • YT_TOKEN — токен

Пользовательская аутентификация

Чтобы создать собственный метод аутентификации, реализуйте интерфейс CredentialsProvider.

Партиционирование данных

Коннектор поддерживает автоматическое партиционирование данных на основе полей с временными метками.

Чтобы настроить партиционирование, укажите столбец с датой или временем в параметре partition-key и выберите нужную гранулярность в параметре partition-scale. Коннектор будет автоматически вычислять значение партиции по значению указанного поля. Рабочую конфигурацию см. в разделе Пример использования партиционирования.

Поддерживаемые масштабы партиций

  • HOUR — почасовые партиции (формат: YYYY-MM-DD HH:00:00)
  • HOUR_T — почасовые партиции с разделителем T (формат: YYYY-MM-DDTHH:00:00)
  • DAY — ежедневные партиции (формат: YYYY-MM-DD)
  • WEEK — еженедельные партиции (формат: YYYY-MM-DD с датой понедельника)
  • MONTH — ежемесячные партиции (формат: YYYY-MM-01)
  • SHORT_MONTH — короткие ежемесячные партиции (формат: YYYY-MM)
  • YEAR — ежегодные партиции (формат: YYYY-01-01, то есть первый день года)
  • SHORT_YEAR — короткие ежегодные партиции (формат: YYYY)

Пример использования партиционирования

CREATE TABLE partitioned_data_source (
id BIGINT,
data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '50',
'number-of-rows' = '1000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.data.length' = '20',
'fields.event_time.max-past' = '7d'
);
CREATE TABLE partitioned_table (
    id BIGINT,
    data STRING,
    event_time TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'path' = '//tmp/partitioned_table',
    'partition-key' = 'event_time',
    'partition-scale' = 'DAY',
    'partition-ttl-day-cnt' = '30',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="data";"type"="string";"required"=%false};
        {"name"="event_time";"type"="string";"required"=%false}
    ]'
);
INSERT INTO partitioned_table
SELECT *
FROM partitioned_data_source;

Результат:

Решардирование таблиц

Коннектор поддерживает автоматическое решардирование таблиц при их создании. Решардирование позволяет задать количество таблетов заранее — фиксированно или на основе статистики уже существующих партиций. Это помогает равномерно распределить нагрузку на запись с самого начала.

Стратегии решардирования

  • NONE — отключить решардирование
  • FIXED — решардирование с фиксированным количеством таблетов
  • LAST_PARTITIONS — решардирование на основе среднего количества таблетов в последних N партициях

Пример настроек решардирования

'reshard.strategy' = 'FIXED',
'reshard.tablet-count' = '10',
'reshard.uniform' = 'true'

Lookup-операции

Коннектор поддерживает как синхронные, так и асинхронные Lookup-операции с возможностью кеширования. Lookup-операции используются для обогащения потока данными из внешней таблицы по ключу. В терминах SQL это соответствует сценарию Lookup Join. Подробнее о lookup join см. в разделе {#T}.

Этот раздел относится к сценарию обогащения потока данными из YTsaurus. Если вам нужен базовый сценарий записи данных, достаточно выполнить шаги из раздела Быстрый старт.

В разделе описаны:

Методы Lookup

  • LOOKUP — стандартная Lookup-операция
  • SELECT — Lookup-операция на основе SELECT запроса

Типы кеша

  • NONE — без кеширования
  • PARTIAL — частичное кеширование с настраиваемым размером и TTL
  • FULL — полное кеширование таблицы с периодической или плановой перезагрузкой

Пример Lookup

Для Lookup-коннектора требуется YSON-форматтер. Соберите или загрузите форматтер YSON согласно документации и поместите его в $FLINK_ROOT/lib.

Подготовьте данные для Lookup-операции.

CREATE TABLE users_datagen (
    id BIGINT,
    name STRING,
    age INT,
    created_at TIMESTAMP(3)
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '50',
    'number-of-rows' = '1000',
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '1',
    'fields.id.end' = '1000'
);
CREATE TABLE lookup_table_sink (
    id BIGINT,
    name STRING,
    age INT,
    created_at TIMESTAMP(3),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//tmp/lookup_table',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="name";"type"="string";"required"=%false};
        {"name"="age";"type"="int64";"required"=%false};
        {"name"="created_at";"type"="string";"required"=%false}
    ]'
);
INSERT INTO lookup_table_sink
SELECT
    id,
    name,
    age,
    created_at
FROM users_datagen;

Объедините данные заказов с данными пользователей.

CREATE TABLE orders_datagen (
    order_id BIGINT,
    user_id BIGINT,
    total_price INT,
    created_at TIMESTAMP(3),
    proc_time AS PROCTIME()
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'number-of-rows' = '100',
    'fields.user_id.min' = '1',
    'fields.user_id.max' = '1000',
    'fields.total_price.min' = '1',
    'fields.total_price.max' = '10000'
);
CREATE TABLE lookup_table (
    id BIGINT,
    name STRING,
    age INT,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'format' = 'yson',
    'proxy' = 'localhost:8081',
    'credentials-source' = 'options',
    'username' = 'admin',
    'token' = 'password',
    'path' = '//tmp/lookup_table',
    'lookup.async' = 'true',
    'lookup.cache' = 'PARTIAL',
    'lookup.partial-cache.max-rows' = '10000',
    'lookup.partial-cache.expire-after-access' = '1h',
    'lookup-method' = 'LOOKUP',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="name";"type"="string";"required"=%false}
    ]'
);
SELECT
    o.order_id,
    o.total_price,
    o.created_at,
    l.id as user_id,
    l.name,
    l.age
FROM orders_datagen o
LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF o.proc_time AS l
ON o.user_id = l.id;

Проверьте интерфейс Apache Flink по адресу localhost:8083.

Flink SQL Client отобразит результаты операции Lookup Join в режиме реального времени.

Примеры

Минимальная конфигурация

CREATE TABLE ytsaurus_sink (
    user_id BIGINT,
    username STRING,
    email STRING,
    created_at TIMESTAMP(3),
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/users_table',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'schema' = '[
        {"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="username";"type"="string";"required"=%false};
        {"name"="email";"type"="string";"required"=%false};
        {"name"="created_at";"type"="string";"required"=%false}
    ]'
);

Партиционированная таблица с решардированием

CREATE TABLE events_table (
    event_id BIGINT,
    user_id BIGINT,
    event_type STRING,
    event_data STRING,
    event_time TIMESTAMP(3),
    PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/events',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'partition-key' = 'event_time',
    'partition-scale' = 'DAY',
    'partition-ttl-day-cnt' = '90',
    'reshard.strategy' = 'LAST_PARTITIONS',
    'reshard.tablet-count' = '20',
    'reshard.last-partitions-count' = '7',
    'reshard.uniform' = 'true',
    'optimize-for' = 'SCAN',
    'schema' = '[
        {"name"="event_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="user_id";"type"="int64";"required"=%false};
        {"name"="event_type";"type"="string";"required"=%false};
        {"name"="event_data";"type"="string";"required"=%false};
        {"name"="event_time";"type"="string";"required"=%false}
    ]'
);

Lookup-таблица с FULL кешем

CREATE TABLE user_lookup (
    user_id BIGINT,
    username STRING,
    email STRING,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path' = '//home/your-user/users',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'lookup.cache' = 'FULL',
    'lookup.full-cache.reload-strategy' = 'PERIODIC',
    'lookup.full-cache.periodic-reload-interval' = '1h',
    'optimize-for' = 'LOOKUP',
    'schema' = '[
        {"name"="user_id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="username";"type"="string";"required"=%false};
        {"name"="email";"type"="string";"required"=%false}
    ]'
);

Конфигурация с Lookup из нескольких кластеров

CREATE TABLE multi_cluster_table (
    id BIGINT,
    data STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'ytsaurus',
    'proxy' = 'localhost:8081',
    'path-map' = 'cluster1://tmp/table1,cluster2://tmp/table2',
    'cluster-pick-strategy' = 'FirstAvailableClusterPickStrategy',
    'credentials-source' = 'options',
    'username' = 'your-username',
    'token' = 'your-token',
    'schema' = '[
        {"name"="id";"type"="int64";"required"=%false;"sort_order"="ascending"};
        {"name"="data";"type"="string";"required"=%false}
    ]'
);

Что дальше

Предыдущая
Следующая