YTsaurus Airflow Provider
Apache Airflow is a platform for creating, scheduling, and monitoring workflows. YTsaurus Airflow Provider allows you to integrate Airflow with YTsaurus and efficiently manage data processing tasks.
This section provides instructions on how to connect to a YTsaurus cluster in Apache Airflow and how to use operators to interact with Cypress, work with static tables, and execute queries in Query Tracker.
Installation
Note
The current package version requires:
- Python 3.8+
- Airflow 2.9.0+
-
Install the Python client from the PyPi repository:
pip install ytsaurus-airflow-provider
If you encounter an error 'Building wheel for xmlsec (pyproject.toml) did not run successfully.'
Install the packages required to build
xmlsec
:sudo apt install libxmlsec1-dev pkg-config build-essential python3-dev
-
Verify that the provider installation was successful:
$ python3 -c "import ytsaurus_airflow_provider; print(ytsaurus_airflow_provider.__version__)" 0.1.1
As a result, the following will be available:
- Python library for writing DAGs;
- provider for Apache Airflow;
- tools
ytsaurus-client
andytsaurus-yson
.
Getting Started
In the Apache Airflow web interface, create a connection to the YTsaurus cluster:
- Admin -> Connections -> +
- Connection Id:
ytsaurus_cluster_default
- Connection Type: YTsaurus Cluster
- Cluster Proxy: Cluster proxy
- Cluster Token: Cluster connection token
- Extra: Optionally, you can specify Client Config:
{ "client_config": { "create_table_attributes": { "compression_codec": "brotli_3" } } }
If necessary, you can create a connection to object storage:
- Admin -> Connections -> +
- Connection Id:
aws_default
- Connection Type: Amazon Web Service
- AWS Access Key ID and AWS Secret Access Key: Object storage keys
How to specify a connection
Each operator accepts an optional parameter ytsaurus_conn_id
, which by default is set to ytsaurus_cluster_default
. If you need to use a different connection, you should specify it in the ytsaurus_conn_id
parameter.
Operators
All operators, except for the read operator with specified ObjectStorage, write data to XCom. It is not recommended to pass large volumes of data through XCom. Below is a description of which operators write data to which XComs.
Note
All operators use the Python SDK. If errors occur during operator execution, refer to the Python SDK documentation.
Working with Cypress
GetOperator
Operator for retrieving the contents of a Cypress node. Implements the functionality of get from the Python SDK.
Parameters:
path: str | YPath
max_size: None | int = None
attributes: None | dict[str, Any] = None
read_from: None | str = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
return_value
SetOperator
Operator for writing new content to a Cypress node. Implements the functionality of set from the Python SDK.
Parameters:
path: str | YPath
value: Any
set_format: None | str | Format = None
recursive: bool = False
force: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
RemoveOperator
Operator for removing a Cypress node. Implements the functionality of remove from the Python SDK.
Parameters:
path: str | YPath
recursive: bool = False
force: bool = False
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
CreateOperator
Operator for creating an empty Cypress node of type node_type
with attributes attributes
. Implements the functionality of create from the Python SDK.
Parameters:
node_type: Literal["table", "file", "map_node", "document", "string_node", "int64_node", "uint64_node", "double_node", "list_node", "boolean_node", "link"]
path: str | YPath
recursive: bool = False
ignore_existing: bool = False
lock_existing: None | bool = None
force: None | bool = None
attributes: None | dict[str, Any] = None
ignore_type_mismatch: bool = False
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
object_id
path
ListOperator
Operator for getting a list of descendants of the path
node. The absolute
option enables output of absolute paths instead of relative ones. Implements the functionality of list from the Python SDK.
Parameters:
path: str | YPath
max_size: None | int = None
absolute: None | bool = None
attributes: None | dict[str, Any] = None
sort: bool = True
read_from: None | str = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
return_value
Working with tables
WriteTableOperator
Operator for writing data to a static table. Implements the functionality of write_table from the Python SDK.
Input data can be passed through the input_data
parameter or through the object_storage_path
parameter, which is an object of type ObjectStorage. It is not possible to pass both input_data
and object_storage_path
simultaneously. When writing data from object storage, you need to specify the corresponding object_storage_format
(see Table data representation formats).
Parameters:
path: str | YPath
input_data: Any | None = None
object_storage_path: None | UPath = None
object_storage_format: None | str | Format = None
table_writer: dict[str, Any] | None = None
max_row_buffer_size: int | None = None
force_create: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
object_storage_path
(if writing throughobject_storage_path
)
ReadTableOperator
Operator for reading data from a static table. Implements the functionality of read_table from the Python SDK.
It can write data to object storage if the object_storage_path
parameter is provided, which is an object of type ObjectStorage. Otherwise, the table will be written to XCom. When writing data to object storage, you need to specify the desired object_storage_format
(see Table data representation formats).
Parameters:
path: str | YPath
object_storage_path: None | UPath = None
object_storage_format: None | str | Format = None
table_reader: None | dict[str, Any] = None
control_attributes: None | dict[str, Any] = None
unordered: None | bool = None
response_parameters: None | dict[str, Any] = None
enable_read_parallel: None | bool = None
omit_inaccessible_columns: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
object_storage_path
(if writing toobject_storage_path
)return_value
(if writing to XCom)
Query Tracker
RunQueryOperator
Operator for executing queries in Query Tracker with different syntax. You can read more about Query Tracker here. Implements the functionality of run_query from the Python SDK.
It can be run in asynchronous mode, in which case the operator will not wait for the query to complete.
Query execution results can be written to XCom and object storage. If the query returns 3 results, and [None, ObjectStorage("s3://bucket/path"), None]
is passed as object_storage_paths
, then results #1 and #3 will be written to XCom, and result #2 will be written to object storage.
Parameters:
engine: Literal["ql", "yql", "chyt", "spyt"]
query: str
settings: None | dict[str, Any] | yt.yson.yson_types.YsonType = None
files: None | list[dict[str, Any]] | list[yt.yson.yson_types.YsonType] = None
stage: None | str = None
annotations: None | dict[str, Any] | yt.yson.yson_types.YsonType = None
access_control_objects: None | list[str] = None
sync: bool = True
object_storage_paths: list[None | UPath] | None = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
meta
query_id
result_{i}
, where i is the number of the returned result, counted from 0. Ifobject_storage_paths[i]
is passed for the i-th result, the result will not be written toresult_{i}
.
Examples
Examples of working with operators are presented in ready-made DAGs example_dags.