Практическое применение метапрограммирования на YQL
Введение
Из-за отсутствия в YTsaurus нативной поддержки партиционированных таблиц, типовой ситуацией является хранение логически единой таблицы в виде множества физических таблиц, разбитых по датам, часам или остатку от деления хеша какого-либо из полей. Таким образом, в YQL over YTsaurus запросы часто работают с огромным количеством таблиц (десятки, сотни и тысячи).
По меркам стандарта SQL это исключительная ситуация и общепринятых механизмов для описания таких запросов им не предоставляется. YQL обладает многими расширениями стандарта, спроектированных специально для подобных специфичных задач. Также для регулярно или многократно запускаемых запросов в большинстве случаев они позволяют обойтись совсем без генерации текстов запросов на клиенте. Каждый механизм в отдельности подробно описан в основном руководстве по синтаксису YQL, а цель данной статьи — продемонстрировать, как их можно комбинировать для достижения практически полезных результатов на приближенных к реальности примерах.
Сценарии
Выбор таблиц по датам
Архив подавляющего большинства логов хранится на YTsaurus в плоской директории, где каждой дате соответствует отдельная таблица. Чтобы выполнить расчёт по интересующим датам и при этом не подставлять их в текст запроса вручную, можно воспользоваться табличной функцией FILTER
в сочетании с выбирающей нужные даты lambda функцией.
USE <cluster-name>;
$log_type = "yql-log";
$base_folder = "logs/" || $log_type || "/1d";
$today = CurrentUtcDate();
$yesterday = $today - Interval("P1D");
$a_week_ago = $today - Interval("P1W");
$is_during_last_week = ($table_name) -> {
$date = CAST($table_name AS Date);
RETURN ($date BETWEEN $a_week_ago AND $yesterday) ?? false;
};
SELECT COUNT(*)
FROM FILTER($base_folder, $is_during_last_week);
Ссылки на документацию:
Обработка таблиц скользящим окном с кешированием
Допустим, вам необходимо ежедневно считать какой-то агрегат по данным за последнюю неделю примерно как в предыдущем примере. Так как по своей природе большинство таблиц с логами перестают меняться уже начиная с ночи следующего дня, то условно 6/7-х вычислительных ресурсов каждый день тратилось на повторное вычисление того, что уже считалось в предыдущий раз. В YQL есть механизм автоматического кеширования промежуточных результатов вычислений, но так как он оперирует на уровне MapReduce операций целиком, то в предыдущем запросе cache hit был бы нулевой: каждый день получался бы свой набор входных таблиц в первую Map операцию, а он участвует в вычислении ключа в кеше.
Чтобы дать механизму автоматического кеширования шанс на успех (или даже гарантировать его, если переопределить PRAGMA yt.TmpFolder в свою директорию/квоту вместо //tmp
), необходимо считать агрегат по каждому дню по отдельности, а затем объединять их в одно значения для получения требуемого результата, благо для большинства агрегатных функций это возможно.
USE <cluster-name>;
$log_type = "visit-log";
$base_folder = "logs/" || $log_type || "/1d/";
$today = CurrentUtcDate();
$range = ListFromRange(1, 9);
$shift_back_by_days = ($days_count) -> {
RETURN Unwrap(
$today - $days_count * Interval("P1D"),
"Failed to shift today back by " || CAST($days_count AS String) || " days"
);
};
$dates = ListMap($range, $shift_back_by_days);
DEFINE ACTION $max_income($date) AS
$table_path = $base_folder || CAST($date AS String);
INSERT INTO @daily_results
SELECT MAX(CAST(Income AS Double)) AS MaxIncome
FROM $table_path;
END DEFINE;
EVALUATE FOR $date IN $dates DO $max_income($date);
COMMIT;
SELECT MAX(MaxIncome) AS MaxIncome
FROM @daily_results;
DISCARD SELECT
Ensure(
NULL,
COUNT(*) == 7,
"Unexpected number of aggregates to be merged: " || CAST(COUNT(*) AS String) || " instead of 7"
)
FROM @daily_results;
В данном примере, в отличие от предыдущего пути к таблицам не выбираются из содержимого директории, а генерируются «из воздуха», основываясь на принятой для данного лога схеме именования таблиц. Объявляется «действие» для обработки каждой таблицы в отдельности (DEFINE ACTION
), которое затем в цикле применяется к каждой дате, которая должна попасть в расчет (EVALUATE FOR
).
Также в данном запросе демонстрируется несколько способов «уронить» запрос с содержательной ошибкой вместо выдачи некорректного результата, если вдруг начало получаться что-то странное:
- Арифметика над датами может вернуть NULL в практически невозможных для данного сценария ситуациях выхода за диапазон поддерживаемых значений, что делает достаточно безопасным использование Unwrap — превращения Optional значения в не-Optional (точно не NULL), с ошибкой времени выполнения если в Optional таки лежал NULL.
- Ensure позволяет проверить выполнение произвольных условий во время выполнения запроса и вернуть ошибку, если условие не выполнено. Его можно вставлять и прямо по ходу вычисления, но в сочетании с
DISCARD
(посчитать что-то и выкинуть результат) можно и сделать отдельную проверку сбоку, например по агрегату от временной или итоговой таблицы как здесь. В варианте сDISCARD
первый аргумент Ensure, который обычно становится результатом его вызова, особого смысла не имеет и туда можно писать любую константу, например NULL.
Ссылки на документацию:
- Lambda функции
- CurrentUtcDate
- ListFromRange / ListMap / ListLength
- Unwrap
- DEFINE ACTION
- EVALUATE FOR
- @foo
- COMMIT
- DISCARD
- Ensure
Сценарий с обработкой заданий
Допустим у вас есть некий внешний процесс, записывающий в определённую директорию таблицы, которые по сути являются заданиями для обработки: необходимо запускать на основе содержимого какой такой таблицы вычисление, записывать результат в другую директорию, а обработанные таблицы удалять — и всё это транзакционно.
Как можно сэмулировать такой внешний процесс на YQL для теста
USE <cluster-name>;
$root_folder = "//tmp/tasks/";
$values = ListFromRange(1, 10);
DEFINE ACTION $create_task($i) AS
$path = $root_folder || CAST($i AS String);
INSERT INTO $path SELECT $i AS Task;
END DEFINE;
EVALUATE FOR $value IN $values DO $create_task($value);
Собственно запуск обработки:
USE <cluster-name>;
$tasks_folder = "tmp/tasks";
$results_folder = "tmp/results";
DEFINE ACTION $process_task($input_path) AS
$output_path = String::ReplaceAll($input_path, $tasks_folder, $results_folder);
INSERT INTO $output_path WITH TRUNCATE
SELECT Task * Task AS Result
FROM $input_path;
COMMIT;
DROP TABLE $input_path;
END DEFINE;
$tasks = (
SELECT AGGREGATE_LIST(Path)
FROM FOLDER($tasks_folder, "row_count")
WHERE Type == "table" AND
Yson::LookupInt64(Attributes, "row_count") == 1
);
EVALUATE FOR $task IN $tasks ?? ListCreate(TypeOf($tasks)) DO $process_task($task);
Здесь также используется механизм «действий» с циклом по списку, но при этом список входных таблиц берётся не «из воздуха», а из табличной функции FOLDER
, которая предоставляет доступ к содержимому произвольной директории в YTsaurus. Так как FOLDER
выдаёт не только таблицы, но и другие виды узлов, то если нет уверенности, что в директории лежат только таблицы, то лучше добавлять фильтр по Type или по произвольным мета атрибутам таблицы (здесь row_count для примера). Для этого надо заказать их во втором аргументе FOLDER
(если несколько, то через точку с запятой) и затем обратиться к колонке Attributes типа Yson.
ListExtract делает из списка структур с одним элементом список сразу строк с путями, а можно было бы доставать Path и уже внутри действия.
Ссылки на документацию: