Apache Airflow Provider для YTsaurus

Рассказываем про MVP Apache Airflow в YTsaurus

Мы выпустили первую минимальную версию Apache Airflow Provider для YTsaurus. На данный момент в нем есть операторы для работы с Cypress, возможность прочитать или записать таблицу и запустить запрос в Query Tracker.

Чтобы установить Apache Airflow Provider, воспользуйтесь инструкцией.

Краткое описание поддерживаемых операторов:

  • ReadTableOperator — прочитать YTsaurus‑таблицу в XCom или S3
  • WriteTableOperator — записать YTsaurus‑таблицу из XCom или S3
  • RunQueryOperator — выполнить запрос в Query Tracker и записать результат в XCom или S3
  • CreateOperator — создать узел в Cypress
  • SetOperator — записать значение в узел
  • GetOperator — получить содержимое узла и записать в XCom
  • RemoveOperator — удалить узел
  • ListOperator — получить список потомков узла и записать в XCom

Подробное описание каждого из операторов можно посмотреть в документации.

Пример DAG для записи данных из S3 на кластер с последующей обработкой (другие примеры использования смотрите здесь):

# Путь до нужного нам объекта в S3.
# Содержимое файла:
# {"user_name": "Alice", "is_happy": true}
# {"user_name": "Bob", "is_happy": false}
# {"user_name": "Charlie", "is_happy": true}
# ...
users_data_s3 = ObjectStoragePath("s3://users-data/latest", conn_id="aws_default")

with DAG(
    "export_users_data_and_count_happy_users",
    start_date=datetime(2025, 3, 20, tzinfo=timezone.utc),
    schedule="@daily",
    tags=["yt"],
) as dag:
    # Загружаем данные о пользователях из S3 в таблицу YTsaurus
    # //home/users-data-{{ ds }}, где {{ ds }} - дата выполнения задачи.
    # Этот же путь оператор сохраняет в XCom "path".
    upload_user_data_task = WriteTableOperator(
        task_id="upload_user_data",
        path="//home/users-data-{{ ds }}", # Все операторы поддерживают airflow templates reference
        object_storage_path=users_data_s3, # Путь до объекта в S3
        object_storage_format="json") # Формат, в котором данные записаны в S3

    # Запускаем YQL запрос для загруженной таблицы с подсчетом счастливых пользователей.
    # Результат запроса сохраняется в XCom "result_0".
    process_user_data_task = RunQueryOperator(
        task_id="process_user_data",
        query="PRAGMA yt.InferSchema = '1';\n"
              "SELECT COUNT_IF(is_happy) AS count FROM `{}`;".format(upload_user_data_task.output["path"]),
        engine="yql")

    # Записываем результат запроса в таблицу //home/happy-users-stat в новую строчку.
    write_happy_users_task = WriteTableOperator(
        task_id="write_happy_users",
        path="<append=true>//home/happy-users-stat",
        input_data=[{"date": "{{ ds }}", "happy_users": process_user_data_task.output["result_0"]}])

    upload_user_data_task >> process_user_data_task >> write_happy_users_task

Если вам не хватает какого‑либо функционала, пожалуйста, пишите в чат сообщества или создавайте issues/PR в репозиторий.

Apache Airflow Provider для YTsaurus
Войдите, чтобы сохранить пост