Примеры использования

Перед запуском примеров необходимо получить YT-токен.

Запускать примеры нужно из-под Linux без аргументов.

cd /path/to/arcadia
cd yt/cpp/mapreduce/examples/tutorial/simple_map_tnode
ya make
 ./simple_map_tnode

Рекомендуется установить уровень логирования в INFO или DEBUG, тогда программы будут выводить ссылки на веб-интерфейс, где можно следить за выполнением операций.

Уровень логирования устанавливается с помощью переменной окружения такой командой (см. так же документацию):

export YT_LOG_LEVEL=INFO

Для продакшн-процессов рекомендуется сохранять наиболее полные DEBUG логи. В случае возникновения каких-либо проблем они помогут понять, что происходило.

Базовый уровень

Простой Map (TNode-версия)

Предположим, что существует таблица с логинами. Необходимо сделать таблицу с email-адресами, вычислив их из логина email = login + "@domain".

Для этого подойдёт простой маппер.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_map_tnode.

 'yt/cpp/mapreduce/tutorial/simple_map_tnode/main.cpp'

Простой Map (Protobuf-версия)

Если известно, как устроена таблица, то может быть удобно воспользоваться форматом Protobuf, который:

  • несколько быстрее;
  • помогает избежать опечаток в названиях колонок и проверит работу с типами.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_map_protobuf.

import "yt/yt_proto/yt/formats/extension.proto";

message TLoginRecord
{
    optional string Name = 1 [(NYT.column_name) = "name"];
    optional string Login = 2 [(NYT.column_name) = "login"];
}

message TEmailRecord
{
    optional string Name = 1 [(NYT.column_name) = "name"];
    optional string Email = 2 [(NYT.column_name) = "email"];
}

#include <yt/cpp/mapreduce/examples/tutorial/simple_map_protobuf/data.pb.h>

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TComputeEmailsMapper
    : public IMapper<TTableReader<TLoginRecord>, TTableWriter<TEmailRecord>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    {
        for (auto& cursor : *reader) {
            const auto& loginRecord = cursor.GetRow();

            TEmailRecord emailRecord;
            emailRecord.SetName(loginRecord.GetName());
            emailRecord.SetEmail(loginRecord.GetLogin() + "@yandex-team.ru");

            writer->AddRow(emailRecord);
        }
    }
};
REGISTER_MAPPER(TComputeEmailsMapper)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    // Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails-protobuf";

    client->Map(
        TMapOperationSpec()
            .AddInput<TLoginRecord>("//home/tutorial/staff_unsorted")
            .AddOutput<TEmailRecord>(outputTable),
        new TComputeEmailsMapper);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Простой Map с Protobuf и использованием лямбда-выражений в C++

Некоторые достаточно простые операции можно представить в виде лямбда-выражений C++ с пустым capture-list (т. е. не захватывающих переменных) с использованием библиотеки yt/cpp/mapreduce/library/lambda.

Предыдущий пример может быть переписан так (data.proto такой же):

(yt/cpp/mapreduce/examples/tutorial/simple_map_lambda)

#include <yt/cpp/mapreduce/examples/tutorial/simple_map_lambda/data.pb.h>

#include <yt/cpp/mapreduce/library/lambda/yt_lambda.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <util/stream/output.h>
#include <util/system/user.h>

struct TGlobalSettings {
    TString MailSuffix;
    Y_SAVELOAD_DEFINE(MailSuffix);
};
NYT::TSaveable<TGlobalSettings> GlobalSettings;

int main() {
    NYT::Initialize();

    auto client = NYT::CreateClient("freud");

    // Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
    auto outputTable = NYT::TRichYPath("//tmp/" + GetUsername() + "-tutorial-emails-protobuf")
        .CompressionCodec("zlib_9");

    // TransformCopyIf запоминает содержимое GlobalSettings на момент своего запуска
    // и восстанавливает его в job'ах, в которых вызывается лямбда.
    // Этот способ требует аккуратности, если операции запускать из разных потоков.
    GlobalSettings.MailSuffix = "@yandex-team.ru";

    NYT::TransformCopyIf<TLoginRecord, TEmailRecord>(
        client,
        "//home/tutorial/staff_unsorted",
        outputTable,
        [](auto& src, auto& dst) {
            dst.SetName(src.GetName());
            dst.SetEmail(src.GetLogin() + GlobalSettings.MailSuffix);
            return true;
        });

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable.Path_ << Endl;

    return 0;
}

Сортировка таблицы и простая операция Reduce

Чтобы по той же таблице посчитать статистику, сколько раз встречается то или иное имя, можно использовать операцию Reduce. Однако, Reduce можно запускать только на сортированных таблицах, поэтому таблицу нужно сначала отсортировать.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_reduce_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TCountNamesReduce
    : public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    {
        // В Do приходят все записи с общим reduce ключом, т.е. в нашем случае с общим полем `name'.
        const auto& row = reader->GetRow();
        const auto name = row["name"];

        ui32 count = 0;
        for ([[maybe_unused]] auto& cursor : *reader) {
            ++count;
        }

        TNode result;
        result["name"] = name;
        result["count"] = count;
        writer->AddRow(result);
    }
};
REGISTER_REDUCER(TCountNamesReduce)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString sortedTmpTable = "//tmp/" + GetUsername() + "-tutorial-tmp";
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails";

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/staff_unsorted")
            .Output(sortedTmpTable)
            .SortBy({"name"}));

    client->Reduce(
        TReduceOperationSpec()
            .ReduceBy({"name"})
            .AddInput<TNode>(sortedTmpTable) // Входная таблица должна быть отсортирована по тем колонкам,
                                             // по которым мы осуществляем reduce.
            .AddOutput<TNode>(outputTable),
        new TCountNamesReduce);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Сложные типы и формат Protobuf (на примере Reduce)

Внимание

Фича пока в экспериментальном статусе, можно экспериментировать на hume, прод завязывать не стоит, интерфейсы могут поменяться

