Vanilla

Vanilla операция — это операция, в которой исполняется произвольный код, у которой отсутствуют входные таблицы, но могут быть указаны выходные таблицы. Так же у неё могут быть указаны таблица для сохранения данных из стандартного потока ошибок (stderr таблицы) и таблицы с coredump.

Операцию Vanilla можно использовать в ситуации, когда требуется запустить распределённый процесс, описав несколько видов джобов и указав желаемое количество джобов каждого вида. Планировщик YTsaurus берёт на себя функцию поддержания жизнедеятельности данного распределённого процесса, добиваясь того, чтобы нужное количество джобов каждого вида было запланировано и успешно завершилось, как и в любой другой операции YTsaurus.

Получение входных данных джобом Vanilla операции выполняется с помощью опции file_paths в спецификации пользовательского скрипта или путём чтения данных джобом напрямую по сети.

При загрузке данных в джоб напрямую по сети предварительно обсудите предполагаемый способ использования с администратором системы YTsaurus и с сервисом, откуда вы планируете забирать данные, чтобы не создать слишком большую нагрузку на сеть и сервис-источник данных.

Примечание

Вся координация и распределение работы между запущенными джобами, а также поддержание консистентности в случае перезапусков джобов ложится на пользователя.

Выходные данные джоб может как писать по сети (в таком случае следует также оценить нагрузку на сеть), так и выводить в таблицы. Для каждого вида джобов при этом будут свои выходные таблицы.

Основной частью описания операции Vanilla является перечисление всевозможных тасков. В экосистеме YTsaurus таск — это группа джобов одного вида, запущенных в рамках операции. Описание таска представляет из себя описание пользовательского скрипта, указание количества джобов, которые должны запуститься в рамках этого таска и, возможно, описание выходных таблиц из данного таска.

Описание таска в операции Vanilla очень похоже на описание пользовательского скрипта, но стоит разделять эти понятия, потому что они отвечают разным сущностям в Планировщике.

Описание пользовательского скрипта — это исключительно указание того, как нужно запускать пользовательский код на машине. В нём нет информации о том, какой объём данных необходимо передать каждому джобу. Например: опция data_weight_per_map_job в операции MapReduce не занесена в спецификацию пользовательского скрипта, потому что она не имеет отношения к тому, как нужно запускать пользовательский скрипт map стадии, который может отсутствовать.

Описание таска — это спецификация того, как планировщик должен управлять джобами данного вида. Данное понятие встречается только в операции Vanilla.

Общие параметры для всех типов операций описаны в разделе Настройки операций.

У операции Vanilla имеются следующие дополнительные параметры:

  • tasks — словарь, описывающий таски операции. Каждому таску должна соответствовать пара ключ-значение, где ключом является название таска, а значение — спецификация таска. В YTsaurus предпочтителен lowercase_with_underscores стиль именования тасков, например partition_map. Название таска будет отображено в веб-интерфейсе на странице операции, а также будет доступно в виде ключей в yson-представлении прогресса операции, поэтому не стоит делать его слишком длинным. Спецификация таска состоит из тех же параметров, что и спецификация пользовательского скрипта, а также содержит дополнительные параметры:
    • job_count — количество джобов данного вида, которые должны запуститься в рамках конкретного таска. В случае когда опция fail_on_job_restart в спецификации операции не установлена, таск будет считаться успешно завершённым, если ровно job_count джобов данного вида успешно завершилось (то есть, завершилось с кодом 0 и не было прервано по каким-то причинам);
    • output_table_paths — список выходных таблиц (возможно пустой). В случае, когда одна и та же таблица является выходной для разных тасков, во всех вхождениях на ней должны быть указаны одинаковые атрибуты на путях;
    • gang_options — опции для Gang-операций (см. раздел Gang-операции).

Примечание

С помощью опций max_failed_job_count и fail_on_job_restart, общих для всех операций, удобно контролировать перезапуски джобов.
Также для Vanilla операций существует опция restart_completed_jobs, которая позволяет включать режим, при котором completed джобы будут перезапускаться.

Пример спецификации

{
  tasks = {
    master = {
      job_count = 1;
      command = "python run_master.py < duties.dat";
      file_paths = ["//tmp/run_master.py"; "<file_name=duties.dat>//tmp/duties_180124.dat"];
    };
    slave = {
      job_count = 100;
      command = "python run_slave.py";
      file_paths = ["//tmp/run_slave.py"];
    };
  };
  max_failed_job_count = 1;
  stderr_table_path = "//tmp/stderrs";
}

Gang-операции

Gang-операции — это специальный режим Vanilla операций, предназначенный для запуска скоординированных распределённых процессов. В отличие от обычных Vanilla операций, где каждый джоб работает и завершается независимо с точки зрения контроллер-агента YTsaurus, для Gang-операций планировщик обеспечивает перезапуск всех джобов этой операции при неуспешном завершении (abort, fail) или прерывании одного из джобов гангового таска. В том числе будут перезапущены и completed джобы (такой перезапуск называется перезапуском gang-а). Ганговым называется таск, в описании которого указана опция gang_options.
Операция является gang-операцией, если она содержит хотя бы один ганговый таск.

Инкарнации операций и ранги джобов

Одно из наиболее важных понятий в модели gang-операций - инкарнация. Инкарнация это некоторый строковый идентификатор, который однозначно идентифицирует период времени жизни операции, между 2 последовательными событиями старта, перезапуска gang-а или завершения операции.

Джобы также характеризуются инкарнацией операции, но, в отличие от операции, инкарнация джоба никогда не меняется. Если меняется инкарнация операции, джобы предыдущей инкарнации будут поаборчены.

Также каждый джоб (за исключением резервных джобов) характеризуется рангом, уникальным в пределах таска gang-операции.

Типичный сценарий использования gang операций - запуск распределённого ML-обучения. В этом сценарии джобы взаимодействуют между собой по сети. Понятие инкарнации нужно в первую очередь для того, чтобы пользовательский код мог использовать инкарнации при коммуникации, чтобы избежать ситации, когда джобы из новой инкарнации общаются с джобами из старой инкарнации.

Рестарт gang-операции

Когда контроллер-агент принимает решение перезапустить gang-операцию, он абортит все джобы, пытаясь при этом переиспользовать аллокации бегущих джобов под запуск джобов новой инкарнации, а также перезапускает джобы, которые уже считаются успешно завершёнными (completed).

Если при переходе операции в новую инкарнацию контроллер-агент смог переиспользовать аллокацию джоба предыдущей инкарнации, то существуют следующие гарантии:

  • job_cookie джобов старой и новой инкарнации совпадают;
  • Если джобу предыдущей инкарнации был присвоен gang_rank, то джобу новой инкарнации присваивается тот же gang_rank;
  • Если джобу предыдущей инкарнации был присвоен monitoring_descriptor, то джобу новой инкарнации присваивается тот же monitoring_descriptor.

Резервные джобы

При перезапуске gang-а, как минимум одна аллокация точно не будет переиспользована. Это аллокация джоба, из-за завершения которого контроллер-агент принял решение перезапустить операцию.
Типично запуск джобов в переиспользованных аллокациях происходит существенно быстрее, чем шедулинг новой аллокации и запуск в ней нового джоба,
Чтобы избежать простоя джобов, под которые получилось переиспользовать аллокацию, на время запуска джоба в новосозданной аллокации, можно использовать механизм резервных джобов.

Резервные джобы это джобы, которые не являются частью ганга в текущей инкарнации (их завершение не приводит к перезапуску gang-а, а также такие джобы не имеют gang_rank), но их аллокации могут быть использованы под запуск джобов новой инкарнации.
Несмотря на то, что резервные джобы не являются частью ганга, в них всё равно будет запущен пользовательский код. О том, как пользовательский код может понять, запущен ли он в резервном джобе или ганговом, написано в разделе переменные окружения

Когда gang_options/size меньше job_count, контроллер-агент запускает job_count-gang_options/size резервных джобов.

Поведение при ревайве операции

При ревайве gang-операции система проверяет, что все нерезервные джобы успешно восстановлены. Если это не так, операция будет перезапущена без переиспользования аллокаций.

Специальный режим мониторинга для Gang-операций

Gang-операции поддерживают специальный режим формирования monitoring descriptors:
Опция use_operation_id_based_descriptors_for_gangs_jobs: Включает специальный режим формирования дескрипторов для Gang-операций: дескриптор формируется на основе operation_id, gang_rank и индекса таска, что гарантирует уникальность. Все такие дескрипторы имеют в качестве префикса operation_id.
Этот режим несёт потенциальную опасность переполнения квоты в мониторинге, поэтому стоит использовать его аккуратно, после согласования с командой YTsaurus.

Переменные окружения

В Gang-операциях джобы получают следующие дополнительные переменные окружения:

  • YT_OPERATION_INCARNATION — уникальный идентификатор инкарнации операции;
  • YT_TASK_GANG_SIZE — общий размер группы джобов во всех тасках операции;
  • YT_GANG_RANK — ранг текущего джоба в группе (от 0 до YT_GANG_SIZE - 1);
  • YT_TASK_JOB_COUNT — общее количество джобов в таске;

Ограничения и особенности

  • Gang-операции несовместимы с fail_on_job_restart
  • Gang-операции несовместимы с restart_completed_jobs = true
  • При сбое любого джоба вся группа перезапускается с новой инкарнацией
  • Изменение job_count во время выполнения Gang-операции запрещено
  • При ревайве операции: Если не все джобы с рангами могут быть восстановлены, вся группа перезапускается с новой инкарнацией
  • Не поддерживаются выходные таблицы

Настройка Gang-операций

Для включения Gang-режима необходимо добавить параметр gang_options в спецификацию таска:

{
  tasks = {
    worker = {
      job_count = 3;
      command = "python distributed_worker.py";
      gang_options = {};
    };
  };
}

Параметры gang_options

  • size — размер группы джобов. По умолчанию равен job_count таска. Не может быть больше job_count. Задание size включит механизм резервных джобов.
Предыдущая