REDUCE
Выполняет группировку входа по указанным ключевым столбцам, затем передает текущие ключи и ленивый итератор соответствующих им значений из остальных колонок на обработку в указанную UDF. По аналогии с PROCESS
UDF может вернуть произвольное количество строк результата на каждый вызов, а также возвращать альтернативы (Variant
) для создания множества выходов. В терминах MapReduce очень близок к Reduce.
Указываемые следом ключевые слова:
PRESORT
(опционально) — указание порядка внутри каждой группы, синтаксис аналогичен ORDER BY;ON
(обязательно) — указание ключевых столбцов;USING
илиUSING ALL
(обязательно) — вызов UDF, о правилах подробнее ниже.
Правила передачи аргументов UDF:
- Если аргументом UDF указан
TableRows()
, то UDF обязана принимать один аргумент — ленивый итератор по строкам, с точки зрения типов —Stream<Struct<...>>
. В этом случае выходным типом функции может быть толькоStream<OutputType>
илиList<OutputType>
. Гарантируется, что данные во входном итераторе сгруппированы по ключу и, при необходимости, отсортированы согласноPRESORT
секции. СTableRows()
можно использовать толькоUSING ALL
. - При использовании
USING
:- UDF обязана принимать два аргумента: в первый передается текущий ключ, а во второй — ленивый итератор с соответствующими этому ключу значениями.
- При использовании
USING ALL
:- UDF обязана принимать один аргумент — ленивый итератор кортежей (
Tuple
), где первый элемент кортежа это ключ, а второй — ленивый итератор с соответствующими этому ключу значениями.
- UDF обязана принимать один аргумент — ленивый итератор кортежей (
- Ключ для передачи в UDF формируется по следующему правилу. Если ключевая колонка одна, то её значение формирует ключ как есть, если несколько (список колонок указывается по аналогии с
GROUP BY
через запятую), то ключ будет формироваться как кортеж (Tuple
) со значениями из перечисленных колонок в указанном порядке. - При вызове
REDUCE
в запросе в круглых скобках после имени UDF указываются только выражение, значения которого будут переданы в качестве элементов итератора (второй аргумент UDF дляUSING
или второй элемент кортежа дляUSING ALL
).
Результат формируется аналогичным PROCESS образом. Также доступно ключевое слово TableRow()
для получения строки целиком в виде структуры.
В REDUCE
можно передать несколько входов (под входом здесь подразумевается таблица, диапазон таблиц, подзапрос, именованное выражение), разделенных запятой. Все входы обязаны иметь указанные в ON
ключевые колонки совпадающего типа. В функцию из USING
в этом случае можно передать только специальное именованное выражение TableRow()
. Второй аргумент (или второй элемент кортежа для USING ALL
) будет содержать ленивый итератор альтернатив с заполненным элементом, соответствущим номеру входа текущей записи.
После USING
в REDUCE
можно опционально указать ASSUME ORDER BY
со списком столбцов. Результат такого REDUCE
будет считаться сортированным, но без выполнения фактической сортировки. Проверка сортированности осуществляется на этапе исполнения запроса. Поддерживается задание порядка сортировки с помощью ключевых слов ASC
(по возрастанию) и DESC
(по убыванию). Выражения в ASSUME ORDER BY
не поддерживается.
Примеры:
REDUCE my_table
ON key, subkey
USING MyUdf::MyReducer(TableRow());
REDUCE my_table
ON key, subkey
USING ALL MyUdf::MyStreamReducer(TableRow()); -- MyUdf::MyStreamReducer принимает на вход ленивый список кортежей (ключ, список записей для ключа)
REDUCE my_table
PRESORT LENGTH(subkey) DESC
ON key
USING MyUdf::MyReducer(
AsTuple(subkey, value)
);
REDUCE my_table
ON key
USING ALL MyUdf::MyFlatStreamReducer(TableRows()); -- MyUdf::MyFlatStreamReducer принимает на вход единый ленивый список записей
-- Функция возвращает альтернативы
$udf = Python::MyReducer(Callable<(String, Stream<Struct<...>>) -> Variant<Struct<...>, Struct<...>>>,
$udfScript
);
-- На выходе из REDUCE получаем кортеж списков
$i, $j = (REDUCE my_table ON key USING $udf(TableRow()));
SELECT * FROM $i;
SELECT * FROM $j;
$script = @@
def MyReducer(key, values):
state = None, 0
for name, last_visit_time in values:
if state[1] < last_visit_time:
state = name, last_visit_time
return {
'region':key,
'last_visitor':state[0],
}
@@;
$udf = Python::MyReducer(Callable<(
Int64?,
Stream<Tuple<String?, Uint64?>>
) -> Struct<
region:Int64?,
last_visitor:String?
>>,
$script
);
REDUCE hahn.`home/yql/tutorial/users`
ON region USING $udf((name, last_visit_time));
-- Функция принимает на вход ключ и итератор альтернатив
$udf = Python::MyReducer(Callable<(String, Stream<Variant<Struct<...>,Struct<...>>>) -> Struct<...>>,
$udfScript
);
REDUCE my_table1, my_table2 ON key USING $udf(TableRow());