Streaming UDF
Функции для запуска произвольного скрипта или исполняемого файла в потоковом режиме:
Streaming::Process(List<Struct<Data:String>>, String, List<String>) -> List<Struct<Data:String>>
Streaming::ProcessInline(List<Struct<Data:String>>, String, List<String>) -> List<Struct<Data:String>>
Описание аргументов:
- Входные данные (
List<Struct<Data:String>>
):- Дочерний процесс запускается один раз в начале обработки списка (или его части, см. следующий пункт) и каждый элемент подается ему на stdin на отдельной строке.
- Если входной список физически является большой таблицей, то будет запущено много одинаковых процессов для параллельной обработки её частей. Обработка всех данных таблицы в одном процессе и порядок обработки строк не гарантируются.
- Функции
Process
иProcessInline
ожидают, что каждая строка в списке входных данных обернута в структуру. Это сделано, чтобы упростить связку сSELECT
— именно такой тип получается у результатаSELECT ... AS Data FROM ...
, а также чтобы не провоцировать пользователя на крайне неэффективный сценарий использования, предполагающий запуск отдельного процесса на каждой строке исходной MapReduce-таблицы. - Имя
Data
для структуры элемента списка на момент написания фиксировано.
- Второй аргумент (
String
):Streaming::Process
— имя программы из$PATH
(нужно убедиться, что она установлена на целевом кластере), либо путь к исполняемому файлу, который можно приложить к запросу, а путь получить через встроенную функциюFilePath("file alias")
.Streaming::ProcessInline
— строка со скриптом, который будет запущен через интерпретатор, указанный в shebang (то есть в первой строке скрипта должно быть написано, например,#!/usr/bin/env bash
). Базовые интерпретаторы вроде sh, bash, awk, perl или python, как правило, уже установлены, а что-то экзотическое можно также приложить к запросу. Скрипт можно задать как inline с использованием многострочного строкового литерала (@@
), либо в отдельном файле (вкладке в веб-интерфейсе), во втором случае получить содержимое файла в виде строки можно с помощью встроенной функцииFileContent("file alias")
.
- Список аргументов, которые будут переданы скрипту или исполняемому файлу при запуске (
List<String>
, необязательный аргумент):- Чтобы сформировать список аргументов из литералов, удобно воспользоваться встроенной функцией
AsList
(принимает 1 или более аргументов и формирует из них список). - Если передавать аргументы не нужно — можно не указывать этот аргумент.
- Чтобы сформировать список аргументов из литералов, удобно воспользоваться встроенной функцией
Если вас интересует какой-либо формат взаимодействия с дочерним процессом, отличный от описанного выше построчного, расскажите нам о своём сценарии использования в коммьюнити-чате.
Особенности реализации:
- При использовании
Streaming::ProcessInline
, как правило, задействуется интерпретатор, установленный на целевом кластере, что в случае различия версий может приводить к неожиданным спецэффектам. Для избежания этого можно или прикладывать свой интерпретатор к запросу (что не всегда тривиально), или договариваться с эксплуатацией кластера об установке нужного интерпретатора или его версии (лучше напрямую, но можно и через нас). - Поток стандартного вывода ошибок (stderr) можно использовать для диагностики проблемных ситуаций: его ограниченная по объему часть сохраняется и возвращается на клиент в случае ошибки времени исполнения (runtime error) при выполнении запроса. Размер сохраняемого фрагмента stderr настраивается на стороне кластера и обычно составляет несколько килобайт.
Примеры
$input = [
<| "Data":"a" |>,
<| "Data":"b" |>,
<| "Data":"c" |>,
];
PROCESS AS_TABLE($input) USING Streaming::Process(TableRows(), "cat", ["-n"]);
/*
[(Data: ' 1 a'),
(Data: ' 2 b'),
(Data: ' 3 c')]
*/
$ips = (SELECT COALESCE(ip, "") AS Data FROM <cluster-name>.`home/yql/tutorial/users`);
$script = @@#!/usr/bin/awk -f
BEGIN { FS="."; }
$2 ~ /170/ { print $0" has 170 in second octet!"; }
@@;
PROCESS $ips
USING Streaming::ProcessInline(
TableRows(),
$script
);
/*
"93.170.111.29 has 170 in second octet!"
"93.170.111.28 has 170 in second octet!"
*/