Пользовательские агрегатные функции в YQL

Введение

Если по каким-то причинам готовых агрегатных функций оказалось недостаточно, в YQL есть возможность описать свою. Механизм называется UDAF (User Defined Aggregation Functions) и работает полностью аналогично стандартным агрегатным функциям.

Из особенностей/преимуществ по сравнению с работой на уровне Map/Reduce (PROCESS + REDUCE, либо работа с YTsaurus напрямую без YQL):

  1. Возможность легко применить одну и ту же логику к нескольким полям/выражениям за один проход вычислений по данным (т. е. считаем разные не связанные друг с другом агрегаты, читая данные только один раз), а также автоматический пропуск пустых значений.
  2. Поддержка DISTINCT, то есть вызова функции только по уникальным значениям.
  3. Автоматическое создание map-side combiner, что во многих случаях позволяет сильно сэкономить на времени выполнения запроса, при этом не занимаясь реализацией map-side combiner самостоятельно.

Примечание

Если вы сделали или собираетесь сделать агрегатную функцию, которая, на ваш взгляд, могла бы быть полезна широкому кругу пользователей — напишите нам об этом в телеграм-чате. Возможно, мы добавим её в список стандартных.

Схема работы

UDAF оперируют следующими абстракциями:

  • Item — агрегируемое значение;
  • State — состояние агрегации;
  • Result — результат агрегации;
  • Serialized (опционально) — сериализованное представление State.
  • Parent (опционально) — узел, который необходимо использовать как родительский, если обработчик написан на lambda. Это необходимо только в тех случаях, когда состояние содержит мутабельные ресурсы.

В каждом конкретном случае за каждой из этих абстракций должен стоять произвольный тип данных.
Однако тип, соответствующий каждой абстракции, должен совпадать у разных функций. Если у одной из функций в роли Item используется String, а у другой — Int64, то возникнет ошибка несоответствия типов.

Порядок вызова UDAF в высокоуровневых терминах:

  1. Для выполнения на каждой из стадий агрегации, пользователь предоставляет от 3 до 7 простых функций (точнее, Callable, вызываемых значений), оперирующих перечисленными выше абстракциями. Основные способы их получить — с помощью lambda или Python UDF. Основные варианты:
  • 3 — минимальный;
  • 4 — если State не является результатом;
  • 6 — если State несериализуем;
  • 7 — если нужно переопределить значение по умолчанию с NULL.
  1. Пользователь вызывает специальную агрегатную функцию UDAF, куда первым аргументом передает колонку или выражение для агрегации, а в остальных — вызываемые значения. Порядок указания вызываемых значений описан в таблице ниже.
  2. Профит.

Список вызываемых значений, обязательны только выделенные жирным:

Порядковый номер Название Описание Сигнатура
1 Create Инициализация State по первому непустому Item. (Item)->State. Для lambda входные аргументы (Item, Parent)
2 Add Добавление очередного Item к State. (State,Item)->State. Для lambda входные аргументы (State, Item, Parent)
3 Merge Объединение двух State. (State,State)->State
4 Get Result Получение результата агрегации из итогового State.
Если не указано — итоговый State используется в роли результата.
(State)->Result
5 Serialize Сериализация State, если он сам по себе не является сериализуемым.
Любая комбинация простых типов и контейнеров сериализуема, так что актуально только при использовании Resource в качестве State.
(State)->Serialized
6 Deserialize Восстановление State после сериализации. (Serialized)->State
7 Default Value Результат в ситуации, когда агрегатная функция совсем не вызывалась (оказалось ноль строк на входе). По умолчанию NULL. ()->Result или (TypeOf(Result))->Result

Передаются последовательно как отдельные аргументы вслед за агрегируемым значением. Всего у агрегатной функции UDAF может быть от 4 до 8 аргументов.

Если одинаковым образом с помощью UDAF агрегируется несколько значений в запросе, то, чтобы не перечислять каждый раз вызываемые значения, имеет смысл использовать фабрику агрегационной функции. При этом вместо колонки с данными передается "UDAF", а сам вызов выполняется с помощью функции AGGREGATE_BY.

Примеры

-- Эмуляция агрегационной функции COUNT
$create = ($item, $parent) -> { return 1ul };
$add = ($state, $item, $parent) -> { return 1ul + $state };
$merge = ($state1, $state2) -> { return $state1 + $state2 };
$get_result = ($state) -> { return $state };
$serialize = ($state) -> { return $state };
$deserialize = ($state) -> { return $state };
$default = 0ul;

$udaf_factory = AggregationFactory(
    "UDAF",
    $create,
    $add,
    $merge,
    $get_result,
    $serialize,
    $deserialize,
    $default
);

SELECT
    AGGREGATE_BY(value1, $udaf_factory) AS cnt1,
    AGGREGATE_BY(value2, $udaf_factory) AS cnt2 -- посчитать количество значений
                                                -- в колонках value1 и value2, не равных NULL.
FROM my_table;