Python User Defined Functions (UDFs) в YQL
Введение
Не все задачи можно выразить в чисто декларативном стиле, как в SQL. Иногда удобнее описать обработку данных в императивном стиле. Для этого YQL поддерживает создание и использование пользовательских функций (UDF).
Python UDF
Python UDF позволяет описывать функции на языке Python в самом запросе и использовать их.
Использование
Для использования Python UDF в YQL запросе нужно выполнить три шага:
- Включить в запрос строку с Python-скриптом, в котором определена функция для вызова в качестве UDF.
- Объявить в запросе имя функции и её сигнатуру (типы входных и выходных данных).
- Вызвать функцию в теле запроса, передав ей необходимые аргументы.
Каждый из этих шагов подробно описан в следующих разделах.
Написание Python скрипта
Скрипт можно задать тремя способами:
-
Обычный строковый литерал. Например:
$script = "def foo(): return 'bar'"
. -
Многострочный строковый литерал (расширение YQL). Такой литерал ограничивается символами
@@
и работает аналогично тройным кавычкам в Python. Комментарий#py
включает подсветку синтаксиса Python в редакторе YQL:$script = @@#py def foo(): return b'bar' @@
-
Именованный файл, приложенный к запросу. Содержимое файла встраивается в запрос с помощью функции FileContent:
$script = FileContent("foo.py")
.
Объявление имени и сигнатуры функции
Поскольку Python является динамически типизированным языком, а YQL — статически типизированным, необходимо заранее зафиксировать типы на их стыке. Имя и сигнатура вызываемой из Python-скрипта функции должны быть известны до выполнения YQL запроса. Если типы, объявленные в сигнатуре, не будут соответствовать фактическим данным, возвращаемым функцией, это приведет к ошибке во время исполнения.
Синтаксис объявления имени и сигнатуры функции рассмотрим на примере:
$f = SystemPython3_8::foo(Callable<()->String>, $script);
:
$f
и$script
— именованные выражения YQL$f
— готовая для использования функция.$script
— скрипт на Python из предыдущего раздела.
SystemPython3_8::
— фиксированный префикс для объявления функции на Python.System
означает, что Python будет взят из окружения во время исполнения.3_8
указывает на версию Python, которая будет использована.- На момент написания доступны версии
3_8
-3_12
.
- На момент написания доступны версии
foo
— имя функции внутри скрипта$script
, которая будет вызываться.Callable<()->String>
— описание сигнатуры функции. Пустые скобки означают отсутствие аргументов, аString
после стрелки — строковый результат. Пример сигнатуры функции с аргументами:Callable<(String, Uint32)->Double>
.
Использование UDF в запросе
Полученную после объявления сигнатуры функцию можно вызывать так же, как и любую встроенную функцию YQL. Например: $f()
или $udf("test", 123)
.
Типы данных
Доступные типы данных для указания в сигнатуре функций можно помотреть в разделе Типы данных YQL.
Контейнеры
Контейнеры преобразуются в Python объекты по следующим правилам:
Название | Объявление сигнатуры | Пример сигнатуры | Представление в Python |
---|---|---|---|
Список | List<Type> |
List<Int32> |
list-like object (подробнее) |
Словарь | Dict<KeyType,ValueType> |
Dict<String,Int32> |
dict-like object (подробнее) |
Кортеж | Tuple<Type1,...,TypeN> |
Tuple<Int32,Int32> |
tuple |
Структура | Struct<Name1:Type1,...,NameN:TypeN> |
Struct<Name:String,Age:Int32> |
StructSequence |
Поток | Stream<Type> |
Stream<Int32> |
generator |
Вариант над кортежем | Variant<Type1,Type2> |
Variant<Int32,String> |
tuple с индексом и объектом |
Вариант над структурой | Variant<Name1:Type1,Name2:Type2> |
Variant<value:Int32,error:String> |
tuple с именем поля и объектом |
Контейнеры можно вкладывать друг в друга. Пример: List<Tuple<Int32,Int32>>
.
Особенности передаваемых в функции объектов для списков и словарей
При использовании List
и Dict
, в аргументы функции передаются специальные read-only объекты yql.TList
и yql.TDict
.
Их особенности:
- Read-only: в них нельзя что-то поменять не скопировав, так как тем же объектом может пользоваться какая-то другая часть текущего запроса, например соседняя функция.
- Ценой потенциально медленного копирования можно получить из них настоящий объект типа list, проитерировавшись для частичного копирования. Для словарей нужно дополнительно вызвать
iteritems()
. Установив значение атрибута_yql_lazy_input
в False на самой функции можно включить автоматическое копирование списков и словарей в list и dict. Данный механизм работает рекурсивно для вложенных контейнеров. - Для получения количества элементов можно вызвать
len(my_arg)
. - Актуальный набор доступных методов можно узнать с помощью вызова
dir(my_arg)
. Пример.
Методы yql.TList
:
has_fast_len()
— можно ли быстро получить длину. Если вернулось False, то список "ленивый".has_items()
— проверка на пустоту.reversed()
— получить перевернутую копию списка.skip(n)
иtake(n)
— аналоги срезов[n:]
и[:n]
, соответственно.to_index_dict()
- получить словарь с номерами элементов в качестве ключей, что позволяет осуществлять доступ по индексу.
Пример с _yql_lazy_input
:
$u = SystemPython3_8::list_func(Callable<(List<Int32>)->Int32>, @@#py
def list_func(lst):
return lst.count(1)
list_func._yql_lazy_input = False
@@);
SELECT $u(AsList(1,2,3));
Изменение окружения
Взаимодействие с Python из окружения
SystemPython
UDF будет вызываться в отдельной операции в рамках пользовательского скрипта.
Взаимодействие с Python происходит через динамическую библиотеку libpython3.N.so
, линковка с которой происходит во время исполнения. Подмена этой библиотеки в окружении приведёт к изменению используемого Python и доступных библиотек.
Изменение окружения из YQL операции возможно при помощи прагм yt.DockerImage и yt.LayerPaths.
Они повлияют на соответствующие параметры пользовательского скрипта, описание которых можно посмотреть в разделе Настройки операций - Параметры пользовательского скрипта.
Окружение по умолчанию
По умолчанию используется дефолтное окружение для джобов, настраиваемое администратором кластера. Оно может не содержать Python требуемой версии.
Пример использования библиотеки tensorflow с помощью pragma yt.DockerImage
:
pragma yt.DockerImage = "docker.io/tensorflow/tensorflow:2.16.1";
$script = @@#py
import sys
import tensorflow
def foo():
return "Python version: " + sys.version + ".\nTensorflow version: " + tensorflow.__version__
@@;
$udf = SystemPython3_11::foo(Callable<()->String>, $script);
SELECT $udf();
-- Python version: 3.11.0rc1 (main, Aug 12 2022, 10:02:14) [GCC 11.2.0].
-- Tensorflow version: 2.16.1
Особенности
При использовании UDF стандартный вывод (stdout) используется для служебных целей, поэтому пользоваться им нельзя.
Примеры
Hello World
$script = @@#py
def hello(name):
return b'Hello, %s!' % name
@@;
$udf = SystemPython3_8::hello(Callable<(String)->String>, $script);
SELECT $udf("world"); -- "Hello, world!"
Конкатенация строки и числа
$script = @@#py
def concat(a, b):
return a.decode("utf-8") + str(b)
@@;
$concat = SystemPython3_8::concat(Callable<(String?, Int64?)->String>, $script);
SELECT $concat(name, age) FROM `//tmp/sample`;
yql.TList
Получение актуального списка методов $u = SystemPython3_8::get_dir(Callable<(List<Int32>)->List<String>>, @@#py
def get_dir(lst):
return dir(lst)
@@);
SELECT $u(AsList());
Вызов Llama
Необходимо правильное окружение, которое может быть получено с помощью следующего Dockerfile:
FROM ollama/ollama:0.3.9
RUN apt install -y python3.11 libpython3.11 python3-pip
RUN python3.11 -m pip install ollama
Используя указанный образ, можно вызывать Llama из Python:
pragma yt.DockerImage = "my.docker.registry/ollama/ollama:0.3.9-with-python3.11-2";
pragma yt.DefaultMemoryLimit = "30G";
pragma yt.OperationSpec = "{max_failed_job_count=1;mapper={cpu_limit=64}}";
$script = @@#py
from ollama import Client
import subprocess
def infer_llama(prompt):
proc = subprocess.Popen(["/bin/ollama", "serve"], stdout=subprocess.DEVNULL)
client = Client(host='http://localhost:11434')
client.pull('llama3.1')
response = client.chat(model='llama3.1', messages=[
{
'role': 'user',
'content': str(prompt),
},
])
proc.kill()
return response['message']['content']
@@;
$infer_llama = SystemPython3_11::infer_llama(Callable<(String?)->String>, $script);
select $infer_llama("Why is the sky blue?");
FAQ
Module not loaded for script type: SystemPython3_8
Я получаю ошибку Это означает, что функциональность не поддержана на кластере. На кластере должны присутствовать QueryTracker и YqlAgents с минимальной версией образа YTsaurus QueryTracker 0.0.8. Попросите администраторов кластера обновить данные компоненты.
libpython3.8.so.1.0: cannot open shared object file: No such file or directory
Я получаю ошибку Это означает, что в окружении джоба, исполняющего функцию, нет динамической библиотеки libpython3.8.so.1.0
. Необходимо использовать ту версию Python, которая присутствует в окружении джоба. Подробнее.
Если используется pragma DockerImage
, то необходимо убедиться, что кластер поддерживает такой способ изменения окружения. Подробнее.