Vanilla
Vanilla операция — это операция, в которой исполняется произвольный код, у которой отсутствуют входные таблицы, но могут быть указаны выходные таблицы. Так же у неё могут быть указаны таблица для сохранения данных из стандартного потока ошибок (stderr таблицы) и таблицы с coredump.
Операцию Vanilla можно использовать в ситуации, когда требуется запустить распределённый процесс, описав несколько видов джобов и указав желаемое количество джобов каждого вида. Планировщик YTsaurus берёт на себя функцию поддержания жизнедеятельности данного распределённого процесса, добиваясь того, чтобы нужное количество джобов каждого вида было запланировано и успешно завершилось, как и в любой другой операции YTsaurus.
Получение входных данных джобом Vanilla операции выполняется с помощью опции file_paths в спецификации пользовательского скрипта или путём чтения данных джобом напрямую по сети.
При загрузке данных в джоб напрямую по сети предварительно обсудите предполагаемый способ использования с администратором системы YTsaurus и с сервисом, откуда вы планируете забирать данные, чтобы не создать слишком большую нагрузку на сеть и сервис-источник данных.
Примечание
Вся координация и распределение работы между запущенными джобами, а также поддержание консистентности в случае перезапусков джобов ложится на пользователя.
Выходные данные джоб может как писать по сети (в таком случае следует также оценить нагрузку на сеть), так и выводить в таблицы. Для каждого вида джобов при этом будут свои выходные таблицы.
Основной частью описания операции Vanilla является перечисление всевозможных тасков. В экосистеме YTsaurus таск — это группа джобов одного вида, запущенных в рамках операции. Описание таска представляет из себя описание пользовательского скрипта, указание количества джобов, которые должны запуститься в рамках этого таска и, возможно, описание выходных таблиц из данного таска.
Описание таска в операции Vanilla очень похоже на описание пользовательского скрипта, но стоит разделять эти понятия, потому что они отвечают разным сущностям в Планировщике.
Описание пользовательского скрипта — это исключительно указание того, как нужно запускать пользовательский код на машине. В нём нет информации о том, какой объём данных необходимо передать каждому джобу. Например: опция data_weight_per_map_job в операции MapReduce не занесена в спецификацию пользовательского скрипта, потому что она не имеет отношения к тому, как нужно запускать пользовательский скрипт map стадии, который может отсутствовать.
Описание таска — это спецификация того, как планировщик должен управлять джобами данного вида. Данное понятие встречается только в операции Vanilla.
Общие параметры для всех типов операций описаны в разделе Настройки операций.
У операции Vanilla имеются следующие дополнительные параметры:
tasks— словарь, описывающий таски операции. Каждому таску должна соответствовать пара ключ-значение, где ключом является название таска, а значение — спецификация таска. В YTsaurus предпочтителен lowercase_with_underscores стиль именования тасков, напримерpartition_map. Название таска будет отображено в веб-интерфейсе на странице операции, а также будет доступно в виде ключей в yson-представлении прогресса операции, поэтому не стоит делать его слишком длинным. Спецификация таска состоит из тех же параметров, что и спецификация пользовательского скрипта, а также содержит дополнительные параметры:job_count— количество джобов данного вида, которые должны запуститься в рамках конкретного таска. В случае когда опцияfail_on_job_restartв спецификации операции не установлена, таск будет считаться успешно завершённым, если ровноjob_countджобов данного вида успешно завершилось (то есть, завершилось с кодом 0 и не было прервано по каким-то причинам);output_table_paths— список выходных таблиц (возможно пустой). В случае, когда одна и та же таблица является выходной для разных тасков, во всех вхождениях на ней должны быть указаны одинаковые атрибуты на путях;gang_options— опции для Gang-операций (см. раздел Gang-операции).
Примечание
С помощью опций max_failed_job_count и fail_on_job_restart, общих для всех операций, удобно контролировать перезапуски джобов.
Также для Vanilla операций существует опция restart_completed_jobs, которая позволяет включать режим, при котором completed джобы будут перезапускаться.
Пример спецификации
{
tasks = {
master = {
job_count = 1;
command = "python run_master.py < duties.dat";
file_paths = ["//tmp/run_master.py"; "<file_name=duties.dat>//tmp/duties_180124.dat"];
};
slave = {
job_count = 100;
command = "python run_slave.py";
file_paths = ["//tmp/run_slave.py"];
};
};
max_failed_job_count = 1;
stderr_table_path = "//tmp/stderrs";
}
Gang-операции
Gang-операции — это специальный режим Vanilla операций, предназначенный для запуска скоординированных распределённых процессов. В отличие от обычных Vanilla операций, где каждый джоб работает и завершается независимо с точки зрения контроллер-агента YTsaurus, для Gang-операций планировщик обеспечивает перезапуск всех джобов этой операции при неуспешном завершении (abort, fail) или прерывании одного из джобов гангового таска. В том числе будут перезапущены и completed джобы (такой перезапуск называется перезапуском gang-а). Ганговым называется таск, в описании которого указана опция gang_options.
Операция является gang-операцией, если она содержит хотя бы один ганговый таск.
Инкарнации операций и ранги джобов
Одно из наиболее важных понятий в модели gang-операций - инкарнация. Инкарнация это некоторый строковый идентификатор, который однозначно идентифицирует период времени жизни операции, между 2 последовательными событиями старта, перезапуска gang-а или завершения операции.
Джобы также характеризуются инкарнацией операции, но, в отличие от операции, инкарнация джоба никогда не меняется. Если меняется инкарнация операции, джобы предыдущей инкарнации будут поаборчены.
Также каждый джоб (за исключением резервных джобов) характеризуется рангом, уникальным в пределах таска gang-операции.
Типичный сценарий использования gang операций - запуск распределённого ML-обучения. В этом сценарии джобы взаимодействуют между собой по сети. Понятие инкарнации нужно в первую очередь для того, чтобы пользовательский код мог использовать инкарнации при коммуникации, чтобы избежать ситации, когда джобы из новой инкарнации общаются с джобами из старой инкарнации.
Рестарт gang-операции
Когда контроллер-агент принимает решение перезапустить gang-операцию, он абортит все джобы, пытаясь при этом переиспользовать аллокации бегущих джобов под запуск джобов новой инкарнации, а также перезапускает джобы, которые уже считаются успешно завершёнными (completed).
Если при переходе операции в новую инкарнацию контроллер-агент смог переиспользовать аллокацию джоба предыдущей инкарнации, то существуют следующие гарантии:
job_cookieджобов старой и новой инкарнации совпадают;- Если джобу предыдущей инкарнации был присвоен
gang_rank, то джобу новой инкарнации присваивается тот жеgang_rank; - Если джобу предыдущей инкарнации был присвоен
monitoring_descriptor, то джобу новой инкарнации присваивается тот жеmonitoring_descriptor.
Резервные джобы
При перезапуске gang-а, как минимум одна аллокация точно не будет переиспользована. Это аллокация джоба, из-за завершения которого контроллер-агент принял решение перезапустить операцию.
Типично запуск джобов в переиспользованных аллокациях происходит существенно быстрее, чем шедулинг новой аллокации и запуск в ней нового джоба,
Чтобы избежать простоя джобов, под которые получилось переиспользовать аллокацию, на время запуска джоба в новосозданной аллокации, можно использовать механизм резервных джобов.
Резервные джобы это джобы, которые не являются частью ганга в текущей инкарнации (их завершение не приводит к перезапуску gang-а, а также такие джобы не имеют gang_rank), но их аллокации могут быть использованы под запуск джобов новой инкарнации.
Несмотря на то, что резервные джобы не являются частью ганга, в них всё равно будет запущен пользовательский код. О том, как пользовательский код может понять, запущен ли он в резервном джобе или ганговом, написано в разделе переменные окружения
Когда gang_options/size меньше job_count, контроллер-агент запускает job_count-gang_options/size резервных джобов.
Поведение при ревайве операции
При ревайве gang-операции система проверяет, что все нерезервные джобы успешно восстановлены. Если это не так, операция будет перезапущена без переиспользования аллокаций.
Специальный режим мониторинга для Gang-операций
Gang-операции поддерживают специальный режим формирования monitoring descriptors:
Опция use_operation_id_based_descriptors_for_gangs_jobs: Включает специальный режим формирования дескрипторов для Gang-операций: дескриптор формируется на основе operation_id, gang_rank и индекса таска, что гарантирует уникальность. Все такие дескрипторы имеют в качестве префикса operation_id.
Этот режим несёт потенциальную опасность переполнения квоты в мониторинге, поэтому стоит использовать его аккуратно, после согласования с командой YTsaurus.
Переменные окружения
В Gang-операциях джобы получают следующие дополнительные переменные окружения:
YT_OPERATION_INCARNATION— уникальный идентификатор инкарнации операции;YT_TASK_GANG_SIZE— общий размер группы джобов во всех тасках операции;YT_GANG_RANK— ранг текущего джоба в группе (от 0 доYT_GANG_SIZE - 1);YT_TASK_JOB_COUNT— общее количество джобов в таске;
Ограничения и особенности
- Gang-операции несовместимы с
fail_on_job_restart - Gang-операции несовместимы с
restart_completed_jobs = true - При сбое любого джоба вся группа перезапускается с новой инкарнацией
- Изменение
job_countво время выполнения Gang-операции запрещено - При ревайве операции: Если не все джобы с рангами могут быть восстановлены, вся группа перезапускается с новой инкарнацией
- Не поддерживаются выходные таблицы
Настройка Gang-операций
Для включения Gang-режима необходимо добавить параметр gang_options в спецификацию таска:
{
tasks = {
worker = {
job_count = 3;
command = "python distributed_worker.py";
gang_options = {};
};
};
}
Параметры gang_options
size— размер группы джобов. По умолчанию равенjob_countтаска. Не может быть большеjob_count. Заданиеsizeвключит механизм резервных джобов.