PROCESS
Преобразовать входную таблицу с помощью UDF на C++, Python или JavaScript или лямбда функции, которая применяется последовательно к каждой строке входа и имеет возможность для каждой строки входа создать ноль, одну или несколько строк результата (аналог Map в терминах MapReduce).
Таблица по имени ищется в базе данных, заданной оператором USE.
В параметрах вызова функции после ключевого слова USING
явно указывается, значения из каких колонок и в каком порядке передавать для каждой строки входа.
Допустимы функции, которые возвращают результат одного из трех составных типов от OutputType
(возможные варианты OutputType
описаны ниже):
OutputType
— на каждую строку входа всегда должна быть строка выхода, схема которой определяется типом структуры.OutputType?
— функции оставляет за собой право пропускать строки, возвращая пустые значения (TUnboxedValue()
в C++,None
в Python илиnull
в JavaScript).Stream<OutputType>
илиList<OutputType>
— возможность вернуть несколько строк.
Вне зависимости от того, какой вариант из перечисленных выше трех выбран, результат преобразовывается к плоской таблице с колонками, определяемыми типом OutputType
.
В качестве OutputType
может выступать один из типов:
Struct<...>
— уPROCESS
будет ровно один выход с записями заданной структуры, представляющий собой плоскую таблицу с колонками соответствующими полямStruct<...>
Variant<Struct<...>,...>
— уPROCESS
число выходов будет равно числу альтернатив вVariant
. Записи каждого выхода представлены плоской таблицей с колонками по полям из соответствующей альтернативы. Ко множеству выходовPROCESS
в этом случае можно обратиться как к кортежу (Tuple
) списков, который можно распаковать в отдельные именованные выражения и использовать независимо.
В списке аргументов функции после ключевого слова USING
можно передать одно из двух специальных именованных выражений:
TableRow()
— текущая строка целиком в виде структуры;TableRows()
— ленивый итератор по строкам, с точки зрения типов —Stream<Struct<...>>
. В этом случае выходным типом функции может быть толькоStream<OutputType>
илиList<OutputType>
.
Примечание
После выполнения PROCESS
в рамках того же запроса по результирующей таблице (или таблицам) можно выполнить SELECT, REDUCE, INSERT INTO, ещё один PROCESS
и так далее в зависимости от необходимого результата.
Ключевое слово USING
и указание функции необязательны: если они не указаны, то возвращается исходная таблица. Это может быть удобно для применения шаблона подзапроса.
В PROCESS
можно передать несколько входов (под входом здесь подразумевается таблица, диапазон таблиц, подзапрос, именованное выражение), разделенных запятой. В функцию из USING
в этом случае можно передать только специальные именованные выражения TableRow()
или TableRows()
, которые будут иметь следующий тип:
TableRow()
— альтернатива (Variant
), где каждый элемент имеет тип структуры записи из соответствущего входа. Для каждой входной строки в альтернативе заполнен элемент, соответствущий номеру входа этой строкиTableRows()
— ленивый итератор по альтернативам, с точки зрения типов —Stream<Variant<...>>
. Альтернатива имеет такую же семантику, что и дляTableRow()
После USING
в PROCESS
можно опционально указать ASSUME ORDER BY
со списком столбцов. Результат такого PROCESS
будет считаться сортированным, но без выполнения фактической сортировки. Проверка сортированности осуществляется на этапе исполнения запроса. Поддерживается задание порядка сортировки с помощью ключевых слов ASC
(по возрастанию) и DESC
(по убыванию). Выражения в ASSUME ORDER BY
не поддерживается.
Примеры:
PROCESS my_table
USING MyUdf::MyProcessor(value)
$udfScript = @@
def MyFunc(my_list):
return [(int(x.key) % 2, x) for x in my_list]
@@;
-- Функция возвращает итератор альтернатив
$udf = Python3::MyFunc(Callable<(Stream<Struct<...>>) -> Stream<Variant<Struct<...>, Struct<...>>>>,
$udfScript
);
-- На выходе из PROCESS получаем кортеж списков
$i, $j = (PROCESS my_table USING $udf(TableRows()));
SELECT * FROM $i;
SELECT * FROM $j;
$udfScript = @@
def MyFunc(stream):
for r in stream:
yield {"alt": r[0], "key": r[1].key}
@@;
-- Функция принимает на вход итератор альтернатив
$udf = Python::MyFunc(Callable<(Stream<Variant<Struct<...>, Struct<...>>>) -> Stream<Struct<...>>>,
$udfScript
);
PROCESS my_table1, my_table2 USING $udf(TableRows());