Streaming UDF

Функции для запуска произвольного скрипта или исполняемого файла в потоковом режиме:

Streaming::Process(List<Struct<Data:String>>, String, List<String>) -> List<Struct<Data:String>>
Streaming::ProcessInline(List<Struct<Data:String>>, String, List<String>) -> List<Struct<Data:String>>

Описание аргументов:

  1. Входные данные (List<Struct<Data:String>>):
    • Дочерний процесс запускается один раз в начале обработки списка (или его части, см. следующий пункт) и каждый элемент подается ему на stdin на отдельной строке.
    • Если входной список физически является большой таблицей, то будет запущено много одинаковых процессов для параллельной обработки её частей. Обработка всех данных таблицы в одном процессе и порядок обработки строк не гарантируются.
    • Функции Process и ProcessInline ожидают, что каждая строка в списке входных данных обернута в структуру. Это сделано, чтобы упростить связку с SELECT — именно такой тип получается у результата SELECT ... AS Data FROM ..., а также чтобы не провоцировать пользователя на крайне неэффективный сценарий использования, предполагающий запуск отдельного процесса на каждой строке исходной MapReduce-таблицы.
    • Имя Data для структуры элемента списка на момент написания фиксировано.
  2. Второй аргумент (String):
    • Streaming::Process — имя программы из $PATH (нужно убедиться, что она установлена на целевом кластере), либо путь к исполняемому файлу, который можно приложить к запросу, а путь получить через встроенную функцию FilePath("file alias").
    • Streaming::ProcessInline — строка со скриптом, который будет запущен через интерпретатор, указанный в shebang (то есть в первой строке скрипта должно быть написано, например, #!/usr/bin/env bash). Базовые интерпретаторы вроде sh, bash, awk, perl или python, как правило, уже установлены, а что-то экзотическое можно также приложить к запросу. Скрипт можно задать как inline с использованием многострочного строкового литерала (@@), либо в отдельном файле (вкладке в веб-интерфейсе), во втором случае получить содержимое файла в виде строки можно с помощью встроенной функции FileContent("file alias").
  3. Список аргументов, которые будут переданы скрипту или исполняемому файлу при запуске (List<String>, необязательный аргумент):
    • Чтобы сформировать список аргументов из литералов, удобно воспользоваться встроенной функцией AsList (принимает 1 или более аргументов и формирует из них список).
    • Если передавать аргументы не нужно — можно не указывать этот аргумент.

Если вас интересует какой-либо формат взаимодействия с дочерним процессом, отличный от описанного выше построчного, расскажите нам о своём сценарии использования в коммьюнити-чате.

Особенности реализации:

  • При использовании Streaming::ProcessInline, как правило, задействуется интерпретатор, установленный на целевом кластере, что в случае различия версий может приводить к неожиданным спецэффектам. Для избежания этого можно или прикладывать свой интерпретатор к запросу (что не всегда тривиально), или договариваться с эксплуатацией кластера об установке нужного интерпретатора или его версии (лучше напрямую, но можно и через нас).
  • Поток стандартного вывода ошибок (stderr) можно использовать для диагностики проблемных ситуаций: его ограниченная по объему часть сохраняется и возвращается на клиент в случае ошибки времени исполнения (runtime error) при выполнении запроса. Размер сохраняемого фрагмента stderr настраивается на стороне кластера и обычно составляет несколько килобайт.

Примеры

$input = [
  <| "Data":"a" |>,
  <| "Data":"b" |>,
  <| "Data":"c" |>,
];

PROCESS AS_TABLE($input) USING Streaming::Process(TableRows(), "cat", ["-n"]);

/*
[(Data: ' 1 a'),
(Data: ' 2 b'),
(Data: ' 3 c')]
*/
$ips = (SELECT COALESCE(ip, "") AS Data FROM <cluster-name>.`home/yql/tutorial/users`);

$script = @@#!/usr/bin/awk -f
BEGIN { FS="."; }
$2 ~ /170/ { print $0" has 170 in second octet!"; }
@@;

PROCESS $ips
USING Streaming::ProcessInline(
  TableRows(),
  $script
);
/*
"93.170.111.29 has 170 in second octet!"
"93.170.111.28 has 170 in second octet!"
*/
Предыдущая
Следующая