To install Apache Airflow Provider see for instructions.

Apache Airflow Provider for YTsaurus
Introducing an MVP of Apache Airflow in YTsaurus
March 25, 2025
We have released the MVP of Apache Airflow Provider for YTsaurus. Currently, it includes operators for working with Cypress, can read and write tables, and run queries in Query Tracker.
Brief description of supported operators:
ReadTableOperator
— read a YTsaurus table into XCom or S3WriteTableOperator
— write a YTsaurus table from XCom or S3RunQueryOperator
— execute a query in Query Tracker and write the result to XCom or S3CreateOperator
— create a node in CypressSetOperator
— write a value to a nodeGetOperator
— get node content and write it to XComRemoveOperator
— delete a nodeListOperator
— get a list of node children and write it to XCom
More on each of the supported operators can be found in the documentation.
Example of a DAG that writes data from S3 to a cluster with subsequent processing (see the repo for more examples).
# Path to the object we need in S3.
# File contents:
# {"user_name": "Alice", "is_happy": true}
# {"user_name": "Bob", "is_happy": false}
# {"user_name": "Charlie", "is_happy": true}
# ...
users_data_s3 = ObjectStoragePath("s3://users-data/latest", conn_id="aws_default")
with DAG(
"export_users_data_and_count_happy_users",
start_date=datetime(2025, 3, 20, tzinfo=timezone.utc),
schedule="@daily",
tags=["yt"],
) as dag:
# Load user data from S3 into a YTsaurus table
# //home/users-data-{{ ds }}, where {{ ds }} - is the task execution date.
# The operator also saves this path to XCom "path".
upload_user_data_task = WriteTableOperator(
task_id="upload_user_data",
path="//home/users-data-{{ ds }}", # All operators support airflow templates reference
object_storage_path=users_data_s3, # Path to the object in S3
object_storage_format="json") # Format in which data is written in S3
# Run a YQL query for the loaded table to count happy users.
# The query result is saved to XCom "result_0".
process_user_data_task = RunQueryOperator(
task_id="process_user_data",
query="PRAGMA yt.InferSchema = '1';\n"
"SELECT COUNT_IF(is_happy) AS count FROM `{}`;".format(upload_user_data_task.output["path"]),
engine="yql")
# Write the query result to the table //home/happy-users-stat in a new row.
write_happy_users_task = WriteTableOperator(
task_id="write_happy_users",
path="<append=true>//home/happy-users-stat",
input_data=[{"date": "{{ ds }}", "happy_users": process_user_data_task.output["result_0"]}])
upload_user_data_task >> process_user_data_task >> write_happy_users_task
If you want to add something, please contact us via the community chat or create issues/PRs in the repository.
Sign in to save this post