Пример: таблица с набором записей про ссылки, которые встречаются в веб-документах. DocTitle — имя документа, Link — структура с полями Host: string, Port: int32, Path: string, OccurenceCount — количество встречаний данной ссылки в документе.
Таблица отсортирована по DocTitle. Необходимо собрать информацию о каждом документе в одну строку выходной таблицы.

Опишем protobuf-сообщения следующего вида:

import "yt/yt_proto/yt/formats/extension.proto";

message TUrl {
    optional string Host = 1 [(NYT.column_name) = "Host"];
    optional int32 Port = 2 [(NYT.column_name) = "Port"];
    optional string Path = 3 [(NYT.column_name) = "Path"];
}

message TExtraInfo {
    optional uint32 TotalOccurrenceCount = 1;
}

message TDoc {
    // Помечаем, что поля данного сообщения будут сериализоваться
    // в YT-режиме. Это означает, что им сопоставлены колонки таблицы
    // с соответствующими (возможно, сложными) типами.
    option (NYT.default_field_flags) = SERIALIZATION_YT;

    optional string Title = 1            [(NYT.key_column_name) = "Title"];
    repeated TUrl Links = 2              [(NYT.column_name) = "Links"];
    repeated uint32 OccurrenceCounts = 3 [(NYT.column_name) = "UpdateTimes"];

    // Данное поле специально помечено как сериализуемое в бинарном (PROTOBUF) режиме.
    // В таблице эта колонка будет иметь тип "string".
    optional TExtraInfo ExtraInfo = 4    [(NYT.column_name) = "ExtraInfo", (NYT.flags) = SERIALIZATION_PROTOBUF];
}

message TLinkEntry {
    option (NYT.default_field_flags) = SERIALIZATION_YT;
    optional string DocTitle = 1        [(NYT.column_name) = "DocTitle"];
    optional TUrl Link = 2              [(NYT.column_name) = "Link"];
    optional uint32 OccurrenceCount = 3 [(NYT.column_name) = "OccurrenceCount"];
}

TLinkEntry соответствует строке входной таблицы, TDoc — строке выходной.

Поля TDoc имеют следующую семантику:

  • Title — заголовок;
  • Links— список ссылок, упомянутых в документе;
  • OccurenceCounts — список количества вхождений соответствующих ссылок (длина этого списка равна длине списка Links);
  • ExtraInfo — дополнительная информация, в данном случае включающая суммарное количество вхождений ссылок.

Внимание

(NYT.field_serialization_mode) в сообщениях TDoc и TLinkEntry. Эта опция по умолчанию равна PROTOBUF, то есть "сериализовать поле в виде последовательности байт". Выставленное нами значение YT означает, что соответствующее вложенное сообщение будет сопоставлено сложному типу в схеме таблицы. В TDoc для примера мы пометили поле ExtraInfo опцией (NYT.serialization_mode) = PROTOBUF(обратите внимание на тип соответствующего поля в выходной таблице).

Reducer пишется естественным образом.

#include <yt/cpp/mapreduce/examples/tutorial/protobuf_complex_types/data.pb.h>

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

// Редьюсер агрегирует информацию о ссылках на документ с данным заголовком.
class AggregateLinksReducer
    : public IReducer<TTableReader<TLinkEntry>, TTableWriter<TDoc>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    {
        TDoc doc;
        for (auto& cursor : *reader) {
            auto entry = cursor.MoveRow();
            if (!doc.HasTitle()) {
                doc.SetTitle(entry.GetDocTitle());
            }
            doc.AddLinks()->Swap(entry.MutableLink());
            doc.AddOccurrenceCounts(entry.GetOccurrenceCount());
            auto newCount = doc.GetExtraInfo().GetTotalOccurrenceCount() + entry.GetOccurrenceCount();
            doc.MutableExtraInfo()->SetTotalOccurrenceCount(newCount);
        }
        writer->AddRow(doc);
    }
};
REGISTER_REDUCER(AggregateLinksReducer)

int main() {
    NYT::Initialize();

    TString cluster = "freud";
    auto client = CreateClient(cluster);

    const TString sortedLinksTable  = "//home/tutorial/links_sorted_schematized";

    Cout << "Sorted links table: https://yt.yandex-team.ru/" << cluster << "/#page=navigation&offsetMode=row&path=" << sortedLinksTable << Endl;

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-docs-protobuf";

    // Обратите внимание на опцию `InferOutputSchema`,
    // она заставляет навешивать на выходную таблицу схему.
    client->Reduce(
        TReduceOperationSpec()
            .AddInput<TLinkEntry>(sortedLinksTable)
            .AddOutput<TDoc>(outputTable)
            .ReduceBy({"DocTitle"}),
        new AggregateLinksReducer,
        TOperationOptions()
            .InferOutputSchema(true));

    Cout << "Output table: https://yt.yandex-team.ru/" << cluster << "/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Reduce с несколькими входными таблицами

Если кроме старой таблицы с пользователями, есть таблица, где записано, кто робот.

Следующая программа сделает таблицу, в которой останутся только пользователи-роботы.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_reduce_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TFilterRobotsReduce
    : public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    void Do(TReader* reader, TWriter* writer) override {
        TNode loginRow;
        bool isRobot = false;
        for (auto& cursor : *reader) {
            const auto& curRow = cursor.GetRow();

            // У нас есть информация о том из какой таблицы пришла запись.
            auto tableIndex = cursor.GetTableIndex();
            if (tableIndex == 0) {
                // Таблица с логинами.
                loginRow = curRow;
            } else if (tableIndex == 1) {
                // Таблица про роботов.
                isRobot = curRow["is_robot"].AsBool();
            } else {
                // Какая-то фигня, такого индекса быть не может.
                Y_ABORT();
            }
        }

        if (isRobot) {
            writer->AddRow(loginRow);
        }
    }
};
REGISTER_REDUCER(TFilterRobotsReduce)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
    const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/staff_unsorted")
            .Output(sortedLoginTable)
            .SortBy({"uid"}));

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/is_robot_unsorted")
            .Output(sortedIsRobotTable)
            .SortBy({"uid"}));

    client->Reduce(
        TReduceOperationSpec()
            .ReduceBy({"uid"})
            .AddInput<TNode>(sortedLoginTable) // Таблицу с логинами мы добавляем первой, поэтому её TableIndex == 0
            .AddInput<TNode>(sortedIsRobotTable) // Таблицу про роботов мы добавляем второй, поэтому её TableIndex == 1
            .AddOutput<TNode>(outputTable),
        new TFilterRobotsReduce);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
    return 0;
}

