Custom aggregate functions in YQL
Introduction
If the built-in aggregate functions do not meet your needs for whatever reason, you can define your own function in YQL. This mechanism is referred to as UDAF
(User Defined Aggregation Functions) and works the same as standard aggregate functions.
Here are the peculiarities/benefits of this approach compared to Map/Reduce (PROCESS + REDUCE or working with YTsaurus directly without YQL):
- You can easily apply the same logic to several fields or expressions within a single computing run (that is, we calculate different non-related aggregates, reading the data only once). Moreover, you can automatically skip Null values.
- Support for
DISTINCT
, that is, calling a function only by unique values. - Automatic creation of a map-side combiner. Often this allows you to substantially save on the query execution time without having to implement a map-side combiner yourself.
Note
If you have built or want to build an aggregate function which, in your opinion, could be useful to many users, write to us in the Telegram chat. We'll consider adding it to standard functions.
How it works
UDAFs operate with the following abstractions:
Item
: Aggregated value.State
: Aggregation state.Result
: Aggregation result.Serialized
(optional): A serialized representation of State.Parent
(optional): A node that should be used as a parent node if the handler is written in lambda. This is needed when the state includes mutable resources.
In each specific case, there should be an arbitrary data type behind each of these abstractions.
However, the type corresponding to each abstraction should be the same for different functions. If one function uses String
as Item
and another function uses Int64
as Item, you will get a type mismatch error.
UDAF calling procedure in high-level terms:
- To perform each aggregation step, the user provides from three to seven simple functions (Callable values, to be precise) that operate with the above abstractions. The main methods to get them is by using lambda or Python UDF. Main options:
- 3: Minimal.
- 4: If State isn't a result.
- 6: If State is non-serializable.
- 7: If you need to override the default
NULL
value.
- The user calls a special aggregate
UDAF
function. To this function, as the first argument, the user passes a column or an aggregated expression. Other arguments include callable values. The order of callable values is described in the table below. - Benefits.
List of callable values. Required values are highlighted in bold.
No. | Name | Description | Signature |
---|---|---|---|
1 | Create | Initializing State by the first non-Null Item. | (Item)->State . Input arguments for lambda (Item, Parent) |
2 | Add | Adding the next Item to State. | (State,Item)->State . Input arguments for lambda (State, Item, Parent) |
3 | Merge | Merging of two States. | (State,State)->State |
4 | Get Result | Getting the aggregation result from the final State. If omitted, the final State is used as a result. |
(State)->Result |
5 | Serialize | Serialization of State, if it isn't serializable by itself. Any combination of basic types and containers is serializable. That's why this case applies only when you use a Resource as a State. |
(State)->Serialized |
6 | Deserialize | Recovering a State after serialization. | (Serialized)->State |
7 | Default Value | The result in the situation when the aggregate function hasn't been called (zero rows were input) The default value is NULL . |
()->Result or (TypeOf(Result))->Result |
They are passed one-by-one as individual arguments after the aggregated value. The aggregate UDAF
function can have between 4 and 8 arguments.
If several values in the query are aggregated in the same manner using UDAF
, then, to avoid enumerating the callable values every time, use the aggregate function factory. In this case, "UDAF" is passed instead of a data column, and the call is executed using AGGREGATE_BY.
Examples
-- Emulate the aggregate COUNT function
$create = ($item, $parent) -> { return 1ul };
$add = ($state, $item, $parent) -> { return 1ul + $state };
$merge = ($state1, $state2) -> { return $state1 + $state2 };
$get_result = ($state) -> { return $state };
$serialize = ($state) -> { return $state };
$deserialize = ($state) -> { return $state };
$default = 0ul;
$udaf_factory = AggregationFactory(
"UDAF",
$create,
$add,
$merge,
$get_result,
$serialize,
$deserialize,
$default
);
SELECT
AGGREGATE_BY(value1, $udaf_factory) AS cnt1,
AGGREGATE_BY(value2, $udaf_factory) AS cnt2 -- Count non-NULL values
-- in the value1 and value2 columns.
FROM my_table;