Apache Airflow Provider for YTsaurus

Introducing an MVP of Apache Airflow in YTsaurus

We have released the MVP of Apache Airflow Provider for YTsaurus. Currently, it includes operators for working with Cypress, can read and write tables, and run queries in Query Tracker.

To install Apache Airflow Provider see for instructions.

Brief description of supported operators:

  • ReadTableOperator — read a YTsaurus table into XCom or S3
  • WriteTableOperator — write a YTsaurus table from XCom or S3
  • RunQueryOperator — execute a query in Query Tracker and write the result to XCom or S3
  • CreateOperator — create a node in Cypress
  • SetOperator — write a value to a node
  • GetOperator — get node content and write it to XCom
  • RemoveOperator — delete a node
  • ListOperator — get a list of node children and write it to XCom

More on each of the supported operators can be found in the documentation.

Example of a DAG that writes data from S3 to a cluster with subsequent processing (see the repo for more examples).

# Path to the object we need in S3.
# File contents:
# {"user_name": "Alice", "is_happy": true}
# {"user_name": "Bob", "is_happy": false}
# {"user_name": "Charlie", "is_happy": true}
# ...
users_data_s3 = ObjectStoragePath("s3://users-data/latest", conn_id="aws_default")

with DAG(
    "export_users_data_and_count_happy_users",
    start_date=datetime(2025, 3, 20, tzinfo=timezone.utc),
    schedule="@daily",
    tags=["yt"],
) as dag:
    # Load user data from S3 into a YTsaurus table
    # //home/users-data-{{ ds }}, where {{ ds }} - is the task execution date.
    # The operator also saves this path to XCom "path".
    upload_user_data_task = WriteTableOperator(
        task_id="upload_user_data",
        path="//home/users-data-{{ ds }}", # All operators support airflow templates reference
        object_storage_path=users_data_s3, # Path to the object in S3
        object_storage_format="json") # Format in which data is written in S3

    # Run a YQL query for the loaded table to count happy users.
    # The query result is saved to XCom "result_0".
    process_user_data_task = RunQueryOperator(
        task_id="process_user_data",
        query="PRAGMA yt.InferSchema = '1';\n"
              "SELECT COUNT_IF(is_happy) AS count FROM `{}`;".format(upload_user_data_task.output["path"]),
        engine="yql")

    # Write the query result to the table //home/happy-users-stat in a new row.
    write_happy_users_task = WriteTableOperator(
        task_id="write_happy_users",
        path="<append=true>//home/happy-users-stat",
        input_data=[{"date": "{{ ds }}", "happy_users": process_user_data_task.output["result_0"]}])

    upload_user_data_task >> process_user_data_task >> write_happy_users_task

If you want to add something, please contact us via the community chat or create issues/PRs in the repository.

Apache Airflow Provider for YTsaurus
Sign in to save this post