Чтение файлов из операций

Предыдущую задачу можно решать и по-другому: загрузить таблицу с роботами прямо на машины, где будут запускаться операции, и считать её там целиком в yhash_set.

В таком случае можно не сортировать таблицу и использовать Map, а не Reduce. Этот способ используется, если таблица небольшая (до нескольких ГБ).

Входные таблицы те же, что и в предыдущем примере: с пользователями и роботами.

Следующая программа сделает таблицу, в которой останутся только пользователи-роботы.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/map_tnode_with_file.

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/io.h>

#include <util/generic/hash_set.h>
#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TFilterRobotsMap
    : public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    void Do(TReader* loginReader, TWriter* writer) override {
        // Если мы прикрепляли табличку //path/to/table, то в операции мы будем видеть её под именем table
        TIFStream stream("is_robot_unsorted");
        auto isRobotReader = CreateTableReader<TNode>(&stream);
        THashSet<i64> robotIds;
        for (auto& cursor : *isRobotReader) {
            const auto& curRow = cursor.GetRow();
            if (curRow["is_robot"].AsBool()) {
                robotIds.insert(curRow["uid"].AsInt64());
            }
        }

        for (auto& cursor : *loginReader) {
            const auto& curRow = cursor.GetRow();
            if (robotIds.contains(curRow["uid"].AsInt64())) {
                writer->AddRow(curRow);
            }
        }
    }
};
REGISTER_MAPPER(TFilterRobotsMap)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");


    const TString loginTable = "//home/tutorial/staff_unsorted";
    const TString isRobotTable = "//home/tutorial/is_robot_unsorted";
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    client->Map(
        TMapOperationSpec()
            .AddInput<TNode>(loginTable)
            .MapperSpec(TUserJobSpec()
                .AddFile(TRichYPath(isRobotTable).Format("yson"))) // Таблицу с роботами добавляем в виде файла
            .AddOutput<TNode>(outputTable),
        new TFilterRobotsMap);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
    return 0;
}

Reduce с несколькими входными и несколькими выходными таблицами

Теперь необходимо сделать почти то же самое что и в предыдущем примере, но записать сразу 2 выходных таблицы: и с людьми и с роботами.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TSplitHumanRobotsReduce
    : public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    void Do(TReader* reader, TWriter* writer) override {
        TNode loginRow;
        bool isRobot = false;
        for (auto& cursor : *reader) {
            const auto& curRow = cursor.GetRow();

            auto tableIndex = cursor.GetTableIndex();
            if (tableIndex == 0) {
                loginRow = curRow;
            } else if (tableIndex == 1) {
                isRobot = curRow["is_robot"].AsBool();
            } else {
                Y_ABORT();
            }
        }

        // Второй аргумент метода `AddRow' указывает, в какую таблицу будет записано значение.
        if (isRobot) {
            writer->AddRow(loginRow, 0);
        } else {
            writer->AddRow(loginRow, 1);
        }
    }
};
REGISTER_REDUCER(TSplitHumanRobotsReduce)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
    const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
    const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
    const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/staff_unsorted")
            .Output(sortedLoginTable)
            .SortBy({"uid"}));

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/is_robot_unsorted")
            .Output(sortedIsRobotTable)
            .SortBy({"uid"}));

    client->Reduce(
        TReduceOperationSpec()
            .ReduceBy({"uid"})
            .AddInput<TNode>(sortedLoginTable)
            .AddInput<TNode>(sortedIsRobotTable)
            .AddOutput<TNode>(robotTable)  // выходная таблица номер 0
            .AddOutput<TNode>(humanTable), // выходная таблица номер 1
        new TSplitHumanRobotsReduce);

    Cout << "Robot table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << robotTable << Endl;
    Cout << "Human table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << humanTable << Endl;

    return 0;
}

Reduce с несколькими входными и несколькими выходными таблицами (Protobuf-версия)

Чтобы сделать разные таблицы для людей и роботов, нужно переписать тот же самый reducer на protobuf. Таблица с людьми будет содержать поля login, email, name. Таблица с роботами будет содержать поля login и uid. Потребуется завести отдельные типы protobuf сообщений для этих таблиц.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_protobuf.

import "yt/yt_proto/yt/formats/extension.proto";

message TUserRecord {
    optional int64 Uid = 1    [(NYT.column_name) = "uid"];
    optional string Name = 2  [(NYT.column_name) = "name"];
    optional string Login = 3 [(NYT.column_name) = "login"];
}

message TIsRobotRecord {
    optional int64 Uid = 1    [(NYT.column_name) = "uid"];
    optional bool IsRobot = 2 [(NYT.column_name) = "is_robot"];
}

message THumanRecord {
    optional string Name = 1  [(NYT.column_name) = "name"];
    optional string Email = 2 [(NYT.column_name) = "email"];
    optional string Login = 3 [(NYT.column_name) = "login"];
}

message TRobotRecord {
    optional int64 Uid = 1    [(NYT.column_name) = "uid"];
    optional string Login = 2 [(NYT.column_name) = "login"];
}

