Reduce
Операция Reduce может состоять из одного или нескольких джобов. На вход каждому джобу поступает часть данных из входных таблиц, которые обрабатываются пользовательским скриптом, результат записывается в выходную таблицу. Входные данные группируются по набору колонок, указанному параметром reduce_by
. Группировка гарантирует, что все данные с одним и тем же значением в колонках reduce_by
попадут на вход одному джобу. Гарантию можно ослабить с помощью опции enable_key_guarantee
. Для запуска операции необходимо, чтобы каждая входная таблица была отсортирована по набору колонок, начинающемуся с reduce_by
.
Общие параметры для всех типов операций описаны в разделе Настройки операций.
У операции Reduce поддерживаются следующие параметры (в скобках указаны значения по умолчанию, если заданы):
-
reducer
— пользовательский скрипт. -
input_table_paths
— список входных таблиц с указанием полных путей (не может быть пустым). -
output_table_paths
— список выходных таблиц. -
reduce_by
— набор колонок, по которым осуществляется группировка. -
sort_by
(по умолчанию совпадает сreduce_by
) — набор колонок, по которым должны быть отсортированы первичные входные таблицы. Опция включает дополнительную проверку на сортированность входных таблиц и дает гарантии на сортированность строк по заданному набору колонок внутри пользовательского скрипта. Последовательностьreduce_by
полей должна быть префиксом последовательностиsort_by
полей. -
job_count
,data_weight_per_job
(256 Mb) — опции, которые указывают, сколько джобов должно быть запущено. Имеют рекомендательный характер. Опцияjob_count
имеет приоритет надdata_weight_per_job
. -
pivot_keys
— с помощью данной опции можно производить ручное партицирование данных, то есть определять, в какие джобы попадут определенные диапазоны данных. В качестве значения опции нужно указать сортированный список ключей таблицы, которые будут пограничными.
Например, еслиpivot_keys = [["a", 3], ["c", -1], ["x", 42]]
, то запустится не более четырёх джобов:- на ключи строго до
["a", 3]
; - на ключи от
["a", 3]
(включительно) до["c", -1]
(не включительно); - на ключи от
["c", -1]
(включительно) до["x", 42]
(не включительно); - на ключи от
["x", 42]
до конца.
Планировщик не будет пытаться разбивать такие джобы на более мелкие, при этом, если какие-то из диапазонов не содержат ключей, то соответствующие джобы могут быть не запущены. Но гарантировать незапуск нельзя, так как планировщик видит не данные полностью, а только выборки из них.
- на ключи строго до
-
enable_key_guarantee
— опция, включающая/отключающая гарантию того, что все записи с одним ключом будут поданы на вход одному джобу (по умолчаниюtrue
). Существует специальный сценарий использования данной опции в Reduce с внешними таблицами. -
auto_merge
— словарь, содержащий настройки автоматического объединения выходных чанков небольшого размера. По умолчанию автоматическое слияние отключено.
Имеются следующие гарантии о порядке входных данных в операции Reduce:
- Данные из каждой входной таблицы разбиваются по джобам непрерывными частями по
reduce_by
колонкам. То есть в один джоб не может попасть ключA
и ключC
, а в другой джоб — ключB
. - В рамках джоба данные из всех входных таблиц объединяются и оказываются отсортированы сперва по полям, указанным в параметре
sort_by
, а затем по индексу входной таблицы (table_index
). Для внешних (foreign
) таблиц внутри одногоjoin_by
-ключа сортировка выполняется по индексу внешней таблицы. Подробнее о внешних таблицах рассказано далее.
Операция Reduce специальным образом обрабатывает входные таблицы с атрибутом teleport=%true
на пути. Чанки таких таблиц, не пересекающиеся по диапазону ключей с чанками других входных таблиц, не подаются в пользовательский скрипт. Такие чанки без обработки записываются в выходную таблицу, на пути которой выставлен атрибут teleport=%true
. Подобных выходных таблиц может быть не более одной. На выходной таблице по-прежнему можно указать атрибут sorted_by
, чтобы таблица была сортирована. В таком случае пользователь должен следить за тем, чтобы диапазон ключей на выходе пользовательского скрипта был не шире, чем диапазон ключей на входе операции.
Пример использования
Характерный пример использования — обновления справочника или агрегатов.
Пусть имеется таблица //some_user/stat
, в которой хранится статистика посещений по дням. В таблице две колонки: time
и count
, данные отсортированы по колонке time
. Данные в таблице хранятся в двух чанках: в первом чанке — даты с 2013-06-01 по 2013-08-15, во втором чанке — даты c 2013-08-16 по 2013-09-16. Также пусть имеется сортированная таблица //some_user/update
с обновлениями статистики за сентябрь. Тогда можно запустить операцию Reduce для обновления основной таблицы, как показано ниже.
yt reduce --src <teleport=%true>//some_user/stat --src //some_user/update --dst <teleport=%true, sorted_by=[time]>//some_user/stat
В результате первый чанк таблицы //some_user/stat
будет «телепортирован» в выходную таблицу в исходном виде, без обработки.
Ещё один частный способ применения: отфильтровать из маленькой сортированной таблицы строки, совпадающие ключами с большой сортированной таблицей: в этом случае можно добавить выходную таблицу с атрибутом <teleport=%true>
и удалить её после выполнения. Выходная таблица будет содержать только чанки большой таблицы, которых не было в маленькой.
Пример запуска Reduce-операции с подобной фильтрацией:
yt reduce --src <teleport=%true>//fat_table --src //small/table --dst //results --dst <teleport=%true>//temp_table_to_throw_out ...; yt remove //temp_table_to_throw_out
Reduce с внешними таблицами
Reduce с внешними таблицами — это модификация операции Reduce, в которой часть входных таблиц помечены как внешние (foreign
) таблицы и выполняют роль справочников, данные которых добавляются к потоку данных из «обычных» (первичных, primary
) таблиц. Для внешних таблиц используется специальный набор ключевых колонок join_by
, который должен быть префиксом набора колонок reduce_by
. Возможна ситуация, когда записи с одним и тем же join_by
ключом, но разными reduce_by
ключами попали в разные джобы — в таком случае соответствующие записи из внешних таблиц будут поданы на вход всем джобам, а значит, обработаны более одного раза. Возможна обратная ситуация, если таблица справочник может содержать больше различных комбинаций значений, чем основная таблица. Если записи из внешней таблицы не соответствует ни одна запись из основных, то такая запись может не попасть ни в один джоб.
Внимание
Поскольку чанки внешних таблиц читаются много раз, категорически не рекомендуется использовать для них erasure-кодирование. Напротив, если внешняя таблица маленькая, стоит выставить для неё повышенный коэффициент репликации. Использование erasure
для внешней таблицы может привести к перекосу нагрузки на чтение по узлам кластера и деградации производительности всего кластера.
Важным отличием Reduce с внешними таблицами от обычной Reduce-операции является гранулярность вызова пользовательского скрипта в python-wrapper и C++ wrapper. В отличие от обычной Reduce-операции, пользовательский скрипт (метод IReducer::Do
в С++ API и функция-обработчик в python) вызывается на каждый join_by
ключ, а не reduce_by
ключ.
Требования к спецификации для Reduce с внешними таблицами:
- должна быть указана хотя бы одна первичная таблица (отсутствует атрибут
foreign
); - должна быть указана хотя бы одна внешняя таблица при помощи атрибута
<foreign=%true>
; - должно быть указано значение
join_by
; join_by
должен быть префиксом или совпадать сreduce_by
;- нельзя использовать
join_by
без внешних таблиц и внешние таблицы безjoin_by
; - нельзя устанавливать атрибут
teleport
на внешнюю таблицу; - нельзя задавать множественные диапазоны (ranges) для внешней таблицы, но одна и та же внешняя таблица может быть указана несколько раз с разными диапазонами.
Также для операции Reduce с внешними таблицами доступна опция consider_only_primary_size
(по умолчанию %false
), при включении которой при разбивке джобов будет учитываться только объем данных в первичных (primary
) таблицах.
Выставление опции enable_key_guarantee=%false
в Reduce с внешними таблицами имеет особую семантику:
- Отключается гарантия на то, что все записи с одинаковым ключом из первичной таблицы попадут в один джоб. При этом сохраняется гарантия того, что если в джоб попала запись из первичной таблицы с некоторым ключом, то все записи с таким же ключом из всех внешних таблиц попадут в тот же джоб.
- Список ключей пользователь должен передавать в
join_by
, указыватьreduce_by
запрещено.
Рекомендуется использовать операцию Reduce с опцией enable_key_guarantee=%false
.
Работа с большими ключами — reduce_combiner
Для работы с большими ключами в Reduce фазе существует специальная стадия обработки данных. Джобы этой стадии называются reduce_combiner джобами. Такие джобы запускаются на частях больших партиций и позволяют выполнять частичный Reduce, не дожидаясь сортировки всех частей партиции и запуска Reduce с финальным Merge. Reduce джоб для такой партиции получит на вход результат слияния выходов нескольких reduce_combiner
.
Reduce_combiner
запускается, если размер партиции превышает data_weight_per_sort_job
. Объем данных в reduce_combiner
равен data_weight_per_sort_job
. Значение по умолчанию для data_weight_per_sort_job
задается в конфигурации планировщика, но может быть переопределено в спецификации операции (в байтах).
Reduce_combiner
также можно принудительно запустить, установив параметр force_reduce_combiners
в %true
.
Reduce_combiner
получает на вход сортированный поток записей как обычный reducer. На выход reduce_combiner
накладывается несколько ограничений:
- Выход должен быть сортированным.
reduce_combiner
не должен изменять ключи — колонки, указанные в полеsort_by
спецификации, а если оно не указано, то вreduce_by
.- Только одна выходная таблица, как для маппера в операции
MapReduce
.
Отсюда, в частности, следует, что любой коммутативный и ассоциативный reducer можно использовать в качествеreduce_combiner
в исходном виде.
Пример запуска операции с reduce_combiner
:
yt map-reduce --reduce-combiner cat --reducer cat --reduce-by cookies --src //statbox/access-log/2013-05-15 --dst //tmp/asd --format dsv
Пример спецификации
{
data_weight_per_job = 1000000;
reduce_by = ["key"];
sort_by = ["key"; "subkey" ];
input_table_paths = [ "//tmp/input_table" ];
output_table_paths = [ "//tmp/unsorted_output_table"; <sorted_by = ["key"]> "//tmp/sorted_output_table" ];
reducer = {
command = "python my_reducer.py"
file_paths = [ "//tmp/my_reducer.py" ];
};
job_io = {
table_writer = {
max_row_weight = 10000000;
}
}
}