General information

Conventions

Options of system commands have the default value of None. This means that the option value won't be transmitted at the command execution, and the system will use the default value.

For the format option, the default value of None usually has another meaning. The command must return a parsed Python structure or expect a Python structure as input. If the format option isn't specified, the command must return an unparsed result or expect unparsed input.

All timeouts or time periods are accepted in milliseconds by default. In some cases, datetime.timedelta can be used as time interchangeably with milliseconds.

All the functions have the client object as the last option. This behavior is implementation-specific. We do not recommend using this feature on your own.

For brevity, in the below examples, we assume that import yt.wrapper as yt has already been performed.

Client and global client

The functions and clauses are available from the yt.wrapper global environment of the module library and can change its global status. For example, they save the current transaction there. By changing yt.config, you're also changing the global configuration. If you want to have an option to work from several independent (that is, differently configured) clients, use the YtClient class. This class provides almost all the functions from the yt.wrapper module: you can call client.run_map, client.read_table_structured, with client.Transaction(), and so on. Note that the YT_PROXY, YT_TOKEN, and other environment variables only set the configuration of the global client. That is, they affect only yt.wrapper.config, but not the configuration of explicitly created YtClient instances.

from yt.wrapper import YtClient
client = YtClient(proxy=<cluster-name>, config={"backend": "rpc"})
print(client.list("/"))

Attention

Note that the library is not thread-safe: in order to work from different threads, you need to create a client instance on each thread.

Command parameters

You can transmit a specified set of parameters to all the requests set up through the client (for example, trace_id). To do this, you can use the special create_client_with_command_params method that enables you to specify an arbitrary set of options to be transmitted to all the API calls.

Thread safety

This tip remains valid if you are using multiprocessing, even if you aren't using threading. One of the reasons is that if you have already set up a request for the cluster in your main process, a connection pool to this cluster has already been initiated within the global client. So, when the processes fork, they're going to use the same sockets when communicating with the cluster. This might result in various issues.

Attention

When using a native driver, use fork + exec (via Popen), avoiding even multiprocessing, because the driver service threads aren't preserved at fork, and the driver becomes incapable of executing requests. You should also avoid overusing clients after forks, because they might have a non-trivial state that poorly survives forks.

Asynchronous client based on gevent

If gevent is present in your code, you can use yt.wrapper in the asynchronous mode. This functionality hasn't been covered by tests yet, but should be operable in simple scenarios. Key points:

  • Before getting started, run monkey patching to replace blocking network calls.
  • Create an independent YtClient inside each greenlet. This is important. If you neglect this point, you will get errors.

You can find an example in this section.

Configuring

The library supports rich configuration options determining its behavior in different scenarios. For example, you can change the path in Cypress where your temporary tables will be created by default: yt.config["remote_temp_tables_directory"] = "//home/my_home_dir/tmp".
To learn about options and their detailed descriptions, see the code.

The library is configured via one of the following methods:

  • Updating the yt.config object: yt.config["proxy"]["url"] = "<cluster_name>".
  • Calling the function yt.update_config: yt.update_config({"proxy": {"url": "<cluster_name>"}}).
  • Setting an environment variable: export YT_CONFIG_PATCHES='{url="<cluster_name>"}';
  • Using the file whose path is stored in the YT_CONFIG_PATH environment variable, the default value is ~/.yt/config. The file should be in the YSON format (using the YT_CONFIG_FORMAT environment variable, you can change this behavior; the YSON and JSON formats are supported). Sample file contents: {proxy={url="<cluster_name>"}}.

You can change some configuration options using environment variables. Such variables are: YT_TOKEN, YT_PROXY, YT_PREFIX. And also options for setting up the logging options (that are separate from the config): YT_LOG_LEVEL and YT_LOG_PATTERN.

When using CLI, you can pass the configuration patch using the --config option.

Note that the library configuration doesn't affect the client configuration: by default, a config with the default values will be used when you're creating a client. To pass a config based on environment variables to the client: client = yt.YtClient(..., config=yt.default_config.get_config_from_env()). You can also update an existing config with values from the environment variables using the update_config_from_env(config) function.

Keep an eye on the priority order. When you import the library, the configurations transmitted through YT_CONFIG_PATCHES are applied. This environment variable expects list_fragment: you can pass multiple configurations separated by a semicolon. These patches are applied last-to-first. Then, the values of the options specified by specific environment variables are applied. For example, this can be done using YT_PROXY. Only after that, the configurations explicitly specified in the code (or passed in the --config option) are applied.

When the config is applied, all the dict nodes are merged rather than overwritten.

Setting up a global configuration to work with a cluster in your home directory.

yt.config["prefix"] = "//home/username/"
yt.config["proxy"]["url"] = "cluster-name"

Creating a client that will create more and more new tables with the brotli_3 codec.

client = yt.YtClient(proxy=<cluster-name>, config={"create_table_attributes": {"compression_codec": "brotli_3"}})

Setting up a global client to collect the archives of dependencies in Python operations using a local directory that is different from /tmp. Then creating a client with an additional max_row_weight setting of 128 MB.

my_config = yt.default_config.get_config_from_env()
my_config["local_temp_directory"] = "/home/ignat/tmp"
my_config["table_writer"] = {"max_row_weight": 128 * 1024 * 1024}
client = yt.YtClient(config=my_config)

Shared config

Profiles

In the configuration file, you can specify multiple profiles for working with YTsaurus. A profile is a set of settings with a common name. Profiles can be hepful:

  1. In different usage scenarios of a single cluster (e.g., you can have a profile for lengthy processes and a profile for quick one-off operations).
  2. To specify default settings for working with different clusters.
  3. To give a short and fitting name to a cluster.

To use profiles, enable the second format version: config_version=2.

A configuration file in the second version has the following structure:

  • profiles: A dict where a key is a profile name and a value is a profile configuration in standard format.
  • default_profile: A string that contains the name of the default profile.
  • config_version: A number, version of the configuration file format. Profiles are supported only in the second version. By default, the first version is used.

Example of a configuration with profiles in YSON format:

{
    "default_profile"="dev";
    "profiles"={
        "main"={
            "token_path"="/etc/token";
            "proxy"={
                "url"="cluster-url";
            };
            "prefix"="//main";
        };
        "dev"={
            "token_path"="/etc/token-dev";
            "proxy"={
                "url"="cluster-url";
            };
            "prefix"="//dev";
            "pool"="dev";
        };
        "external"={
            "token_path"="/etc/token-external";
            "proxy"={
                "url"="external-url";
            };
        };
    };
    "config_version"=2;
}

You can specify a profile in several ways:

  1. Via the YT_CONFIG_PROFILE environment variable.
  2. In the python sdk client = yt.YtClient(..., config=yt.default_config.get_config_from_env(profile="my_profile")).
  3. The default_profile key value is used by default.

Logging setup

This is how logging works in ytsaurus-client and all the tools that use this library. There is a special logger implemented in the yt.logger module both as a global LOGGER variable and as aliases that enable logging at the module level.

To change the logging settings, update the LOGGER.

The initial logger setup (when loading the module) is regulated by the YT_LOG_LEVEL, YT_LOG_PATTERN, and YT_LOG_PATH environment variables.
The YT_LOG_LEVEL variable regulates the logging level and accepts one of the following values: DEBUG, INFO, WARNING, ERROR. The YT_LOG_PATTERN variable regulates the formatting of log messages and accepts the logger's format string. For more information, see the Python documentation. You can use the YT_LOG_PATH environment variable to specify the path to the log file.

By default, the logging level is INFO, and log messages are written to stderr.

Token setup

You can retrieve the token from the following places (listed in priority order).

  1. From the config["token"]option that gets to the global configuration from the YT_TOKEN environment variable.
  2. From the file specified in config["token_path"]; the default value of this option is ~/.yt/token. This option can also be overridden using the YT_TOKEN_PATH environment variable (the environment variable only applies to the global client configuration; for details, see the section).

Setting up configuration retries