#include <yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_protobuf/data.pb.h>

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TSplitHumanRobotsReduce
    // Обратите внимание наш редьюс работает с несколькими типами записей
    // как на вход так и на выход, поэтому мы используем ::google::protobuf::Message
    : public IReducer<
        TTableReader<::google::protobuf::Message>,
        TTableWriter<::google::protobuf::Message>>
{
public:
    void Do(TReader* reader, TWriter* writer) override {
        TUserRecord userRecord;
        bool isRobot = false;
        for (auto& cursor : *reader) {
            auto tableIndex = cursor.GetTableIndex();
            // Мы знаем номер таблицы и поэтому мы можем запросить конкретный тип protobuf'а в этом месте.
            // Тип protobuf сообщения передаётся шаблонным аргументом к методу `GetRow()'.
            if (tableIndex == 0) {
                userRecord = cursor.GetRow<TUserRecord>();
            } else if (tableIndex == 1) {
                const auto& isRobotRecord = cursor.GetRow<TIsRobotRecord>();
                isRobot = isRobotRecord.GetIsRobot();
            } else {
                Y_ABORT();
            }
        }

        // В AddRow мы можем передавать как TRobotRecord так и THumanRecord.
        if (isRobot) {
            TRobotRecord robotRecord;
            robotRecord.SetUid(userRecord.GetUid());
            robotRecord.SetLogin(userRecord.GetLogin());
            writer->AddRow(robotRecord, 0);
        } else {
            THumanRecord humanRecord;
            humanRecord.SetName(userRecord.GetName());
            humanRecord.SetLogin(userRecord.GetLogin());
            humanRecord.SetEmail(userRecord.GetLogin() + "@yandex-team.ru");
            writer->AddRow(humanRecord, 1);
        }
    }
};
REGISTER_REDUCER(TSplitHumanRobotsReduce)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString sortedUserTable = "//tmp/" + GetUsername() + "-tutorial-user-sorted";
    const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
    const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
    const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/staff_unsorted")
            .Output(sortedUserTable)
            .SortBy({"uid"}));

    client->Sort(
        TSortOperationSpec()
            .AddInput("//home/tutorial/is_robot_unsorted")
            .Output(sortedIsRobotTable)
            .SortBy({"uid"}));

    client->Reduce(
        TReduceOperationSpec()
            .ReduceBy({"uid"})
            .AddInput<TUserRecord>(sortedUserTable)
            .AddInput<TIsRobotRecord>(sortedIsRobotTable)
            .AddOutput<TRobotRecord>(robotTable)
            .AddOutput<THumanRecord>(humanTable),
        new TSplitHumanRobotsReduce);

    Cout << "Robot table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << robotTable << Endl;
    Cout << "Human table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << humanTable << Endl;

    return 0;
}

Чтение и запись таблиц

YTsaurus позволяет писать в таблицы и дописывать данные в них. По чтению таблиц тоже есть несколько режимов: можно читать как всю таблицу, так и отдельные диапазоны по номеру строки или ключу. См. также разделы про чтение и запись данных и таблицы.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/table_read_write_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/system/user.h>

using namespace NYT;

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString table = "//tmp/" + GetUsername() + "-read-write";

    {
        // Просто пишем данные в таблицу, если таблица существует, её перезапишут.
        auto writer = client->CreateTableWriter<TNode>(table);

        TNode row;
        row["english"] = "one";
        row["russian"] = "один";
        writer->AddRow(row);

        row["english"] = "two";
        row["russian"] = "два";
        writer->AddRow(row);

        // Лучше не забывать звать метод Finish явно.
        // Он конечно позовётся в деструкторе writer'а, но если случатся какие-нибудь ошибки,
        // то у вас не будет возможности их поймать, потому что деструктор в случае ошибки абортит программу.
        writer->Finish();
    }
    {
        // Дописываем данные в конец таблицы,
        // Для этого необходимо выставить опцию Append.
        auto writer = client->CreateTableWriter<TNode>(TRichYPath(table).Append(true));

        TNode row;
        row["english"] = "three";
        row["russian"] = "три";
        writer->AddRow(row);

        writer->Finish();
    }
    {
        // Читаем всю таблицу
        auto reader = client->CreateTableReader<TNode>(table);
        Cout << "*** ALL TABLE ***" << Endl;
        for (auto& cursor : *reader) { // reader имеет тот же самый интерфейс, что и reader в джобах
            auto& row = cursor.GetRow();
            Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
        }
        Cout << Endl;
    }
    {
        // Читаем первые 2 строки.
        auto reader = client->CreateTableReader<TNode>(TRichYPath(table).AddRange(
                TReadRange()
                    .LowerLimit(TReadLimit().RowIndex(0))
                    .UpperLimit(TReadLimit().RowIndex(2)))); // читаем с 0й по 2ю строки, 2я невключительно

        Cout << "*** FIRST TWO ROWS ***" << Endl;
        for (auto& cursor : *reader) {
            auto& row = cursor.GetRow();
            Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
        }
        Cout << Endl;
    }
    {
        // Мы можем отсортировать таблицу и читать записи по ключам.
        client->Sort(TSortOperationSpec()
            .SortBy({"english"})
            .AddInput(table)
            .Output(table));

        auto reader = client->CreateTableReader<TNode>(TRichYPath(table).AddRange(
                TReadRange()
                    .Exact(TReadLimit().Key({"three"})))); // если нужен один ключ а не диапазон, можно использовать
                                                           // Exact вместо LowerLimit / UpperLimit,
                                                           // Key указывает, что мы ищем запись по ключу и работает
                                                           // только для сортированных таблиц.

        Cout << "*** EXACT KEY ***" << Endl;
        for (auto& cursor : *reader) {
            auto& row = cursor.GetRow();
            Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
        }
        Cout << Endl;
    }

    return 0;
}

Передача состояния в джоб

Иногда нужно написать операцию, которая принимает некоторые аргументы. Например, написать программу, которая отфильтрует исходную таблицу по имени пользователя.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/stateful_map_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <library/cpp/string_utils/levenshtein_diff/levenshtein_diff.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TFilterMapper
    : public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    Y_SAVELOAD_JOB(Pattern_, MaxDistance_); // Заклинание, которое говорит, какие переменные нужно передавать на сервер.

    TFilterMapper() = default; // У джобы обязательно должен быть конструктор по умолчанию

    TFilterMapper(TString pattern, double maxDistance)
        : Pattern_(std::move(pattern))
        , MaxDistance_(maxDistance)
    { }


    void Do(TReader* reader, TWriter* writer) override {
        for (auto& cursor : *reader) {
            const auto& row = cursor.GetRow();
            const auto& name = row["name"].AsString();
            if (NLevenshtein::Distance(name, Pattern_) <= MaxDistance_) {
                writer->AddRow(row);
            }
        }
    }

