RPC-прокси

В данном разделе приведены примеры работы с YT через RPC-прокси для разных языков программирования: C++, Java, Python.

С++

Пример на Github

В примере реализована программа, через которую можно работать с динамическими таблицами в интерактивном режиме.

Формат вызова:

$ ./rpc_proxy_sample --config config.yson --user <user> --token $(cat ~/.yt/token) 2> /dev/null

Работа с Кипарисом:

list //sys
get //sys

Чтение данных средствами языка запросов (SelectRows):

select timestamp, host, rack, utc_time, data FROM [//home/dev/autorestart_nodes_copy] LIMIT 10

Запрос строк по ключу (LookupRows):

ulookup //home/dev/autorestart_nodes_copy timestamp;host;rack;utc_time;data <id=0>1486113922563016;<id=1>"s04-sas.cluster-name";<id=2>"SAS2.4.3-13" <id=0>1486113924172063;<id=1>"s04-sas.cluster-name";<id=2>"SAS2.4.3-13" <id=0>1486113992045484;<id=1>"s04-sas.cluster-name";<id=2>"SAS2.4.3-13" <id=0>1486113992591731;<id=1>"s04-sas.cluster-name";<id=2>"SAS2.4.3-13"  <id=0>1486113997734536;<id=1>"n4137-sas.cluster-name";<id=2>"SAS2.4.3-13"

Вставка строк (aka WriteRows):

upsert //home/dev/autorestart_nodes_copy timestamp;host;rack;utc_time;data <id=0>123;<id=1>"host123";<id=2>"rack123";<id=3>"utc_time1";<id=4>"data1" <id=0>567;<id=1>"host567";<id=2>"rack567";<id=3>"utc_time2";<id=4>"data2"

Удаление данных (DeleteRows):

delete //home/dev/autorestart_nodes_copy timestamp;host;rack <id=0>123;<id=1>"host123";<id=2>"rack123"

Пример работы с транзакциями:

if (ValidateSignature("delete", {"path", "columns", "..."}, tokens)) {
    auto path = tokens[1];
    TPrepareRows prepareRows(tokens);

    auto tx = Client_->StartTransaction(NTransactionClient::ETransactionType::Tablet).Get();
    if (!ValidateResult(tx)) return;

    tx.Value()->DeleteRows(path, prepareRows.NameTable, prepareRows.Rows);
    auto result = tx.Value()->Commit().Get();
    if (!ValidateResult(result)) return;

    Cout << "Committed" << Endl;
}

Java

Исходные коды представленных примеров находятся в GitHub.

Создание клиента:

   public static YtClient createYtClient(BusConnector connector, String user, String token)
    {
        return new YtClient(connector, "cluster-name", RpcCredentials(user, token));
    }

Реализация Select запроса:

package ru.yandex.yt.ytclient.examples;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.yandex.yt.ytclient.tables.ColumnSchema;
import ru.yandex.yt.ytclient.wire.UnversionedRow;
import ru.yandex.yt.ytclient.wire.UnversionedRowset;
import ru.yandex.yt.ytclient.wire.UnversionedValue;
import ru.yandex.yt.ytclient.ytree.YTreeMapNode;

public class SelectRowsExample {
    private static final Logger logger = LoggerFactory.getLogger(SelectRowsExample.class);

    public static void main(String[] args) {
        ExamplesUtil.runExampleWithBalancing(client -> {
            long t0 = System.nanoTime();
            UnversionedRowset rowset = client.selectRows(
                    "timestamp, host, rack, utc_time, data FROM [//home/dev/autorestart_nodes_copy] LIMIT 10")
                    .join();
            long t1 = System.nanoTime();
            logger.info("Request time: {}", (t1 - t0) / 1000000.0);
            logger.info("Result schema:");
            for (ColumnSchema column : rowset.getSchema().getColumns()) {
                logger.info("    {}", column.getName());
            }
            for (UnversionedRow row : rowset.getRows()) {
                logger.info("Row:");
                for (UnversionedValue value : row.getValues()) {
                    logger.info("    value: {}", value);
                }
            }
            for (YTreeMapNode row : rowset.getYTreeRows()) {
                logger.info("Row: {}", row);
            }
        });
    }
}

Работа с Кипарисом:

package ru.yandex.yt.ytclient.examples;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GetNodeExample {
    private static final Logger logger = LoggerFactory.getLogger(GetNodeExample.class);

    public static void main(String[] args) {
        ExamplesUtil.runExample(client -> {
            logger.info("Table dynamic: {}", client.getNode("//home/dev/autorestart_nodes_copy/@dynamic").join());
            logger.info("Table schema: {}", client.getNode("//home/dev/autorestart_nodes_copy/@schema").join());
        });
    }
}

Работа с Кипарисом:

package ru.yandex.yt.ytclient.examples;

import java.util.Random;

import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;

import ru.yandex.yt.rpcproxy.ETransactionType;
import ru.yandex.yt.ytclient.misc.YtGuid;
import ru.yandex.yt.ytclient.proxy.ApiServiceTransaction;
import ru.yandex.yt.ytclient.proxy.ApiServiceTransactionOptions;
import ru.yandex.yt.ytclient.proxy.request.ObjectType;

public class CypressExample {

    public static void main(String[] args) {
        try {
            OptionParser parser = new OptionParser();

            OptionSpec<String> proxyOpt = parser.accepts("proxy", "proxy (see //sys/rpc_proxies)")
                    .withRequiredArg().ofType(String.class);

            OptionSet option = parser.parse(args);

            String [] hosts = null;

            if (option.hasArgument(proxyOpt)) {
                String line = option.valueOf(proxyOpt);
                hosts = line.split(",");
            } else {
                parser.printHelpOn(System.out);
                System.exit(1);
            }

            Random rnd = new Random();
            String host = hosts[rnd.nextInt(hosts.length)];

            ExamplesUtil.runExample(client -> {
                try {
                    ApiServiceTransactionOptions transactionOptions =
                            new ApiServiceTransactionOptions(ETransactionType.MASTER);

                    String node = "//tmp/test-node-cypress-example";
                    ApiServiceTransaction t = client.startTransaction(transactionOptions).get();


                    t.existsNode(node).thenAccept(result -> {
                        try {
                            if (result) {
                                t.removeNode(node);
                            }
                        } catch (Throwable e) {
                            throw new RuntimeException(e);
                        }
                    }).get();

                    YtGuid guid = t.createNode(node, ObjectType.Table).get();
                    /*
                    Map<String, YTreeNode> data = new HashMap<String, YTreeNode>();
                    data.put("k1", new YTreeInt64Node(10, new HashMap<>()));
                    data.put("k2", new YTreeInt64Node(31337, new HashMap<>()));
                    data.put("str", new YTreeStringNode("stroka"));
                    t.setNode(node, new YTreeMapNode(data)).get();
                    */
                    t.commit();

                    ApiServiceTransaction t2 = client.startTransaction(transactionOptions).get();
                    t2.linkNode(node, node + "-link");
                    t2.moveNode(node, node + "-moved");
                    t2.commit();

                    client.removeNode(node + "-link");

                } catch (Throwable e) {
                    System.out.println(e);
                    e.printStackTrace();
                    System.exit(-1);
                }
            }, ExamplesUtil.getUser(), ExamplesUtil.getToken(), host);

            System.exit(0);
        } catch (Throwable e) {
            System.out.println(e);
            e.printStackTrace();
            System.exit(-1);
        }
    }
}

Выполнение LookupRows:

package ru.yandex.yt.ytclient.examples;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.yandex.yt.ytclient.proxy.LookupRowsRequest;
import ru.yandex.yt.ytclient.tables.ColumnSchema;
import ru.yandex.yt.ytclient.tables.ColumnValueType;
import ru.yandex.yt.ytclient.tables.TableSchema;
import ru.yandex.yt.ytclient.wire.UnversionedRow;
import ru.yandex.yt.ytclient.wire.UnversionedRowset;
import ru.yandex.yt.ytclient.wire.UnversionedValue;
import ru.yandex.yt.ytclient.ytree.YTreeMapNode;

public class LookupRowsExample {
    private static final Logger logger = LoggerFactory.getLogger(LookupRowsExample.class);

    public static void main(String[] args) {
        TableSchema schema = new TableSchema.Builder()
                .addKey("timestamp", ColumnValueType.INT64)
                .addKey("host", ColumnValueType.STRING)
                .addKey("rack", ColumnValueType.STRING)
                .addValue("utc_time", ColumnValueType.STRING)
                .addValue("data", ColumnValueType.STRING)
                .build();
        ExamplesUtil.runExampleWithBalancing(client -> {
            long t0 = System.nanoTime();
            LookupRowsRequest request = new LookupRowsRequest("//home/dev/autorestart_nodes_copy", schema.toLookup())
                    .addFilter(1486113922563016L, "s04-sas.cluster-name", "SAS2.4.3-13")
                    .addFilter(1486113924172063L, "s04-sas.cluster-name", "SAS2.4.3-13")
                    .addFilter(1486113992045484L, "s04-sas.cluster-name", "SAS2.4.3-13")
                    .addFilter(1486113992591731L, "s04-sas.cluster-name", "SAS2.4.3-13")
                    .addFilter(1486113997734536L, "n4137-sas.cluster-name", "SAS2.4.3-13")
                    .addLookupColumns("utc_time", "data");
            long t1 = System.nanoTime();
            UnversionedRowset rowset = client.lookupRows(request).join();
            long t2 = System.nanoTime();
            logger.info("Request time: {}ms + {}ms", (t1 - t0) / 1000000.0, (t2 - t1) / 1000000.0);
            logger.info("Result schema:");
            for (ColumnSchema column : rowset.getSchema().getColumns()) {
                logger.info("    {}", column.getName());
            }
            for (UnversionedRow row : rowset.getRows()) {
                logger.info("Row:");
                for (UnversionedValue value : row.getValues()) {
                    logger.info("    value: {}", value);
                }
            }
            for (YTreeMapNode row : rowset.getYTreeRows()) {
                logger.info("Row: {}", row);
            }
        });
    }
}

Выполнение ModifyRows:

package ru.yandex.yt.ytclient.examples;

import java.util.Arrays;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.yandex.yt.rpcproxy.ETransactionType;
import ru.yandex.yt.ytclient.proxy.ApiServiceTransaction;
import ru.yandex.yt.ytclient.proxy.ApiServiceTransactionOptions;
import ru.yandex.yt.ytclient.proxy.LookupRowsRequest;
import ru.yandex.yt.ytclient.proxy.ModifyRowsRequest;
import ru.yandex.yt.ytclient.tables.ColumnSchema;
import ru.yandex.yt.ytclient.tables.ColumnValueType;
import ru.yandex.yt.ytclient.tables.TableSchema;
import ru.yandex.yt.ytclient.wire.UnversionedRow;
import ru.yandex.yt.ytclient.wire.UnversionedRowset;
import ru.yandex.yt.ytclient.wire.UnversionedValue;
import ru.yandex.yt.ytclient.ytree.YTreeBuilder;
import ru.yandex.yt.ytclient.ytree.YTreeMapNode;

public class ModifyRowsExample {
    private static final Logger logger = LoggerFactory.getLogger(ModifyRowsExample.class);

    public static void main(String[] args) {
        TableSchema schema = new TableSchema.Builder()
                .addKey("timestamp", ColumnValueType.INT64)
                .addKey("host", ColumnValueType.STRING)
                .addKey("rack", ColumnValueType.STRING)
                .addValue("utc_time", ColumnValueType.STRING)
                .addValue("data", ColumnValueType.STRING)
                .build();
        ExamplesUtil.runExample(client -> {
            ApiServiceTransactionOptions transactionOptions =
                    new ApiServiceTransactionOptions(ETransactionType.MASTER);
            try (ApiServiceTransaction transaction = client.startTransaction(transactionOptions).join()) {
                logger.info("Transaction started: {} (timestamp={}, ping={})",
                        transaction.getId(),
                        transaction.getStartTimestamp(),
                        transaction.isPing());

                transaction.ping().join();
                logger.info("Transaction ping succeeded!");

                ModifyRowsRequest request =
                        new ModifyRowsRequest("//home/dev/autorestart_nodes_copy", schema)
                                .addInsert(Arrays.asList(10, "myhost1", "myrack1", "utc_time1", "data1"))
                                .addInsert(Arrays.asList(11, "myhost2", "myrack2", "utc_time2", "data2"))
                                .addUpdate(new YTreeBuilder()
                                        .beginMap()
                                        .key("timestamp").value(1486190036109192L)
                                        .key("host").value("n0344-sas.cluster-name")
                                        .key("rack").value("SAS2.4.3-15")
                                        .key("data").value("XXX " + UUID.randomUUID().toString())
                                        .buildMap()
                                        .mapValue())
                                .addUpdate(new YTreeBuilder()
                                        .beginMap()
                                        .key("timestamp").value(1486190037953802L)
                                        .key("host").value("s03-sas.cluster-name")
                                        .key("rack").value("SAS2.4.3-15")
                                        .key("data").value("XXX " + UUID.randomUUID().toString())
                                        .buildMap()
                                        .mapValue());
                long t0 = System.nanoTime();
                transaction.modifyRows(request).join();
                long t1 = System.nanoTime();

                logger.info("Request time: {}ms", (t1 - t0) / 1000000.0);

                t0 = System.nanoTime();

                LookupRowsRequest lookup = new LookupRowsRequest("//home/dev/autorestart_nodes_copy", schema.toLookup())
                        .addFilter(1486190036109192L, "n0344-sas.cluster-name", "SAS2.4.3-15")
                        .addFilter(1486190037953802L, "s03-sas.cluster-name", "SAS2.4.3-15")
                        .addLookupColumns("timestamp", "data");


                t1 = System.nanoTime();
                UnversionedRowset rowset = client.lookupRows(lookup).join();
                long t2 = System.nanoTime();
                logger.info("Request time: {}ms + {}ms", (t1 - t0) / 1000000.0, (t2 - t1) / 1000000.0);
                logger.info("Result schema:");
                for (ColumnSchema column : rowset.getSchema().getColumns()) {
                    logger.info("    {}", column.getName());
                }
                for (UnversionedRow row : rowset.getRows()) {
                    logger.info("Row:");
                    for (UnversionedValue value : row.getValues()) {
                        logger.info("    value: {}", value);
                    }
                }
                for (YTreeMapNode row : rowset.getYTreeRows()) {
                    logger.info("Row: {}", row);
                }

                transaction.commit().join();
                logger.info("Transaction committed!");
            }
        });
    }
}

Python

Для работы через Python необходимо установить пакет с биндингами, pypi-пакет называется ytsaurus-rpc-driver. После этого необходимо указать в качестве бэкенда rpc.

Работа с динамическими таблицами:

import yt.wrapper as yt

def main():
    client = yt.YtClient("cluster-name", config={"backend": "rpc"})
    schema = [
        {"name": "x", "type": "int64", "sort_order": "ascending"},
        {"name": "y", "type": "int64"},
        {"name": "z", "type": "int64"}
    ]

    table = "//home/ignat/dynamic_table"
    client.create("table", table, attributes={"schema": schema, "dynamic": True})
    client.mount_table(table, sync=True)
    client.insert_rows(table, [{"x": 0, "y": 99}])

    for iter in xrange(5):
        with client.Transaction(type="tablet"):
            rows = list(client.lookup_rows(table, [{"x": 0}]))
            if len(rows) == 1 and rows[0]["y"] <= 100:
                rows[0]["y"] += 1
                client.insert_rows(table, rows)

    print list(client.select_rows("* from [{}]".format(table)))

if __name__ == "__main__":
    main()

Для того чтобы клиент ходил к проксям определённой роли (например, my_role), нужно указать в конфиге опцию proxy_role:

client = yt.YtClient("cluster-name", config={"backend": "rpc", "driver_config": {"proxy_role": "my_role"}})

Мы также советуем включать дебажные логи клиентской библиотеки. Такие логи сильно помогут при возникновении проблем. Помимо обычного способа через переменную окружения YT_LOG_LEVEL=debug можно явно настроить логирование внутри rpc proxy client. Для этого нужно передать конфиг логирования через параметр driver_logging_config:

logging_config = {
    'rules': [{'min_level': 'debug', 'writers': ['file']}],
    'writers': {'file': {'file_name': logging_file_name, 'type': 'file'}}
}
client = yt.YtClient("cluster-name", config={"backend": "rpc", "driver_logging_config": logging_config})