Пакетная обработка запросов

В данном разделе содержится информация о пакетной обработке запросов (batch-запросах), приведены примеры использования для Python, C++ и Java API.

При работе с системой YTsaurus каждая команда порождает отдельный запрос к мастер-серверу. Запрос имеет свою стоимость, зачастую более высокую, чем выполнение команды. Поэтому объединение нескольких команд в один запрос может существенно ускорить процессы, которые посылают много легких команд к Кипарису, и основную часть времени находятся в ожидании ответа. Такой запрос называется Batch-запросом. Мастер-сервер Кипариса выполнит команды из batch-запроса в произвольном порядке и вернет все полученные результаты. Ошибки, возникающие при выполнении отдельных команд, не влияют на другие команды.

Concurrency

Batch-запрос на мастере Кипариса выполняется как набор независимых команд, конкретный порядок в выполнении команд не гарантируется. Все команды учитываются в квоте команд пользователя. У Batch-запроса существует параметр concurrency, который регулирует параллельность исполнения команд. Значение параметра ограничивает количество одновременно выполняющихся команд, тем самым позволяет не превышать допустимое число одновременных обращений к мастеру для пользователя, в противном случае в ответ на запрос вернётся ошибка User 'user' has exceeded its request queue size limit. По умолчанию параметр имеет значение 50 (одновременно выполняющихся команд), а ограничение у пользователя равно 100, что позволит избегать ошибок с превышением ограничения. Превышение ограничения представляет собой не фатальную ошибку, в случае ее возникновения рекомендуется делать перезапрос. При использовании высокоуровневых SDK перезапрос будет делаться автоматически.

Python API

В python API доступен метод create_batch_client(), который позволяет, имея простого клиента (client = yt.YtClient(<cluster-name>)) , создать клиента для объединения нескольких команд в один batch-запрос (client.create_batch_client()). Пример использования представлен ниже:

import yt.wrapper as yt

if __name__ == "__main__":
    client = yt.YtClient(<cluster-name>)

    batch_client = client.create_batch_client()
    list_rsp = batch_client.list("/")
    exists_rsp = batch_client.exists("/")
    batch_client.commit_batch()

    print list_rsp.get_result()
    print exists_rsp.get_result()

#['cooked_logs', 'home', 'logs', 'projects', 'statbox', 'sys', 'tmp', 'user_sessions', 'userdata', 'userfeat', 'userstats']
#true

В результате выполнения запроса с помощью batch_client в ответ возвращается специальный объект типа BatchResponse, который обладает следующей структурой, представленной ниже.

class BatchResponse(object):
    ...

    def get_result(self):
        ...
 
    def get_error(self):
        ...
        
    def is_ok(self):
        ...

Указать формат возвращаемых данных отдельной команды через batch_client нельзя. Причина в том, что формат возвращаемых данных един для всего batch-запроса.

C++ API

В С++ API для выполнения batch-запроса необходимо:

  • создать объект типа TBatchRequest;
  • с его помощью задать команды (методы TBatchRequest возвращают TFuture на соответствующий тип);
  • передать его в метод ExecuteBatch у клиента;
  • достать результаты из полученных ранее TFuture.

При этом, если какие-то команды завершились с ошибкой, узнать об этом можно, вызвав future.GetValue или future.HasException (т. е. вызов ExecuteBatch не будет возвращать ошибку). Пример использования представлен ниже.

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/common/helpers.h>

using namespace NYT;

int main()
{
    auto client = CreateClient(<cluster-name>);

    TBatchRequest batchRequest;
    auto listRsp = batchRequest.List("/", TListOptions().MaxSize(5));
    auto existsRsp = batchRequest.Exists("//tmp");

    client->ExecuteBatch(batchRequest);

    for (auto item : listRsp.GetValue()) {
        Cerr << NodeToYsonString(item) << Endl;
    }
    Cerr << NodeToYsonString(existsRsp.GetValue()) << Endl;

    return 0;
}

/* Output of program:
"tmp"
"projects"
"logs"
"userfeat"
"cooked_logs"
%true
*/

Java API

В Java API доступен метод executeBatch, который позволяет, имея клиента для работы с Кипарисом yt.cypress(), создать объект типа BatchRequest для выполнения нескольких команд в рамках одного batch-запроса. Все команды, отправленные через BatchRequest, возвращают объекты типа future, которые будут выполнены после вызова метода execute. Пример использования представлен ниже.

BatchRequest request = yt.cypress().executeBatch(transactionId, pingAncestorTransactions, Option.empty());
ListF<CompletableFuture<YTreeNode>> futures = Cf.arrayList();

for (YPath path : spec.getInputTables()) {
	futures.add(request.get(transactionId, pingAncestorTransactions, path.attribute("sorted"), Cf.set()));
}

request.execute().join();

boolean result = futures.forAll(x -> {
	YTreeNode node = x.join();
	return node.isBooleanNode() && node.boolValue();
});
В этой статье: