Vanilla

Vanilla-операция исполняет произвольный код. У неё отсутствуют входные таблицы, но могут быть указаны выходные таблицы.

Также у неё могут быть указаны: таблица для сохранения данных из стандартного потока ошибок (stderr-таблицы) и таблицы с coredump.

Что важно знать

  • Операция Vanilla подходит для запуска распределённого процесса: возможно описать несколько видов джобов и указать нужное количество джобов каждого вида.

    Планировщик YTsaurus берёт на себя функцию поддержания жизнедеятельности данного распределённого процесса, добиваясь того, чтобы нужное количество джобов каждого вида было запланировано и успешно завершилось, как и в любой другой операции YTsaurus.

  • Джоб Vanilla получает входные данные с помощью опции file_paths в спецификации пользовательского скрипта или читает данные напрямую по сети.

    Внимание

    При загрузке данных в джоб напрямую по сети предварительно обсудите предполагаемый способ использования с администратором системы YTsaurus и с представителем сервиса, из которого вы планируете забирать данные.

    Это важно, чтобы нагрузка на сеть и сервис (источник данных) была не слишком большая.

    Примечание

    Вся координация и распределение работы между запущенными джобами, а также поддержание консистентности в случае перезапусков джобов ложится на пользователя.

  • Выходные данные джоб может как писать по сети (в таком случае следует также оценить нагрузку на сеть), так и выводить в таблицы. Для каждого вида джобов при этом будут свои выходные таблицы.

Таски и скрипт

Основной частью описания операции Vanilla является перечисление всевозможных тасков.
В экосистеме YTsaurus таск — это группа джобов одного вида, запущенных в рамках операции.

  • Описание таска представляет из себя описание пользовательского скрипта, указание количества джобов, которые должны запуститься в рамках этого таска и, возможно, описание выходных таблиц из данного таска.

  • Описание таска в операции Vanilla очень похоже на описание пользовательского скрипта, но стоит разделять эти понятия, потому что они отвечают разным сущностям в Планировщике.

  • Описание пользовательского скрипта — это исключительно указание того, как нужно запускать пользовательский код на машине. В нём нет информации о том, какой объём данных необходимо передать каждому джобу.

    Например: опция data_weight_per_map_job в операции MapReduce не занесена в спецификацию пользовательского скрипта, потому что она не имеет отношения к тому, как нужно запускать пользовательский скрипт map стадии, который может отсутствовать.

  • Описание таска — это спецификация того, как планировщик должен управлять джобами данного вида. Данное понятие встречается только в операции Vanilla.

Общие параметры для всех типов операций описаны в разделе Настройки операций.

Параметры операции

У операции Vanilla имеются следующие дополнительные параметры:

  • tasks — словарь, описывающий таски операции. Каждому таску должна соответствовать пара ключ-значение, где ключом является название таска, а значение — спецификация таска. В YTsaurus предпочтителен lowercase_with_underscores стиль именования тасков, например partition_map. Название таска будет отображено в веб-интерфейсе на странице операции, а также будет доступно в виде ключей в yson-представлении прогресса операции, поэтому не стоит делать его слишком длинным.

    Спецификация таска состоит из тех же параметров, что и спецификация пользовательского скрипта, а также содержит дополнительные параметры:

    • job_count — количество джобов данного вида, которые должны запуститься в рамках конкретного таска. В случае когда опция fail_on_job_restart в спецификации операции не установлена, таск будет считаться успешно завершённым, если ровно job_count джобов данного вида успешно завершилось (то есть, завершилось с кодом 0 и не было прервано по каким-то причинам);
    • output_table_paths — список выходных таблиц (возможно пустой). В случае, когда одна и та же таблица является выходной для разных тасков, во всех вхождениях на ней должны быть указаны одинаковые атрибуты на путях;
    • gang_options — опции для Gang-операций (см. раздел Gang-операции).

Примечание

Удобно контролировать перезапуски джобов с помощью опций (общих для всех операций) max_failed_job_count и fail_on_job_restart.

Ещё для Vanilla есть опция restart_completed_jobs. Она включает режим перезапуска завершённых джобов.

Пример спецификации

{
  tasks = {
    master = {
      job_count = 1;
      command = "python run_master.py < duties.dat";
      file_paths = ["//tmp/run_master.py"; "<file_name=duties.dat>//tmp/duties_180124.dat"];
    };
    slave = {
      job_count = 100;
      command = "python run_slave.py";
      file_paths = ["//tmp/run_slave.py"];
    };
  };
  max_failed_job_count = 1;
  stderr_table_path = "//tmp/stderrs";
}

Gang-операции

Gang-операции — это специальный режим Vanilla-операций, предназначенный для запуска скоординированных распределённых процессов.

В отличие от обычных Vanilla операций, где каждый джоб работает и завершается независимо с точки зрения контроллер-агента YTsaurus, для Gang-операций планировщик обеспечивает перезапуск всех джобов этой операции при неуспешном завершении (abort, fail) или прерывании одного из джобов гангового таска. В том числе будут перезапущены и completed джобы (такой перезапуск называется перезапуском gang-а).