private:
    TString Pattern_;
    size_t MaxDistance_ = 0;
};
REGISTER_MAPPER(TFilterMapper)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output-statefull-map";

    client->Map(
        TMapOperationSpec()
        .AddInput<TNode>("//home/tutorial/staff_unsorted")
        .AddOutput<TNode>(outputTable),
        new TFilterMapper("Arkady", 2)); // Мы создаём объект TFilterMapper, и конструктор заполняет поля Pattern_ и MaxDistance_.
                                         // Библиотека сериализует поля, указанные в Y_SAVELOAD_JOB, и загружает их на сервер.
                                         // На сервере вызывается конструктор по умолчанию и сериализованные поля загружаются.

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Операция MapReduce (Protobuf-версия)

В YTsaurus есть слитная операция MapReduce, которая работает несколько быстрее нежели Map + Sort + Reduce. Чтобы по таблице с пользователями ещё раз посчитать статистику, сколько раз встречается то или иное имя, перед подсчётом нормализируем имена, приведя их к нижнему регистру. Это нужно, чтобы люди с именами АРКАДИЙ и Аркадий считались как одно.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/mapreduce_protobuf.

#include <yt/cpp/mapreduce/interface/client.h>

#include <yt/cpp/mapreduce/examples/tutorial/mapreduce_protobuf/data.pb.h>

#include <util/stream/output.h>
#include <util/system/user.h>
#include <util/charset/utf8.h>

using namespace NYT;

//
// Для того чтобы запустить операцию mapreduce, нам нужны обычные классы Mapper'а и Reducer'а
// (эти классы даже можно использовать в других местах в отдельных операциях Map/Reduce).
//

class TNormalizeNameMapper
    : public IMapper<
        TTableReader<TLoginRecord>,
        TTableWriter<TLoginRecord>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    {
        for (auto& cursor : *reader) {
            auto row = cursor.GetRow();
            row.SetName(ToLowerUTF8(row.GetName()));
            writer->AddRow(row);
        }
    }
};
REGISTER_MAPPER(TNormalizeNameMapper)

class TCountNameReducer
    : public IReducer<
        TTableReader<TLoginRecord>,
        TTableWriter<TNameStatistics>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    { TNameStatistics result;
        ui64 count = 0;
        for (auto& cursor : *reader) {
            const auto& row = cursor.GetRow();
            if (!result.HasName()) {
                result.SetName(row.GetName());
            }
            ++count;
        }
        result.SetCount(count);
        writer->AddRow(result);
    }
};
REGISTER_REDUCER(TCountNameReducer)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output";

    // Запуск операции MapReduce несильно отличается от запуска других операций.
    // Нам надо указать список ключей, по которым мы будем редьюсить
    // и два класса -- один Mapper и один Reducer.
    client->MapReduce(
        TMapReduceOperationSpec()
            .ReduceBy({"name"})
            .AddInput<TLoginRecord>("//home/tutorial/staff_unsorted")
            .AddOutput<TNameStatistics>(outputTable),
        new TNormalizeNameMapper,
        new TCountNameReducer);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Операция MapReduce (версия с лямбда-выражениями)

Задача та же, что в предыдущем примере.

Этот пример лежит в yt/cpp/mapreduce/examples/tutorial/mapreduce_lambda.

#include <yt/cpp/mapreduce/examples/tutorial/mapreduce_lambda/data.pb.h>

#include <yt/cpp/mapreduce/library/lambda/yt_lambda.h>

#include <util/stream/output.h>
#include <util/system/user.h>
#include <util/charset/utf8.h>

using namespace NYT;

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    // Эта таблица будет с некоторой вероятностью не отсортирована.
    // Если вместо MapReduce() использовать MapReduceSorted(), тогда
    // она будет дополнительно досортирована по "name" и это будет отражено в схеме
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output";

    MapReduce<TLoginRecord, TLoginRecord, TNameStatistics>(
        client,
        "//home/tutorial/staff_unsorted",
        outputTable,
        {"name"}, // список ключей, по которым мы будем редьюсить
        [](auto& src, auto& dst) { // mapper
            dst.SetName(ToLowerUTF8(src.GetName()));
            return true;
        },
        [](auto& /*src*/, auto& dst) { // reducer
            // dst.SetName() не вызываем, т.к. когда по
            // "name" редьюсим, это поле уже заполнено.

            // так можно делать, т.к. конструктор протобуфа dst
            // проинициализирует .Count в 0 (по умолчанию):
            dst.SetCount(dst.GetCount() + 1);
        });

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Подготовка операции в классе джоба

Есть возможность настраивать параметры операции, перегружая метод IJob::PrepareOperation, подробности см. в разделе Описание.

import "yt/yt_proto/yt/formats/extension.proto";

message TGrepperRecord
{
    optional string Key = 1;
    optional bytes Other = 2 [(NYT.flags) = OTHER_COLUMNS];
};

#include <yt/cpp/mapreduce/examples/tutorial/prepare_operation/grepper.pb.h>

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

// Этот маппер отдаёт все строки, значение в колонке `column` которых равно `pattern`.
class TGrepper
    : public IMapper<TTableReader<TGrepperRecord>, TTableWriter<TGrepperRecord>>
{
public:
    TGrepper() = default;
    TGrepper(TString column, TString pattern)
        : Column_(column)
        , Pattern_(pattern)
    { }

    void Do(TReader* reader, TWriter* writer) override
    {
        for (const auto& cursor : *reader) {
            auto row = cursor.GetRow();
            if (row.GetKey() == Pattern_) {
                writer->AddRow(row);
            }
        }
    }

    void PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& preparer) const override
    {
        auto schema = context.GetInputSchema(/* tableIndex */ 0);
        // Выходная схема совпадает со входной во всём, кроме названия колонки, по которой ищем.
        for (auto& column : schema.MutableColumns()) {
            if (column.Name() == Column_) {
                column.Name("Key");
            }
        }
        preparer
            .InputColumnRenaming(/* tableIndex */ 0, {{Column_, "Key"}})
            .InputDescription<TGrepperRecord>(/* tableIndex */ 0)
            // Выключаем автовывод схемы, т.к. он ничего не даст (у нас есть поле, помеченное OTHER_COLUMNS).
            .OutputDescription<TGrepperRecord>(/* tableIndex */ 0, /* inferSchema */ false)
            .OutputSchema(/* tableIndex */ 0, schema);
    }

    Y_SAVELOAD_JOB(Column_, Pattern_);

private:
    TString Column_;
    TString Pattern_;
};
REGISTER_MAPPER(TGrepper)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    // Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-grepped-emails";

    client->Map(
        new TGrepper("login", "lev"),
        "//home/tutorial/staff_unsorted_schematized",
        outputTable);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Продвинутый уровень

Запуск нескольких операций параллельно

Для запуска нескольких операций параллельно существует класс TOperationTracker.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/operation_tracker.

#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/library/operation_tracker/operation_tracker.h> // OperationTracker живёт в этой библиотеке

#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
    const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
    const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
    const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    TOperationTracker tracker;
    tracker.AddOperation(
        client->Sort(
            TSortOperationSpec()
                .AddInput("//home/tutorial/staff_unsorted")
                .Output(sortedLoginTable)
                .SortBy({"uid"}),
            TOperationOptions().Wait(false))); // Wait(false) говорит клиенту не ждать завершения операции

    tracker.AddOperation(
        client->Sort(
            TSortOperationSpec()
                .AddInput("//home/tutorial/is_robot_unsorted")
                .Output(sortedIsRobotTable)
                .SortBy({"uid"}),
            TOperationOptions().Wait(false)));

    tracker.WaitAllCompleted(); // Ждёт когда обе операции завершатся, бросит исключение в случае ошибок.

    return 0;
}

Reduce с enable_key_guarantee=false

Предположим, нужно отфильтровать таблицу с URL по регулярным выражениям, которые лежат в отдельной таблице с хостами (т. е. для каждого хоста свое регулярное выражение). Задачу можно было бы решить Reduce по колонке host, но реальные данные часто устроены таким образом, что какие-то хосты имеют непропорционально много URL. В таблице с примером URL с хоста https://www.youtube.com занимают больше половины таблицы. Джоб, который будет обрабатывать такой хост, займёт непропорционально много времени (ключ https://www.youtube.com в такой ситуации называют ключом-монстром).

Но для решения задачи не нужно, чтобы все записи с одним хостом попадали на одну машину. Возможно распределить таблицу с URL между несколькими машинами: важно, чтобы вместе с URL обязательно был получен хост.

host1, url1
host1, url1
host1, url3
host1, url3
host1, url2
host1, url2
host1, url4
host1, url4
host1, url5
host1, url5
host2, url1
host2, url1
host2, url2
host2, url2
host3, url1
host3, url1
host1, data1
host1, data1
host2, data1
host2, data1
host3, data1
host3, data1
host1, data1
host1, data1
host1, url1
host1, url1
host1, url3
host1, url3
host1, url2
host1, url2
host1, url4
host1, url4
host2, url1
host2, url1
host2, url2
host2, url2
host3, url1
host3, url1
host2, data1
host2, data1
host3, data1
host3, data1
Table 1
Table 1
Table 2
Table 2
Job 1
Job 1
Job 2
Job 2
Job 3
Job 3
Reduce
Reduce

Layer 1
host1, url1
host1, url1
host1, url3
host1, url3
host1, url2
host1, url2
host1, url4
host1, url4
host1, url5
host1, url5
host2, url1
host2, url1
host2, url2
host2, url2
host3, url1
host3, url1
host1, data1
host1, data1
host2, data1
host2, data1
host3, data1
host3, data1
host1, data1
host1, data1
host1, url1
host1, url1
host1, url2
host1, url2
host2, url1
host2, url1
host2, url2
host2, url2
host3, url1
host3, url1
host2, data1
host2, data1
host3, data1
host3, data1
Table 1
Table 1
Table 2
Table 2
Job 1
Job 1
Job 3
Job 3
Job 4
Job 4
host1, data1
host1, data1
host1, url3
host1, url3
host1, url4
host1, url4
Job 2
Job 2
Урлы с одного хоста теперь обрабатываются несколькими джобами
Урлы с одного хоста теперь обрабатываются несколькими джобами
Эту таблицу отмечаем как foreign
Эту таблицу отмечаем как foreign

Reduce

enable_key_guarantee=false

Пример лежит в yt/cpp/mapreduce/examples/tutorial/join_reduce_tnode.

#include <yt/cpp/mapreduce/interface/client.h>

#include <library/cpp/regex/pcre/regexp.h>

#include <util/system/user.h>

using namespace NYT;

