Reduce

Операция Reduce может состоять из одного или нескольких джобов. На вход каждому джобу поступает часть данных из входных таблиц, которые обрабатываются пользовательским скриптом, результат записывается в выходную таблицу. Входные данные группируются по набору колонок, указанному параметром reduce_by. Группировка гарантирует, что все данные с одним и тем же значением в колонках reduce_by попадут на вход одному джобу. Гарантию можно ослабить с помощью опции enable_key_guarantee. Для запуска операции необходимо, чтобы каждая входная таблица была отсортирована по набору колонок, начинающемуся с reduce_by.

Общие параметры для всех типов операций описаны в разделе Настройки операций.

У операции Reduce поддерживаются следующие параметры (в скобках указаны значения по умолчанию, если заданы):

  • reducer — пользовательский скрипт.

  • input_table_paths — список входных таблиц с указанием полных путей (не может быть пустым).

  • output_table_paths — список выходных таблиц.

  • reduce_by — набор колонок, по которым осуществляется группировка.

  • sort_by (по умолчанию совпадает с reduce_by) — набор колонок, по которым должны быть отсортированы первичные входные таблицы. Опция включает дополнительную проверку на сортированность входных таблиц и дает гарантии на сортированность строк по заданному набору колонок внутри пользовательского скрипта. Последовательность reduce_by полей должна быть префиксом последовательности sort_by полей.

  • job_count, data_weight_per_job (256 Mb) — опции, которые указывают, сколько джобов должно быть запущено. Имеют рекомендательный характер. Опция job_count имеет приоритет над data_weight_per_job.

  • pivot_keys — с помощью данной опции можно производить ручное партицирование данных, то есть определять, в какие джобы попадут определенные диапазоны данных. В качестве значения опции нужно указать сортированный список ключей таблицы, которые будут пограничными.
    Например, если pivot_keys = [["a", 3], ["c", -1], ["x", 42]], то запустится не более четырёх джобов:

    • на ключи строго до ["a", 3];
    • на ключи от ["a", 3] (включительно) до ["c", -1] (не включительно);
    • на ключи от ["c", -1] (включительно) до ["x", 42] (не включительно);
    • на ключи от ["x", 42] до конца.

    Планировщик не будет пытаться разбивать такие джобы на более мелкие, при этом, если какие-то из диапазонов не содержат ключей, то соответствующие джобы могут быть не запущены. Но гарантировать незапуск нельзя, так как планировщик видит не данные полностью, а только выборки из них.

  • enable_key_guarantee — опция, включающая/отключающая гарантию того, что все записи с одним ключом будут поданы на вход одному джобу (по умолчанию true). Существует специальный сценарий использования данной опции в Reduce с внешними таблицами.

  • auto_merge — словарь, содержащий настройки автоматического объединения выходных чанков небольшого размера. По умолчанию автоматическое слияние отключено.

Имеются следующие гарантии о порядке входных данных в операции Reduce:

  • Данные из каждой входной таблицы разбиваются по джобам непрерывными частями по reduce_by колонкам. То есть в один джоб не может попасть ключ A и ключ C, а в другой джоб — ключ B.
  • В рамках джоба данные из всех входных таблиц объединяются и оказываются отсортированы сперва по полям, указанным в параметре sort_by, а затем по индексу входной таблицы (table_index). Для внешних (foreign) таблиц внутри одного join_by-ключа сортировка выполняется по индексу внешней таблицы. Подробнее о внешних таблицах рассказано далее.

Операция Reduce специальным образом обрабатывает входные таблицы с атрибутом teleport=%true на пути. Чанки таких таблиц, не пересекающиеся по диапазону ключей с чанками других входных таблиц, не подаются в пользовательский скрипт. Такие чанки без обработки записываются в выходную таблицу, на пути которой выставлен атрибут teleport=%true. Подобных выходных таблиц может быть не более одной. На выходной таблице по-прежнему можно указать атрибут sorted_by, чтобы таблица была сортирована. В таком случае пользователь должен следить за тем, чтобы диапазон ключей на выходе пользовательского скрипта был не шире, чем диапазон ключей на входе операции.

Пример использования

Характерный пример использования — обновления справочника или агрегатов.
Пусть имеется таблица //some_user/stat, в которой хранится статистика посещений по дням. В таблице две колонки: time и count , данные отсортированы по колонке time. Данные в таблице хранятся в двух чанках: в первом чанке — даты с 2013-06-01 по 2013-08-15, во втором чанке — даты c 2013-08-16 по 2013-09-16. Также пусть имеется сортированная таблица //some_user/update с обновлениями статистики за сентябрь. Тогда можно запустить операцию Reduce для обновления основной таблицы, как показано ниже.

yt reduce --src <teleport=%true>//some_user/stat --src //some_user/update --dst <teleport=%true, sorted_by=[time]>//some_user/stat

В результате первый чанк таблицы //some_user/stat будет «телепортирован» в выходную таблицу в исходном виде, без обработки.

Ещё один частный способ применения: отфильтровать из маленькой сортированной таблицы строки, совпадающие ключами с большой сортированной таблицей: в этом случае можно добавить выходную таблицу с атрибутом <teleport=%true> и удалить её после выполнения. Выходная таблица будет содержать только чанки большой таблицы, которых не было в маленькой.
Пример запуска Reduce-операции с подобной фильтрацией:

