Примеры использования
- Базовый уровень
- Простой Map (TNode-версия)
- Простой Map (Protobuf-версия)
- Простой Map с Protobuf и использованием лямбда-выражений в C++
- Сортировка таблицы и простая операция Reduce
- Сложные типы и формат Protobuf (на примере Reduce)
- Reduce с несколькими входными таблицами
- Чтение файлов из операций
- Reduce с несколькими входными и несколькими выходными таблицами
- Reduce с несколькими входными и несколькими выходными таблицами (Protobuf-версия)
- Чтение и запись таблиц
- Передача состояния в джоб
- Операция MapReduce (Protobuf-версия)
- Операция MapReduce (версия с лямбда-выражениями)
- Подготовка операции в классе джоба
- Продвинутый уровень
Перед запуском примеров необходимо получить YT-токен.
Примеры запускаются обычным способом через ya
. Запускать их нужно из-под Linux
без аргументов. внутренняя история?
cd /path/to/arcadia
cd yt/cpp/mapreduce/examples/tutorial/simple_map_tnode
ya make
./simple_map_tnode
Рекомендуется установить уровень логирования в INFO
или DEBUG
, тогда программы будут выводить ссылки на веб-интерфейс, где можно следить за выполнением операций.
Уровень логирования устанавливается с помощью переменной окружения такой командой (см. так же документацию):
export YT_LOG_LEVEL=INFO
Для продакшн-процессов рекомендуется сохранять наиболее полные DEBUG
логи. В случае возникновения каких-либо проблем они помогут понять, что происходило.
Базовый уровень
Простой Map (TNode-версия)
Предположим, что существует таблица с логинами. Необходимо сделать таблицу с email-адресами, вычислив их из логина email = login + "@domain"
.
Для этого подойдёт простой маппер.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_map_tnode
тоже внутри
'yt/cpp/mapreduce/tutorial/simple_map_tnode/main.cpp'
?
Простой Map (Protobuf-версия)
Если известно, как устроена таблица, то может быть удобно воспользоваться форматом Protobuf, который:
- несколько быстрее;
- помогает избежать опечаток в названиях колонок и проверит работу с типами.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_map_protobuf. внутри
import "yt/yt_proto/yt/formats/extension.proto";
message TLoginRecord
{
optional string Name = 1 [(NYT.column_name) = "name"];
optional string Login = 2 [(NYT.column_name) = "login"];
}
message TEmailRecord
{
optional string Name = 1 [(NYT.column_name) = "name"];
optional string Email = 2 [(NYT.column_name) = "email"];
}
#include <yt/cpp/mapreduce/examples/tutorial/simple_map_protobuf/data.pb.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TComputeEmailsMapper
: public IMapper<TTableReader<TLoginRecord>, TTableWriter<TEmailRecord>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
for (auto& cursor : *reader) {
const auto& loginRecord = cursor.GetRow();
TEmailRecord emailRecord;
emailRecord.SetName(loginRecord.GetName());
emailRecord.SetEmail(loginRecord.GetLogin() + "@yandex-team.ru");
writer->AddRow(emailRecord);
}
}
};
REGISTER_MAPPER(TComputeEmailsMapper)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
// Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails-protobuf";
client->Map(
TMapOperationSpec()
.AddInput<TLoginRecord>("//home/dev/tutorial/staff_unsorted")
.AddOutput<TEmailRecord>(outputTable),
new TComputeEmailsMapper);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Простой Map с Protobuf и использованием лямбда-выражений в C++
Некоторые достаточно простые операции можно представить в виде лямбда-выражений C++ с пустым capture-list (т.е. не захватывающих переменных) с использованием библиотеки yt/cpp/mapreduce/library/lambda.
Предыдущий пример может быть переписан так (data.proto
такой же):
(yt/cpp/mapreduce/examples/tutorial/simple_map_lambda)
#include <yt/cpp/mapreduce/examples/tutorial/simple_map_lambda/data.pb.h>
#include <yt/cpp/mapreduce/library/lambda/yt_lambda.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <util/stream/output.h>
#include <util/system/user.h>
struct TGlobalSettings {
TString MailSuffix;
Y_SAVELOAD_DEFINE(MailSuffix);
};
NYT::TSaveable<TGlobalSettings> GlobalSettings;
int main() {
NYT::Initialize();
auto client = NYT::CreateClient("freud");
// Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
auto outputTable = NYT::TRichYPath("//tmp/" + GetUsername() + "-tutorial-emails-protobuf")
.CompressionCodec("zlib_9");
// TransformCopyIf запоминает содержимое GlobalSettings на момент своего запуска
// и восстанавливает его в job'ах, в которых вызывается лямбда.
// Этот способ требует аккуратности, если операции запускать из разных потоков.
GlobalSettings.MailSuffix = "@yandex-team.ru";
NYT::TransformCopyIf<TLoginRecord, TEmailRecord>(
client,
"//home/dev/tutorial/staff_unsorted",
outputTable,
[](auto& src, auto& dst) {
dst.SetName(src.GetName());
dst.SetEmail(src.GetLogin() + GlobalSettings.MailSuffix);
return true;
});
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable.Path_ << Endl;
return 0;
}
Сортировка таблицы и простая операция Reduce
Чтобы по той же таблице (внутри) посчитать статистику, сколько раз встречается то или иное имя, можно использовать операцию Reduce. Однако, Reduce можно запускать только на сортированных таблицах, поэтому таблицу нужно сначала отсортировать.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/simple_reduce_tnode. внутри
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TCountNamesReduce
: public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
// В Do приходят все записи с общим reduce ключом, т.е. в нашем случае с общим полем `name'.
const auto& row = reader->GetRow();
const auto name = row["name"];
ui32 count = 0;
for ([[maybe_unused]] auto& cursor : *reader) {
++count;
}
TNode result;
result["name"] = name;
result["count"] = count;
writer->AddRow(result);
}
};
REGISTER_REDUCER(TCountNamesReduce)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString sortedTmpTable = "//tmp/" + GetUsername() + "-tutorial-tmp";
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails";
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/staff_unsorted")
.Output(sortedTmpTable)
.SortBy({"name"}));
client->Reduce(
TReduceOperationSpec()
.ReduceBy({"name"})
.AddInput<TNode>(sortedTmpTable) // Входная таблица должна быть отсортирована по тем колонкам,
// по которым мы осуществляем reduce.
.AddOutput<TNode>(outputTable),
new TCountNamesReduce);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Сложные типы и формат Protobuf (на примере Reduce)
Внимание
Фича пока в экспериментальном статусе, можно экспериментировать на hume, прод завязывать не стоит, интерфейсы могут поменяться
Пример: таблица с набором записей про ссылки, которые встречаются в веб-документах. DocTitle
— имя документа, Link
— структура с полями Host: string, Port: int32, Path: string
, OccurenceCount
— количество встречаний данной ссылки в документе.
Таблица отсортирована по DocTitle
. Необходимо собрать информацию о каждом документе в одну строку выходной таблицы.
Опишем protobuf-сообщения следующего вида:
import "yt/yt_proto/yt/formats/extension.proto";
message TUrl {
optional string Host = 1 [(NYT.column_name) = "Host"];
optional int32 Port = 2 [(NYT.column_name) = "Port"];
optional string Path = 3 [(NYT.column_name) = "Path"];
}
message TExtraInfo {
optional uint32 TotalOccurrenceCount = 1;
}
message TDoc {
// Помечаем, что поля данного сообщения будут сериализоваться
// в YT-режиме. Это означает, что им сопоставлены колонки таблицы
// с соответствующими (возможно, сложными) типами.
option (NYT.default_field_flags) = SERIALIZATION_YT;
optional string Title = 1 [(NYT.key_column_name) = "Title"];
repeated TUrl Links = 2 [(NYT.column_name) = "Links"];
repeated uint32 OccurrenceCounts = 3 [(NYT.column_name) = "UpdateTimes"];
// Данное поле специально помечено как сериализуемое в бинарном (PROTOBUF) режиме.
// В таблице эта колонка будет иметь тип "string".
optional TExtraInfo ExtraInfo = 4 [(NYT.column_name) = "ExtraInfo", (NYT.flags) = SERIALIZATION_PROTOBUF];
}
message TLinkEntry {
option (NYT.default_field_flags) = SERIALIZATION_YT;
optional string DocTitle = 1 [(NYT.column_name) = "DocTitle"];
optional TUrl Link = 2 [(NYT.column_name) = "Link"];
optional uint32 OccurrenceCount = 3 [(NYT.column_name) = "OccurrenceCount"];
}
TLinkEntry
соответствует строке входной таблицы, TDoc
— строке выходной.
Поля TDoc
имеют следующую семантику:
Title
— заголовок;Links
— список ссылок, упомянутых в документе;OccurenceCounts
— список количества вхождений соответствующих ссылок (длина этого списка равна длине спискаLinks
);ExtraInfo
— дополнительная информация, в данном случае включающая суммарное количество вхождений ссылок.
Внимание
(NYT.field_serialization_mode)
в сообщениях TDoc
и TLinkEntry
. Эта опция по умолчанию равна PROTOBUF
, то есть "сериализовать поле в виде последовательности байт". Выставленное нами значение YT
означает, что соответствующее вложенное сообщение будет сопоставлено сложному типу в схеме таблицы. В TDoc
для примера мы пометили поле ExtraInfo
опцией (NYT.serialization_mode) = PROTOBUF
(обратите внимание на тип соответствующего поля в выходной таблице).
Reducer пишется естественным образом.
#include <yt/cpp/mapreduce/examples/tutorial/protobuf_complex_types/data.pb.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
// Редьюсер агрегирует информацию о ссылках на документ с данным заголовком.
class AggregateLinksReducer
: public IReducer<TTableReader<TLinkEntry>, TTableWriter<TDoc>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
TDoc doc;
for (auto& cursor : *reader) {
auto entry = cursor.MoveRow();
if (!doc.HasTitle()) {
doc.SetTitle(entry.GetDocTitle());
}
doc.AddLinks()->Swap(entry.MutableLink());
doc.AddOccurrenceCounts(entry.GetOccurrenceCount());
auto newCount = doc.GetExtraInfo().GetTotalOccurrenceCount() + entry.GetOccurrenceCount();
doc.MutableExtraInfo()->SetTotalOccurrenceCount(newCount);
}
writer->AddRow(doc);
}
};
REGISTER_REDUCER(AggregateLinksReducer)
int main() {
NYT::Initialize();
TString cluster = "hume";
auto client = CreateClient(cluster);
const TString sortedLinksTable = "//home/levysotsky/yt-tutorial/links-sorted-schematized";
Cout << "Sorted links table: https://yt.yandex-team.ru/" << cluster << "/#page=navigation&offsetMode=row&path=" << sortedLinksTable << Endl;
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-docs-protobuf";
// Обратите внимание на опцию `InferOutputSchema`,
// она заставляет навешивать на выходную таблицу схему.
client->Reduce(
TReduceOperationSpec()
.AddInput<TLinkEntry>(sortedLinksTable)
.AddOutput<TDoc>(outputTable)
.ReduceBy({"DocTitle"}),
new AggregateLinksReducer,
TOperationOptions()
.InferOutputSchema(true));
Cout << "Output table: https://yt.yandex-team.ru/" << cluster << "/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Reduce с несколькими входными таблицами
внутренние ссылки:
Если кроме старой таблицы с пользователями, есть таблица, где записано, кто робот.
Следующая программа сделает таблицу, в которой останутся только пользователи-роботы.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_reduce_tnode.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TFilterRobotsReduce
: public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* reader, TWriter* writer) override {
TNode loginRow;
bool isRobot = false;
for (auto& cursor : *reader) {
const auto& curRow = cursor.GetRow();
// У нас есть информация о том из какой таблицы пришла запись.
auto tableIndex = cursor.GetTableIndex();
if (tableIndex == 0) {
// Таблица с логинами.
loginRow = curRow;
} else if (tableIndex == 1) {
// Таблица про роботов.
isRobot = curRow["is_robot"].AsBool();
} else {
// Какая-то фигня, такого индекса быть не может.
Y_ABORT();
}
}
if (isRobot) {
writer->AddRow(loginRow);
}
}
};
REGISTER_REDUCER(TFilterRobotsReduce)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/staff_unsorted")
.Output(sortedLoginTable)
.SortBy({"uid"}));
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/is_robot_unsorted")
.Output(sortedIsRobotTable)
.SortBy({"uid"}));
client->Reduce(
TReduceOperationSpec()
.ReduceBy({"uid"})
.AddInput<TNode>(sortedLoginTable) // Таблицу с логинами мы добавляем первой, поэтому её TableIndex == 0
.AddInput<TNode>(sortedIsRobotTable) // Таблицу про роботов мы добавляем второй, поэтому её TableIndex == 1
.AddOutput<TNode>(outputTable),
new TFilterRobotsReduce);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Чтение файлов из операций
внутренние ссылки:
Предыдущую задачу можно решать и по-другому: загрузить таблицу с роботами прямо на машины, где будут запускаться операции, и считать её там целиком в yhash_set
.
В таком случае можно не сортировать таблицу и использовать Map, а не Reduce. Этот способ используется, если таблица небольшая (до нескольких ГБ).
Входные таблицы те же, что и в предыдущем примере: с пользователями и роботами.
Следующая программа сделает таблицу, в которой останутся только пользователи-роботы.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/map_tnode_with_file.
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/io.h>
#include <util/generic/hash_set.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TFilterRobotsMap
: public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* loginReader, TWriter* writer) override {
// Если мы прикрепляли табличку //path/to/table, то в операции мы будем видеть её под именем table
TIFStream stream("is_robot_unsorted");
auto isRobotReader = CreateTableReader<TNode>(&stream);
THashSet<i64> robotIds;
for (auto& cursor : *isRobotReader) {
const auto& curRow = cursor.GetRow();
if (curRow["is_robot"].AsBool()) {
robotIds.insert(curRow["uid"].AsInt64());
}
}
for (auto& cursor : *loginReader) {
const auto& curRow = cursor.GetRow();
if (robotIds.contains(curRow["uid"].AsInt64())) {
writer->AddRow(curRow);
}
}
}
};
REGISTER_MAPPER(TFilterRobotsMap)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString loginTable = "//home/dev/tutorial/staff_unsorted";
const TString isRobotTable = "//home/dev/tutorial/is_robot_unsorted";
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";
client->Map(
TMapOperationSpec()
.AddInput<TNode>(loginTable)
.MapperSpec(TUserJobSpec()
.AddFile(TRichYPath(isRobotTable).Format("yson"))) // Таблицу с роботами добавляем в виде файла
.AddOutput<TNode>(outputTable),
new TFilterRobotsMap);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Reduce с несколькими входными и несколькими выходными таблицами
внутренние ссылки:
Теперь необходимо сделать почти то же самое что и в предыдущем примере, но записать сразу 2 выходных таблицы: и с людьми и с роботами.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_tnode.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TSplitHumanRobotsReduce
: public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* reader, TWriter* writer) override {
TNode loginRow;
bool isRobot = false;
for (auto& cursor : *reader) {
const auto& curRow = cursor.GetRow();
auto tableIndex = cursor.GetTableIndex();
if (tableIndex == 0) {
loginRow = curRow;
} else if (tableIndex == 1) {
isRobot = curRow["is_robot"].AsBool();
} else {
Y_ABORT();
}
}
// Второй аргумент метода `AddRow' указывает, в какую таблицу будет записано значение.
if (isRobot) {
writer->AddRow(loginRow, 0);
} else {
writer->AddRow(loginRow, 1);
}
}
};
REGISTER_REDUCER(TSplitHumanRobotsReduce)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/staff_unsorted")
.Output(sortedLoginTable)
.SortBy({"uid"}));
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/is_robot_unsorted")
.Output(sortedIsRobotTable)
.SortBy({"uid"}));
client->Reduce(
TReduceOperationSpec()
.ReduceBy({"uid"})
.AddInput<TNode>(sortedLoginTable)
.AddInput<TNode>(sortedIsRobotTable)
.AddOutput<TNode>(robotTable) // выходная таблица номер 0
.AddOutput<TNode>(humanTable), // выходная таблица номер 1
new TSplitHumanRobotsReduce);
Cout << "Robot table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << robotTable << Endl;
Cout << "Human table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << humanTable << Endl;
return 0;
}
Reduce с несколькими входными и несколькими выходными таблицами (Protobuf-версия)
Чтобы сделать разные таблицы для людей и роботов, нужно переписать тот же самый reducer на protobuf. Таблица с людьми будет содержать поля login
, email
, name
. Таблица с роботами будет содержать поля login
и uid
. Потребуется завести отдельные типы protobuf сообщений для этих таблиц.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_protobuf.
import "yt/yt_proto/yt/formats/extension.proto";
message TUserRecord {
optional int64 Uid = 1 [(NYT.column_name) = "uid"];
optional string Name = 2 [(NYT.column_name) = "name"];
optional string Login = 3 [(NYT.column_name) = "login"];
}
message TIsRobotRecord {
optional int64 Uid = 1 [(NYT.column_name) = "uid"];
optional bool IsRobot = 2 [(NYT.column_name) = "is_robot"];
}
message THumanRecord {
optional string Name = 1 [(NYT.column_name) = "name"];
optional string Email = 2 [(NYT.column_name) = "email"];
optional string Login = 3 [(NYT.column_name) = "login"];
}
message TRobotRecord {
optional int64 Uid = 1 [(NYT.column_name) = "uid"];
optional string Login = 2 [(NYT.column_name) = "login"];
}
#include <yt/cpp/mapreduce/examples/tutorial/multiple_input_multiple_output_reduce_protobuf/data.pb.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TSplitHumanRobotsReduce
// Обратите внимание наш редьюс работает с несколькими типами записей
// как на вход так и на выход, поэтому мы используем ::google::protobuf::Message
: public IReducer<
TTableReader<::google::protobuf::Message>,
TTableWriter<::google::protobuf::Message>>
{
public:
void Do(TReader* reader, TWriter* writer) override {
TUserRecord userRecord;
bool isRobot = false;
for (auto& cursor : *reader) {
auto tableIndex = cursor.GetTableIndex();
// Мы знаем номер таблицы и поэтому мы можем запросить конкретный тип protobuf'а в этом месте.
// Тип protobuf сообщения передаётся шаблонным аргументом к методу `GetRow()'.
if (tableIndex == 0) {
userRecord = cursor.GetRow<TUserRecord>();
} else if (tableIndex == 1) {
const auto& isRobotRecord = cursor.GetRow<TIsRobotRecord>();
isRobot = isRobotRecord.GetIsRobot();
} else {
Y_ABORT();
}
}
// В AddRow мы можем передавать как TRobotRecord так и THumanRecord.
if (isRobot) {
TRobotRecord robotRecord;
robotRecord.SetUid(userRecord.GetUid());
robotRecord.SetLogin(userRecord.GetLogin());
writer->AddRow(robotRecord, 0);
} else {
THumanRecord humanRecord;
humanRecord.SetName(userRecord.GetName());
humanRecord.SetLogin(userRecord.GetLogin());
humanRecord.SetEmail(userRecord.GetLogin() + "@yandex-team.ru");
writer->AddRow(humanRecord, 1);
}
}
};
REGISTER_REDUCER(TSplitHumanRobotsReduce)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString sortedUserTable = "//tmp/" + GetUsername() + "-tutorial-user-sorted";
const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/staff_unsorted")
.Output(sortedUserTable)
.SortBy({"uid"}));
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/is_robot_unsorted")
.Output(sortedIsRobotTable)
.SortBy({"uid"}));
client->Reduce(
TReduceOperationSpec()
.ReduceBy({"uid"})
.AddInput<TUserRecord>(sortedUserTable)
.AddInput<TIsRobotRecord>(sortedIsRobotTable)
.AddOutput<TRobotRecord>(robotTable)
.AddOutput<THumanRecord>(humanTable),
new TSplitHumanRobotsReduce);
Cout << "Robot table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << robotTable << Endl;
Cout << "Human table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << humanTable << Endl;
return 0;
}
Чтение и запись таблиц
YT позволяет писать в таблицы и дописывать данные в них. По чтению таблиц тоже есть несколько режимов: можно читать как всю таблицу, так и отдельные диапазоны по номеру строки или ключу.
См. также разделы про чтение и запись данных и таблицы.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/table_read_write_tnode.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/system/user.h>
using namespace NYT;
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString table = "//tmp/" + GetUsername() + "-read-write";
{
// Просто пишем данные в таблицу, если таблица существует, её перезапишут.
auto writer = client->CreateTableWriter<TNode>(table);
TNode row;
row["english"] = "one";
row["russian"] = "один";
writer->AddRow(row);
row["english"] = "two";
row["russian"] = "два";
writer->AddRow(row);
// Лучше не забывать звать метод Finish явно.
// Он конечно позовётся в деструкторе writer'а, но если случатся какие-нибудь ошибки,
// то у вас не будет возможности их поймать, потому что деструктор в случае ошибки абортит программу.
writer->Finish();
}
{
// Дописываем данные в конец таблицы,
// Для этого необходимо выставить опцию Append.
auto writer = client->CreateTableWriter<TNode>(TRichYPath(table).Append(true));
TNode row;
row["english"] = "three";
row["russian"] = "три";
writer->AddRow(row);
writer->Finish();
}
{
// Читаем всю таблицу
auto reader = client->CreateTableReader<TNode>(table);
Cout << "*** ALL TABLE ***" << Endl;
for (auto& cursor : *reader) { // reader имеет тот же самый интерфейс, что и reader в джобах
auto& row = cursor.GetRow();
Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
}
Cout << Endl;
}
{
// Читаем первые 2 строки.
auto reader = client->CreateTableReader<TNode>(TRichYPath(table).AddRange(
TReadRange()
.LowerLimit(TReadLimit().RowIndex(0))
.UpperLimit(TReadLimit().RowIndex(2)))); // читаем с 0й по 2ю строки, 2я невключительно
Cout << "*** FIRST TWO ROWS ***" << Endl;
for (auto& cursor : *reader) {
auto& row = cursor.GetRow();
Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
}
Cout << Endl;
}
{
// Мы можем отсортировать таблицу и читать записи по ключам.
client->Sort(TSortOperationSpec()
.SortBy({"english"})
.AddInput(table)
.Output(table));
auto reader = client->CreateTableReader<TNode>(TRichYPath(table).AddRange(
TReadRange()
.Exact(TReadLimit().Key({"three"})))); // если нужен один ключ а не диапазон, можно использовать
// Exact вместо LowerLimit / UpperLimit,
// Key указывает, что мы ищем запись по ключу и работает
// только для сортированных таблиц.
Cout << "*** EXACT KEY ***" << Endl;
for (auto& cursor : *reader) {
auto& row = cursor.GetRow();
Cout << "russian: " << row["russian"].AsString() << "; " << "english: " << row["english"].AsString() << Endl;
}
Cout << Endl;
}
return 0;
}
Передача состояния в джоб
Иногда нужно написать операцию, которая принимает некоторые аргументы. Например, написать программу, которая отфильтрует нашу таблицу по имени пользователя.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/stateful_map_tnode.
#include <yt/cpp/mapreduce/interface/client.h>
#include <library/cpp/string_utils/levenshtein_diff/levenshtein_diff.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TFilterMapper
: public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
Y_SAVELOAD_JOB(Pattern_, MaxDistance_); // Заклинание, которое говорит, какие переменные нужно передавать на сервер.
TFilterMapper() = default; // У джобы обязательно должен быть конструктор по умолчанию
TFilterMapper(TString pattern, double maxDistance)
: Pattern_(std::move(pattern))
, MaxDistance_(maxDistance)
{ }
void Do(TReader* reader, TWriter* writer) override {
for (auto& cursor : *reader) {
const auto& row = cursor.GetRow();
const auto& name = row["name"].AsString();
if (NLevenshtein::Distance(name, Pattern_) <= MaxDistance_) {
writer->AddRow(row);
}
}
}
private:
TString Pattern_;
size_t MaxDistance_ = 0;
};
REGISTER_MAPPER(TFilterMapper)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output";
client->Map(
TMapOperationSpec()
.AddInput<TNode>("//home/dev/tutorial/staff_unsorted")
.AddOutput<TNode>(outputTable),
new TFilterMapper("Arkady", 2)); // Мы создаём объект TFilterMapper, и конструктор заполняет поля Pattern_ и MaxDistance_.
// Библиотека сериализует поля, указанные в Y_SAVELOAD_JOB, и загружает их на сервер.
// На сервере вызывается конструктор по умолчанию и сериализованные поля загружаются.
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Операция MapReduce (Protobuf-версия)
В YT есть слитная операция MapReduce, которая работает несколько быстрее нежели Map + Sort + Reduce. Чтобы по таблице с пользователями ещё раз посчитать статистику, сколько раз встречается то или иное имя, перед подсчётом нормализируем имена, приведя их к нижнему регистру. Это нужно, чтобы люди с именами АРКАДИЙ
и Аркадий
считались как одно.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/mapreduce_protobuf.
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/examples/tutorial/mapreduce_protobuf/data.pb.h>
#include <util/stream/output.h>
#include <util/system/user.h>
#include <util/charset/utf8.h>
using namespace NYT;
//
// Для того чтобы запустить операцию mapreduce, нам нужны обычные классы Mapper'а и Reducer'а
// (эти классы даже можно использовать в других местах в отдельных операциях Map/Reduce).
//
class TNormalizeNameMapper
: public IMapper<
TTableReader<TLoginRecord>,
TTableWriter<TLoginRecord>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
for (auto& cursor : *reader) {
auto row = cursor.GetRow();
row.SetName(ToLowerUTF8(row.GetName()));
writer->AddRow(row);
}
}
};
REGISTER_MAPPER(TNormalizeNameMapper)
class TCountNameReducer
: public IReducer<
TTableReader<TLoginRecord>,
TTableWriter<TNameStatistics>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{ TNameStatistics result;
ui64 count = 0;
for (auto& cursor : *reader) {
const auto& row = cursor.GetRow();
if (!result.HasName()) {
result.SetName(row.GetName());
}
++count;
}
result.SetCount(count);
writer->AddRow(result);
}
};
REGISTER_REDUCER(TCountNameReducer)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output";
// Запуск операции MapReduce несильно отличается от запуска других операций.
// Нам надо указать список ключей, по которым мы будем редьюсить
// и два класса -- один Mapper и один Reducer.
client->MapReduce(
TMapReduceOperationSpec()
.ReduceBy({"name"})
.AddInput<TLoginRecord>("//home/dev/tutorial/staff_unsorted")
.AddOutput<TNameStatistics>(outputTable),
new TNormalizeNameMapper,
new TCountNameReducer);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Операция MapReduce (версия с лямбда-выражениями)
Задача та же, что в предыдущем примере.
внутренние ссылки:
Этот пример лежит в yt/cpp/mapreduce/examples/tutorial/mapreduce_lambda.
#include <yt/cpp/mapreduce/examples/tutorial/mapreduce_lambda/data.pb.h>
#include <yt/cpp/mapreduce/library/lambda/yt_lambda.h>
#include <util/stream/output.h>
#include <util/system/user.h>
#include <util/charset/utf8.h>
using namespace NYT;
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
// Эта таблица будет с некоторой вероятностью не отсортирована.
// Если вместо MapReduce() использовать MapReduceSorted(), тогда
// она будет дополнительно досортирована по "name" и это будет отражено в схеме
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-output";
MapReduce<TLoginRecord, TLoginRecord, TNameStatistics>(
client,
"//home/dev/tutorial/staff_unsorted",
outputTable,
{"name"}, // список ключей, по которым мы будем редьюсить
[](auto& src, auto& dst) { // mapper
dst.SetName(ToLowerUTF8(src.GetName()));
return true;
},
[](auto& /*src*/, auto& dst) { // reducer
// dst.SetName() не вызываем, т.к. когда по
// "name" редьюсим, это поле уже заполнено.
// так можно делать, т.к. конструктор протобуфа dst
// проинициализирует .Count в 0 (по умолчанию):
dst.SetCount(dst.GetCount() + 1);
});
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Подготовка операции в классе джоба
Есть возможность настраивать параметры операции, перегружая метод IJob::PrepareOperation
, подробности см. в разделе Описание.
import "yt/yt_proto/yt/formats/extension.proto";
message TGrepperRecord
{
optional string Key = 1;
optional bytes Other = 2 [(NYT.flags) = OTHER_COLUMNS];
};
#include <yt/cpp/mapreduce/examples/tutorial/prepare_operation/grepper.pb.h>
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/interface/config.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
// Этот маппер отдаёт все строки, значение в колонке `column` которых равно `pattern`.
class TGrepper
: public IMapper<TTableReader<TGrepperRecord>, TTableWriter<TGrepperRecord>>
{
public:
TGrepper() = default;
TGrepper(TString column, TString pattern)
: Column_(column)
, Pattern_(pattern)
{ }
void Do(TReader* reader, TWriter* writer) override
{
for (const auto& cursor : *reader) {
auto row = cursor.GetRow();
if (row.GetKey() == Pattern_) {
writer->AddRow(row);
}
}
}
void PrepareOperation(const IOperationPreparationContext& context, TJobOperationPreparer& preparer) const override
{
auto schema = context.GetInputSchema(/* tableIndex */ 0);
// Выходная схема совпадает со входной во всём, кроме названия колонки, по которой ищем.
for (auto& column : schema.MutableColumns()) {
if (column.Name() == Column_) {
column.Name("Key");
}
}
preparer
.InputColumnRenaming(/* tableIndex */ 0, {{Column_, "Key"}})
.InputDescription<TGrepperRecord>(/* tableIndex */ 0)
// Выключаем автовывод схемы, т.к. он ничего не даст (у нас есть поле, помеченное OTHER_COLUMNS).
.OutputDescription<TGrepperRecord>(/* tableIndex */ 0, /* inferSchema */ false)
.OutputSchema(/* tableIndex */ 0, schema);
}
Y_SAVELOAD_JOB(Column_, Pattern_);
private:
TString Column_;
TString Pattern_;
};
REGISTER_MAPPER(TGrepper)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
// Выходная табличка у нас будет лежать в tmp и содержать имя текущего пользователя.
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-grepped-emails";
client->Map(
new TGrepper("login", "lev"),
"//home/dev/tutorial/staff_unsorted_schematized",
outputTable);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Продвинутый уровень
Запуск нескольких операций параллельно
внутренние ссылки:
Для запуска нескольких операций параллельно существует класс TOperationTracker.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/operation_tracker.
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/library/operation_tracker/operation_tracker.h> // OperationTracker живёт в этой библиотеке
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString sortedLoginTable = "//tmp/" + GetUsername() + "-tutorial-login-sorted";
const TString sortedIsRobotTable = "//tmp/" + GetUsername() + "-tutorial-is_robot-sorted";
const TString humanTable = "//tmp/" + GetUsername() + "-tutorial-humans";
const TString robotTable = "//tmp/" + GetUsername() + "-tutorial-robots";
TOperationTracker tracker;
tracker.AddOperation(
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/staff_unsorted")
.Output(sortedLoginTable)
.SortBy({"uid"}),
TOperationOptions().Wait(false))); // Wait(false) говорит клиенту не ждать завершения операции
tracker.AddOperation(
client->Sort(
TSortOperationSpec()
.AddInput("//home/dev/tutorial/is_robot_unsorted")
.Output(sortedIsRobotTable)
.SortBy({"uid"}),
TOperationOptions().Wait(false)));
tracker.WaitAllCompleted(); // Ждёт когда обе операции завершатся, бросит исключение в случае ошибок.
return 0;
}
Reduce с enable_key_guarantee=false
внутренние ссылки:
Чтобы отфильтровать таблица с URL по регулярным выражениям, которые лежат в таблице с хостами (т.е. для каждого хоста свое регулярное выражение). Задачу можно было бы решить Reduce по колонке host
, но реальные данные часто устроены таким образом, что какие-то хосты имеют непропорционально много URL. В таблице с примером URL с хоста https://www.youtube.com
занимают больше половины таблицы. Джоб, который будет обрабатывать такой хост, займёт непропорционально много времени (ключ https://www.youtube.com
в такой ситуации называют ключом-монстром).
Но для решения задачи не нужно, чтобы все записи с одним хостом попадали на одну машину. Возможно распределить таблицу с URL между несколькими машинами: важно, чтобы вместе с URL обязательно был получен хост.
Reduce enable_key_guarantee=false
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/join_reduce_tnode.
#include <yt/cpp/mapreduce/interface/client.h>
#include <library/cpp/regex/pcre/regexp.h>
#include <util/system/user.h>
using namespace NYT;
class TFilterVideoRegexp
: public IReducer<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
// Так же как и с обычным Reducer'ом в каждый вызов метода Do
// придут записи с общим JoinBy ключом.
TMaybe<TRegExMatch> regex;
for (auto& cursor : *reader) {
auto row = cursor.GetRow();
if (cursor.GetTableIndex() == 0) { // таблица с хостами
const auto videoRegexp = row["video_regexp"].AsString();
// Дебажная печать, stderr можно будет посмотреть в web интерфейсе
Cerr << "Processing host: " << row["host"].AsString() << Endl;
if (!videoRegexp.empty()) {
regex = TRegExMatch(videoRegexp);
}
} else { // таблица с урлами
if (regex && regex->Match(row["path"].AsString().c_str())) {
writer->AddRow(row);
}
}
}
}
};
REGISTER_REDUCER(TFilterVideoRegexp)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-join-reduce";
client->Reduce(
TReduceOperationSpec()
.JoinBy({"host"})
.AddInput<TNode>(
TRichYPath("//home/dev/tutorial/host_video_regexp")
.Foreign(true)) // важно отметить хостовую таблицу как foreign
.AddInput<TNode>("//home/dev/tutorial/doc_title")
.AddOutput<TNode>(outputTable)
.EnableKeyGuarantee(false),
new TFilterVideoRegexp);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Пакетные запросы
Есть возможность исполнять «лёгкие» запросы (создать/удалить таблицу, проверить её существование и т.д. ) пачками. Этот способ стоит использовать, если нужно выполнить большое количество однотипных операций. Пакетные (batch) запросы помогут заметно сэкономить время.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/batch_request.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/system/user.h>
using namespace NYT;
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
// Создаём batch запрос (это можно делать и из транзакции).
auto request = client->CreateBatchRequest();
// Добавляем запросы в batch
NThreading::TFuture<bool> docTitleExists = request->Exists("//home/dev/tutorial/doc_title");
NThreading::TFuture<bool> unexistingTableExists = request->Exists("//home/dev/tutorial/unexisting_table");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-test-batch";
NThreading::TFuture<TNodeId> createResult = request->Create(outputTable, NT_TABLE);
// Выполняем batch запрос.
request->ExecuteBatch();
// Проверяем результаты.
Cout << "Table //home/dev/tutorial/doc_title exists: " << docTitleExists.GetValue() << Endl;
Cout << "Table //home/dev/tutorial/unexisting_table exists: " << unexistingTableExists.GetValue() << Endl;
try {
// Следует проверять все результаты с помощью GetValue(),
// т.к. отдельные запросы могут пофейлиться и тогда соответствующая TFuture будет содержать ошибку.
//
// Если запускать эту программу второй раз то Create пофейлится, потому что таблица уже существует.
createResult.GetValue();
} catch (const std::exception& ex) {
Cerr << "Create " << outputTable << " failed: " << ex.what() << Endl;
}
return 0;
}
Доставка дампа таблицы в джоб в виде файла
В YT в джоб можно доставлять файлы (как, например, описано ранее). Менее известной возможностью является доставка дампа таблицы в джоб. Это может быть удобно, если джобу требуется словарь, хранящийся в небольшой (едининичные ГБ) таблице. Однако не стоит использовать эту возможность для загрузки больших таблиц, для таких таблиц подход лучше не использовать. (почему не стоит так делать?)
внутренние ссылки:
Можно переписать с помощью новой технологии один из предыдущих примеров, где фильтровалась пользовательскую таблицу по таблице с роботами.
Пример лежит в yt/cpp/mapreduce/examples/tutorial/pass_table_as_file.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/generic/hash_set.h>
#include <util/stream/output.h>
#include <util/system/user.h>
using namespace NYT;
class TFilterRobotsMap
: public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
// Мы читаем дамп таблицы в методе Start, перед тем как обработать первую запись в Map-таблице.
// Настройка доставки таблицы в джобу происходит в функции main.
void Start(TWriter*) override {
// Нам в джобу доставили файлик "robot_table" с дампом таблицы.
TFileInput tableDump("robot_table");
// Есть функция CreateTableReader<>, которая умеет создавать читателя из любого IInputStream'а.
// Созданный читатель имеет интерфейс аналогичный другим читателям.
auto reader = CreateTableReader<TNode>(&tableDump);
for (auto& cursor : *reader) {
const auto& curRow = cursor.GetRow();
if (curRow["is_robot"].AsBool()) {
RobotUids.insert(curRow["uid"].AsInt64());
}
}
}
void Do(TReader* reader, TWriter* writer) override {
for (auto& cursor : *reader) {
const auto& curRow = cursor.GetRow();
if (RobotUids.contains(curRow["uid"].AsInt64())) {
writer->AddRow(curRow);
}
}
}
private:
THashSet<i64> RobotUids;
};
REGISTER_MAPPER(TFilterRobotsMap)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-robots";
client->Map(
TMapOperationSpec()
.MapperSpec(
TUserJobSpec()
// Самое интересное -- мы просим YT доставить нам табличку в виде файла.
.AddFile(
TRichYPath("//home/dev/tutorial/is_robot_unsorted") // Тут указываем таблицу, которую нам надо доставить.
.Format("yson") // Это формат, в котором таблица будет прочитана, нам нужен yson, чтобы TNode-читатель в джобе смог прочитать файл.
.FileName("robot_table") // Это имя файла, с дампом таблицы.
// Тут же можно было бы указать фильтрацию по колонкам или фильтрацию по номерам строк таблицы
// с помощью соответсвтующих методов TRichYPath, но нам ничего этого не надо.
))
.AddInput<TNode>("//home/dev/tutorial/staff_unsorted")
.AddOutput<TNode>(outputTable),
new TFilterRobotsMap);
Cout << "Output table: https://yt.yandex-team.ru/freud/#page=navigation&offsetMode=row&path=" << outputTable << Endl;
return 0;
}
Запись и получение job statistics
Во время работы операция накапливает множество статистик (есть документация и пост) (актуально?). Более того, из операции можно писать пользовательские статистики. Статистики можно читать с помощью С++.
Пример на простой Map со сбором статистик:
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/job_statistics.
#include <yt/cpp/mapreduce/interface/client.h>
#include <util/system/user.h>
using namespace NYT;
class TComputeEmailsMapper
: public IMapper<TTableReader<TNode>, TTableWriter<TNode>>
{
public:
void Do(TReader* reader, TWriter* writer) override
{
// Соберём несколько интересных статистик по сотрудникам
i64 fortyTwoCount = 0;
i64 shortLoginCount = 0;
for (const auto& cursor : *reader) {
const auto& row = cursor.GetRow();
auto login = row["login"].AsString();
TNode outRow;
outRow["name"] = row["name"];
outRow["email"] = login + "@yandex-team.ru";
writer->AddRow(outRow);
if (login.find("42") != TString::npos) {
++fortyTwoCount;
}
if (login.size() <= 4) {
++shortLoginCount;
}
}
// Пользовательскую статистику можно писать по пути (имени)
WriteCustomStatistics("names/with_42", fortyTwoCount);
WriteCustomStatistics(TNode() // А можно в виде мапы
("names", TNode()
("short", shortLoginCount)));
}
};
REGISTER_MAPPER(TComputeEmailsMapper)
int main() {
NYT::Initialize();
auto client = CreateClient("freud");
const TString outputTable = "//tmp/" + GetUsername() + "-tutorial-emails";
auto operation = client->Map(
TMapOperationSpec()
.AddInput<TNode>("//home/dev/tutorial/staff_unsorted")
.AddOutput<TNode>(outputTable),
new TComputeEmailsMapper);
auto jobStatistics = operation->GetJobStatistics();
// Выведем системные статистики
Cout << "time/total: " << jobStatistics.GetStatisticsAs<TDuration>("time/total").Sum() << Endl;
Cout << "data/input/row_count: " << jobStatistics.GetStatistics("data/input/row_count").Sum() << Endl;
// NB: для получения пользовательских статистик используется другой метод
Cout << "names/with_42: " << jobStatistics.GetCustomStatistics("names/with_42").Sum() << Endl;
Cout << "names/short: " << jobStatistics.GetCustomStatistics("names/short").Sum() << Endl;
return 0;
}
Базовая работа с динамическими таблицами
YT поддерживает динамические таблицы. ßßWrapper С++ предоставляет возможность совершать базовые операции с такими таблицами.
Внимание
В отличие от других примеров, для запуска этого примера придётся совершить некоторые дополнительные действия:
- По умолчанию у пользователей нет разрешений для создания динамических таблиц, поэтому надо получить права на создание и монтирование динамических таблиц на каком-нибудь кластере.
- Во время запуска примера нужно передать имя кластера и путь к тестовой таблице (она не должна существовать). Подробнее в комментариях в тексте программы.
внутренние ссылки:
Пример лежит в yt/cpp/mapreduce/examples/tutorial/dyntable_get_insert.
#include <yt/cpp/mapreduce/interface/client.h>
#include <yt/cpp/mapreduce/util/wait_for_tablets_state.h> // for WaitForTabletsState
#include <util/stream/output.h>
using namespace NYT;
int main(int argc, const char** argv) {
NYT::Initialize();
//
// Программа принимает аргументами имя кластера и путь к таблице, с которой она будет работать (таблица не должна существовать).
//
// По умолчанию у пользователей нет прав монтировать динамические таблицы, и перед запуском программы необходимо получить
// такие права (права на монтирование таблиц) на каком-нибудь кластере YT.
if (argc != 3) {
Cerr << "Usage:\n";
Cerr << '\t' << argv[0] << " <server-name> <path-to-dynamic-table>\n";
Cerr << '\n';
Cerr << "For example:\n";
Cerr << '\t' << argv[0] << " freud //home/ermolovd/test-dyntable" << Endl;
return 1;
}
auto client = CreateClient(argv[1]);
const TString dynamicTablePath = argv[2];
auto schema = TTableSchema()
.AddColumn("key", EValueType::VT_INT64, ESortOrder::SO_ASCENDING)
.AddColumn("value", EValueType::VT_STRING);
// ВАЖНО: при создании динамической таблицы нам нужно
// - указать атрибут dynamic: true
// - указать схему
// Это нужно сделать обязательно в вызове Create. Т.е. не получится сначала создать таблицу,
// а потом проставить эти атрибуты.
client->Create(
dynamicTablePath, NT_TABLE,
TCreateOptions()
.Force(true)
.Attributes(
TNode()
("dynamic", true)
("schema", schema.ToNode())));
// Для того чтобы начать работу с динамической таблицей, её необходимо "подмонтировать".
//
// Часто создание / монтирование / размонтирование таблиц делается отдельными административными скриптами,
// а приложение просто расчитывает, что таблицы существуют и уже подмонтированы.
//
// Мы для полноты примера подмонтируем таблицу, а в конце её размонтируем.
client->MountTable(dynamicTablePath);
Cout << "Waiting tablets are mounted..." << Endl;
// Функция MountTable (и UnmountTable) запускает асинхронный процесс монтирования,
// который может занять значительное время (десятки секунд) для больших таблиц.
// Нам необходимо дождаться завершения этого процесса.
WaitForTabletsState(client, dynamicTablePath, TS_MOUNTED);
// Вставлять значения в динтаблицу можно с помощью InsertRows
client->InsertRows(dynamicTablePath, {
TNode()("key", 1)("value", "один"),
TNode()("key", 42)("value", "сорок два"),
TNode()("key", 100500)("value", "стопятьсот"),
});
// Получать значения из динтаблицы можно с помощью LookupRows,
// возвращает список найденных строчек.
auto result = client->LookupRows(dynamicTablePath, {
TNode()("key", 100500),
TNode()("key", 42),
});
Cout << "====== LOOKUP RESULT ======" << Endl;
for (const auto& row : result) {
Cout << "key: " << row["key"].AsInt64() << " value: " << row["value"].AsString() << Endl;
}
Cout << "====== END LOOKUP RESULT ======" << Endl;
client->UnmountTable(dynamicTablePath);
Cout << "Waiting tablets are unmounted..." << Endl;
WaitForTabletsState(client, dynamicTablePath, TS_UNMOUNTED);
return 0;
}