YTsaurus Airflow Provider
Apache Airflow is a platform for creating, scheduling, and monitoring workflows. YTsaurus Airflow Provider enables you to integrate Airflow with YTsaurus and efficiently manage data processing tasks.
This section provides instructions on how to connect to a YTsaurus cluster in Apache Airflow and how to use operators to interact with Cypress, work with static tables, and execute queries in Query Tracker.
Installation
Note
The current package version requires:
- Python 3.8+
- Airflow 2.9.0+
-
Install the Python client from the PyPi repository:
pip install ytsaurus-airflow-providerIf you encounter the error 'Building wheel for xmlsec (pyproject.toml) did not run successfully.'
Install the packages required to build
xmlsec:sudo apt install libxmlsec1-dev pkg-config build-essential python3-dev -
Verify that the provider was installed successfully:
$ python3 -c "import ytsaurus_airflow_provider; print(ytsaurus_airflow_provider.__version__)" 0.1.1
As a result, the following will be available:
- Python library for writing DAGs.
- Provider for Apache Airflow.
ytsaurus-clientandytsaurus-ysontools.
Getting started
In the Apache Airflow web interface, create a connection to your YTsaurus cluster:
- Admin -> Connections -> +
- Connection Id:
ytsaurus_cluster_default - Connection Type: YTsaurus Cluster
- Cluster Proxy: Cluster proxy
- Cluster Token: Cluster connection token
- Extra: Optionally, you can specify Client Config:
{ "client_config": { "create_table_attributes": { "compression_codec": "brotli_3" } } }
If necessary, you can create a connection to the object storage:
- Admin -> Connections -> +
- Connection Id:
aws_default - Connection Type: Amazon Web Service
- AWS Access Key ID and AWS Secret Access Key: Access keys for the object storage
How to specify a connection
Each operator accepts the optional ytsaurus_conn_id parameter, which by default is set to ytsaurus_cluster_default. If you need to use a different connection, you should specify it in the ytsaurus_conn_id parameter.
Operators
All operators, except for the read operator with a specified ObjectStorage, write data to XCom. It is not recommended to pass large volumes of data through XCom. You can find out below which operators write data to which XComs.
Note
All operators use the Python SDK. If an error occurs during operator execution, see the Python SDK documentation.
Working with Cypress
GetOperator
Operator for retrieving the contents of a Cypress node. It implements the get functionality from the Python SDK.
Parameters:
path: str | YPathmax_size: None | int = Noneattributes: None | dict[str, Any] = Noneread_from: None | str = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathreturn_value
SetOperator
Operator for writing new contents to a Cypress node. It implements the set functionality from the Python SDK.
Parameters:
path: str | YPathvalue: Anyset_format: None | str | Format = Nonerecursive: bool = Falseforce: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
RemoveOperator
Operator for removing a Cypress node. It implements the remove functionality from the Python SDK.
Parameters:
path: str | YPathrecursive: bool = Falseforce: bool = Falseytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
CreateOperator
Operator for creating an empty Cypress node of the node_type with attributes. It implements the create functionality from the Python SDK.
Parameters:
node_type: Literal["table", "file", "map_node", "document", "string_node", "int64_node", "uint64_node", "double_node", "boolean_node", "link"]path: str | YPathrecursive: bool = Falseignore_existing: bool = Falselock_existing: None | bool = Noneforce: None | bool = Noneattributes: None | dict[str, Any] = Noneignore_type_mismatch: bool = Falseytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
object_idpath
ListOperator
Operator for getting a list of descendants of the path node. The absolute option enables the output of absolute paths instead of relative paths. It implements the list functionality from the Python SDK.
Parameters:
path: str | YPathmax_size: None | int = Noneabsolute: None | bool = Noneattributes: None | dict[str, Any] = Nonesort: bool = Trueread_from: None | str = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathreturn_value
Working with tables
WriteTableOperator
Operator for writing data to a static table. It implements the write_table functionality from the Python SDK.
Input data can be passed through the input_data parameter or the object_storage_path parameter, which is an object of the ObjectStorage type. You cannot pass both input_data and object_storage_path simultaneously. When writing data from the object storage, you need to specify the corresponding object_storage_format (see Table data representation formats).
Parameters:
path: str | YPathinput_data: Any | None = Noneobject_storage_path: None | UPath = Noneobject_storage_format: None | str | Format = Nonetable_writer: dict[str, Any] | None = Nonemax_row_buffer_size: int | None = Noneforce_create: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathobject_storage_path(if writing throughobject_storage_path)
ReadTableOperator
Operator for reading data from a static table. It implements the read_table functionality from the Python SDK.
This operator can write data to the object storage if you pass the object_storage_path parameter, which is an object of the ObjectStorage type. Otherwise, the table will be written to XCom. When writing data to the object storage, you need to specify the desired object_storage_format (see Table data representation formats).
Parameters:
path: str | YPathobject_storage_path: None | UPath = Noneobject_storage_format: None | str | Format = Nonetable_reader: None | dict[str, Any] = Nonecontrol_attributes: None | dict[str, Any] = Noneunordered: None | bool = Noneresponse_parameters: None | dict[str, Any] = Noneenable_read_parallel: None | bool = Noneomit_inaccessible_columns: None | bool = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
pathobject_storage_path(if writing toobject_storage_path)return_value(if writing to XCom)
Query Tracker
RunQueryOperator
Operator for executing queries in Query Tracker with different syntax. You can learn more about Query Tracker here. It implements the run_query functionality from the Python SDK.
This operator can be run in asynchronous mode, then it will not wait for the query to complete.
Query execution results can be written to XCom and the object storage. If the query returns three results, and object_storage_paths are passed as [None, ObjectStorage("s3://bucket/path"), None], then results No. 1 and 3 will be written to XCom, and result No. 2 will be written to the object storage.
Parameters:
engine: Literal["ql", "yql", "chyt", "spyt"]query: strsettings: None | dict[str, Any] | yt.yson.yson_types.YsonType = Nonefiles: None | list[dict[str, Any]] | list[yt.yson.yson_types.YsonType] = Nonestage: None | str = Noneannotations: None | dict[str, Any] | yt.yson.yson_types.YsonType = Noneaccess_control_objects: None | list[str] = Nonesync: bool = Trueobject_storage_paths: list[None | UPath] | None = Noneytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
metaquery_idresult_{i}, where i is the number of the returned result, counted from 0. Ifobject_storage_paths[i]is passed for the i-th result, the result will not be written toresult_{i}.
Examples
Examples of working with operators are presented in ready-made DAGs example_dags.