yt reduce --src <teleport=%true>//fat_table --src //small/table --dst //results --dst <teleport=%true>//temp_table_to_throw_out ...; yt remove //temp_table_to_throw_out

Reduce с внешними таблицами

Reduce с внешними таблицами — это модификация операции Reduce, в которой часть входных таблиц помечены как внешние (foreign) таблицы и выполняют роль справочников, данные которых добавляются к потоку данных из «обычных» (первичных, primary) таблиц. Для внешних таблиц используется специальный набор ключевых колонок join_by, который должен быть префиксом набора колонок reduce_by. Возможна ситуация, когда записи с одним и тем же join_by ключом, но разными reduce_by ключами попали в разные джобы — в таком случае соответствующие записи из внешних таблиц будут поданы на вход всем джобам, а значит, обработаны более одного раза. Возможна обратная ситуация, если таблица справочник может содержать больше различных комбинаций значений, чем основная таблица. Если записи из внешней таблицы не соответствует ни одна запись из основных, то такая запись может не попасть ни в один джоб.

Внимание

Поскольку чанки внешних таблиц читаются много раз, категорически не рекомендуется использовать для них erasure-кодирование. Напротив, если внешняя таблица маленькая, стоит выставить для неё повышенный коэффициент репликации. Использование erasure для внешней таблицы может привести к перекосу нагрузки на чтение по узлам кластера и деградации производительности всего кластера.

Важным отличием Reduce с внешними таблицами от обычной Reduce-операции является гранулярность вызова пользовательского скрипта в python-wrapper и C++ wrapper. В отличие от обычной Reduce-операции, пользовательский скрипт (метод IReducer::Do в С++ API и функция-обработчик в python) вызывается на каждый join_by ключ, а не reduce_by ключ.
Требования к спецификации для Reduce с внешними таблицами:

  • должна быть указана хотя бы одна первичная таблица (отсутствует атрибут foreign);
  • должна быть указана хотя бы одна внешняя таблица при помощи атрибута <foreign=%true>;
  • должно быть указано значение join_by;
  • join_by должен быть префиксом или совпадать с reduce_by;
  • нельзя использовать join_by без внешних таблиц и внешние таблицы без join_by;
  • нельзя устанавливать атрибут teleport на внешнюю таблицу;
  • нельзя задавать множественные диапазоны (ranges) для внешней таблицы, но одна и та же внешняя таблица может быть указана несколько раз с разными диапазонами.

Также для операции Reduce с внешними таблицами доступна опция consider_only_primary_size (по умолчанию %false), при включении которой при разбивке джобов будет учитываться только объем данных в первичных (primary) таблицах.

Выставление опции enable_key_guarantee=%false в Reduce с внешними таблицами имеет особую семантику:

  • Отключается гарантия на то, что все записи с одинаковым ключом из первичной таблицы попадут в один джоб. При этом сохраняется гарантия того, что если в джоб попала запись из первичной таблицы с некоторым ключом, то все записи с таким же ключом из всех внешних таблиц попадут в тот же джоб.
  • Список ключей пользователь должен передавать в join_by, указывать reduce_by запрещено.

Рекомендуется использовать операцию Reduce с опцией enable_key_guarantee=%false.

Работа с большими ключами — reduce_combiner

Для работы с большими ключами в Reduce фазе существует специальная стадия обработки данных. Джобы этой стадии называются reduce_combiner джобами. Такие джобы запускаются на частях больших партиций и позволяют выполнять частичный Reduce, не дожидаясь сортировки всех частей партиции и запуска Reduce с финальным Merge. Reduce джоб для такой партиции получит на вход результат слияния выходов нескольких reduce_combiner.

Reduce_combiner запускается, если размер партиции превышает data_weight_per_sort_job. Объем данных в reduce_combiner равен data_weight_per_sort_job. Значение по умолчанию для data_weight_per_sort_job задается в конфигурации планировщика, но может быть переопределено в спецификации операции (в байтах).

Reduce_combiner также можно принудительно запустить, установив параметр force_reduce_combiners в %true.
Reduce_combiner получает на вход сортированный поток записей как обычный reducer. На выход reduce_combiner накладывается несколько ограничений:

  • Выход должен быть сортированным.
  • reduce_combiner не должен изменять ключи — колонки, указанные в поле sort_by спецификации, а если оно не указано, то в reduce_by.
  • Только одна выходная таблица, как для маппера в операции MapReduce.
    Отсюда, в частности, следует, что любой коммутативный и ассоциативный reducer можно использовать в качестве reduce_combiner в исходном виде.

Пример запуска операции с reduce_combiner:

yt map-reduce --reduce-combiner cat --reducer cat --reduce-by cookies --src //statbox/access-log/2013-05-15 --dst //tmp/asd --format dsv

Пример спецификации

{
  data_weight_per_job = 1000000;
  reduce_by = ["key"];
  sort_by = ["key"; "subkey" ];
  input_table_paths = [ "//tmp/input_table" ];
  output_table_paths = [ "//tmp/unsorted_output_table"; <sorted_by = ["key"]> "//tmp/sorted_output_table" ];
  reducer = {
    command = "python my_reducer.py"
    file_paths = [ "//tmp/my_reducer.py" ];
  };
  job_io = {
    table_writer = {
      max_row_weight = 10000000;
    }
  }
}
Предыдущая
Следующая