Lambda expressions
In some YTsaurus operations, calculations are so simple that creating a class for each of them using the C++ API feels excessive. In this case, you can use lambda expressions to describe operations.
Warning
Such lambda expressions currently can't capture variables, meaning they must have an empty capture block. This makes them equivalent to C functions. This is because generic lambda are problematic to serialize correctly in modern C++. To pass parameters to a lambda, you can sometimes use static const
and TSaveable global variables.
Warning
This only works from programs built statically for Linux, so you can't use settings like JobBinaryLocalPath
.
Using the yt_lambda library
Connect the yt_lambdalibrary and include the following header in the program:
#include <yt/cpp/mapreduce/library/lambda/yt_lambda.h>
Initialize YTsaurus and get the client
. Pass the client
or transaction as the first parameter to the functions described in this document.
This library's functions have good compatibility with the use of records in protobuf format.
Operations overview
All of the operations in this library take as input lambda-expressed operators on a single record, which are then repeatedly applied. In general, an operation can also be viewed as a chain of conversions of types:
Additive operations
A variant of ordinary operations where Reduce has the same input and output type and is associative. In this case, you can further simplify the implementation of the Reduce transformation and use it as a combiner for free.
CopyIf
This function filters records in the table without modifying them.
template <class T>
void CopyIf(const IClientBasePtr& client, const TRichYPath& from, const TRichYPath& to, bool (*p)(const T&));
Parameter | Description |
---|---|
T |
Record type (TNode or a specific protobuf message type) |
from |
Input table name |
to |
Output table name |
p |
Filtering predicate |
Notes:
- The order of records in the table isn't changed, and the
Ordered
flag that may slightly slow down the operation is set. - A known defect: if there is only one input table, it has a schema, and is sorted, the sorting information doesn't reach the output table schema.
Application
NYT::CopyIf<TMyProtoMsg>(client, "//home/project/inTable", "//home/project/outTable",
[](auto& rec) { return rec.GetSomeField() > 10000; });
TransformCopyIf, or Map
This function performs a classic Map: it reads each record from the table of one format and writes them to the table of another format. Some records aren't included in the final table.
template <class R, class W>
void TransformCopyIf(const IClientBasePtr& client, const TRichYPath& from, const TRichYPath& to, bool (*mapper)(const R&, W&));
Parameter | Description |
---|---|
R |
Input record type (TNode or a specific protobuf message type) |
W |
Output record type (TNode or a specific protobuf message type) |
from |
Input table name |
to |
Output table name |
mapper |
A conversion function that takes an src record of the R type as input and fills the dst record of the W type. For dst to be written, return true . For performance reasons, the library passes as dst the same buffer variable that isn't even cleared between mapper calls. This is generally not a problem, because the mapper usually fills in the same dst fields. |
Note:
- The order of records in the table isn't changed, and the
Ordered
flag that may slightly slow down the operation is set.
Application
NYT::TransformCopyIf<TDataMsg, TStatsMsg>(
client, "//home/project/inTable", "//home/project/outTable",
[](auto& src, auto& dst) {
dst.SetField(src.GetField());
return true;
});
AdditiveReduce
template <class T, class TFin>
void AdditiveReduce(
const IClientBasePtr& client,
const TKeyBase<TRichYPath>& from,
const TRichYPath& to,
const TKeyColumns& reduceFields,
void (*reducer)(const T&, T&),
bool (*finalizer)(const T&, TFin&));
Parameter | Description |
---|---|
T |
Input and output record type for reducer (TNode or a specific protobuf message type) |
TFin Opt |
Optional parameter. Output record type (TNode or a specific protobuf message type). Specified only in the version with a finalizer |
from |
Name of an input table (input tables) |
to |
Output table name |
reduceFields |
Which keys' identical values will be used for sorting and folding (Reduce) |
reducer |
A conversion function that takes a new src record of the T type as input and modifies the dst record of the same T type. The first record of the current range of records with the same reduceFields keys is initially included in dst . If there is one record in the current range, the reducer won't be called. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
finalizer Opt |
Optional parameter. A conversion function that takes a new src record of the T type as input and fills the dst record of the TFin type. Called once after Reduce has processed a range of input records to generate the final version of the output record. dst is initialized by default. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
Reduce
template <class R, class W, class TFin>
void Reduce(
const IClientBasePtr& client,
const TKeyBase<TRichYPath>& from,
const TRichYPath& to,
const TKeyColumns& reduceFields,
void (*reducer)(const R&, W&),
bool (*finalizer)(const W&, TFin&));
Parameter | Description |
---|---|
R |
Input record type (TNode or a specific protobuf message type) |
W |
Reducer's output record type (TNode or protobuf) |
TFin Opt |
Optional parameter. Output record type (TNode or a specific protobuf message type). Specified only in the version with a finalizer |
from |
Name of an input table (input tables) |
to |
Output table name |
reduceFields |
Which keys' identical values will be used for sorting and folding (Reduce) |
reducer |
A conversion function that takes a new src record of the R type as input and modifies the dst record of the W type. dst is initialized by default (in case of TNode, there will be an empty TNode, that is, Undefined ; protobuf is usually initialized with zeros and empty strings, which makes it more practical). You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
finalizer Opt |
Optional parameter. A conversion function that takes a new src record of the W type as input and fills the dst record of the TFin type. Called once after Reduce has processed a range of input records to generate the final version of the output record. dst is initialized by default. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
AdditiveMapReduce[Sorted]
The AdditiveMapReduce
function generates a merged MapReduce operation, that is, it converts records of the input table (or tables), sorts the result, and performs Reduce. The reducer must be an associative binary operation on same-type records — most often, this will be an addition of numbers. As a bonus, YT can use such a reducer as a combiner if needed.
Unfortunately, merged MapReduce operations in YT always generate an unsorted table. There's a workaround in the form of the AdditiveMapReduceSorted
function, which invokes AdditiveMapReduce
and then sorts the output table.
template <class R, class W, class TFin>
void AdditiveMapReduce(
const IClientBasePtr& client,
const TKeyBase<TRichYPath>& from,
const TRichYPath& to,
const TKeyColumns& reduceFields,
bool (*mapper)(const R&, W&),
void (*reducer)(const W&, W&),
bool (*finalizer)(const W&, TFin&));
Parameter | Description |
---|---|
R |
Input record type (TNode or a specific protobuf message type) |
W |
Output record type for reducer (TNode or a specific protobuf message type) |
TFin Opt |
Optional parameter. Output record type (TNode or a specific protobuf message type). Specified only in the version with a finalizer |
from |
Name of an input table (input tables) |
to |
Output table name |
reduceFields |
Which keys' identical values will be used for sorting and folding (Reduce) |
mapper |
A conversion function that takes an src record of the R type as input and fills the dst record of the W type. For dst to be written, return true . mapper may be nullptr . For performance reasons, the library passes as dst the same buffer variable that isn't even cleared between mapper calls. This is generally not a problem, because the mapper usually fills in the same dst fields. |
reducer |
A conversion function that takes a new src record of the W type and modifies the dst record of the same W type. The first record of the current range of records with the same reduceFields keys is initially included in dst . If there is one record in the current range, the reducer won't be called. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
finalizer Opt |
Optional parameter. A conversion function that takes a new src record of the W type as input and fills the dst record of the TFin type. Called once after Reduce has processed a range of input records to generate the final version of the output record. dst is initialized by default. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
Application
NYT::AdditiveMapReduceSorted<TDataMsg, TStatsMsg>(
client,
inTable, // or inTables
outTable,
"name", // sort key, matches TStatsMsg::Name
[](auto& src, auto& dst) {
// dst is a buffer that may contain garbage.
// We don't need to clear it because we set all fields.
dst.SetName(src.GetSomeField());
dst.SetCount(src.GetSomeCountField());
return true;
},
[](auto& src, auto& dst) {
// dst is initialized by the first record of equal key range.
// This lambda function is called starting from the 2nd record.
// dst.Name is filled with correct value and must not be modified.
dst.SetCount(src.GetCount() + dst.GetCount());
});
MapReduce[Sorted]
The MapReduce function generates a merged MapReduce operation, that is, it converts records of the input table (or tables), sorts the result, and performs Reduce. Unlike AdditiveMapReduce, the reducer function doesn't have to be associative, so it can have different input and output record types (and thus can't be used as a combiner). To make the output clearer, you can use an additional lambda function — finalizer, — which will be called on the reducer output to generate the final version of the record.
Unfortunately, merged MapReduce operations in YT always generate an unsorted table. There's a workaround in the form of the MapReduceSorted
function, which invokes MapReduce
and then sorts the output table.
template <class R, class TMapped, class TReducerData, class W>
void MapReduce(
const IClientBasePtr& client,
const TKeyBase<TRichYPath>& from,
const TRichYPath& to,
const TKeyColumns& reduceFields,
bool (*mapper)(const R& src, TMapped& dst),
void (*reducer)(const TMapped& src, TReducerData& dst),
bool (*finalizer)(const TReducerData& src, W& dst));
Parameter | Description |
---|---|
R |
Input record type (TNode or a specific protobuf message type) |
TMapped |
Output record type for mapper and input record type for reducer (TNode or protobuf) |
TReducerData Opt |
Optional parameter. Output record type for the reducer and input record type for the finalizer (any type, including C++ struct). Specified only in the MapReduce version with a finalizer |
W |
Output record type (TNode or a specific protobuf message type) |
from |
Name of an input table (input tables) |
to |
Output table name |
reduceFields |
Which keys' identical values will be used for sorting and folding (Reduce) |
mapper |
A conversion function that takes an src record of the R type as input and fills the dst record of the TMapped type. For dst to be written, return true . mapper may be nullptr . For performance reasons, the library passes as dst the same buffer variable that isn't even cleared between mapper calls. This is generally not a problem, because the mapper usually fills in the same dst fields. |
reducer |
A conversion function that takes a new src record of the TMapped type as input and modifies the dst record of the W type (or TReducerData when using a finalizer). dst is initialized by default (in case of TNode, there will be an empty TNode, that is, Undefined ; protobuf is usually initialized with zeros and empty strings, which makes it more practical). You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
finalizer Opt |
Optional parameter. A conversion function that takes a new src record of the TReducerData type as input and fills the dst record of the W type. Called once after the reducer has processed a range of input records to generate the final version of the output record. dst is initialized by default. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
Application
message TSimpleKeyValue {
required string Key = 1 [(NYT.column_name) = "key"];
required uint64 Value = 2 [(NYT.column_name) = "value"];
}
message TKeyStat {
required string Key = 1 [(NYT.column_name) = "key"];
required double Mean = 2 [(NYT.column_name) = "mean"];
required double Sigma = 3 [(NYT.column_name) = "sigma"];
}message TSimpleKeyValue {
required string Key = 1 [(NYT.column_name) = "key"];
required uint64 Value = 2 [(NYT.column_name) = "value"];
}
message TKeyStat {
required string Key = 1 [(NYT.column_name) = "key"];
required double Mean = 2 [(NYT.column_name) = "mean"];
required double Sigma = 3 [(NYT.column_name) = "sigma"];
}
struct TDispersionData {
ui64 Count = 0;
long double Sum = 0.;
long double SumSquared = 0.;
};
void CalculateDispersion(IClientPtr& client, TString input, TString output) {
NYT::MapReduceSorted<TDataMsg, TSimpleKeyValue, TDispersionData, TKeyStat>(
client,
input,
output,
"key",
[](auto& src, auto& dst) { // mapper
dst.SetName(src.GetSomeField());
dst.SetCount(src.GetSomeCountField());
return true;
},
[](auto& src, auto& dst) { // reducer
double value = src.GetValue();
dst.Count++;
dst.Sum += value;
dst.SumSquared += value * value;
},
[](auto& src, auto& dst) { // finalizer
double mean = (double)src.Sum / src.Count;
double dispersion = (double)src.SumSquared / src.Count - mean * mean;
dst.SetMean(mean);
dst.SetSigma(std::sqrt(dispersion));
});
}
MapReduceCombined[Sorted]
The MapReduceCombined function generates a merged MapReduce operation, that is, it converts records of the input table (or tables), sorts the result, and performs Reduce similarly to MapReduce, but with combiner mandatorily enabled. To make the output clearer, you can use an additional lambda function — finalizer, — which will be called on the reducer output to generate the final version of the record.
Unfortunately, merged MapReduce operations in YT always generate an unsorted table. There's a workaround in the form of the MapReduceCombinedSorted
function, which invokes MapReduceCombined
and then sorts the output table.
template <class R, class TMapped, class W, class TFin>
void MapReduceCombined(
const IClientBasePtr& client,
const TKeyBase<TRichYPath>& from,
const TRichYPath& to,
const TKeyColumns& reduceFields,
bool (*mapper)(const R&, TMapped&),
void (*combiner)(const TMapped&, W&),
void (*reducer)(const W&, W&),
bool (*finalizer)(const W&, TFin&));
Parameter | Description |
---|---|
R |
Input record type (TNode or a specific protobuf message type) |
TMapped |
Output record type for mapper and input record type for reducer (TNode or protobuf) |
W |
Output record type for the combiner, input record type for the reducer and finalizer (TNode or a specific protobuf message type) |
TFin Opt |
Optional parameter. Output entry type for finalizer (TNode or a specific protobuf message type). Specified only in the version with a finalizer |
from |
Name of an input table (input tables) |
to |
Output table name |
reduceFields |
Which keys' identical values will be used for sorting and folding (Reduce) |
mapper |
A conversion function that takes an src record of the R type as input and fills the dst record of the TMapped type. For dst to be written, return true . mapper may be nullptr . For performance reasons, the library passes as dst the same buffer variable that isn't even cleared between mapper calls. This is generally not a problem, because the mapper usually fills in the same dst fields. |
combiner |
A conversion function that takes a new src record of the TMapped type as input and modifies the dst record of the W type. Essentially, it does the same thing that the reducer does in MapReduce. dst is initialized by default (in case of TNode, there will be an empty TNode, that is, Undefined ; protobuf is usually initialized with zeros and empty strings, which makes it more practical). You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
reducer |
An associative conversion function that takes a new src record as input and modifies the dst record of the same W type. The first record of the current range of records with the same reduceFields keys is initially included in dst . If there is one record in the current range, the reducer won't be called. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
finalizer Opt |
Optional parameter. A conversion function that takes a new src record of the W type as input and fills the dst record of the TFin type. Called once after Reduce has processed a range of input records to generate the final version of the output record. dst is initialized by default. You don't need to write anything in the dst fields that correspond to the keys — everything is already there |
Application
In addition to the protobuf messages defined in the example for MapReduce:
message TDispersionDataMsg {
required string Key = 1 [(NYT.column_name) = "key"];
required uint64 Count = 2 [(NYT.column_name) = "count"];
required double Sum = 3 [(NYT.column_name) = "sum"];
required double SumSquared = 4 [(NYT.column_name) = "sum_squared"];
};
The function itself:
void CalculateDispersionWithCombiner(IClientPtr& client, TString input, TString output) {
NYT::MapReduceCombinedSorted<TDataMsg, TSimpleKeyValue, TDispersionDataMsg, TKeyStat>(
client,
input,
output,
"key",
[](auto& src, auto& dst) { // mapper
dst.SetName(src.GetSomeField());
dst.SetCount(src.GetSomeCountField());
return true;
},
[](auto& src, auto& dst) { // combiner
double value = src.GetValue();
dst.SetCount(dst.GetCount() + 1);
dst.SetSum(dst.GetSum() + value);
dst.SetSumSquared(dst.GetSumSquared() + value * value);
},
[](auto& src, auto& dst) { // reducer
dst.SetCount(src.GetCount() + dst.GetCount());
dst.SetSum(src.GetSum() + dst.GetSum());
dst.SetSumSquared(src.GetSumSquared() + dst.GetSumSquared());
},
[](auto& src, auto& dst) { // finalizer
double mean = src.GetSum() / src.GetCount();
double dispersion = src.GetSumSquared() / src.GetCount() - mean * mean;
dst.SetMean(mean);
dst.SetSigma(std::sqrt(dispersion));
});
}
Choosing between MapReduce and MapReduceCombined
The AdditiveMapReduce
function, which also provides the combiner, leaves it up to YT to choose whether to use the combiner or not. The MapReduceCombined
function always forces the system to use the combiner even if omitting it would be more efficient. This is because writing lambdas in this function would be problematic, as this would allow inputting different types of records (from map and from combine) to the reducer.
In general, the observed effect is as follows: the MapReduceCombined
operations consume a few percent more CPU resources on the cluster than fully identical MapReduce operations, but are completed much faster. Since using MapReduceCombined is as simple as using MapReduce, it may be worth trying both during the development stage and leave the one that best suits your needs.
Combining lambdas with the regular API
You may want to use a lambda when calling operations in a standard way via IClientBase
methods. For example, you wrote the mapper as a large class, and the reducer remained simple. Then it makes sense to use named wrapper classes (TCopyIfMapper
, TTransformMapper
, TAdditiveReducer
, etc.).
Example
auto createReducer = []() {
return new TAdditiveReducer<TOutStatsProto>([](auto& src, auto& dst) {
dst.SetCount(src.GetCount() + dst.GetCount());
});
};
client->MapReduce(
TMapReduceOperationSpec()
.AddInput<TInputProto>(inTable)
.AddOutput<TOutStatsProto>(outTable)
.ReduceBy(fields),
new TMyComplicatedMapper(),
createReducer(), // combiner
createReducer(), // reducer
TOperationOptions().Spec(TNode()("map_selectivity_factor", 0.005)));
TSaveable — global variables accessible from lambda operations
If you create a global variable of the NYT::TSaveable<TYourStruct>
type, where TYourStruct
has access to the ysaveload
variant serialization, then all the above functions described in the yt_lambda
library will save the variable in the job state at the beginning of their work and restore it on the cluster before executing the job.
Example:
struct TGlobalSettings {
TString MailSuffix;
Y_SAVELOAD_DEFINE(MailSuffix);
};
NYT::TSaveable<TGlobalSettings> GlobalSettings;
int main() {
GlobalSettings.MailSuffix = "@domain-name";
...
/// use GlobalSettings.MailSuffix in lambda
}
Attention
-
Naturally, the contents of
TSaveable
aren't saved In ordinary (non-lambda) operations. In the example above, you can't use theTSaveable
variable from theTMyComplicatedMapper
class, but you can use it from the lambda created insidecreateReducer()
. If necessary, you can manually add calls toTSaveablesRegistry::Get()->SaveAll(stream)
andLoadAll(stream)
in theSave
andLoad
functions of your mapper or reducer. -
Don't use
TSaveable
if modification of these global variables and start of operations can be initiated from different threads to avoid data races and crashes. -
You can create only one
TSaveable
variable for each user-defined type. This is intended and won't be changed in the near future.