YTsaurus Queue API
В данном разделе описана экосистема очередей YTsaurus, специальные методы работы с ними, способы конфигурации и использования.
Модель данных
Очередью (queue) в YTsaurus называется любая упорядоченная динамическая таблица. Партиция очереди — таблет динамической таблицы, её индекс соответствует индексу таблета.
Консьюмером (queue_consumer) в YTsaurus называется сортированная таблица с некоторой фиксированной схемой. Консьюмер находится в соотношении many-to-many с очередями и представляет из себя потребителя одной или нескольких очередей. Задача консьюмера — хранить оффсеты по партициям читаемых очередей.
Связь между консьюмерами и очередями обеспечивается объектами регистраций.
Таблица 1 — Схема таблицы консьюмера
Имя | Тип | Описание |
---|---|---|
queue_cluster | string | Имя кластера очереди |
queue_path | string | Путь к динамической таблице-очереди |
partition_index | uint64 | Номер партиции, он же tablet_index |
offset | uint64 | Номер первой необработанной консьмером строки указанной партиции указанной очереди |
meta | any | Системная мета-информация |
Продьюсером (queue_producer) в YTsaurus так же называется сортированная таблица с некоторой фиксированной схемой. Продьюсер хранит порядковые номера для последних записанных строчек в рамках сессий записи в очереди, что позволяет не писать дублирующиеся строчки в очередь.
Таблица 2 — Схема таблицы продьюсера
Имя | Тип | Описание |
---|---|---|
queue_cluster | string | Имя кластера очереди |
queue_path | string | Путь к динамической таблице-очереди |
session_id | string | Идентификатор сессии |
sequence_number | int64 | Порядковый номер последней записанной строчки |
epoch | int64 | Номер текущей эпохи |
user_meta | any | Пользовательская мета-информация |
system_meta | any | Системная мета-информация |
API
Создание очереди
Создание очереди ничем не отличается от создания обычной упорядоченной динамической таблицы.
Для наибольшей полноты графиков/статистик рекомендуется добавлять в схему таблицы колонки $timestamp
и $cumulative_data_weight
.
Проверить, что автоматика подцепила очередь, можно по появлению вкладки Queue на странице объекта динамической таблицы.
Создание консьюмера
Создать консьюмер можно через алиас create queue_consumer
, или явно создав сортированную динамическую таблицу со схемой, указанной выше, выставив на неё атрибут @treat_as_queue_consumer = %true
.
Проверить, что автоматика подцепила консьюмер, можно по появлению вкладки Consumers на странице объекта динамической таблицы.
Регистрация консьюмера к очереди
Регистрация консьюмера к очереди производится с помощью метода register_queue_consumer
. Обязательно указывать булевый параметр vital
, который играет роль для настроек автоматического тримминга очередей (см. секцию про автоматический тримминг).
Для того чтобы выполнять команду выше, нужно иметь право register_queue_consumer
с указанием vital=True/False
на директорию, содержащую очередь. Запросить это право можно аналогично остальным правам в UI YTsaurus на странице директории. Право register_queue_consumer
с vital=True
даёт возможность регистрировать как vital-консьюмеры, так и non-vital.
Удаление регистрации производится с помощью метода unregister_queue_consumer
. Для выполнения команды нужно иметь write-доступ к очереди или консьюмеру.
Оба Cypress аргумента обеих команд имеют тип rich YPath и поддерживают указание имени кластера. В ином случае используется кластер, на котором выполняется команда.
Чтение данных
Для чтения данных из очередей доступно два схожих метода.
Метод pull_queue
позволяет вычитать порцию строк указанной партиции указанной очереди, ограничив её по числу строк (max_row_count
) или по объёму данных в байтах (max_data_weight
). Для выполнения этого запроса нужно иметь право на чтение очереди.
Метод pull_queue_consumer
аналогичен предыдущему, но принимает первым аргументом путь до таблицы консьюмера. Для выполнения этого запроса должны быть выполнено два свойства: у пользователя есть право на чтение консьюмера и присутствует регистрация указанного консьюмера к указанной очереди.
Параметр очереди в данном методе представляет из себя rich YPath и поддерживает указание кластера, на котором она расположена. В ином случае используется кластер, на котором выполняется команда.
Чтение данных из очереди также можно делать обычным способом, через метод select_rows
.
Внимание
Методы pull_queue
и pull_queue_consumer
могут вернуть меньше строк, чем указано в max_row_count
, даже если столько строк есть в партиции очереди и их объем меньше max_data_weight
. Лишь возврат пустого множества строк означает, что в партиции нет строк по указанному оффсету.
Работа с консьюмером
Для работы с консьюмером доступен метод advance_queue_consumer
, продвигающий оффсет консьюмера по указанной партиции указанной очереди. В переданной транзакции делается изменение соответствующей строки консьюмера с оффсетом. В той же транзакции могут быть произведены иные действия в рамках динамических таблиц.
При указании non-null параметра old_offset
, в той же транзакции сначала производится чтение текущего оффсета и его сравнение с переданным значением — при их неравенстве выбрасывается исключение.
Оффсеты очередей YTsaurus интерпретируются как индекс первой непрочитанной строки.
Создание продьюсера
Создать продьюсер можно через алиас create queue_producer
, или явно создав сортированную динамическую таблицу со схемой, указанной выше, выставив на неё атрибут @treat_as_queue_producer = %true
.
Запись данных
Для записи данных в очередь можно либо использовать API динамических таблиц, а именно метод insert_rows
, либо воспользоваться API продьюсеров для записи без дубликатов.
Чтобы писать через продьюсер, нужно иметь право write
как для очереди, так и для продьюсера.
Перед началом записи нужно позвать метод create_queue_producer_session
, который принимает путь до очереди, путь до продьюсера, а также идентификатор сессии записи (session_id
). В качестве session_id
можно передать произвольную строку, например, имя хоста, откуда производится запись. В результате, если такой сессии ранее не было, то в таблице-продьюсере создастся сессия с epoch
равным 0
и sequence_number
равным -1
. Если же сессия с таким идентификатором уже ранее создавалась, то для нее будет увеличено значение epoch
.
Метод create_queue_producer_session
вернет текущее (обновленное) значение эпохи, а также порядковый номер последнего записанного сообщение, то есть актуальное состояние сессии записи, хранящееся в продьюсере.
Затем, с помощью метода push_queue_producer
можно писать данные в очередь. Метод принимает путь до очереди, путь до продьюсера, идентификатор сессии, эпоху, а также сами записываемые данные. В каждой строчке необходимо передать значение $sequence_number
, соответствующее порядковому номеру этой строчки. Либо, можно не передавать порядковый номер в каждой строчке в самих данных, а указать лишь порядковый номер, соответствующий первой строчке, в опциях метода - в таком случае мы будем считать, что для остальных строк он инкрементально увеличивается на единицу.
Эпоха (epoch
) сессии может использоваться для того, чтобы бороться с зомби-процессами.
Queue Agent
Queue Agent — выделенный микросервис, следящий за очередями, консьюмерами и регистрациями.
Автоматические политики очистки очередей
Для очереди можно настроить автоматическую политику очистки (тримминга), выставив на ней атрибут @auto_trim_config
с конфигурацией соответствующего формата.
Доступные опции:
enable: True
— включает тримминг по vital консьюмерам. Если есть хотя бы один vital-консьюмер, то Queue Agent будет с периодичностью в единицы секунд звать Trim по партиции вплоть до минимума по оффсетам vital-консьюмеров.
NB: Если vital-консьюмеров нет, то тримминг не производится.retained_rows: x
— гарантирует, что в каждой партиции безусловно будут держаться последние x строк. Предназначен для использования в совокупности с предыдущей опцией.retained_lifetime_duration: x
— гарантирует, что в каждой партиции безусловно будут держаться строки, которые были записаны в очередь не более x миллисекунд назад, при этом указанное количество миллисекунд должно быть кратно одной секунде. Предназначен для использования в совокупности с включённой опцией тримминга (enable: True
).
Такая настройка не конфликтует с существующими настройками TTL динамических таблиц: можно настроить тримминг по vital consumers и max_data_ttl=36h, min_data_versions=0
, чтобы, помимо удаления данных по оффсетам, всегда хранить не более трёх дней данных.
Графики и статусы
С помощью атрибутов @queue_status
, @queue_partitions
на таблицах очередей и @queue_consumer_status
, @queue_consumer_partitions
на таблицах консьюмеров можно узнать текущее состояние и мета-информацию очередей/консьюмеров с точки зрения Queue Agent, как в целом, так и по отдельным партициям.
Эти атрибуты не предназначены для высоконагруженных сервисов и их стоит использовать исключительно для интроспекции.
Пример использования
Представленные примеры используют гипотетическую конфигурацию из нескольких кластеров, с адресам hume
и pythia
.
Листинг 1 — Пример использования API очередей
# Create queue on pythia.
$ yt --proxy pythia create table //tmp/$USER-test-queue --attributes '{dynamic=true;schema=[{name=data;type=string};{name="$timestamp";type=uint64};{name="$cumulative_data_weight";type=int64}]}'
2826e-2b1e4-3f30191-dcd2013e
# Create queue_consumer on hume.
$ yt --proxy hume create queue_consumer //tmp/$USER-test-consumer
# OR: Create queue_consumer on hume as table with explicit schema specification.
$ yt --proxy hume create table //tmp/$USER-test-consumer --attributes '{dynamic=true;treat_as_queue_consumer=true;schema=[{name=queue_cluster;type=string;sort_order=ascending;required=true};{name=queue_path;type=string;sort_order=ascending;required=true};{name=partition_index;type=uint64;sort_order=ascending;required=true};{name=offset;type=uint64;required=true};{name=meta;type=any;required=false}]}'
18a5b-28931-3ff0191-35282540
# Register consumer for queue.
$ yt --proxy pythia register-queue-consumer //tmp/$USER-test-queue "<cluster=hume>//tmp/$USER-test-consumer" --vital
# Check registrations for queue.
$ yt --proxy pythia list-queue-consumer-registrations --queue-path //tmp/$USER-test-queue
[
{
"queue_path" = <
"cluster" = "pythia";
> "//tmp/bob-test-queue";
"consumer_path" = <
"cluster" = "hume";
> "//tmp/bob-test-consumer";
"vital" = %true;
"partitions" = #;
};
]
# Check queue status provided by Queue Agent.
$ yt --proxy pythia get //tmp/$USER-test-queue/@queue_status
{
"partition_count" = 1;
"has_cumulative_data_weight_column" = %true;
"family" = "ordered_dynamic_table";
"exports" = {
"progress" = {};
};
"alerts" = {};
"queue_agent_host" = "yt-queue-agent-1.ytsaurus.tech";
"has_timestamp_column" = %true;
"write_row_count_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
"registrations" = [
{
"consumer" = "hume://tmp/bob-test-consumer";
"vital" = %true;
"queue" = "pythia://tmp/bob-test-queue";
};
];
"write_data_weight_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
}
$ yt --proxy pythia get //tmp/$USER-test-queue/@queue_partitions
[
{
"error" = {
"attributes" = {
"trace_id" = "21503942-85d82a3a-2c9ea16d-e2149d9c";
"span_id" = 17862985281506116291u;
"thread" = "Controller:4";
"datetime" = "2025-01-23T13:42:21.839124Z";
"tid" = 9166196934387883291u;
"pid" = 481;
"host" = "yt-queue-agent-1.ytsaurus.tech";
"state" = "unmounted";
"fid" = 18445202819181375616u;
};
"code" = 1;
"message" = "Tablet 3d3c-50e7d-7db02be-7e178361 is not mounted or frozen";
};
};
]
# Check queue consumer status provided by Queue Agent.
$ yt --proxy hume get //tmp/$USER-test-consumer/@queue_consumer_status
{
"queues" = {
"pythia://tmp/bob-test-queue" = {
"error" = {
"attributes" = {
"trace_id" = "623ba99c-b2dce5fe-50174949-5f508824";
"span_id" = 14498308957160432715u;
"thread" = "Controller:1";
"datetime" = "2025-01-23T13:42:55.747430Z";
"tid" = 627435960759374310u;
"pid" = 481;
"host" = "yt-queue-agent-1.ytsaurus.tech";
"fid" = 18442320640360156096u;
};
"code" = 1;
"message" = "Queue \"pythia://tmp/bob-test-queue\" snapshot is missing";
};
};
};
"registrations" = [
{
"consumer" = "hume://tmp/bob-test-consumer";
"vital" = %true;
"queue" = "pythia://tmp/bob-test-queue";
};
];
"queue_agent_host" = "yt-queue-agent-1.ytsaurus.tech";
}
# We can see some errors in the responses above, since both tables are unmounted.
# Mount queue and consumer tables.
$ yt --proxy pythia mount-table //tmp/$USER-test-queue
$ yt --proxy hume mount-table //tmp/$USER-test-consumer
# Check statuses again:
$ yt --proxy pythia get //tmp/$USER-test-queue/@queue_partitions
[
{
"meta" = {
"cell_id" = "2dd9a-d4f7-3f302bc-f21fc0c";
"host" = "node-1.ytsaurus.tech:9022";
};
"lower_row_index" = 0;
"cumulative_data_weight" = #;
"upper_row_index" = 0;
"available_row_count" = 0;
"write_row_count_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
"available_data_weight" = #;
"trimmed_data_weight" = #;
"last_row_commit_time" = "1970-01-01T00:00:00.000000Z";
"write_data_weight_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
"commit_idle_time" = 1737639851870;
};
]
$ yt --proxy hume get //tmp/$USER-test-consumer/@queue_consumer_status
{
"queues" = {
"pythia://tmp/bob-test-queue" = {
"read_data_weight_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
"read_row_count_rate" = {
"1m_raw" = 0.;
"1h" = 0.;
"current" = 0.;
"1d" = 0.;
"1m" = 0.;
};
"partition_count" = 1;
};
};
"registrations" = [
{
"consumer" = "hume://tmp/bob-test-consumer";
"vital" = %true;
"queue" = "pythia://tmp/bob-test-queue";
};
];
}
# Enable automatic trimming based on vital consumers for queue.
$ yt --proxy pythia set //tmp/$USER-test-queue/@auto_trim_config '{enable=true}'
# Write rows without exactly once semantics.
$ for i in {1..20}; do echo '{data=foo}; {data=bar}; {data=foobar}; {data=megafoo}; {data=megabar}' | yt insert-rows --proxy pythia //tmp/$USER-test-queue --format yson; done;
# Check that queue status reflects writes.
$ yt --proxy pythia get //tmp/$USER-test-queue/@queue_status/write_row_count_rate
{
"1m_raw" = 2.6419539762457456;
"current" = 5.995956327053036;
"1h" = 2.6419539762457456;
"1d" = 2.6419539762457456;
"1m" = 2.6419539762457456;
}
# Read data via consumer.
$ yt --proxy hume pull-queue-consumer //tmp/$USER-test-consumer "<cluster=pythia>//tmp/$USER-test-queue" --partition-index 0 --offset 0 --max-row-count 5 --format "<format=text>yson"
{"$tablet_index"=0;"$row_index"=0;"data"="foo";"$timestamp"=1865777451725072279u;"$cumulative_data_weight"=20;};
{"$tablet_index"=0;"$row_index"=1;"data"="bar";"$timestamp"=1865777451725072279u;"$cumulative_data_weight"=40;};
{"$tablet_index"=0;"$row_index"=2;"data"="foobar";"$timestamp"=1865777451725072279u;"$cumulative_data_weight"=63;};
{"$tablet_index"=0;"$row_index"=3;"data"="megafoo";"$timestamp"=1865777451725072279u;"$cumulative_data_weight"=87;};
{"$tablet_index"=0;"$row_index"=4;"data"="megabar";"$timestamp"=1865777451725072279u;"$cumulative_data_weight"=111;};
# Advance queue consumer.
$ yt --proxy hume advance-queue-consumer //tmp/$USER-test-consumer "<cluster=pythia>//tmp/$USER-test-queue" --partition-index 0 --old-offset 0 --new-offset 42
# Since trimming is enabled and the consumer is the only vital consumer for the queue, soon the rows up to index 42 will be trimmed.
# Calling pull-queue-consumer now returns the next available rows.
$ yt --proxy hume pull-queue-consumer //tmp/$USER-test-consumer "<cluster=pythia>//tmp/$USER-test-queue" --partition-index 0 --offset 0 --max-row-count 5 --format "<format=text>yson"
{"$tablet_index"=0;"$row_index"=42;"data"="foobar";"$timestamp"=1865777485011069884u;"$cumulative_data_weight"=951;};
{"$tablet_index"=0;"$row_index"=43;"data"="megafoo";"$timestamp"=1865777485011069884u;"$cumulative_data_weight"=975;};
{"$tablet_index"=0;"$row_index"=44;"data"="megabar";"$timestamp"=1865777485011069884u;"$cumulative_data_weight"=999;};
{"$tablet_index"=0;"$row_index"=45;"data"="foo";"$timestamp"=1865777486084785133u;"$cumulative_data_weight"=1019;};
{"$tablet_index"=0;"$row_index"=46;"data"="bar";"$timestamp"=1865777486084785133u;"$cumulative_data_weight"=1039;};
# Create queue producer on pythia.
$ yt --proxy pythia create queue_producer //tmp/$USER-test-producer
309db-eb36-3f30191-f83f27c0
# Create queue producer session.
$ yt --proxy pythia create-queue-producer-session --queue-path //tmp/$USER-test-queue --producer-path //tmp/$USER-test-producer --session-id session_123
{
"epoch" = 0;
"sequence_number" = -1;
"user_meta" = #;
}
# Write rows via queue producer.
$ echo '{data=value1;"$sequence_number"=1};{data=value2;"$sequence_number"=2}' | yt --proxy pythia push-queue-producer //tmp/$USER-test-producer //tmp/$USER-test-queue --session-id session_123 --epoch 0 --input-format yson
{
"last_sequence_number" = 2;
"skipped_row_count" = 0;
}
# Check written rows.
$ yt --proxy pythia pull-queue //tmp/$USER-test-queue --offset 100 --partition-index 0 --format "<format=pretty>yson"
{
"$tablet_index" = 0;
"$row_index" = 100;
"data" = "value1";
"$timestamp" = 1865777698685609732u;
"$cumulative_data_weight" = 2243;
};
{
"$tablet_index" = 0;
"$row_index" = 101;
"data" = "value2";
"$timestamp" = 1865777698685609732u;
"$cumulative_data_weight" = 2266;
};
# Write one more row batch with row duplicates.
$ echo '{data=value2;"$sequence_number"=2};{data=value3;"$sequence_number"=10}' | yt --proxy pythia push-queue-producer //tmp/$USER-test-producer //tmp/$USER-test-queue --session-id session_123 --epoch 0 --input-format yson
{
"last_sequence_number" = 10;
"skipped_row_count" = 1;
}
# Check that there is no row dublicates.
$ yt --proxy pythia pull-queue //tmp/$USER-test-queue --offset 100 --partition-index 0 --format "<format=pretty>yson"
{
"$tablet_index" = 0;
"$row_index" = 100;
"data" = "value1";
"$timestamp" = 1865777698685609732u;
"$cumulative_data_weight" = 2243;
};
{
"$tablet_index" = 0;
"$row_index" = 101;
"data" = "value2";
"$timestamp" = 1865777698685609732u;
"$cumulative_data_weight" = 2266;
};
{
"$tablet_index" = 0;
"$row_index" = 102;
"data" = "value3";
"$timestamp" = 1865777742709000317u;
"$cumulative_data_weight" = 2289;
};