Коллективы джобов

Коллективы джобов — это механизм, позволяющий запускать несколько джобов для обработки одной порции данных. Это полезно, когда для выполнения вычислений требуется несколько процессов, работающих совместно — например, при распределённых вычислениях на GPU или при параллельной обработке данных несколькими джобами, когда они не могут быть обработаны одним джобом.

О том, как включить коллективы джобов в операции, написано в разделе Настройка.

Основная идея

Операции с коллективами джобов работают так же, как и обычные операции, за одним исключением: каждая порция данных обрабатывается не одним джобом, а коллективом из нескольких джобов.

Это означает, что:

  • Нарезка на джобы происходит ровно так же, как и в обычных операциях. (В частности, для пользователя это означает, что конфигурировать нарезку в спеке операции с коллективами джобов можно точно так же, как и в обычных операциях.)
  • С точки зрения потока данных операции, коллектив выглядит как один джоб: он получает одну порцию входных данных и производит одну порцию выходных данных.

Примечание

Коллективы джобов — это не то же самое, что Gang-операции. Gang-операции перезапускают все джобы операции при перезапуске любого нерезервного джоба, тогда как коллективы джобов перезапускают только джобы внутри одного коллектива (который обрабатывает отдельную порцию данных).

Сценарии использования

Распределённые вычисления на GPU

Типичный сценарий — обработка данных с использованием нескольких GPU. Master-джоб читает данные и распределяет их между slave-джобами, каждый из которых выполняет вычисления на своём GPU.

Параллельная обработка с агрегацией

Slave-джобы выполняют параллельные вычисления, результаты которых master-джоб агрегирует и записывает в выходную таблицу.

Когда стоит использовать Gang-операции, а когда — операции с коллективами джобов?

Gang-операции — это разновидность Vanilla-операций, поэтому в случае, когда требуется запустить операцию другого типа, подойдут только операции с коллективом джобов.
В случае, когда нужна Vanilla-операция, можно использовать как Gang-операции, так и Vanilla-операции с коллективом джобов. Разница будет заключаться в некоторых деталях:

Структура коллектива

Коллектив состоит из нескольких джобов, совместно обрабатывающих одну порцию данных. Каждому джобу в коллективе присваивается ранг — целое число от 0 до size - 1, где size — размер коллектива.

  • Master-джоб (ранг 0) — читает входные данные и пишет результаты в выходные таблицы. С точки зрения системы, именно master-джоб представляет коллектив во взаимодействии с потоком данных операции.
  • Slave-джобы (ранги 1, 2, ...) — вспомогательные джобы, которые не имеют доступа к входным/выходным потокам данных операции. Они выполняют вычисления и обмениваются данными с master-джобом по сети.

Жизненный цикл коллектива

  1. Когда контроллер-агент решает обработать очередную порцию данных, вместо одного джоба он создаёт коллектив из нескольких джобов.
  2. Master-джоб получает входные данные через stdin (для операций Map/Reduce/MapReduce) точно так же, как это делал бы обычный джоб.
  3. Все джобы коллектива могут взаимодействовать между собой по сети, используя YT_COLLECTIVE_ID для идентификации.
  4. Только master-джоб может писать выходные данные.
  5. Коллектив считается успешно завершённым, когда master-джоб завершился успешно. Это означает, что если slave-джобы завершатся неуспешно после комплита master-джоба (с точки зрения контроллер-агента), данные всё равно считаются обработанными.

Обработка сбоев

При неуспешном завершении (fail или abort) любого джоба в коллективе (в случае, если до этого master-джоб ещё не покомплитился):

  • Все остальные джобы этого коллектива абортятся.
  • Коллектив перезапускается целиком — так же, как перезапускался бы обычный джоб.
  • Другие коллективы операции продолжают работать независимо.

Поддерживаемые операции

Коллективы джобов поддерживаются во всех операциях с пользовательским кодом:

Операция Где указывать collective_options
Map В спецификации mapper
Reduce В спецификации reducer
MapReduce В спецификации mapper, reducer или reduce_combiner
Vanilla В спецификации таска

Переменные окружения

В джобах коллектива доступны следующие переменные окружения:

Переменная Описание
YT_COLLECTIVE_MEMBER_RANK Ранг джоба в коллективе (0 для master-джоба, 1, 2, ... для slave-джобов)
YT_COLLECTIVE_ID Уникальный идентификатор коллектива (сейчас совпадает с job_id master-джоба, но это свойство не гарантируется и может в дальнейшем поменяться)

Эти переменные позволяют джобам:

  • Определить свою роль в коллективе.
  • Найти друг друга для сетевого взаимодействия.
  • Координировать работу между собой.

Настройка

Для включения коллективов джобов нужно добавить параметр collective_options в спецификацию джоба:

{
  collective_options = {
    size = 2;  # Количество джобов в коллективе
  };
}

Параметры collective_options

Параметр Тип Описание
size int Количество джобов в коллективе. Должен быть больше 1.

Примеры спецификации

Vanilla-операция с коллективами джобов

В этом примере запускается Vanilla-операция с коллективом из 2 джобов. Master-джоб (ранг 0) пишет результат в выходную таблицу, а slave-джоб (ранг 1) ничего не делает.

{
  tasks = {
    worker = {
      job_count = 1;
      output_table_paths = ["//tmp/output"];
      format = "yson";
      command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then echo '{result=42}'; fi";
      collective_options = {
        size = 2;
      };
      close_stdout_if_unused = %true;
    };
  };
}

Map-операция с коллективами джобов

{
  mapper = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
}

MapReduce-операция с коллективами джобов

В этом примере коллективы джобов используются на всех стадиях MapReduce-операции: mapper, reduce_combiner и reducer.

{
  mapper = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  reduce_combiner = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  reducer = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  force_reduce_combiners = %true;
}

Ограничения

  • Несовместимость с gang_options: Коллективы джобов нельзя использовать вместе с Gang-операциями. При попытке указать оба параметра операция завершится с ошибкой.

  • Вывод данных только из master-джоба: Только master-джоб (ранг 0) может писать в выходные таблицы.

    Совет

    Используйте опцию close_stdout_if_unused = %true в спецификации джоба, чтобы явно закрыть stdout для slave-джобов и избежать случайной записи.

  • Прерывание джобов: Прервать (interrupt) можно только master-джоб. Попытка прервать slave-джоб приводит к его аборту с последующим перезапуском коллектива.

См. также