YTsaurus Airflow Provider
Apache Airflow is a platform for creating, scheduling, and monitoring workflows. YTsaurus Airflow Provider enables you to integrate Airflow with YTsaurus and efficiently manage data processing tasks.
This section provides instructions on how to connect to a YTsaurus cluster in Apache Airflow and how to use operators to interact with Cypress, work with static tables, and execute queries in Query Tracker.
Installation
Note
The current package version requires:
- Python 3.8+
- Airflow 2.9.0+
-
Install the Python client from the PyPi repository:
pip install ytsaurus-airflow-provider
If you encounter the error 'Building wheel for xmlsec (pyproject.toml) did not run successfully.'
Install the packages required to build
xmlsec
:sudo apt install libxmlsec1-dev pkg-config build-essential python3-dev
-
Verify that the provider was installed successfully:
$ python3 -c "import ytsaurus_airflow_provider; print(ytsaurus_airflow_provider.__version__)" 0.1.1
As a result, the following will be available:
- Python library for writing DAGs.
- Provider for Apache Airflow.
ytsaurus-client
andytsaurus-yson
tools.
Getting started
In the Apache Airflow web interface, create a connection to your YTsaurus cluster:
- Admin -> Connections -> +
- Connection Id:
ytsaurus_cluster_default
- Connection Type: YTsaurus Cluster
- Cluster Proxy: Cluster proxy
- Cluster Token: Cluster connection token
- Extra: Optionally, you can specify Client Config:
{ "client_config": { "create_table_attributes": { "compression_codec": "brotli_3" } } }
If necessary, you can create a connection to the object storage:
- Admin -> Connections -> +
- Connection Id:
aws_default
- Connection Type: Amazon Web Service
- AWS Access Key ID and AWS Secret Access Key: Access keys for the object storage
How to specify a connection
Each operator accepts the optional ytsaurus_conn_id
parameter, which by default is set to ytsaurus_cluster_default
. If you need to use a different connection, you should specify it in the ytsaurus_conn_id
parameter.
Operators
All operators, except for the read operator with a specified ObjectStorage, write data to XCom. It is not recommended to pass large volumes of data through XCom. You can find out below which operators write data to which XComs.
Note
All operators use the Python SDK. If an error occurs during operator execution, see the Python SDK documentation.
Working with Cypress
GetOperator
Operator for retrieving the contents of a Cypress node. It implements the get functionality from the Python SDK.
Parameters:
path: str | YPath
max_size: None | int = None
attributes: None | dict[str, Any] = None
read_from: None | str = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
return_value
SetOperator
Operator for writing new contents to a Cypress node. It implements the set functionality from the Python SDK.
Parameters:
path: str | YPath
value: Any
set_format: None | str | Format = None
recursive: bool = False
force: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
RemoveOperator
Operator for removing a Cypress node. It implements the remove functionality from the Python SDK.
Parameters:
path: str | YPath
recursive: bool = False
force: bool = False
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
CreateOperator
Operator for creating an empty Cypress node of the node_type
with attributes
. It implements the create functionality from the Python SDK.
Parameters:
node_type: Literal["table", "file", "map_node", "document", "string_node", "int64_node", "uint64_node", "double_node", "list_node", "boolean_node", "link"]
path: str | YPath
recursive: bool = False
ignore_existing: bool = False
lock_existing: None | bool = None
force: None | bool = None
attributes: None | dict[str, Any] = None
ignore_type_mismatch: bool = False
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
object_id
path
ListOperator
Operator for getting a list of descendants of the path
node. The absolute
option enables the output of absolute paths instead of relative paths. It implements the list functionality from the Python SDK.
Parameters:
path: str | YPath
max_size: None | int = None
absolute: None | bool = None
attributes: None | dict[str, Any] = None
sort: bool = True
read_from: None | str = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
return_value
Working with tables
WriteTableOperator
Operator for writing data to a static table. It implements the write_table functionality from the Python SDK.
Input data can be passed through the input_data
parameter or the object_storage_path
parameter, which is an object of the ObjectStorage type. You cannot pass both input_data
and object_storage_path
simultaneously. When writing data from the object storage, you need to specify the corresponding object_storage_format
(see Table data representation formats).
Parameters:
path: str | YPath
input_data: Any | None = None
object_storage_path: None | UPath = None
object_storage_format: None | str | Format = None
table_writer: dict[str, Any] | None = None
max_row_buffer_size: int | None = None
force_create: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
object_storage_path
(if writing throughobject_storage_path
)
ReadTableOperator
Operator for reading data from a static table. It implements the read_table functionality from the Python SDK.
This operator can write data to the object storage if you pass the object_storage_path
parameter, which is an object of the ObjectStorage type. Otherwise, the table will be written to XCom. When writing data to the object storage, you need to specify the desired object_storage_format
(see Table data representation formats).
Parameters:
path: str | YPath
object_storage_path: None | UPath = None
object_storage_format: None | str | Format = None
table_reader: None | dict[str, Any] = None
control_attributes: None | dict[str, Any] = None
unordered: None | bool = None
response_parameters: None | dict[str, Any] = None
enable_read_parallel: None | bool = None
omit_inaccessible_columns: None | bool = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
path
object_storage_path
(if writing toobject_storage_path
)return_value
(if writing to XCom)
Query Tracker
RunQueryOperator
Operator for executing queries in Query Tracker with different syntax. You can learn more about Query Tracker here. It implements the run_query functionality from the Python SDK.
This operator can be run in asynchronous mode, then it will not wait for the query to complete.
Query execution results can be written to XCom and the object storage. If the query returns three results, and object_storage_paths
are passed as [None, ObjectStorage("s3://bucket/path"), None]
, then results No. 1 and 3 will be written to XCom, and result No. 2 will be written to the object storage.
Parameters:
engine: Literal["ql", "yql", "chyt", "spyt"]
query: str
settings: None | dict[str, Any] | yt.yson.yson_types.YsonType = None
files: None | list[dict[str, Any]] | list[yt.yson.yson_types.YsonType] = None
stage: None | str = None
annotations: None | dict[str, Any] | yt.yson.yson_types.YsonType = None
access_control_objects: None | list[str] = None
sync: bool = True
object_storage_paths: list[None | UPath] | None = None
ytsaurus_conn_id: str = "ytsaurus_cluster_default"
XComs:
meta
query_id
result_{i}
, where i is the number of the returned result, counted from 0. Ifobject_storage_paths[i]
is passed for the i-th result, the result will not be written toresult_{i}
.
Examples
Examples of working with operators are presented in ready-made DAGs example_dags.