Типы операций

В данном разделе описаны операции обработки данных, доступные в системе YTsaurus. Вводятся понятия пользовательского скрипта и джоба как составной части операции. Описаны возможные состояния операций и джобов.

Обзор

В системе YTsaurus поддерживаются следующие операции обработки данных:

  • Map, Reduce, MapReduce — выполняют пользовательский код над входными данными;
  • Sort — сортирует указанную на входе таблицу;
  • Merge — выполняет слияние таблиц;
  • Erase — удаляет из таблицы указанный диапазон данных;
  • RemoteCopy — копирует данные между кластерами;
  • Vanilla — запускает набор пользовательских скриптов в нужном количестве на узлах кластера и поддерживает их жизнедеятельность.

У всех операций есть специальный параметр spec, в котором передаётся описание настроек операции, так называемая спецификация. Спецификация представляет собой иерархическую структуру, которая задана в формате YSON. У каждого типа операции своя структура спецификации, её подробное описание можно найти в подразделах, посвященных конкретным типам операций.

Пользовательский скрипт — произвольная строка, которая будет выполнена посредством вызова bash -c, и набор параметров, описывающих детали выполнения. Работа пользовательского скрипта происходит в окружении, которое называется sandbox. Пользовательский скрипт выполняется от имени служебного пользователя, которого формально нет в системе (речь идет о пользователях системы Linux), и у которого есть права только на чтение/запись в текущую директорию и в директорию tmp. Подробное описание параметров пользовательского скрипта приведено в разделе Настройки операций.

Операции запускаются в вычислительных пулах. Пулы организованы в древовидную структуру и позволяют распределять вычислительные ресурсы кластера. Подробнее можно прочитать в разделе Планировщик и пулы.

В рамках операции выполняются джобы (jobs). Джобы являются единицей параллелизма операции. Входные данные операции делятся на части. Каждая часть входных данных обрабатывается одним джобом.

Параметры запущенной операции

Примечание

Процесс и результат выполнения операций можно отслеживать через веб-интерфейс, экран Operations.

Также всю необходимую информацию можно получить с помощью вызова get_operation.
Вызов возвращает набор атрибутов операции, особенно полезны следующие атрибуты:

  • state — текущее состояние операции;
  • progress — описание прогресса операции по джобам;
  • spec — спецификация запущенной операции;
  • result — результат выполнения (описание ошибки в случае, если операция завершилась с вердиктом failed).

Также существует вызов list_jobs, с помощью которого можно получить список джобов операции — как выполняемых, так и завершившихся.
Примеры использования get_operation и list_jobs на языке Python в разделе Python wrapper.

Возможные статусы операции

В процессе выполнения операция может переходить из одного статуса в другой:

  • running — операция выполняется;
  • pending — операция находится в очереди на запуск. В каждом пуле есть ограничение на количество выполняющихся операций, и когда оно превышено, операции выстраиваются в очередь на запуск;
  • completed — операция успешно завершена;
  • failed — операция завершилась с ошибкой. Основные возможные причины: не смогла запуститься из-за некорректных параметров, либо превышен порог для джобов, завершившихся с ошибкой;
  • aborted — операция была прервана. Основные возможные причины: была прервана пользователем, либо была прервана транзакция, в которой запускалась операция;
  • reviving — операция восстанавливается после перезапуска планировщика;
  • initializing, preparing, materializing, completing, aborting, failing — переходные состояния операции, необходимы для поэтапного изменения метаинформации в мастер-сервере YTsaurus.

Возможные состояния джобов

Каждая операция состоит из множества джобов. У джобов, как и у операций, имеются различные состояния:

  • running — джоб выполняется;
  • completed — джоб успешно завершен;
  • aborted — джоб был прерван. Как правило, такое состояние является следствием вытеснения (preemption) или сетевых ошибок. Подробнее о вытеснении в разделе Планировщик и пулы;
  • failed — джоб завершился с ошибкой. Как правило, такое состояние является следствием ошибок в пользовательском скрипте. В таком случае, у данного джоба будет лог с ошибками (stderr), который можно прочитать. Также данные джобы могут появляться, если закончилось место у используемого аккаунта, и джоб не может записать результат своей работы;
  • waiting — джоб был отправлен на узел кластера, но стоит в очереди в ожидании ресурсов;
  • aborting — планировщик принял решение о прерывании джоба, но данный процесс на узле кластера еще не закончился.

