PROCESS

Преобразовать входную таблицу с помощью UDF на C++, Python или JavaScript или лямбда функции, которая применяется последовательно к каждой строке входа и имеет возможность для каждой строки входа создать ноль, одну или несколько строк результата (аналог Map в терминах MapReduce).

Таблица по имени ищется в базе данных, заданной оператором USE.

В параметрах вызова функции после ключевого слова USING явно указывается, значения из каких колонок и в каком порядке передавать для каждой строки входа.

Допустимы функции, которые возвращают результат одного из трех составных типов от OutputType (возможные варианты OutputType описаны ниже):

  • OutputType — на каждую строку входа всегда должна быть строка выхода, схема которой определяется типом структуры.
  • OutputType? — функции оставляет за собой право пропускать строки, возвращая пустые значения (TUnboxedValue() в C++, None в Python или null в JavaScript).
  • Stream<OutputType> или List<OutputType> — возможность вернуть несколько строк.

Вне зависимости от того, какой вариант из перечисленных выше трех выбран, результат преобразовывается к плоской таблице с колонками, определяемыми типом OutputType.

В качестве OutputType может выступать один из типов:

  • Struct<...> — у PROCESS будет ровно один выход с записями заданной структуры, представляющий собой плоскую таблицу с колонками соответствующими полям Struct<...>
  • Variant<Struct<...>,...> — у PROCESS число выходов будет равно числу альтернатив в Variant. Записи каждого выхода представлены плоской таблицей с колонками по полям из соответствующей альтернативы. Ко множеству выходов PROCESS в этом случае можно обратиться как к кортежу (Tuple) списков, который можно распаковать в отдельные именованные выражения и использовать независимо.

В списке аргументов функции после ключевого слова USING можно передать одно из двух специальных именованных выражений:

  • TableRow() — текущая строка целиком в виде структуры;
  • TableRows() — ленивый итератор по строкам, с точки зрения типов — Stream<Struct<...>>. В этом случае выходным типом функции может быть только Stream<OutputType> или List<OutputType>.

Примечание

После выполнения PROCESS в рамках того же запроса по результирующей таблице (или таблицам) можно выполнить SELECT, REDUCE, INSERT INTO, ещё один PROCESS и так далее в зависимости от необходимого результата.

Ключевое слово USING и указание функции необязательны: если они не указаны, то возвращается исходная таблица. Это может быть удобно для применения шаблона подзапроса.

В PROCESS можно передать несколько входов (под входом здесь подразумевается таблица, диапазон таблиц, подзапрос, именованное выражение), разделенных запятой. В функцию из USING в этом случае можно передать только специальные именованные выражения TableRow() или TableRows(), которые будут иметь следующий тип:

  • TableRow() — альтернатива (Variant), где каждый элемент имеет тип структуры записи из соответствущего входа. Для каждой входной строки в альтернативе заполнен элемент, соответствущий номеру входа этой строки
  • TableRows() — ленивый итератор по альтернативам, с точки зрения типов — Stream<Variant<...>>. Альтернатива имеет такую же семантику, что и для TableRow()

После USING в PROCESS можно опционально указать ASSUME ORDER BY со списком столбцов. Результат такого PROCESS будет считаться сортированным, но без выполнения фактической сортировки. Проверка сортированности осуществляется на этапе исполнения запроса. Поддерживается задание порядка сортировки с помощью ключевых слов ASC (по возрастанию) и DESC (по убыванию). Выражения в ASSUME ORDER BY не поддерживается.

Примеры:

PROCESS my_table
USING MyUdf::MyProcessor(value)
$udfScript = @@
def MyFunc(my_list):
    return [(int(x.key) % 2, x) for x in my_list]
@@;

-- Функция возвращает итератор альтернатив
$udf = Python3::MyFunc(Callable<(Stream<Struct<...>>) -> Stream<Variant<Struct<...>, Struct<...>>>>,
    $udfScript
);

-- На выходе из PROCESS получаем кортеж списков
$i, $j = (PROCESS my_table USING $udf(TableRows()));

SELECT * FROM $i;
SELECT * FROM $j;
$udfScript = @@
def MyFunc(stream):
    for r in stream:
        yield {"alt": r[0], "key": r[1].key}
@@;

-- Функция принимает на вход итератор альтернатив
$udf = Python::MyFunc(Callable<(Stream<Variant<Struct<...>, Struct<...>>>) -> Stream<Struct<...>>>,
    $udfScript
);

PROCESS my_table1, my_table2 USING $udf(TableRows());
Предыдущая
Следующая