class TFilterVideoRegexp
    : public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    void Do(TReader* reader, TWriter* writer) override
    {
        // Так же как и с обычным Reducer'ом в каждый вызов метода Do
        // придут записи с общим JoinBy ключом.
        TMaybe<TRegExMatch> regex;
        for (auto& cursor : *reader) {
            auto row = cursor.GetRow();
            if (cursor.GetTableIndex() == 0) { // таблица с хостами
                const auto videoRegexp = row["video_regexp"].AsString();

                // Дебажная печать, stderr можно будет посмотреть в web интерфейсе
                Cerr << "Processing host: " << row["host"].AsString() << Endl;
                if (!videoRegexp.empty()) {
                    regex = TRegExMatch(videoRegexp);
                }
            } else { // таблица с урлами
                if (regex && regex->Match(row["path"].AsString().c_str())) {
                    writer->AddRow(row);
                }
            }
        }
    }
};
REGISTER_REDUCER(TFilterVideoRegexp)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-join-reduce";
    const TString outputTmpVideoTable = "//tmp/" + GetUsername() + "-tutorial-join-reduce-video-sorted";
    const TString outputTmpDocTable = "//tmp/" + GetUsername() + "-tutorial-join-reduce-doc-sorted";

    client->Sort(TSortOperationSpec()
        .SortBy({"host"})
        .AddInput("//home/tutorial/host_video_regexp")
        .Output(outputTmpVideoTable));

    client->Sort(TSortOperationSpec()
        .SortBy({"host"})
        .AddInput("//home/tutorial/doc_title")
        .Output(outputTmpDocTable));

    client->Reduce(
        TReduceOperationSpec()
        .JoinBy({"host"})
        .AddInput<TNode>(
            TRichYPath(outputTmpVideoTable)
            .Foreign(true)) // важно отметить хостовую таблицу как foreign
        .AddInput<TNode>(outputTmpDocTable)
        .AddOutput<TNode>(outputTable)
        .EnableKeyGuarantee(false),
        new TFilterVideoRegexp);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;

    return 0;
}

Пакетные запросы

Есть возможность исполнять «лёгкие» запросы (создать/удалить таблицу, проверить её существование и т.д. ) пачками. Этот способ стоит использовать, если нужно выполнить большое количество однотипных операций. Пакетные (batch) запросы помогут заметно сэкономить время.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/batch_request.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/system/user.h>

using namespace NYT;

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    // Создаём batch запрос (это можно делать и из транзакции).
    auto request = client->CreateBatchRequest();

    // Добавляем запросы в batch
    NThreading::TFuture<bool> docTitleExists = request->Exists("//home/tutorial/doc_title");
    NThreading::TFuture<bool> unexistingTableExists = request->Exists("//home/tutorial/unexisting_table");

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-test-batch";
    NThreading::TFuture<TNodeId> createResult = request->Create(outputTable, NT_TABLE);

    // Выполняем batch запрос.
    request->ExecuteBatch();

    // Проверяем результаты.
    Cout << "Table //home/tutorial/doc_title exists: " << docTitleExists.GetValue() << Endl;
    Cout << "Table //home/tutorial/unexisting_table exists: " << unexistingTableExists.GetValue() << Endl;

    try {
        // Следует проверять все результаты с помощью GetValue(),
        // т.к. отдельные запросы могут пофейлиться и тогда соответствующая TFuture будет содержать ошибку.
        //
        // Если запускать эту программу второй раз то Create пофейлится, потому что таблица уже существует.
        createResult.GetValue();
    } catch (const std::exception& ex) {
        Cerr << "Create " << outputTable << " failed: " << ex.what() << Endl;
    }

    return 0;
}

Доставка дампа таблицы в джоб в виде файла

В YTsaurus в джоб можно доставлять файлы (как, например, описано ранее). Менее известной возможностью является доставка дампа таблицы в джоб. Это может быть удобно, если джобу требуется словарь, хранящийся в небольшой (едининичные гигабайты) таблице. Однако не стоит использовать этот подход для загрузки больших таблиц — если размер таблицы несколько десятков гигабайт, такая таблица не поместится на одном узле кластера, на котором запускаются джобы.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/pass_table_as_file.

#include <yt/cpp/mapreduce/interface/client.h>

#include <util/generic/hash_set.h>
#include <util/stream/output.h>
#include <util/system/user.h>

using namespace NYT;

class TFilterRobotsMap
    : public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
    // Мы читаем дамп таблицы в методе Start, перед тем как обработать первую запись в Map-таблице.
    // Настройка доставки таблицы в джобу происходит в функции main.
    void Start(TWriter*) override {
        // Нам в джобу доставили файлик "robot_table" с дампом таблицы.
        TFileInput tableDump("robot_table");

        // Есть функция CreateTableReader<>, которая умеет создавать читателя из любого IInputStream'а.
        // Созданный читатель имеет интерфейс аналогичный другим читателям.
        auto reader = CreateTableReader<TNode>(&tableDump);
        for (auto& cursor : *reader) {
            const auto& curRow = cursor.GetRow();
            if (curRow["is_robot"].AsBool()) {
                RobotUids.insert(curRow["uid"].AsInt64());
            }
        }
    }

    void Do(TReader* reader, TWriter* writer) override {
        for (auto& cursor : *reader) {
            const auto& curRow = cursor.GetRow();
            if (RobotUids.contains(curRow["uid"].AsInt64())) {
                writer->AddRow(curRow);
            }
        }
    }

private:
    THashSet<i64> RobotUids;
};
REGISTER_MAPPER(TFilterRobotsMap)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");

    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";

    client->Map(
        TMapOperationSpec()
            .MapperSpec(
                TUserJobSpec()
                // Самое интересное -- мы просим YT доставить нам табличку в виде файла.
                .AddFile(
                    TRichYPath("//home/tutorial/is_robot_unsorted") // Тут указываем таблицу, которую нам надо доставить.
                    .Format("yson") // Это формат, в котором таблица будет прочитана, нам нужен yson, чтобы TNode-читатель в джобе смог прочитать файл.
                    .FileName("robot_table") // Это имя файла, с дампом таблицы.
                    // Тут же можно было бы указать фильтрацию по колонкам или фильтрацию по номерам строк таблицы
                    // с помощью соответсвтующих методов TRichYPath, но нам ничего этого не надо.
                ))
            .AddInput<TNode>("//home/tutorial/staff_unsorted")
            .AddOutput<TNode>(outputTable),
        new TFilterRobotsMap);

    Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
    return 0;
}

Запись и получение job statistics

Во время работы операция накапливает множество статистик (читайте документацию).

Пример на простой Map со сбором статистик: yt/cpp/mapreduce/examples/tutorial/job_statistics:

#include <yt/cpp/mapreduce/interface/client.h>
#include <util/system/user.h>

using namespace NYT;