Ганговым называется таск, в описании которого указана опция gang_options. Gang-операция должна содержать хотя бы один ганговый таск.

Инкарнации операций и ранги джобов

Одно из наиболее важных понятий в модели gang-операций — инкарнация.

  • Инкарнация — это некоторый строковый идентификатор, который однозначно идентифицирует период времени жизни операции между 2-мя последовательными событиями старта, перезапуска gang-а или завершения операции.
  • Джобы также характеризуются инкарнацией операции, но, в отличие от операции, инкарнация джоба никогда не меняется. Если меняется инкарнация операции, джобы предыдущей инкарнации будут поаборчены.
  • Также каждый джоб (за исключением резервных джобов) характеризуется рангом, уникальным в пределах таска gang-операции.

Например: типичный сценарий использования gang операций — запуск распределённого ML-обучения.

В этом сценарии джобы взаимодействуют между собой по сети. Понятие инкарнации нужно в первую очередь для того, чтобы пользовательский код мог использовать инкарнации при коммуникации, чтобы избежать ситации, когда джобы из новой инкарнации общаются с джобами из старой инкарнации.

Рестарт gang-операции

Когда контроллер-агент принимает решение перезапустить gang-операцию, он абортит все джобы, пытаясь при этом переиспользовать аллокации бегущих джобов под запуск джобов новой инкарнации, а также перезапускает джобы, которые уже считаются успешно завершёнными (completed).

Если при переходе операции в новую инкарнацию контроллер-агент смог переиспользовать аллокацию джоба предыдущей инкарнации, то существуют следующие гарантии:

  • job_cookie джобов старой и новой инкарнации совпадают;
  • если джобу предыдущей инкарнации был присвоен gang_rank, то джобу новой инкарнации присваивается тот же gang_rank;
  • если джобу предыдущей инкарнации был присвоен monitoring_descriptor, то джобу новой инкарнации присваивается тот же monitoring_descriptor.

Резервные джобы

При перезапуске gang-а минимум одна аллокация точно не будет переиспользована. Это аллокация джоба, из-за завершения которого контроллер-агент принял решение перезапустить операцию.

Типично запуск джобов в переиспользованных аллокациях происходит существенно быстрее, чем шедулинг новой аллокации и запуск в ней нового джоба.

Совет

Чтобы избежать простоя джобов, под которые получилось переиспользовать аллокацию, на время запуска джоба в новосозданной аллокации, можно использовать механизм резервных джобов.

Резервные джобы — это джобы, которые не являются частью ганга в текущей инкарнации (их завершение не приводит к перезапуску gang-а, а также такие джобы не имеют gang_rank), но их аллокации могут быть использованы под запуск джобов новой инкарнации.

  • Несмотря на то, что резервные джобы не являются частью ганга, в них всё равно будет запущен пользовательский код.

    О том, как пользовательский код может понять, запущен ли он в резервном джобе или ганговом, написано в разделе переменные окружения

  • Когда gang_options/size меньше job_count, контроллер-агент запускает job_count-gang_options/size резервных джобов.

Поведение при ревайве операции

При ревайве gang-операции система проверяет, что все нерезервные джобы успешно восстановлены. Если это не так, операция будет перезапущена без переиспользования аллокаций.

Специальный режим мониторинга для Gang-операций

Gang-операции поддерживают специальный режим формирования monitoring descriptors:
Опция use_operation_id_based_descriptors_for_gangs_jobs: включает специальный режим формирования дескрипторов для Gang-операций: дескриптор формируется на основе operation_id, gang_rank и индекса таска, что гарантирует уникальность.

Все такие дескрипторы имеют в качестве префикса operation_id.

Совет

Этот режим несёт потенциальную опасность переполнения квоты в мониторинге, поэтому стоит использовать его аккуратно, после согласования с командой YTsaurus.

Переменные окружения

В Gang-операциях джобы получают следующие дополнительные переменные окружения:

  • YT_OPERATION_INCARNATION — уникальный идентификатор инкарнации операции;
  • YT_TASK_GANG_SIZE — общий размер группы джобов во всех тасках операции;
  • YT_GANG_RANK — ранг текущего джоба в группе (от 0 до YT_GANG_SIZE - 1);
  • YT_TASK_JOB_COUNT — общее количество джобов в таске;

Ограничения и особенности

  • Gang-операции несовместимы с fail_on_job_restart.
  • Gang-операции несовместимы с restart_completed_jobs = true.
  • При сбое любого джоба вся группа перезапускается с новой инкарнацией.
  • Изменение job_count во время выполнения Gang-операции запрещено.
  • При ревайве операции: Если не все джобы с рангами могут быть восстановлены, вся группа перезапускается с новой инкарнацией.
  • Не поддерживаются выходные таблицы.

Настройка Gang-операций

Для включения Gang-режима необходимо добавить параметр gang_options в спецификацию таска:

{
  tasks = {
    worker = {
      job_count = 3;
      command = "python distributed_worker.py";
      gang_options = {};
    };
  };
}

Параметры gang_options

  • size — размер группы джобов. По умолчанию равен job_count таска. Не может быть больше job_count. Задание size включит механизм резервных джобов.
Предыдущая