Пользовательские агрегатные функции в YQL
Введение
Если по каким-то причинам готовых агрегатных функций оказалось недостаточно, в YQL есть возможность описать свою. Механизм называется UDAF
(User Defined Aggregation Functions) и работает полностью аналогично стандартным агрегатным функциям.
Из особенностей/преимуществ по сравнению с работой на уровне Map/Reduce (PROCESS + REDUCE, либо работа с YTsaurus напрямую без YQL):
- Возможность легко применить одну и ту же логику к нескольким полям/выражениям за один проход вычислений по данным (т. е. считаем разные не связанные друг с другом агрегаты, читая данные только один раз), а также автоматический пропуск пустых значений.
- Поддержка
DISTINCT
, то есть вызова функции только по уникальным значениям. - Автоматическое создание map-side combiner, что во многих случаях позволяет сильно сэкономить на времени выполнения запроса, при этом не занимаясь реализацией map-side combiner самостоятельно.
Примечание
Если вы сделали или собираетесь сделать агрегатную функцию, которая, на ваш взгляд, могла бы быть полезна широкому кругу пользователей — напишите нам об этом в телеграм-чате. Возможно, мы добавим её в список стандартных.
Схема работы
UDAF оперируют следующими абстракциями:
Item
— агрегируемое значение;State
— состояние агрегации;Result
— результат агрегации;Serialized
(опционально) — сериализованное представление State.Parent
(опционально) — узел, который необходимо использовать как родительский, если обработчик написан на lambda. Это необходимо только в тех случаях, когда состояние содержит мутабельные ресурсы.
В каждом конкретном случае за каждой из этих абстракций должен стоять произвольный тип данных.
Однако тип, соответствующий каждой абстракции, должен совпадать у разных функций. Если у одной из функций в роли Item
используется String
, а у другой — Int64
, то возникнет ошибка несоответствия типов.
Порядок вызова UDAF в высокоуровневых терминах:
- Для выполнения на каждой из стадий агрегации, пользователь предоставляет от 3 до 7 простых функций (точнее, Callable, вызываемых значений), оперирующих перечисленными выше абстракциями. Основные способы их получить — с помощью lambda или Python UDF. Основные варианты:
- 3 — минимальный;
- 4 — если State не является результатом;
- 6 — если State несериализуем;
- 7 — если нужно переопределить значение по умолчанию с
NULL
.
- Пользователь вызывает специальную агрегатную функцию
UDAF
, куда первым аргументом передает колонку или выражение для агрегации, а в остальных — вызываемые значения. Порядок указания вызываемых значений описан в таблице ниже. - Профит.
Список вызываемых значений, обязательны только выделенные жирным:
Порядковый номер | Название | Описание | Сигнатура |
---|---|---|---|
1 | Create | Инициализация State по первому непустому Item. | (Item)->State . Для lambda входные аргументы (Item, Parent) |
2 | Add | Добавление очередного Item к State. | (State,Item)->State . Для lambda входные аргументы (State, Item, Parent) |
3 | Merge | Объединение двух State. | (State,State)->State |
4 | Get Result | Получение результата агрегации из итогового State. Если не указано — итоговый State используется в роли результата. |
(State)->Result |
5 | Serialize | Сериализация State, если он сам по себе не является сериализуемым. Любая комбинация простых типов и контейнеров сериализуема, так что актуально только при использовании Resource в качестве State. |
(State)->Serialized |
6 | Deserialize | Восстановление State после сериализации. | (Serialized)->State |
7 | Default Value | Результат в ситуации, когда агрегатная функция совсем не вызывалась (оказалось ноль строк на входе). По умолчанию NULL . |
()->Result или (TypeOf(Result))->Result |
Передаются последовательно как отдельные аргументы вслед за агрегируемым значением. Всего у агрегатной функции UDAF
может быть от 4 до 8 аргументов.
Если одинаковым образом с помощью UDAF
агрегируется несколько значений в запросе, то, чтобы не перечислять каждый раз вызываемые значения, имеет смысл использовать фабрику агрегационной функции. При этом вместо колонки с данными передается "UDAF", а сам вызов выполняется с помощью функции AGGREGATE_BY.
Примеры
-- Эмуляция агрегационной функции COUNT
$create = ($item, $parent) -> { return 1ul };
$add = ($state, $item, $parent) -> { return 1ul + $state };
$merge = ($state1, $state2) -> { return $state1 + $state2 };
$get_result = ($state) -> { return $state };
$serialize = ($state) -> { return $state };
$deserialize = ($state) -> { return $state };
$default = 0ul;
$udaf_factory = AggregationFactory(
"UDAF",
$create,
$add,
$merge,
$get_result,
$serialize,
$deserialize,
$default
);
SELECT
AGGREGATE_BY(value1, $udaf_factory) AS cnt1,
AGGREGATE_BY(value2, $udaf_factory) AS cnt2 -- посчитать количество значений
-- в колонках value1 и value2, не равных NULL.
FROM my_table;