class TComputeEmailsMapper
    : public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
    public:
        void Do(TReader* reader, TWriter* writer) override
        {
            // Соберём несколько интересных статистик по сотрудникам
            i64 fortyTwoCount = 0;
            i64 shortLoginCount = 0;
            for (const auto& cursor : *reader) {
                const auto& row = cursor.GetRow();
                auto login = row["login"].AsString();
                TNode outRow;
                outRow["name"] = row["name"];
                outRow["email"] = login + "@yandex-team.ru";
                writer->AddRow(outRow);

                if (login.find("42") != TString::npos) {
                    ++fortyTwoCount;
                }
                if (login.size() <= 4) {
                    ++shortLoginCount;
                }
            }

            // Пользовательскую статистику можно писать по пути (имени)
            WriteCustomStatistics("names/with_42", fortyTwoCount);
            WriteCustomStatistics(TNode() // А можно в виде мапы
                ("names", TNode()
                    ("short", shortLoginCount)));
        }
};
REGISTER_MAPPER(TComputeEmailsMapper)

int main() {
    NYT::Initialize();

    auto client = CreateClient("freud");
    const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails";

    auto operation = client->Map(
        TMapOperationSpec()
            .AddInput<TNode>("//home/tutorial/staff_unsorted")
            .AddOutput<TNode>(outputTable),
        new TComputeEmailsMapper);

    auto jobStatistics = operation->GetJobStatistics();
    // Выведем системные статистики
    Cout << "time/total: " << jobStatistics.GetStatisticsAs<TDuration>("time/total").Sum() << Endl;
    Cout << "data/input/row_count: " << jobStatistics.GetStatistics("data/input/row_count").Sum() << Endl;

    // NB: для получения пользовательских статистик используется другой метод
    Cout << "names/with_42: " << jobStatistics.GetCustomStatistics("names/with_42").Sum() << Endl;
    Cout << "names/short: " << jobStatistics.GetCustomStatistics("names/short").Sum() << Endl;
    return 0;
}

Базовая работа с динамическими таблицами

YTsaurus поддерживает динамические таблицы. Wrapper С++ предоставляет возможность совершать базовые операции с такими таблицами.

Внимание

В отличие от других примеров, для запуска этого примера придётся совершить некоторые дополнительные действия:

  • По умолчанию у пользователей нет разрешений для создания динамических таблиц, поэтому надо получить права на создание и монтирование динамических таблиц на каком-нибудь кластере.
  • Во время запуска примера нужно передать имя кластера и путь к тестовой таблице (она не должна существовать). Подробнее в комментариях в тексте программы.

Пример лежит в yt/cpp/mapreduce/examples/tutorial/dyntable_get_insert.

#include <yt/cpp/mapreduce/interface/client.h>

#include <yt/cpp/mapreduce/util/wait_for_tablets_state.h> // for WaitForTabletsState

#include <util/stream/output.h>

using namespace NYT;

int main(int argc, const char** argv) {
    NYT::Initialize();

    //
    // Программа принимает аргументами имя кластера и путь к таблице, с которой она будет работать (таблица не должна существовать).
    //
    // По умолчанию у пользователей нет прав монтировать динамические таблицы, и перед запуском программы необходимо получить
    // такие права (права на монтирование таблиц) на каком-нибудь кластере YT.
    if (argc != 3) {
        Cerr << "Usage:\n";
        Cerr << '\t' << argv[0] << " <server-name> <path-to-dynamic-table>\n";
        Cerr << '\n';
        Cerr << "For example:\n";
        Cerr << '\t' << argv[0] << " freud //home/ermolovd/test-dyntable" << Endl;
        return 1;
    }

    auto client = CreateClient(argv[1]);
    const TString dynamicTablePath = argv[2];

    auto schema = TTableSchema()
        .AddColumn("key", EValueType::VT_INT64, ESortOrder::SO_ASCENDING)
        .AddColumn("value", EValueType::VT_STRING);

    // ВАЖНО: при создании динамической таблицы нам нужно
    //  - указать атрибут dynamic: true
    //  - указать схему
    // Это нужно сделать обязательно в вызове Create. Т.е. не получится сначала создать таблицу,
    // а потом проставить эти атрибуты.
    client->Create(
        dynamicTablePath, NT_TABLE,
        TCreateOptions()
        .Force(true)
        .Attributes(
            TNode()
                ("dynamic", true)
                ("schema", schema.ToNode())));

    // Для того чтобы начать работу с динамической таблицей, её необходимо "подмонтировать".
    //
    // Часто создание / монтирование / размонтирование таблиц делается отдельными административными скриптами,
    // а приложение просто расчитывает, что таблицы существуют и уже подмонтированы.
    //
    // Мы для полноты примера подмонтируем таблицу, а в конце её размонтируем.
    client->MountTable(dynamicTablePath);
    Cout << "Waiting tablets are mounted..." << Endl;

    // Функция MountTable (и UnmountTable) запускает асинхронный процесс монтирования,
    // который может занять значительное время (десятки секунд) для больших таблиц.
    // Нам необходимо дождаться завершения этого процесса.
    WaitForTabletsState(client, dynamicTablePath, TS_MOUNTED);

    // Вставлять значения в динтаблицу можно с помощью InsertRows
    client->InsertRows(dynamicTablePath, {
        TNode()("key", 1)("value", "один"),
        TNode()("key", 42)("value", "сорок два"),
        TNode()("key", 100500)("value", "стопятьсот"),
    });

    // Получать значения из динтаблицы можно с помощью LookupRows,
    // возвращает список найденных строчек.
    auto result = client->LookupRows(dynamicTablePath, {
        TNode()("key", 100500),
        TNode()("key", 42),
    });

    Cout << "====== LOOKUP RESULT ======" << Endl;
    for (const auto& row : result) {
        Cout << "key: " << row["key"].AsInt64() << " value: " << row["value"].AsString() << Endl;
    }
    Cout << "====== END LOOKUP RESULT ======" << Endl;

    client->UnmountTable(dynamicTablePath);
    Cout << "Waiting tablets are unmounted..." << Endl;
    WaitForTabletsState(client, dynamicTablePath, TS_UNMOUNTED);

    return 0;
}

Следующая