Commands in YTsaurus are classified into light and heavy (to get the system's view on the list of commands, see Commands).

Light commands are commands like create, remove, set, list, get, ping, and other similar ones. Before each request, the client also accesses a special proxy to get a list of commands that it supports. We recommend retrying such commands when errors or timeouts occur. You can set up the retry parameters in the proxy/retries configuration section.

There are two categories of heavy requests.

  • Reading from and writing to tables or files (that is, read_table, write_table, read_blob_table, and other requests). The retries for such commands are set up in the read_retries and write_retries sections (for read requests and write requests, respectively).
  • Reading from and writing to dynamic tables (that is, with heavy queries, such a select_rows, lookup_rows, insert_rows, and others). For such commands, the retries are set up individually in the dynamic_table_retries section (the regular options read_retries and write_retries won't work because when reading static tables, 60-second intervals between retries are considered normal, but that's inappropriate for dynamic tables where low latency is critical).

The above-mentioned retries affect the processes of data uploads or exports from the cluster in the case of network faults, chunk unavailability, and other issues. Below is a description of actions that are initiated inside the cluster (running a MapReduce operation or a batch request).

  1. The start_operation_retries section regulates the retries of the operation start command, that is, instead of dealing with network issues, here we handle the errors like "concurrent operation limit exceeded," when many operations are running and the scheduler won't start new ones. In case of such errors, the client makes retries with big sleep intervals to allow enough time for some operations to complete.
  2. The concatenate_retries section regulates the retries of the concatenate command (see API). This isn't a light command because it may access different master cells and spend a long time there. That's why you can't use the retry settings for light commands in this case.
  3. The batch_requests_retries section regulates the retries made from inside a batch request (see the description of the execute_batch) command. The client retries the requests that failed with such errors as the "request rate limit exceeded". That is, the client sends a batch of requests, of which some have completed and some have failed with the "request queue size per user exceeded" errors. In this case, the requests are delivered again with a new batch. This section regulates the policy of such retries.

Errors

YtError: The parent class of all the errors in the library. It has the following fields:

  • code (int type): HTTP error code (see the section). If omitted, it's equal to 1.
  • inner_errors (the list type): Errors that preceded the given error when executing the request (the proxy accessed the driver, an error occurred inside the driver: this error will be added to inner_errors).
  • attributes (the dict type): Attributes of a generic error (for example, the request date).

The following errors describe more specific issues.

YtOperationFailedError: The operation has failed. This error stores the following information in the attributes field.

  • id: Operation ID.
  • url: Operation URL.
  • stderrs: List of dicts with details about the jobs that failed or jobs with stderr. This dict has the host field; it can also have the stderr and error fields, depending on whether the job had a stderr or had failed.
  • state: Operation status (for example, failed).

To print the stderr output when an operation fails, you need to process the exception and explicitly print error messages. Otherwise, you will see them truncated in backtrace. You can use the non-public decorator add_failed_operation_stderrs_to_error_message (though it might be renamed in the future) that intercepts the YtOperationFailedError exception and enriches its message about the stderr error.

YtResponseError: The command (that is, the request to YTsaurus) has failed. This class has the error field that stores the structured response describing the error cause.
It provides the following useful methods:

Other subclasses of YtError:

You can look up a full list of subclasses in the code.

Formats

To learn more about the formats, see the section.

There is a separate class for each format:

In the constructor, these classes accept the parameters that are specific for the given formats. If the format option isn't explicitly available in the constructor, you can pass it in the attributes option that accepts a dict.

Moreover, each class has a set of methods for serialization or deserialization of records from the stream:

  • load_row: Reads a single row from the stream. Some formats (for example, Yson) do not support loading a single record. That's why, when you use load_rows on them, they will throw an exception.
  • load_rows: Reads all the records from the stream, processes the table switches, and returns a record iterator. If raw==True, the method returns rows without parsing them. This method is a generator.
  • dump_row: Writes a single record to the stream.
  • dump_rows: Writes a set of records to the stream. It adds table switches to the output stream.

There is also the create_format function that creates a format based on the specified name and a given set of attributes. The format name can also include attributes in YSON format: you can create a format as follows: create_format("<columns=[columnA;columnB]>schemaful_dsv").

You can use the create_table_switch function to create a record that functions as a switch to the table with the specified index.

The functions read_table, write_table, select_rows, lookup_rows, insert_rows, delete_rows have the raw parameter that enables you to get/send records in an unparsed format (as strings). If you set raw to False, this means that the records will be deserialized to dict at reading and serialized from dict at writing.

Example:

yt.write_table("//home/table", [b"x=value\n"], format=yt.DsvFormat(), raw=True)
assert list(yt.read_table("//home/table", format=yt.DsvFormat(), raw=False)) == [{"x": "value"}]  # Ok

A similar option exists for operations. By default, this operation deserializes records from the format specified at operation startup. If you want the operation to accept unparsed strings as input, use the raw decorator and specify the format when running the operation:

@yt.raw
def mapper(rec):
    ...

yt.run_map(...., format=yt.JsonFormat())

Attention

To use YSON format to work with tables (operations, table read/write), additionally install the package with YSON C++ bindings (see yson bindings). Otherwise, you will see the "Yson bindings required" exception.

Specifics of using JSON format: the JSON module is written in Python, that's why it's very slow. The library tries to use different modules that support bindings written in faster languages such as ujson. To enable it, use the enable_ujson parameter: JsonFormat(..., enable_ujson=True). The ujson package is disabled by default because in certain cases its default behavior is faulty:

import ujson
s = '{"a": 48538100000000000000.0}'
ujson.loads(s)
{u'a': 1.1644611852580897e+19}

If you have read the ujson documentation, and you are sure you that your input data will be deserialized correctly, then parse using this module.

YPath

Paths in YTsaurus are represented by YPath. In code, this is enabled by the YPath class and its subclasses, TablePath and FilePath. Among the constructors of the last two classes, you can specify relevant YPath attributes, for example, schema, start_index, end_index (for TablePath) and append and executable (for FilePath). To learn more about TablePath, see the section.
Use these classes for working with paths in your code instead of formatting YPath literals manually.

There are other useful functions in YPath modules, here are some of them:

  • ypath_join joins several paths into one path (a counterpart of os.path.join).
  • ypath_split: Returns the (head, tail) pair, where tail is the trailing path component andhead is the remaining path (a counterpart of os.path.split).
  • ypath_dirname: Returns the path without the trailing component (a counterpart of os.path.dirname).
  • escape_ypath_literal: Escapes the string so that it can be used as a YPath component.

Python3 and byte strings

In YTsaurus, all the strings are byte strings. However, in Python, regular strings are Unicode-based. That's why the following feature for structured data (dicts) has been introduced in Python 3. When writing to and reading from tables in raw mode, the library uses binary strings both for input and output.
In non-raw mode, the following logic is enabled.

When read, strings are automatically utf8-encoded, including dict keys (to select a different encoding, use the encoding parameter in the format, see below). If a byte string is read, a special YsonStringProxy object is returned. If you try to treat such an object as a string, you will get a typical error: NotUnicodeError. You can call two functions on such an object: yt.yson.is_unicode and yt.yson.get_bytes. The first function will return False for YsonStringProxy, and the second function will return raw bytes that couldn't be decoded. You can call the same functions on regular strings, and for them is_unicode will return True as expected. When calling get_bytes on regular strings, as the second argument, you can specify the encoding (utf-8 by default) and get s.encode(encoding).

Here's the assumed scenario for mixed, regular, and byte structures when reading tables or running operations:

## if you simply need bytes
b = yt.yson.get_bytes(s)

## if you need to do respond differently to byte strings and regular strings
if yt.yson.is_unicode(s):
    # Process a regular string
else:
    b = yt.yson.get_bytes(s)
    # Process a byte string

When writing data, you can both leave the YsonStringProxy object as it is (it will be automatically converted to a byte string) or return a byte string or Unicode string. Unicode strings will be encoded in UTF-8 (or other encoding).

Note that you can't mix bytes and str in dict keys. When encoding != None, the only way to specify a byte key is to use the yt.yson.make_byte_key function. The reason is that in Python 3, the strings "a" and b"a" are not equal. It is unacceptable for a dict in the format {"a": 1, b"a": 1} to be sent to the system implicitly, converted to a string with two identical a keys.

If needed, you can disable the decoding logic or select another encoding. Use the encoding parameter when creating a format in this case. If the encoding parameter is specified and is None, the library works with the records where all the rows are expected to be binary rows (both keys and values, both on write and on read). If you attempt to serialize a dict that includes Unicode strings with encoding=None, in most cases, you'll see an error.

If the encoding parameter is specified and is not None, the library handles Unicode strings, but expects the specified encoding from all data (instead of UTF-8 used by default).

An example of use with comments is in a separate section.

Batch requests

Low-level API:

  • execute_batch: Accepts a set of request descriptions in the list format and returns a set of results. A simple wrapper for the API command.

High level API:

  • create_batch_client: Creates a batch client. This function is available both on the client side and globally. The batch client has as its methods all the API functions supported in the batch requests.
  • batch_apply: This method enables you to apply a function to a dataset, in batch mode. Returns a list of results.

Example:

client = yt.YtClient("cluster-name")

batch_client = client.create_batch_client()
list_rsp = batch_client.list("/")
exists_rsp = batch_client.exists("/")
batch_client.commit_batch()

print(list_rsp.get_result())
print(exists_rsp.get_result())

For more information about batch requests, see the tutorial.

Specifics of working with the batch client:

  • The create_batch_client method and the client configuration have the max_batch_size parameter with a default value of 100. When the commit_batch method is called on the client side, the requests are broken down into parts, max_batch_size each, and are executed in a step-by-step manner. This is because of natural restrictions on the request size.

  • All the requests are sent with the client transaction from which the batch-client has been constructed. If the client is used inside a transaction, all its requests are executed in the context of this transaction. The batch client behaves similarly.

  • By default, you can handle all the errors that arise in requests by looking at BatchResponse returned by the batch client methods.

    Example:

    client = yt.YtClient("cluster-name")
    batch_client = client.create_batch_client()
    list_rsp = batch_client.list("//some/node")
    if list_rsp.is_ok():
        # ...
    else:
        error = yt.YtResponseError(list_rsp.get_error())
        if error.is_resolve_error():
            # Handle the situation when the node for which the list has been created doesn't exist
            # ...
    

The create_batch_client method has the raise_errors parameter that you can set to True. Then, if at least one request fails, the YtBatchRequestFailedError exception will be thrown with all the errors. Example:

client = yt.YtClient("cluster-name")
batch_client = client.create_batch_client(raise_errors=True)
list_rsp = batch_client.list("//some/node")
try:
    batch_client.commit_batch()
except yt.YtBatchRequestFailedError as err:
   # Print the error message
   print err.inner_errors[0]["message"]  # "Error resolving path //some/node"

Commands

The yt library allows running the commands in the system using the Python API. The public part of the library includes only the methods that are in yt/wrapper/__init__.py and yt/yson/__init__.py.

Some command options are shared by command classes. For more information, see Commands.

Working with Cypress

  • get: Get the value of the Cypress node. Read more.

  • set: Write a value to the Cypress node. Read more.

  • create: Create an empty Cypress node of the type and with attributes. Read more.

  • exists: Check whether the Cypress node exists. Read more.

  • remove: Delete the Cypress node. Read more.

  • list: Get a list of children for the path node. The absolute option enables the output of absolute paths instead of relative paths. Read more.

Examples:

yt.create("table", "//home/table", attributes={"mykey": "myvalue"}) # Output: <id of the created object>
yt.get("//home/table/@mykey")  # Output: "myvalue"
yt.create("map_node", "//home/dir") # Output: <id of the created object>
yt.exists("//home/dir")  # Output: True

yt.set("//home/string_node", "abcde")
yt.get("//home/string_node")  # Output: "abcde"
yt.get("//home/string_node", format="json")  # Output: '"abcde"'

yt.set("//home/string_node/@mykey", "value")  # Set the attribute
yt.get("//home/string_node/@mykey")  # Output: "value"

## Create a document and use it as a dictonary
yt.create("document", "//home/doc")
yt.set("//home/doc", {"number": 7, "vegetable": "tomato"})
yt.get("//home/doc")  # Output: [7L, "tomato"]
yt.set("//home/doc/vegetable", "cabbage")
yt.get("//home/doc")  # Output: [7L, "cabbage"]

## Create a document and append number and a string to it
yt.create("document", "//home/doc-list")
yt.set("//home/doc-list", [])
yt.set("//home/doc-list/end", 7)
yt.set("//home/doc-list/end", "cabbage")
yt.get("//home/doc-list")  # Output: [7L, "cabbage"]
  • copy and move: Copy/move the Cypress node. To learn more about the option value, see section.

  • link: create a symlink to a Cypress node. Read more.
    To learn where the symlink points, read the value from the @path attribute. To access a link object, add & at the end of the path.

Examples:

yt.create("table", "//home/table")
yt.copy("//home/table", "//tmp/test/path/table")
## error
yt.copy("//home/table", "//tmp/test/path/table", recursive=True)
yt.get("//home/table/@account")
## Output: sys
yt.get("//tmp/test/path/table/@account")
## Output: tmp

yt.move("//home/table", "//tmp/test/path/table")
## error
yt.move("//home/table", "//tmp/test/path/table", force=True)
yt.get("//tmp/test/path/table/@account")
## Output: sys

yt.link("//tmp/test/path/table", "//home/table")
yt.get("//home/table/@path")
##Output: "/tmp/test/path/table"

An alias function for creating a directory.

  • mkdir: Creates a directory, that is, a node of the map_node type.
Alias functions for working with attributes

Note that these functions do not support access to nested attributes by design.
To access nested attributes, use regular Cypress verbs and navigation using YPath.

Merging files/tables:

Other commands:

  • find_free_subpath: Searches a free node whose path begins with path.

  • search: Recursively traverses a subtree growing from the root node. By default, it outputs the absolute paths of all the nodes of the subtree. There's also a number of filters that allow you to select specific records. The attributes option specifies a list of attributes that must be retrieved with each node. The retrieved attributes are available in the .attributes field on the returned paths.

    Example:

    for table in yt.search(
        "//home",
        node_type=["table"],
        object_filter=lambda obj: obj.attributes.get("account") == "dev",
        attributes=["account"],
    ):
        print(table)
    

Working with files

For more information about files in Cypress, see the section.

  • read_file

    Read a file from Cypress to a local machine. Returns the ResponseStream object, which is a line iterator that has the following additional methods:

    • read: Read length bytes from the stream. If length==None, read the data to the end of the stream.
    • readline: Read a line (including "\n").
    • chunk_iter: An iterator by response chunks.

    The command supports retries (enabled by default). To enable/disable or increase the number of retries, use the read_retries configuration option (see read_table). To enable read retries, use the YT_RETRY_READ=1 variable.

  • write_file
    Write a file to Cypress. The command accepts a stream from which it reads data. The command supports retries. To set up retries, use the write_retries configuration option (more in write_table). There is the is_stream_compressed parameter that says that the stream data has already been compressed, and you can transmit it without compressing.

  • Files can be transmitted as arguments of an operation. In that case, they are written to the root of the directory where your jobs will be run. For more information, see the section and the example.

  • get_file_from_cache
    Returns a path to the cached file based on the specified md5 sum.

  • put_file_to_cache
    Upload the file existing at the given Cypress path to the cache. Note that the file should be uploaded to Cypress with a special option that enables md5 calculation.

Working with tables

For more information about tables, see Static tables.

Data classes

The main method used to represent table rows is to use classes with fields annotated by types (similar to dataclasses). With this approach, you can effectively (de)serialize data, avoid errors, and work with complex types (structures, lists, and others) more conveniently. To define a data class, use the yt_dataclass decorator. For example:

@yt.yt_dataclass
class Row:
    id: int
    name: str
    robot: bool = False

The field type comes after the colon. This might be a regular Python type or a type from the typing module or a special type such as OtherColumns. For more information, see Data classes. You can create objects of this class as follows: row = Row(id=123, name="foo"). For all the fields without default values (such as robot: bool = False), you need to pass relevant fields to the constructor. Otherwise, it will throw an exception.

The data classes support inheritance. For more information, see Data classes. See also the example.

Schemas

Each table in YTsaurus has a schema. The Python  API has a dedicated TableSchema class. The primary way to create a schema is from a data class: schema = TableSchema.from_row_type(Row). In particular, this happens automatically when writing a table. However, sometimes you need to build a schema manually. To do this easily, use a builder interface. For example:

import yt.type_info.typing as ti

schema = yt.schema.TableSchema() \
    .add_column("id", ti.Int64, sort_order="ascending") \
    .add_column("name", ti.Utf8) \
    .add_column("clicks", ti.List[
        ti.Struct[
            "url": ti.String,
            "ts": ti.Optional[ti.Timestamp],
        ]
    ])

The column type should be a type from the type_info library.
Composite types (Optional, List, Struct, Tuple, etc.) are set up using square brackets.

You can specify a schema when creating or writing to (an empty) table (in the schema attribute of the TablePath class). To get a table schema, use:

schema = TableSchema.from_yson_type(yt.get("//path/to/table/@schema"))

TablePath

All the commands used with tables (including operations), are not only accepting string as the input and output tables, but also the TablePath class (where appropriate). This class represents a path to a table with certain modifiers (a wrapper on YPath for tables). Its constructor accepts:

  • name: Path to a table in Cypress.
  • append: Append records to the table instead of overwriting it.
  • sorted_by: Set of columns by which the table should be sorted when written to.
  • columns: List of selected columns.
  • lower_key, upper_key, exact_key: Lower/upper/exact read boundary defined by a key. Used with sorted tables only.
  • start_index, end_index, exact_index: Lower/upper/exact read boundary defined by row indexes.
  • ranges: Specify an arbitrary set of ranges to be read.
  • schema: A table schema; it makes sense when creating a table or writing to an empty or non-existing table.
  • attributes: Set any additional attributes.

Ranges are semi-intervals (that is, they do not include the upper boundary). Note that some modifiers make sense only when you read data from a table (all the attributes related to ranges or columns), and some modifiers can only be used when writing to tables (append, sorted_by). As name, you can pass a string with ypath modifiers and ypath attributes, they will be read correctly and put to the attributes field. In the TablePath object, the attributes field is both readable and writable.

Example:

@yt.yt_dataclass
class Row:
    x: str

table = "//tmp/some-table"
yt.write_table_structured(table, Row, [Row(x="a"), Row(x="c"), Row(x="b")])
yt.run_sort(table, sort_by=["x"])
ranged_path = yt.TablePath(table, lower_key="b")
list(yt.read_table_structured(ranged_path, Row))
## Output: [Row(x='b'), Row(x='c')]

Commands

  • create_temp_table

    Creates a temporary table in the path directory with the prefix. If path is omitted, the directory will be taken from the config: config["remote_temp_tables_directory"]. For convenience, there's a wrapper that supports with_statement and accepts the same parameters as it.
    Example:

    with yt.TempTable("//home/user") as table1:
        with yt.TempTable("//home/user", "my") as table2:
            yt.write_table_structured(table1, Row, [Row(x=1)])
            yt.run_map(..., table1, table2, ...)
    
  • write_table_structured

    It writes the rows of row_type (it must be a yt_dataclass) from input_stream to the table.
    If the table is missing, first it is created together with a schema. The command supports retries. You can set up retries using the write_retries config option.

    Attention

    Writing with retries consumes more memory than regular writing because the write operation buffers the rows written into chunks before writing
    (if a chunk fails to be written, a retry occurs). The default size of each chunk is 520 MB (see the configuration option).

    With the table_writer options, you can specify a number of system write parameters. To write raw or compressed data, use the write_table function.

    Example:

    @yt.yt_dataclass
    class Row:
        id: str
        ts: int
    
    yt.write_table_structured("//path/to/table", Row, [Row(id="a", ts=10)])
    

    When writing to an empty or non-existing table, the schema is created automatically.
    In more complex cases, you might need to build the schema manually. For more information, see this section and the example.

  • read_table_structured

    Read the table as a sequence of rows of row_type (it must belong to yt_dataclass).
    The command supports retries (enabled by default). You can set up retries using the read_retries configuration option.
    The table_reader (dict) option enables you to specify a number of system read parameters.
    The unordered (bool) option enables you to request unordered reading. In that case, the data might be read faster, but the read order isn't guaranteed.
    You can use the response_parameters (dict) option to pass a dict. This dict will be appended with special read command parameters (currently, there are two such parameters: start_row_index and approximate_row_count).

    The iterator returned supports the .with_context() method that returns an iterator on the (row, ctx) pairs. The second item enables you to get the indexes of the current row and range using the ctx.get_row_index() and ctx.get_range_index() methods (a similar iterator inside the job also enables you to get the table index: ctx.get_table_index()). See examples in the tutorial showing the context in regular reading and inside operations.

    A few more words about reading with retries: in the event of retries, a transaction is created in the current context and a snapshot lock is taken on the table. The lock holds until the whole data stream is read or .close() is called on the stream or iterator. This behavior can result in different errors. For example, the following code won't work: because of the nested read transaction you won't be able to commit the transaction that has been explicitly created within the code (the nested read transaction hasn't been completed because read_table_structured created an iterator that hasn't been used).

    with Transaction():
        write_table_structured(table, Row, rows)
        read_table_structured(table, Row)
    

Examples:

@yt.yt_dataclass
class Row:
    id: str
    ts: int

yt.write_table_structured("//path/to/table", Row, [Row(id="a", ts=1)])
assert list(yt.read_table_structured("//path/to/table", Row) == [Row(id="a", ts=1)])

ranges = [
    {"lower_limit": {"key": ["a"]}, "upper_limit": {"key": ["b"]}},
    {"lower_limit": {"key": ["x"]}, "upper_limit": {"key": ["y"]}},
]
path = yt.TablePath("//other/table", columns=["time", "value"], ranges=ranges)
rows = yt.read_table_structured(path, Row)
for row, ctx in rows.with_context():
    # ctx.get_row_index() – index of row in table.
    # ctx.get_range_index() – index of range from requested ranges.
    # ...

Alias functions for working with tables:

  • row_count: Returns the number of records in the table.
  • is_empty: Checks whether the table is empty.
  • is_sorted: Checks whether the table is sorted.

Examples:

yt.write_table_structured("//home/table", Row, [Row(id="a", ts=2), Row(id="b", ts=3)])

sum = 0
for row in yt.read_table_structured("//home/table", Row):
    sum += row.ts
print(sum)  # Output: 5

yt.is_empty("//home/table")  # Output: False
yt.row_count("//home/table")  # Output: 2
yt.is_sorted("//home/table") # Output: False
  • write_table
    A non-typed analog of write_table_structured, should be avoided.

    Info

    Writes rows from input_stream to the table.
    If the table is missing, it is created first. You can set up retries using the write_retries config option.
    Learn the example in the dedicated section.

    Attention

    Writing with retries consumes more memory than regular writing because the write operation buffers the rows written into chunks before writing
    (if a chunk fails to be written, then a retry is made). The default size of each chunk is 520 MB (see the configuration option).

    With the table_writer options, you can specify a number of system write parameters. There is also the is_stream_compressed parameter that says that the stream data has already been compressed, and you can transmit them without prior compression. Keep in mind that when transmitting compressed data, you need to specify Content-Encoding using the configuration: config["proxy"]["content_encoding"] = "gzip", as well as set the raw=True option. The raw option regulates in which format the data is expected. If raw=False, an iterator by Python structures is expected. If raw=True, then a string is expected (or a string iterator) or a stream with the data in the format.

    Examples:

    yt.write_table("//path/to/table", ({"a": i, "b": i * i} for i in xrange(100)))
    yt.write_table("//path/to/table", open("my_file.json"), format="json", raw=True)
    yt.write_table("//path/to/table", "a=1\na=2\n", format="dsv", raw=True)
    

    To write a table with a schema, you need to create it separately first.

      schema = [
          {"name": "id", "type": "string"},
          {"name": "timestamp", "type": "int64"},
          {"name": "some_json_info", "type": "any"},
      ]
      yt.create("table", "//path/to/table", attributes={"schema": schema})
      yt.write_table("//path/to/table", rows)
    
  • read_table
    A non-typed analog of read_table_structured, should be avoided.

    Info

    Reads the table using the specified format. The value returned depends on the raw option. If raw=False (default value), an iterator by a list of records is returned. One record is a dict or Record (in the case of the yamr format). If raw=True, a stream-like object is returned from which you can read data in the format.
    The command supports retries (enabled by default). You can set up retries using the read_retries configuration option.

    When retries are enabled, reading is slower because of the need to parse the stream as you need to count the number of previously read rows.

    The table_reader (dict) option enables you to specify a number of system read parameters.

    With the control_attributes (dict) option, you can request a number of control attributes when reading data.

    The unordered (bool) option enables you to request unordered reading. In that case, the data might be read faster, but the read order isn't guaranteed.

    The response_parameters (dict) option enables you to send a dict to it. This dict will be appended by special read command parameters (in the current implementation, the two parameters are: start_row_index and approximate_row_count).

    See the example in the dedicated section.

    A few more words about reading with retries: in this case, a transaction is created in the current context and a snapshot lock is taken on the table. This lock holds until you've read the entire data stream or call .close() on the stream or the iterator returned. Such a behavior can result in errors. For example, the following code won't work: because of the nested read transaction, you won't be able to commit the transaction explicitly created within the code (the nested read transaction hasn't been completed because read_table created an iterator that hasn't been used).

    with Transaction():
        write_table(table, rows)
        read_table(table, format="yson")
    

    Attention

    Reading with retries usually works slower than without them because you need to parse a stream of records.

Parallel reading of tables and files

The table is broken down into smaller ranges, assuming that the data is evenly distributed across rows. Each range is considered a separate stream. When you enable retries, the entire range, rather than individual table rows, will be retried. This approach enables you to skip data parsing and streamline the reading process.
The files are simply split into chunks of the specified size and read in parallel.

The configuration of parallel reading are stored in the read_parallel configuration section that includes the following keys:

  • enable: Enable parallel reading.
  • max_thread_count: Maximum number of threads.
  • data_size_per_thread: Amount of data loaded by each data thread.

Specifics of parallel reading of data:

  • Requesting control attributes is not supported.
  • Table rows with limits set by keys aren't supported.
  • Parallel reading might be ineffective on tables that substantially vary in row sizes.

Acceleration example:

$ time yt read-table //sys/scheduler/event_log.2[:#1000000] --proxy cluster-name --format yson > /dev/null

real    1m46.608s
user    1m39.228s
sys     0m4.216s

$ time yt read-table //sys/scheduler/event_log.2[:#1000000] --proxy cluster-name --format yson --config "{read_parallel={enable=%true;max_thread_count=50;}}" > /dev/null

real    0m14.463s
user    0m12.312s
sys     0m4.304s
Measuring the speed
$ export YT_PROXY=cluster-name
$ yt read //sys/scheduler/event_log.2[:#20000000] --format json --config "{read_parallel={enable=%true;max_thread_count=50;}}" > scheduler_log_json
$ yt read //sys/scheduler/event_log.2[:#20000000] --format yson --config "{read_parallel={enable=%true;max_thread_count=50;}}" > scheduler_log_yson

$ ls -lah
total 153G
drwxrwxr-x 2 user group 4.0K Sep 29 21:23 .
drwxrwxr-x 7 user group 4.0K Sep 28 15:20 ..
-rw-r--r-- 1 user group  51G Sep 29 21:25 scheduler_log_json
-rw-r--r-- 1 user group  51G Sep 29 21:22 scheduler_log_yson
-rw-r--r-- 1 user group  51G Sep 29 21:20 test_file

$ time cat scheduler_log_yson | yt write //tmp/test_yson --format yson
real    36m51.653s
user    31m34.416s
sys     1m5.256s

$ time cat scheduler_log_json | yt write //tmp/test_json --format json
real    88m38.745s
user    21m7.188s
sys     1m1.400s

$ time cat test_file | yt upload //tmp/test_file
real    35m50.723s
user    17m31.232s
sys     1m39.132s

$ time cat scheduler_log_yson | yt write //tmp/test_yson --format yson --config "{write_parallel={enable=%true;max_thread_count=30;}}"
real    13m37.545s
user    37m20.516s
sys     4m16.436s

$ time cat scheduler_log_json | yt write //tmp/test_json --format json --config "{write_parallel={enable=%true;max_thread_count=30;}}"
real    3m53.308s
user    23m21.152s
sys     2m57.400s

$ time cat test_file | yt upload //tmp/test_file --config "{write_parallel={enable=%true;max_thread_count=30;}}"
real    1m49.368s
user    18m30.904s
sys     1m40.660s

Parallel writing of tables and files

To use this option, do the following:

  1. Set zlib_fork_safe if you run python2.
  2. Enable the following option in the config file: config["write_parallel"]["enable"] = True.

After that, the standard commands write_table and write_file will work in multithreading mode.

Now, the client configuration includes a new write_parallel section with the following keys:

  • enable (False by default) enables the option to write to tables and files in YTsaurus in parallel.
  • max_thread_count (10 by default): The maximum number of threads the data is written to in parallel.
  • unordered (False by default): Enables you to write rows to a table in an arbitrary order, streamlining the writing process.
  • concatenate_size (20 by default): a limit on the number of tables or files that the concatenate command receives as input.

How it works

The entire process of parallel writing looks like this:

  1. The input thread is split into rows, and the rows are grouped into chunks whose size is regulated by the /write_retries/chunk_size configuration option (the default value is 512 MB).
  2. The chunks are delivered to ThreadPool.
  3. The threads receive a group of rows to be written as input, compress this data, and upload it to to a temporary table or file on the server.
  4. The main thread accumulates temporary tables and files that have already been written to.
  5. As soon as the amount of written tables or files becomes equal to /write_parallel/concatenate_size, the data is merged.

Limitations

  • Parallel writing is unavailable when a thread of compressed data is delivered as input.
  • If the path to the output table has the sorted_by attribute, parallel writing will also be unavailable.
  • Parallel writing won't be effective in Python2 unless you install zlib_fork_safe (this is because of GIL).
  1. Parallel writing won't be effective if the parser used for your format is slow (for example, in the event of schemaful_dsv).
  2. Because the input thread is split into chunks for parallel uploading to the cluster, the custom script will consume the memory proportionally to max_thread_count * chunk_size (in practice, the multiplier is about 2).

Q: Why is multithreaded writing many times faster for JSON vs. YSON, but is the other way around for singlethreaded writing?
A: There are two reasons:
You need to break down the input thread into rows. In JSON, you can do this easily by splitting by \n. Doing this in YSON requires much more effort. As this operation is single-threaded, it becomes a weak spot and locks the entire writing process.

Working with transactions and locks

  • Transaction: Single-thread wrapper class for creating, committing, or aborting transactions. It supports the syntax of context manager (the with statement), that is, if the transaction exits the scope successfully, the transaction is committed; otherwise, it is aborted. All the commands in the scope run within the specified transaction. You can create nested scopes. The ping parameter (the default value is True) in the builder is responsible for running a pinging thread. If there is no ping, the operation will be forcibly aborted on timeout expiry.

Examples:

with yt.Transaction():
    yt.write_table_structured(table, [Row(x=5)])  # the data will be written within the transaction
    # Once it exits the with, the transaction is committed.

with yt.Transaction():
    yt.write_table_structured(table, [Row(x=6)])
    raise RuntimeError("Something went wrong")
    # The exception will abort the transaction and no one will see the changes

## Nested transactions
with yt.Transaction():
    yt.lock("//home/table", waitable=True, wait_for=60000)
    with yt.Transaction():
        yt.set("//home/table/@attr", "value")

If the pinging thread fails when trying to ping the transaction, it calls thread.interrupt_main(). You can change this behavior using the option config["ping_failed_mode"].
Available options:

  1. pass: Do nothing
  2. call_function: Calls the function specified in the ping_failed_function field of the config
  3. interrupt_main: Throw the KeyboardInterrupt exception in the main thread
  4. send_signal: Send the SIGUSR1 signal to the process.
  5. terminate_process: Terminate the process.

You can also manage transactions at a lower level (unlike with the Transaction context manager, the commands won't automatically be called under the given transaction).

These functions produce the YtResponseError exception with .is_resolve_error() == True when the transaction is not found.

  • lock: Acquires a lock on the specified node within the current transaction. In the event of a waitable lock and the specified wait_for, it waits for the lock to be taken within wait_for milliseconds. If the lock isn't taken within this time, it returns an exception.

  • unlock: Removes all the explicit locks on the node acquired by the current transaction. This includes already acquired locks as well as enqueued locks. If there are no locks, it has no effect. When the unlocking is impossible (because the locked node version includes changes compared to the original version), it returns an exception.

    Note

    When the transaction is completed (either successfully or not), all the locks taken by it are released. This means that you only have to unlock when you need to release a node without completing the transaction.

Running operations

To learn about the operations available for table data, see the section.

All the functions that run operations start with the run_ prefix. These functions often have numerous parameters. Next, we will describe the parameters that are shared by all the operation run functions, as well as the parameter selection logic. For a full list of parameters for each specific function, see pydoc.

Operation run commands

SpecBuilder

We recommend using spec builders to specify the operation's runtime parameters.

The following classes are provided for populating the operation specifications:

The names of the methods correspond to option names in the specification listed here.

The builders used to populate the specification for an operation with custom jobs provide the begin_mapper, begin_reducer, and begin_reduce_combiner methods. Similarly, there are the begin_job_io, begin_map_job_io, begin_sort_job_io, begin_reduce_job_io, begin_partition_job_io, begin_merge_job_io methods in the appropriate spec builders.

Example:

import yt.wrapper as yt

if __name__ == "__main__":
    spec_builder = yt.spec_builders.MapSpecBuilder() \
        .input_table_paths("//tmp/input_table") \
        .output_table_paths("//tmp/output_table") \
        .begin_mapper() \
            .command("cat") \
            .format(yt.YsonFormat()) \
        .end_mapper()

    yt.run_operation(spec_builder)

Another two types of spec builders are provided for convenience:

  1. Spec builders for filling out I/O parameters:
  2. Spec builders for filling out the specification of a custom job:

see the example in the tutorial.

Other parameters of the operation run commands.

The sync parameter is shared by all the operation run functions (True by default). If sync=True, when the function is called, the operation completion will be awaited synchronously. If sync=False, the Operation object will be returned to the client.

If the relevant parameter is missing in the appropriate spec builder, you can use the spec parameter of the dict type that enables you to explicitly set arbitrary options for the operation specification (the specified parameter has the highest priority when generating a specification). For example, you can specify the desired amount of data per job: yt.run_map(my_mapper, "//tmp/in", "//tmp/out", spec={"data_weight_per_job": 1024 * 1024}) (but in this particular case, it's more appropriate to use a builder method).

All the required specification parameters exist as individual options. Such parameters include the input and output table paths and the command (for operations with the custom code). For the sort, reduce, map_reduce, join_reduce operations, the sort_by, reduce_by, and join_by parameters are specified as individual options.

The source_table and destination_table parameters can accept as their input both a table path and a list of paths to tables, and correctly handle both cases, depending on whether the operation expects a table path or a set of table paths at the given point. Both the strings and TablePath objects are accepted as table paths.

Some of the included parameters have to be modified fairly regularly. This includes job_io and its table_writer part. These parameters will be added to the appropriate parts of the specification. If the operation includes multiple steps, then the parameters specified in job_io will propagate to all the steps. For all the operations that have the job_count parameter in the specification, this parameter is defined as a separate option.

For all the operations that include the custom code, a separate option is created for memory_limit, as well as the format, input_format, and output_format parameters that enable you to specify the input and output data format. There are the yt_files and local_files parameters (and the files alias) enabling you to set the file paths in Cypress and the paths to local files (respectively) that will be passed to the job environment.

For the map_reduce operation, all the parameters of the custom script described in the previous paragraph are copied separately for mapper, reducer, and reduce-combiner.

As the binary, you can pass both the run command (as a string) and a callable Python object. The operations that access callable objects as binary are referred to as Python operations. If a run command is passed as binary, the format parameter is mandatory. However, the format is optional in the case of a callable object.

Any operation writing data to destination_table has the option that skips deleting the existing table but appends data to it. For this purpose, as destination_table, you can specify a TablePath object with append=True.

When running a map_reduce operation with multiple input tables, keep in mind that the YTsaurus client can add table switches to input records. As a result, the switches might end up in the output stream (if you pass this data to the output stream as is, such as a mapper from some yield row). This might result in the error of writing the record to a non-existing table. To avoid this situation, you should either disable the table index using the enable_input_table_index option or manually delete the table index before writing data to the output stream: del row["@table_index"].

For the run_map_reduce function, all the options specific to a given job exist in three copies: for the mapper, reducer, and reduce_combiner. That is, the function has the options map_local_files, reduce_local_files, reduce_combiner_local_files, etc. For a full list of options, see pydoc.

Working with operations and jobs

When running an operation with the sync=False flag, it's more convenient to use the abort, suspend, resume, complete methods from the Operation class than the above methods (see the Operation section).

  • run_job_shell: Run a job-shell for the job. The easiest way to use this function is with its analogous command in CLI.

Getting information about jobs and operations

This operation has a fairly non-trivial life cycle, and at certain points in time the information about the operation can be obtained from various sources:

  1. Cypress (contains information about the running operations that haven't been archived yet)
  2. The Orchid of the controller agent (includes all current information about the running operations)
  3. Operation archive (includes information about completed operations)

Because you can get information about an operation from different sources (and not only the list of sources can change, but also the structure of each source), there exist the following methods that can collect information about an operation from the listed sources anytime.

  • get_operation: Get information about the operation based on its ID. A dict is returned with fields similar to the fields in the get_operation response.
  • list_operations: Get information about a set of operations based on filters. The meaning of the fields is similar to get_operation. For a list of filters, see Commands.
  • iterate_operations: Get an iterator for a set of operations. This function is similar to list_operations, but it doesn't set restrictions on the number of requested operations.

An example is to output the types of the three latest operations run by username:

from datetime import datetime
import yt.wrapper as yt
result = yt.list_operations(
    user="username",
    cursor_time=datetime.utcnow(),
    cursor_direction="past",
    limit=3,
)
print([op["type"] for op in result["operations"]])
## Output: ['sort', 'sort', 'sort']

Another example is to find all the executing operations run in the user ephemeral pools:

for op in client.iterate_operations(state="running"):
    if "$" in op.get("pool"):
        print(op["id"], op["start_time"])

The information about the operation's job is available in the scheduler and in the archive. The following methods enable you to get information about jobs.

  • get_job: Get information about the job. A dict is returned with fields similar to the fields in the get_job response.
  • list_jobs: Get information about the set of jobs for the operation. The meaning of fields in the response is similar to that in get_job. For a list of filters, see Commands.
  • get_job_stderr: Get the stderr of the job.
  • get_job_input: Get the job's full input
  • get_job_input_paths: Get the list of input tables (with row ranges) for the job.
  • get_job_spec: Get the specification of the job.

Attention

When you call functions described in this section from inside themselves, they can access the master servers, the scheduler, and the cluster nodes (note that accesses to the master servers and to the scheduler aren't scalable), so you shouldn't use them too often.

For debugging of failed jobs, it's convenient to use the job tool. This utility enables you to prepare the environment similar to the job environment and run it with the same input data.

Operation

The object of this class is returned by the operation run commands (run_*). It provides a small API to work with the already running or completed operations.

OperationsTracker

To handle several operations CONVENIENTLY, you can use OperationsTracker. You can also use it for working with operations running on different clusters. OperationsTracker has the following interface:

  • add: Add an Operation object to the set of tracked objects

  • add_by_id: Add the operation to the set of tracked operations using its ID

  • wait_all: Wait until all the operations are completed

  • abort_all: Abort all the tracked operations

Example:

with yt.OperationsTracker() as tracker:
    op1 = yt.run_map("sleep 10; cat", "//tmp/table1", "//tmp/table1_out", sync=False)
    op2 = yt.run_map("sleep 10; cat", "//tmp/table2", "//tmp/table2_out", sync=False)
    tracker.add(op1)
    tracker.add(op2)
    tracker.abort_all()
    tracker.add(yt.run_map("true", table, TEST_DIR + "/out", sync=False))

When exiting the with block, the tracker will wait until all the operations are completed. If an exception occurs, all the running operations are aborted.

Attention

Using the tracker without with is deprecated and we don't recommend it.

OperationsTrackerPool

The OperationsTrackerPool class is similar to OperationsTracker, but it additionally guarantees that there won't be more than pool_size operations running in parallel. As the input, the methods of the class accept one or more spec builders (see the appropriate section).
This class creates a background thread that gradually runs all the operations from the queue, maintaining the guarantees for the number of concurrently running operations.
Interface:

  • add: Accepts a spec builder as the input and adds it to the run queue
  • map: Accepts a list of all the spec builders as its input. The other parameters have the same meaning as the add method.

Working with access rights

Examples:

yt.create("user", attributes={"name": "tester"})
yt.check_permission("tester", "read", "//sys")
## Output: {"action": "allow", ...}
yt.create("group", attributes={"name": "test_group"})
yt.add_member("tester", "test_group")
yt.get_attribute("//sys/groups/testers", "members")
## Output: ["tester"]
yt.remove_member("tester", "test_group")
yt.get_attribute("//sys/groups/testers", "members")
## Output: []

Working with dynamic tables

Dynamic tables implement an interface both for point reads and for key-based data writes with transaction support and a native SQL dialect.

The list of commands to access the content of dynamic tables.

Commands related to mounting of tables.

Commands related to sharding.

Other commands.

Other commands

Converting tables

Transform. Occasionally, you might need to transform your table to a new compression codec, erasure codec, or a new chunk format. Such tasks are solved by the transform function.

This function starts the merge operation on the input table, compressing the data based on the specified parameters. If the check_codecs option is set to True, the function checks whether the data has already been compressed, and if yes, then the operation is not run. If the optimize_for option is specified, the operation always runs (the check_codecs option is ignored). This is because there is no option to check which format table chunks have.

Shuffle. To randomly shuffle the table rows, use the shuffle_table function.
The function runs the map_reduce operation that sorts the table by the added column with a random number.

Python objects as operations

Overview

To run the operation, you need to define a special yt.wrapper.TypedJob subclass and pass the object of this class to the operation run function (or specify it in the applicable field of the SpecBuilder).

Make sure to define, in the job class, the __call__(self, row) method (for the mapper) or __call__(self, rows) method (for the reducer). As input, this method accepts table rows (in the case of a reducer, a single call (__call__) corresponds to a set of rows with the same key). It has to return (by using yield) the rows that need to be written to the output table. If there are multiple output tables, use the wrapper class yt.wrapper.OutputRow, whose constructor accepts the row written and the table_index as a named parameter (see the example in the tutorial).

In addition, you can define the following methods:

  • start(self): Called exactly once before processing the job records.
  • finish(self): Called once after processing the job records.

Just as __call__, these methods can generate new records using yield. This allows you to perform buffering or some data accumulation processes. These methods aren't called for operations that are labeled with @yt.aggregator and receive the entire input.

You can also define the .prepare_operation(self, context, preparer) method. which is used to specify row types for input and output tables, as well as to modify the operation specification. For more information, see the Preparing an operation from a job section and check out the examples:

Preparing an operation from a job

To specify the input and output string types in the job class, you can use the hint type: one, two, three, and four) or override the method .prepare_operation(self, context, preparer). The types are specified using the methods of the preparer object of the OperationPreparer type. Useful methods:

  1. inputs: Enables you to specify the input row type for multiple input tables (it must be a class with the decorator @yt.wrapper.yt_dataclass), a list of names for the columns that the job needs, as well as the renamings for the columns.
  2. outputs: Enables you to specify the output row type for multiple output tables (it must be a class with the decorator @yt.wrapper.yt_dataclass) and the schema that you want to output for these tables (by default, the schema is output from the data class).
  3. input and output are the counterparts of corresponding methods that accept a single index.

The context object enables you to get information about the input and output streams: their number, schemas, and paths to tables.

See the examples in the tutorial: one and two.

If you run MapReduce with multiple intermediate streams, you also need to override the .get_intermediate_stream_count(self) method, returning the number of intermediate streams from it. See example.

Decorators

You can mark functions or job classes with special decorators that change the expected interface of interaction with jobs.

  • aggregator is a decorator that allows you to indicate that a given mapper is an aggregator, that is, it accepts a row iterator as input rather than a single row.
  • raw is a decorator that allows you to specify that the function accepts a stream of raw data as input rather than parsed records.
  • raw_io is a decorator that allows you to indicate that the function will take records (rows) from stdin and write them to stdout.
  • reduce_aggregator is a decorator that enables you to specify that the reducer is an aggregator that accepts a generator of pairs where each pair is (a key, and records with this key) as input, rather than an iterator of records with a single key.
  • with_context is a decorator that enables you to request a context for the function. This context will include control attributes requested when running the operation.

Keep in mind that the decorator is implemented by setting an attribute on a function. That's why, for example, you cannot declare a function with the decorator and then make functools.partial on it. If you want to pass certain parameters to a function directly at function call, it makes sense to create a class with a decorator (see the last example below).

You can find examples in the tutorial.

Pickling functions and environments

General structure

Here is a sequence of actions that occur when you run an operation that is a Python function:

  1. The library uses the dill module to transform the executed object (class or function) into a stream of bytes
  2. The local runtime environment for the function is set up. This environment is transmitted into a job on the cluster and then used to start up your function properly.

After that, a special code from the _py_runner.py module is executed on the cluster. Locally, it unpacks all the dependencies in each job, transforms the function from a set of bytes to a Python object, and starts it up properly, reading data from stdin.

Link to the post with tips

Examples of module filtering
yt.config["pickling"]["module_filter"] = lambda module: hasattr(module, "__file__") and \
    not module.__file__.endswith(".so")
yt.config["pickling"]["force_using_py_instead_of_pyc"] = True

## When working in the client, you can specify it in the config:
client = yt.wrapper.client.Yt(config={"pickling": {"module_filter": lambda ...}})

You can also write your dependency resolution function based on the default implementation from here:

def my_create_modules_archive():
    ...

yt.config["pickling"]["create_modules_archive_function"] = my_create_modules_archive

If you are using binary modules that are dynamically linked with other libraries that are missing in the cluster, it makes sense to add them to the cluster manually (be sure to set the LD_LIBRARY_PATH) environment variable or build dynamic libraries for your Python modules automatically. At each build, an ldd is invoked for each .so and the found dependencies are packed.
Here's how it's enabled:

yt.config["pickling"]["dynamic_libraries"]["enable_auto_collection"] = True

You can also specify a filter if you do not need to add any libraries Example:

yt.config["pickling"]["dynamic_libraries"]["library_filter"] = lambda lib: not lib.startswith("/lib")

By default, we recommend filtering out the libraries that are in the /lib, /lib64 directories because these directories host various system libraries (for example, libcgroup) and using them on the cluster might result in some strange errors. To filter the libraries, you can use the function from the example above.

Boilerplate filters for different cases:

  • If you see the error: AttributeError: 'module' object has no attribute 'openssl_md_meth_names', filter out hashlib:yt.config["pickling"]["module_filter"] = lambda module: "hashlib" not in getattr(module, "__name__", "")

  • If you run Anaconda, you have to filter out hashlib (see the filter example above), as well as the .so libraries. To filter out .so libraries, you can use the following filter:

    yt.config["pickling"]["module_filter"] = (
        lambda module: hasattr(module, "__file__") and
        not module.__file__.endswith(".so")
    )
    

    Attention

    This filter also filters out YSON bindings. If you use YSON, it makes sense to add the library to exceptions:

    yt.config["pickling"]["module_filter"] = (
        lambda module: hasattr(module, "__file__") and
        (not module.__file__.endswith(".so") or module.__file__.endswith("yson_lib.so")
    )
    
  • You execute your program using updated python2.7 and get the following error: ImportError: No module named urllib3 or ImportError: cannot import name _compare_digest, or you can't import the hmac module

    To solve this problem, you need to filter out hmac from the modules that you take with you (it imports from the module the method missing from python2.7.3 installed on the cluster).

    yt.config["pickling"]["module_filter"] = ... and getattr(module, "__name__", "") != "hmac"
    

Automatic filtering of the .pyc and .so files is also supported in cases when the Python version or the OS version differ between the cluster and the client. The option should be enabled in the config first:

yt.config["pickling"]["enable_modules_compatibility_filter"] = True

When running a Python  function in a job, all the modules present in the dependencies are unpacked and imported, including those of your main module. That's why all the business logic needs to be hidden within __main__; a proper implementation should look like this:

class Mapper(yt.TypedJob):
    ...

if __name__ == "__main__":
    yt.run_map(mapper, ...)

Porto layers

When running an operation, you can specify which file system image needs to be prepared before running your jobs.
There is a certain set of ready-to-use layers available at //porto_layers.

One way you can specify a path to the proper layer using the parameter layer_paths in the job spec is:

spec_builder = ReduceSpecBuilder() \
    .begin_reducer() \
        .command(reducer) \
        .layer_paths(["//porto_layers/ubuntu-precise-base.tar.xz"]) \
    .end_reducer() \
    ...
yt.run_operation(spec_builder)

tmpfs in jobs

Supporting tmpfs in jobs includes two parts:

  1. For Python operations, tmpfs is enabled by default: it is mounted to a special tmpfs directory and the module archive is unpacked into it. Additional memory needed for tmpfs is added to the limit specified by the user. The behavior is regulated by the options: pickling/enable_tmpfs_archive and pickling/add_tmpfs_archive_size_to_memory_limit.
  2. There is an option to automatically enable tmpfs for all the job's files: this option is called mount_sandbox_in_tmpfs/enable and is disabled by default. If you enable this option, the specs for your operations will specify tmpfs_path="." and also set tmpfs_size equal to the total file size. tmpfs_size will also be added to memory_limit. Node that if you're using table files, the system can't find out the size of the disk after formatting, so you need to specify the size in the disk_size attribute of the path. You can also request additional tmpfs space if your job generates some files at runtime. For this, specify the appropriate number of bytes in the mount_sandbox_in_tmpfs/additional_tmpfs_size option.

Statistics in jobs

As the job is running, the user can export their own statistics (for example, measure the execution time for certain steps in the job). The library provides the following functions:

  • write_statistics: Writes a dict with the collected statistics to an appropriate file descriptor. The function must be called from within the job.

The Operation class also has the get_job_statistics method for quick access to the operation statistics.

Example:

class WriteStatistics(yt.wrapper.TypedJob):
    def __call__(self, row: Row) -> Row:
        yt.write_statistics({"row_count": 1})
        yield row

yt.write_table_structured(table, [Row(x=1)])
op = yt.run_map(WriteStatistics(), table, table, sync=False)
op.wait()
print(op.get_job_statistics()["custom"])
## Output: {"row_count": {"$": {"completed": {"map": {"count": 1, "max": 1, "sum": 1, "min": 1}}}}}

Untyped Python operations

Non-typed jobs are also supported aside from typed jobs.
Instead of the TypedJob class, a Python function or any callable object is used as a custom job. The function accepts a record (in case of the mapper) and the key plus the record iterator (in case of the reducer). In the latter case, the key is a dict-like object where only key columns are populated. The function must be a generator that uses yield to generate records that will be written to the output table.

Example of a filtering mapper:

def mapper(row):
    if row.get("type") == "job_started":
        yield row

Example of a reducer summing across the value column:

def reducer(key, rows):
    row = dict(key.iteritems())
    row["sum"] = sum((row["value"] for row in rows))
    yield row

Decorators

In the non-typed API, the meaning of some decorators slightly changes compared to the typed API.

  • reduce_aggregator: Instead of accepting a single pair (key, and records), the reducer will accept a pair iterator where each pair is (key, and records with this key).

Keep in mind that the decorator is implemented by setting an attribute on a function. That's why, for example, you cannot declare a function with the decorator and then make functools.partial on it. If you want to pass certain parameters to a function directly at function call, it makes sense to create a class with a decorator (see the last example below).

You can find examples in the tutorial.

Formats

By default, you do not have to specify formats when running Python operations.

YTsaurus stores structured data, and there's no predefined textual representation for this data: all the jobs run in streaming mode, and the user needs to explicitly specify the format in which its script expects to see the data in the input stream. By contrast, when accessing data from a Python function, you also get the data in the structured format. You can establish a mapping between the structured data format in YTsaurus and in Python. In this case, the intermediate format used to transmit data to the job isn't as critical.

Keep in mind that there is no perfect one-to-one correspondence between the structured data in YTsaurus and dicts in Python. That's why, in this case, there are some peculiarities (see below).

Structured data representation

The following key peculiarities exist in representing structured data in Python:

  1. Objects in table records in YTsaurus can have attributes (this only applies to columns of the "any" type). To represent them, the library provides special types that are inherited from standard Python  types, but additionally they also have the attributes field. Because creating custom objects is an expensive procedure, by default, such an object is created only if it has attributes. To regulate this behavior, there is the option: always_create_attributes. YSON types are compared as follows: first the values of the elementary types, then the attributes. If the attributes aren't equal (for example, if one object has attributes, and another object doesn't), the objects are considered non-equal. This should be kept in mind when comparing with elementary types in Python: to avoid dependence on the presence of object attributes, you should explicitly convert the object to the elementary type.

    Explanatory example:

    import yt.yson as yson
    s = yson.YsonString(b"a")
    s == b"a" # True
    s.attributes["b"] = "c"
    s == b"a" # False
    
    bytes(s) == b"a" # True
    
    other_s = yson.YsonString(b"a")
    other_s == s # False
    
    other_s.attributes["b"] = "c"
    other_s == s # True
    
  2. The YSON format uses two integer types: int64 and uint64. On the other hand, from the data model viewpoint, Python has a single data type with no limits imposed. For this reason, when reading data, a non-signed type is represented as YsonUint64, while a signed type is represented as a regular int type. When writing an int, the behavior is automatic: numbers in the range [-2^63, 2^63) are represented as signed, while numbers in the range [2^63, 2^64) are represented as non-signed. However, you can always specify the type explicitly by creating an explicit Yson object.

  3. Unicode strings. As all the strings are byte strings in YTsaurus, the Python's  Unicode strings are UTF-8-encoded as byte strings. When reading data in Python 3, an attempt is made to decode byte strings using a utf-8 decoder. If this doesn't work, a special YsonStringProxy object is returned. For more information, see the appropriatesection.

When writing data, YsonFormat correctly distinguishes between Python types and YSON types.

Control attributes

Apart from a stream of records, when reading data from a table (or from within a job), you can request various control attributes. Working with control attributes is format-dependent: for example, most control attributes can't be represented in formats different from Yamr, JSON, or YSON.

Yamr format in Python API correctly supports parsing of the row_index and table_index attributes that will be represented as the tableIndex and recordIndex fields and Record-type objects.

In JSON format, the control attributes aren't processed in any special way. When requesting them, you have to process the control records within the stream.

YSON format has several modes for automated processing of control attributes. Selection of the mode is controlled by a control_attributes_mode option that can take on the following values (important: for historical reasons, for the option to run correctly, you also need to set process_table_index=None when creating a format. By default, process_table_index is True, which enforces control_attributes_mode=row_fields):

  • iterator(default): When parsing the stream, the format will output a record iterator. To get the control attributes from a job, you need to use the context.
  • row_fields: The requested control attributes will be added as fields to each record. For example, if you request row _index, then each record will have the @row_index field with the ID of this record.
  • none: No special processing will be done for the control attributes: the client will get a stream of records where the entity-type records will have control attributes.

Examples of switching between output tables with table_index at control_attributes_mode equal to iterator and row_fields. Here you can find an example of how to get an index of the current table in the reducer using context.

Other formats

By default, the library serializes data into YSON format because this format provides a proper and unambiguous representation for any data stored in YTsaurus tables. To handle YSON format efficiently in Python, you need the native library. If you don't add this library to the cluster, you will get a runtime error in your job (we didn't add a fallback to the Python's YSON library because it's very slow and inefficient). In this situation, you can switch over to the JSON format.

Most other formats are text-based (that is, numbers have the same format as strings), so you will lose data typing.

If you use the formats different from YSON and Yamr, you will always get a dict of records. For example, there will be no automatic record conversion from JSON to YSON.

Other

gRPC

RPC proxy with gRPC transport isn't supported. If you can't install a binary package with RPC bindings, you can use gRPC on your own.

YSON

Together with the library to work with YTsaurus clusters, we also supply a library for YSON format. The library is available in the yt.yson module and implements the standard load, loads, dump, and dumps functions. In addition, it provides access to Yson types. The library also implements the following generally useful functions:

  • to_yson_type: Creating a YSON type from a Python object.
  • json_to_yson: Recursively converts a Python object from the JSON representation to a YSON representation. Learn more about the nuances of structured data representation in YTsaurus in JSON format.
  • yson_to_json: Recursively converts a Python object from YSON to JSON.

YSON bindings

The yson library has two implementations: written in pure Python, and written as C++ bindings.
The YSON's native parser and writer written in Python are very slow and can only be used with small amounts of data.
Important: For example, you won't be able to run operations or read tables in YSON format

C++ bindings are delivered as Debian and pip packages.

The packages are built as a universal .so library with libcxx compiled into it: that's why they should work in any Debian-based system.

Previous