Чтобы установить Apache Airflow Provider, воспользуйтесь инструкцией.

Apache Airflow Provider для YTsaurus
Рассказываем про MVP Apache Airflow в YTsaurus
25 марта 2025 г.
Мы выпустили первую минимальную версию Apache Airflow Provider для YTsaurus. На данный момент в нем есть операторы для работы с Cypress, возможность прочитать или записать таблицу и запустить запрос в Query Tracker.
Краткое описание поддерживаемых операторов:
ReadTableOperator
— прочитать YTsaurus‑таблицу в XCom или S3WriteTableOperator
— записать YTsaurus‑таблицу из XCom или S3RunQueryOperator
— выполнить запрос в Query Tracker и записать результат в XCom или S3CreateOperator
— создать узел в CypressSetOperator
— записать значение в узелGetOperator
— получить содержимое узла и записать в XComRemoveOperator
— удалить узелListOperator
— получить список потомков узла и записать в XCom
Подробное описание каждого из операторов можно посмотреть в документации.
Пример DAG для записи данных из S3 на кластер с последующей обработкой (другие примеры использования смотрите здесь):
# Путь до нужного нам объекта в S3.
# Содержимое файла:
# {"user_name": "Alice", "is_happy": true}
# {"user_name": "Bob", "is_happy": false}
# {"user_name": "Charlie", "is_happy": true}
# ...
users_data_s3 = ObjectStoragePath("s3://users-data/latest", conn_id="aws_default")
with DAG(
"export_users_data_and_count_happy_users",
start_date=datetime(2025, 3, 20, tzinfo=timezone.utc),
schedule="@daily",
tags=["yt"],
) as dag:
# Загружаем данные о пользователях из S3 в таблицу YTsaurus
# //home/users-data-{{ ds }}, где {{ ds }} - дата выполнения задачи.
# Этот же путь оператор сохраняет в XCom "path".
upload_user_data_task = WriteTableOperator(
task_id="upload_user_data",
path="//home/users-data-{{ ds }}", # Все операторы поддерживают airflow templates reference
object_storage_path=users_data_s3, # Путь до объекта в S3
object_storage_format="json") # Формат, в котором данные записаны в S3
# Запускаем YQL запрос для загруженной таблицы с подсчетом счастливых пользователей.
# Результат запроса сохраняется в XCom "result_0".
process_user_data_task = RunQueryOperator(
task_id="process_user_data",
query="PRAGMA yt.InferSchema = '1';\n"
"SELECT COUNT_IF(is_happy) AS count FROM `{}`;".format(upload_user_data_task.output["path"]),
engine="yql")
# Записываем результат запроса в таблицу //home/happy-users-stat в новую строчку.
write_happy_users_task = WriteTableOperator(
task_id="write_happy_users",
path="<append=true>//home/happy-users-stat",
input_data=[{"date": "{{ ds }}", "happy_users": process_user_data_task.output["result_0"]}])
upload_user_data_task >> process_user_data_task >> write_happy_users_task
Если вам не хватает какого‑либо функционала, пожалуйста, пишите в чат сообщества или создавайте issues/PR в репозиторий.
Войдите, чтобы сохранить пост