Как правильно и эффективно использовать систему YTsaurus
В данном разделе собраны рекомендации по использованию системы YTsaurus для сценариев массивно-параллельной обработки данных.
Слишком мелкие чанки
Примечание
Ситуация с мелкими чанками релевантна для статических таблиц. Сценарии работы с динамическими таблицами предполагают мелкогранулярные чтения для снижения времени ответа на запрос, поэтому небольшие чанки в случае динамических таблиц допустимы.
Маленьким можно считать чанк размером менее 100 Мб. Чтобы чанков в системе не становилось слишком много, стоит стремиться к тому, чтобы средний размер чанка был не менее 512 Мб.
Мелкие чанки затрудняют работу кластера по нескольким причинам:
- Мелкие чанки создают дополнительную нагрузку на мастер-серверы. Чем больше чанков, тем больше памяти мастер-серверов требуется для хранения мета-информации о чанках, и тем медленней они работают: медленнее пишут snapshot, дольше восстанавливаются из него в случае проблем.
- Большое число мелких чанков приводит к медленному чтению данных. Если, например, 100 Мб данных расположены в миллионе чанков по одной записи в каждом, чтобы прочитать все данные, необходимо сделать миллион операций
seek
по диску, что очень медленно, даже если выполнять их параллельно.
В результате выполнения операций или мелкогранулярной записи данных могут возникать таблицы с большим количеством мелких чанков. Чтобы бороться с подобной проблемой чанки таких таблиц необходимо укрупнять. Укрупнение чанков можно выполнить вызвав команду:
yt merge --src //your/table --dst //your/table --spec '{combine_chunks=true;mode=<mode>}'
Следует установить значение атрибута <mode>
в sorted
, для сортированной таблицы и ordered
, для несортированной. Подробнее о типах таблиц в разделе Статические таблицы.
В обоих случаях merge сохранит порядок данных. Но в случае sorted
он также сохранит сортированность таблицы с точки зрения системы: таблица останется sorted
и сохранит все соответствующие атрибуты).
Если вы используете Python-библиотеку, можно указать опцию конфигурации auto_merge_output={action=merge}
, и тогда библиотека сама будет укрупнять получающиеся таблицы, если в них слишком мелкие чанки.
MapReduce vs Map+Sort+Reduce
Подробнее про устройство операции MapReduce, а также про то, почему она обычно быстрее цепочки Map+Sort+Reduce, в разделе MapReduce.
Далее будут описаны случаи, когда не рекомендуется использовать слитную операцию MapReduce.
Тяжёлый и сильно фильтрующий mapper
В качестве условного ориентира можно принять то, что тяжёлый mapper тратит более 100мс CPU на обработку одной строки. Для сильно фильтрующего mapper-а характерно, если объем входных данных в пять и более раз превышает объём выходных данных по строкам или байтам.
В таком случае необходимо, чтобы в map-фазе было как можно больше джобов, чтобы каждый из них выполнялся быстрее. У операции MapReduce существует ограничение на количество map-джобов, вызванное тем, что если их слишком много, то после из-за большого количества мелких случайных чтений с диска будет дорого выполнять фазу сортировки.
Правильное решение в данном случае — сначала запустить операцию Map, указав как можно больше джобов. После этого на полученных данных запустить слитную операцию MapReduce с пустым mapper-ом.
Частое использование MapReduce по одному и тому же набору ключевых полей
Если данные необходимо обработать в Reduce несколько раз, то скорее всего будет выгоднее предварительно их отсортировать, и после этого выполнять обычные Reduce-операции. Типичный случай таких данных — логи.
Данная схема будет хорошо работать только в двух случаях:
- Данные не меняются.
- Данные дописываются в конец (append). Например, таблица отсортирована по времени. Приходит дополнительная порция данных, для которой все значения ключа (времени) больше, чем все значения ключа (времени) в таблице. Тогда порцию данных можно отсортировать, после запустить Reduce с параметром
teleport=%true
Подробнее об опции в разделе Reduce.
Тяжёлый reducer
Аналогично случаю с тяжёлым mapper-ом, требуется запустить как можно больше reduce-джобов. Но у операции существует ограничение на количество партиций, а значит и на количество reduce-джобов.
Правильное решение в данном случае — это отсортировать таблицу, а затем запускать Reduce с как можно большим количеством джобов.
Большое количество записей с одинаковым ключом в reduce-фазе
Для борьбы с данной проблемой в YTsaurus существуют Reduce-комбайнеры, которые позволяют обрабатывать большие ключи в нескольких джобах в reduce-фазе.
Map-комбайнеры
Идея map-комбайнера заключается в том, чтобы агрегировать данные в map-фазе.
Классический пример использования map-комбайнеров — это задача WordCount. В данной задаче на map-стадии необходимо выписывать не пары (word, 1)
, а пары (word, count)
, где count
— количество вхождений данного слова в рамках данного джоба. То есть агрегацию данных полезно выполнять не только в reduce-фазе, но и сразу в map-фазе.
Специальной поддержки map-комбайнеров в системе YTsaurus нет. Причиной больших ключей зачастую является отсутствие агрегации в map-стадии. Поэтому если агрегация возможна, то рекомендуется её выполнять.
Большое число выходных таблиц операции
Для каждой выходной таблицы операции резервируется буфер в оперативной памяти. На этапе запуска операции размер буферов под все выходные таблицы суммируется и прибавляется к тому объёму памяти, который пользователь указал в спецификации операции.
Технически размер буфера под выходные таблицы неявно учитывается в памяти процесса JobProxy
и зависит от настроек кластера, а также значения атрибута erasure_codec
выходной таблицы.
Если выходных таблиц много, суммарный объём памяти может оказаться довольно большим, так что планировщик не сможет быстро найти узел кластера с подходящим объёмом свободной памяти, а может не найти его вовсе. В таком случае операция будет прервана, пользователь получит ошибку No online node can satisfy the resource demand
. В сообщении об ошибке будет указано, какой объём памяти был запрошен.
Примечание
Рекомендуется указывать не более нескольких десятков выходных таблиц для операции.