YTsaurus Airflow Provider
Apache Airflow — это платформа для создания, планирования и мониторинга рабочих процессов. YTsaurus Airflow Provider позволяет интегрировать Airflow с YTsaurus и эффективно управлять задачами, связанными с обработкой данных.
В данном разделе приведены инструкции — как подключиться к YTsaurus кластеру в Apache Airflow и как использовать операторы для взаимодействия с Кипарисом, для работы со статическими таблицами и для выполнения запросов в Query Tracker.
Установка
Примечание
Текущая версия пакета требует:
- Python 3.8+
- Airflow 2.9.0+
-
Установите Python-клиент из PyPi-репозитория:
pip install ytsaurus-airflow-providerЕсли возникает ошибка 'Building wheel for xmlsec (pyproject.toml) did not run successfully.'
Установите пакеты, необходимые для сборки
xmlsec:sudo apt install libxmlsec1-dev pkg-config build-essential python3-dev -
Проверьте, что установка провайдера прошла успешно:
$ python3 -c "import ytsaurus_airflow_provider; print(ytsaurus_airflow_provider.__version__)" 0.1.1
В результате будут доступны:
- Python-библиотека для написания DAG-ов;
- провайдер для Apache Airflow;
- инструменты
ytsaurus-clientиytsaurus-yson.
Начало работы
В веб-интерфейсе Apache Airflow создайте подключение к кластеру YTsaurus:
- Admin -> Connections -> +
- Connection Id:
ytsaurus_cluster_default - Connection Type: YTsaurus Cluster
- Cluster Proxy: Прокси кластера
- Cluster Token: Токен подключения к кластеру
- Extra: По желанию, можно указать Client Config:
{ "client_config": { "create_table_attributes": { "compression_codec": "brotli_3" } } }
При необходимости можно создать подключение к объектному хранилищу:
- Admin -> Connections -> +
- Connection Id:
aws_default - Connection Type: Amazon Web Service
- AWS Access Key ID и AWS Secret Access Key: Ключи доступа к объектному хранилищу
Как задать подключение
Каждый оператор принимает на вход опциональный параметр ytsaurus_conn_id, который по стандарту указан как ytsaurus_cluster_default. Если необходимо использовать другое подключение, то его следует указать в параметре ytsaurus_conn_id.
Операторы
Все операторы, кроме оператора чтения при указанном ObjectStorage, пишут данные в XCom. Не рекомендуется передавать большие объёмы данных через XCom. Ниже описано, какие операторы в какие XCom пишут данные.
Примечание
Все операторы используют Python SDK. При возникновении ошибок во время выполнения операторов обратитесь к документации Python SDK.
Работа с Кипарисом
GetOperator
Оператор для получения содержимого узла Кипариса. Реализует функционал get из Python SDK.
Параметры:
path: str | YPathmax_size: None | int = Noneattributes: None | dict[str, Any] = Noneread_from: None | str = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathreturn_value
SetOperator
Оператор для записи нового содержимого в узел Кипариса. Реализует функционал set из Python SDK.
Параметры:
path: str | YPathvalue: Anyset_format: None | str | Format = Nonerecursive: bool = Falseforce: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
RemoveOperator
Оператор для удаления узла Кипариса. Реализует функционал remove из Python SDK.
Параметры:
path: str | YPathrecursive: bool = Falseforce: bool = Falseytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
CreateOperator
Оператор для создания пустого узла Кипариса типа node_type с атрибутами attributes. Реализует функционал create из Python SDK.
Параметры:
node_type: Literal["table", "file", "map_node", "document", "string_node", "int64_node", "uint64_node", "double_node", "boolean_node", "link"]path: str | YPathrecursive: bool = Falseignore_existing: bool = Falselock_existing: None | bool = Noneforce: None | bool = Noneattributes: None | dict[str, Any] = Noneignore_type_mismatch: bool = Falseytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
object_idpath
ListOperator
Оператор для получения списка потомков узла path. Опция absolute включает вывод абсолютных путей вместо относительных. Реализует функционал list из Python SDK.
Параметры:
path: str | YPathmax_size: None | int = Noneabsolute: None | bool = Noneattributes: None | dict[str, Any] = Nonesort: bool = Trueread_from: None | str = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathreturn_value
Работа с таблицами
WriteTableOperator
Оператор для записи данных в статическую таблицу. Реализует функционал write_table из Python SDK.
Входные данные могут передаваться через параметр input_data или через параметр object_storage_path, представляющий из себя объект типа ObjectStorage. Одновременно передавать input_data и object_storage_path нельзя. При записи данных из объектного хранилища требуется указать соответствующий object_storage_format (см. Форматы представления табличных данных).
Параметры:
path: str | YPathinput_data: Any | None = Noneobject_storage_path: None | UPath = Noneobject_storage_format: None | str | Format = Nonetable_writer: dict[str, Any] | None = Nonemax_row_buffer_size: int | None = Noneforce_create: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathobject_storage_path(если запись черезobject_storage_path)
ReadTableOperator
Оператор для чтения данных из статической таблицы. Реализует функционал read_table из Python SDK.
Может писать данные в объектное хранилище, если на вход передаётся параметр object_storage_path, представляющий из себя объект типа ObjectStorage. В ином случае таблица будет записываться в XCom. При записи данных в объектное хранилище требуется указать желаемый object_storage_format (см. Форматы представления табличных данных).
Параметры:
path: str | YPathobject_storage_path: None | UPath = Noneobject_storage_format: None | str | Format = Nonetable_reader: None | dict[str, Any] = Nonecontrol_attributes: None | dict[str, Any] = Noneunordered: None | bool = Noneresponse_parameters: None | dict[str, Any] = Noneenable_read_parallel: None | bool = Noneomit_inaccessible_columns: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathobject_storage_path(если запись вobject_storage_path)return_value(если запись в XCom)
Query Tracker
RunQueryOperator
Оператор для выполнения запросов в Query Tracker с разным синтаксисом. Подробнее про Query Tracker можно почитать тут. Реализует функционал run_query из Python SDK.
Может запускаться в асинхронном режиме, тогда оператор не будет дожидаться выполнения запроса.
Результаты выполнения запроса могут записываться в XCom и объектное хранилище. Если в результате запроса возвращается 3 результата, а в качестве object_storage_paths переданы [None, ObjectStorage("s3://bucket/path"), None], то в XCom будут записаны результаты №1 и №3, а результат №2 будет записан в объектное хранилище.
Параметры:
engine: Literal["ql", "yql", "chyt", "spyt"]query: strsettings: None | dict[str, Any] | yt.yson.yson_types.YsonType = Nonefiles: None | list[dict[str, Any]] | list[yt.yson.yson_types.YsonType] = Nonestage: None | str = Noneannotations: None | dict[str, Any] | yt.yson.yson_types.YsonType = Noneaccess_control_objects: None | list[str] = Nonesync: bool = Trueobject_storage_paths: list[None | UPath] | None = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
metaquery_idresult_{i}, где i — номер возвращаемого результата, отсчитываемого от 0. Если для i-го результата переданobject_storage_paths[i], то результат не будет записан вresult_{i}.
Примеры работы
Примеры работы с операторами представлены в готовых DAG'ах example_dags.