На странице операции в веб-интерфейсе можно увидеть количество pending джобов. Это число не спланированных джобов, которые необходимо будет выполнить до конца операции по предположению планировщика. Количество pending джобов может отличаться от количества джобов, которое будет реально выполнено: нарезка данных зависит от скорости выполнения джобов.

Прерванные джобы

Джобы в состоянии aborted несут в себе информацию, указывающую на класс причин, по которой джоб был прерван:

  • scheduler — джоб был прерван планировщиком, но более точная причина неизвестна;
  • failed_chunks — джоб был прерван узлом кластера из-за невозможности прочитать входные или записать выходные данные;
  • resource_overdraft — джоб был прерван узлом кластера из-за превышения ограничения по памяти. Планировщик по умолчанию выделяет под джоб меньше памяти, чем заказывал пользователь, и в случае, если джоб не укладывается в выделенную память и память на узле кластера заканчивается, такой джоб будет прерван и перезапущен;
  • other — джоб был прерван узлом кластера по неклассифицированной причине;
  • preemption — джоб был прерван планировщиком для запуска другого джоба, сработало вытеснение;
  • user_request — джоб был прерван пользователем;
  • node_offline — джоб был прерван, так как планировщик потерял связь с узлом кластера;
  • waiting_timeout — джоб был прерван, так как стоял в очереди на узле кластера больше определенного времени;
  • account_limit_exceeded — джоб был прерван, так как при записи данных был переполнен аккаунт пользователя. Такие джобы считаются failed, если не включена опция suspend_operation_if_account_limit_exceeded;
  • unknown — причина прерывания джоба неизвестна. Такое поведение случается, если на кластере работают различные версии вычислительных узлов и планировщика, и узел кластера сообщает планировщику причину, которая неизвестна планировщику;
  • причины, начинающиеся с префикса scheduling_ относятся к случаю, когда планировщик спланировал джоб, но в силу определенных обстоятельств отменил джоб до того, как запланировать его на узле кластера. То есть джобы такого типа тратят впустую только ресурсы самого планировщика, но не ресурсы кластера:
    • scheduling_timeout — планировщик не успел спланировать джоб за отведенное время. Может происходить при запуске крупных операций — с большим объёмом входных данных порядка нескольких петабайт, а также когда планировщик перегружен по CPU;
    • scheduling_resource_overcommit — после планирования джоба выяснилось, что превышен resource_limits операции или одного из родительских пулов;
    • scheduling_operation_suspended — после планирования джоба контроллером выяснилось, что операция была приостановлена;
    • scheduling_job_spec_throttling — после планирования джоба для него формируется спецификация, в которой в том числе содержится метаинформация о входных чанках для джоба. В случае, когда входных чанков слишком много, активируется логика по ограничению создания джобов для конкретной операции. Какие-то джобы операции при этом могут быть прерваны;
    • scheduling_intermediate_chunk_limit_exceeded — джоб был прерван, так как превышено ограничение на количество промежуточных чанков, которые может создать операция при включенном автоматическом слиянии выходных чанков в режимах economy и manual;
    • scheduling_other — джоб был прерван планировщиком по неклассифицированной причине.

Завершившиеся джобы

Возможна ситуация когда планировщик решает прервать джоб, при этом планировщик перестает подавать новые данные на вход джоба согласованно с гарантиями операции и дает определенное время, чтобы джоб завершился. В случае, если джоб успешно завершился в рамках отведенного ему времени, он переходит в состояние completed, а не aborted, хоть и был прерван. У таких completed джобов имеется атрибут interrupt_reason не равное none и обозначающее причину, по которой планировщик решил прервать джоб.
Возможные значения причины прерывания джоба в таких случаях:

  • none — джоб успешно завершился без прерывания;
  • preemption — джоб был прерван планировщиком для запуска другого джоба, сработало вытеснение;
  • user_request — джоб был прерван пользователем;
  • job_split — джоб был прерван, чтобы разбить его на большее количество джобов;
  • unknown — причина прерывания джоба неизвестна. Такое случается, если решение о прерывании джоба было отправлено на узел кластера и после этого планировщик был перезапущен.

Стандартный поток ошибок

В джобе операции можно писать данные в стандартный поток ошибок (stderr), записанные данные будут сохранены и доступны для чтения. Записывать данные в stderr бывает полезно для отладки.
Получить stderr джоба можно командой get_job_stderr, получить список джобов можно командой list_jobs. В параметрах list_jobs для удобства можно указать, что следует вывести только джобы с записанным stderr. list_jobs и get_job_stderr доступны, например, из Python, их описание в разделе Python wrapper.
Чтобы stderr операции не занимали слишком много места, применяются следующие политики при их записи:

  1. Сохраняется не больше max_stderr_size байтов, записанных в stderr. Все, что записано после данного лимита, игнорируется.
  2. Никогда не сохраняется более 150 stderr-ов джобов — это настройка планировщика.
  3. По умолчанию сохраняется не более 20 stderr-ов джобов — это настройка операции. Может принимать значение до 150.

Борьба с недоступностью данных

Стратегии и тактики

Для каждой операции в спецификации имеется две настройки: unavailable_chunk_strategy и unavailable_chunk_tactics. Их возможные значения: wait, fail и skip. Данные значения определяют поведение планировщика в случае, когда необходимые входные данные для операции недоступны:

  • wait — ждать появления данных до бесконечности;
  • fail — немедленно прерывать операцию;
  • skip — пропускать недоступные данные и выполнять расчет на неполном наборе.

Стратегия и тактика влияют на поведение на разных стадиях операции:

  • стратегия используется при запуске операции и первоначальном планировании объема работ;
  • тактика используется при появлении недоступных данных для уже выполняемой операции.

Теоретически возможно 9 комбинаций, однако не все из них имеют смысл. Полезны следующие сочетания:

  • стратегия wait, тактика wait — используется по умолчанию, расчет обязательно выполнить по полным данным, возможно ценой ожидания;
  • стратегия fail, тактика wait — при запуске нужно убедиться в целостности данных, в случае возникновения потерь — ожидать, пока данные станут доступны;
  • стратегия skip, тактика skip — расчет нужно выполнить по любым имеющимся данным как можно скорее;
  • стратегия skip, тактика wait — расчет необходимо запустить прямо сейчас, фиксировав масштаб потерь, никакие дополнительные потери не допустимы, следует ждать доступности данных;
  • стратегия skip, тактика fail — расчет необходимо запустить прямо сейчас, фиксировав масштаб потерь, никакие дополнительные потери не допустимы, при их возникновении расчет нужно прервать;
  • стратегия fail, тактика fail — расчет нужно выполнить на полных данных, в случае недоступности сообщить об этом как можно скорее.

В процессе работы планировщик периодически запрашивает у мастера статус входных чанков у операции. В случае обнаружения планировщиком недоступных данных релевантная часть вычисления останавливается. Джобы, использующие эти данные, не запускаются, но запрос чанков у мастера продолжается. Каждый запрос erasure чанка вызывает его перемещение наверх в очереди чанков, ожидающих восстановления (т.н. move to front). Соответственно, те чанки, которые нужны прямо сейчас для работы операций, будут восстановлены в первоочередном порядке.

Данные действия полностью автоматические и не требуют никаких настроек.

Запуск через C++ и Python Wrapper

При запуске операций через С~++ и Python Wrapper помимо вызова соответствующей команды драйвера происходит ряд подготовительных работ. К ним относятся:

  • загрузка файлов в Файловый кеш;

    Примечание: Автоматическая загрузка файлов в кеш всегда происходит вне пользовательской транзакции, что может приводить к конфликтам на узлах Кипариса, если каталог с файловым кешем не был создан заранее.

  • создание выходных таблиц или их очистка с помощью команды erase. С точки зрения системы, при запуске операции все выходные таблицы уже должны существовать;

  • удаление файлов после завершения операции;

  • установка параметров jobcount и memorylimit, если они указаны пользователем;

  • часть параметров может иметь значения по умолчанию, отличные от системных.

Ограничения на число таблиц и джобов

В системе действуют ограничения на параметры операций, которые нужно учитывать при работе.

  1. Существует ограничение на число входных таблиц любой операции. По умолчанию на «больших» продуктовых кластерах данное ограничение равно 3 000. На тестовых кластерах, а также кластерах, где не предполагается крупных MapReduce операций, ограничение может быть меньше, но при необходимости также может быть увеличено до 3 000;
  2. Отдельного ограничения на количество выходных таблиц не существует, но следует понимать, что каждая выходная таблица требует поддержания буферов в памяти. Поэтому, указав 1 000 выходных таблиц, вы получите операцию, у джобов которой будут невыполнимые требования по памяти, поэтому они не будут запускаться. Практически верхнее ограничение составляет около 100 выходных таблиц. Если требуется больше, необходимо разбить операцию на несколько и выполнить обработку поэтапно;
  3. Существует ограничение на количество джобов операций. Для Map, Merge, Reduce, RemoteCopy оно составляет 100 000 джобов. В случае Reduce и сортированного Merge сформировать столько джобов может не получиться, так как у планировщика не будет достаточного количества семплов. Для операций с shuffle-стадией (Sort и MapReduce) существует ограничение в 5 000 партиций. Подробнее о партициях можно узнать в соответствующих разделах, посвящённых конкретным типам операций;
  4. Существует ограничение на произведение количества джобов операции на количество выходных таблиц. Данное ограничение необходимо для того, чтобы операция не порождала большого количества чанков, которые могут сильно нагрузить мастер-серверы. На «больших» продуктовых кластерах данное ограничение равно 2 000 000.

Транзакционность обработки данных

На любую операцию можно смотреть как на черный ящик, у которого есть набор входов и набор выходов, а также набор используемых в ходе работы артефактов, то есть файлов и порто-слоев. Общая логика работы операции является достаточно простой: он берет exclusive-локи на выходные таблицы под специальной output-транзакцией и snapshot-локи на входные таблицы (и артефакты) под специальной input-транзакцией, дальше выполняются все вычисления и в случае успеха коммитится output-транзакций. В противном случае, если операция была прервана или завершилась ошибкой – output-транзакция прерывается.

Также опишем разные нюансы и особые случаи:

  • Если таблица есть как среди входных, так и среди выходных таблиц, то для таблицы берется только exclusive-лок в output-транзакции и данные как читаются, так и записываются из данной версии таблицы.
  • В случае, если выходная таблица помечена флагом append, то для неё берется shared-лок. Логика из предыдущего пункта работать не будет, если таблицы есть и во входе и на выходе.
  • Сама операция может быть запущена в рамках пользовательской транзакции, в таком случае input- и output-транзакции создаются как подтранзакции пользовательской транзакции.
  • Транзакции планировщика создаются с timeout, достаточным, чтобы переживать даунтаймы кластера, которые могут занимать часы. Далее они пингуются планировщиком. При стандартном обновлении планировщика, он переиспользует уже имеющиеся транзакции у операций. В случае длинных учений или мажорного обновления мастер-сервера, транзакции прерываются, операции перезапускаются с нуля и в частности заново берутся локи на таблицы.

Также у операции может быть debug-транзакция, в случае, если у операции есть core- или stderr-таблицы. Под данной транзакцией происходит запись core-файлов и stderr-ов, данная транзакция коммитится даже в случае прерывания или завершения операции ошибкой.

Указанные транзакции можно посмотреть в атрибутах операции:

  1. input_transaction_id, nested_input_transaction_ids – транзакция, под которой берутся локи на входные таблицы и артефакты. Это может быть список транзакций, если используются transaction_id на входных таблицах;
  2. output_transaction_id – транзакция, под которой записываются данные в выходные таблицы;
  3. debug_transaction_id – транзакция, под которой записываются core- и stderr-таблицы;
  4. async_scheduler_transaction_id – транзакция, под которой записываются временные данные в операции, которая всегда прерывается по завершении операции. Например, под ней живут intermediate-данные.
  5. output_completion_transaction_id, debug_completion_transaction_id – технические транзакции, которые используются при успешном завершении операции, чтобы атомарно закоммитить результат работы операции.
Следующая