MapReduce
How the MapReduce operation works:
- On cluster nodes, the map phase starts. Each user script in the map phase (mapper) receives a separate part of the input table for input. The output data of the map phase is partitioned based on the hash value of a key. The resulting partitioned data is stored locally on the disk (replicated once), at the nodes where the map jobs were executed.
- Next comes the reduce phase. Each user script in the reduce phase (reducer) handles one partition: as input, it receives all the records received from mappers whose hashed key has the specified value. Over the network, the reducer fetches the data corresponding to its partition from the cluster nodes where the map jobs were executed. This process is called shuffle, and it produces the highest load on the network. Each partition is sorted in memory by key, and if the keys match, by subkey. The sorted data then becomes input for a reduce job. The reduce job's output is written to the disk and replicated as needed — usually it is two synchronous replicas and one asynchronous, with one synchronous replica being written locally.
The MapReduce operation is similar to the Sort operation, but allows for executing a mapper user script before partitioning jobs, and a reducer user script after sorting jobs.
Attention
Functionally, MapReduce is not the same as Map + Sort + Reduce. Even if the user script in the reduce phase does not change key column values, you still cannot get a sorted table as the operation's output. Therefore, using MapReduce in combination with the sorted_by
attribute on the output table is almost always incorrect. The error will make itself known only on fairly large tables, which do not fit into a single reduce job in their entirety.
In the merged MapReduce operation:
- The user may go without specifying the mapper, in which case it is assumed to be identical, and the input table data is output from the map phase without processing. In YTsaurus terms, a user-defined mapper determines which jobs will be running: partition map jobs (if a user-defined mapper is present) or partition jobs (if there isn't one).
- If the partition is too large and cannot be stored in memory, it is impossible to sort it on a single cluster node. In these cases, the partition is divided into parts, each part is sorted separately, and the result is locally written to the disk. After that, multiway merge is launched, which merges the data and feeds it to a reduce job's input. In YTsaurus terms, sorting a small partition in memory is done through partition reduce jobs. If the partition is large, then partition sorting is performed with partition sort jobs, and the final merge and launch of the reducer is handled by sorted_reduce jobs.
- Until map jobs finish, you cannot start reduce jobs. This is because the YTsaurus system must supply reduce jobs with data ordered by subkey, and that sequence cannot be generated until the data (map job outputs) is completed. A barrier of sorts therefore exists between the map and reduce phases — they must not intersect in time.
If the scheduler detects a large partition that cannot be sorted within one job, it has an opportunity to save time and start processing the partitioned data before all map jobs finish. The scheduler, at its own discretion, divides the partition into several parts. The individual parts are then sorted on different cluster nodes by intermediate sort jobs, which take this data in over the network and sort it. This technique allows for a partial overlap of the phases, shortening the operation time.
MapReduce vs Map + Sort + Reduce
The merged MapReduce operation has several advantages over the Map + Sort + Reduce combination:
- Less I/O operations on intermediate data. Without the merge phase, map-sort-reduce typically writes intermediate data to disk three times: after the map phase, after the partition sort phase, and after sorting. Some of these writes may have a replication factor other than 1. For a MapReduce operation, a single write to disk will suffice, and the replication factor will be 1.
- No unnecessary barriers (synchronization points). With the map-sort-reduce combination, YTsaurus is forced to wait for all the map jobs (including the longest ones) to finish before starting partitioning. Similarly, sorting needs to be completed before the first reduce jobs can be run. As for the merged MapReduce operation, there is only one inevitable synchronization point: reduce jobs cannot be started before map jobs run their course.
- Better fault tolerance. MapReduce constitutes a single operation, so the scheduler does everything it can to complete it even when some intermediate data becomes unavailable. When it comes to a chain of independent operations, the scheduler is only concerned with the individual phases. The only way to combat this is to ramp up intermediate data replication, which severely impacts speed.
Usage notes
To merge the Map + Sort + Reduce combination into a single MapReduce operation, pass both the mapper and reducer options in the operation's specification.
In some cases, users may find the merged MapReduce operation not fast enough on default settings. The reasons may be:
-
Significant difference between the data volume at the mapper's input and output, which leads to the scheduler misjudging the number of partitions.
The scheduler estimates the partition number based on the amount of input data as it aims to ensure the partition fully fits into the sort job's memory (2 GB by default). In doing that, the scheduler assumes that the input for the mapper approximately equals its output.
In practice, the mapper may be filtering input data with an output-to-input ratio of 1:1000 or less. As a result, each partition's buffer gets something like several records, which leads to a large number of random, small-portioned reads from the disk in the shuffle process at the beginning of the sort phase.
Example: Let's say a partition map job is input with 2 GB of data, produces an output of 4 MB, and the
partition_count
is 1000. Then, the average pre-compression size of a block written by the partition map job will be around 2 KB. Such blocks cause a huge number of instances of random access to the disk.To combat this problem, the
map_selectivity_factor
option was added to an operation's specification, allowing users to set the approximate output-to-input ratio at the start of the operation and improve the scheduler's estimates. Alternatively, you can explicitly specify the desired number of reduce jobs using thepartition_count
option. -
"Heavy" mappers and a small number of parallel jobs due to memory shortage.
The scheduler allocates resources for a job based on the memory required for the user-provided code (
memory_limit
, set by the user in the operation's specification) and the memory for the system elements (JobProxy). For a map job (within the Map operation), the memory is made up ofmemory_limit
, the read buffer, and the write buffer.Example: Let's say the
memory_limit
is 250 MB and the buffer sizes are 100 + 100 MB, totaling 450 MB per job. If the fair_share distribution for the operation is 0.1 and the total amount of the cluster's resources is 11 TB of memory and 7000 CPUs, then the resources at the operation's disposal are 0.1 * 11 TB = 1.1 TB, and 0.1 * 7000 = 700 CPUs. Therefore, min(1100 GB / 450 MB, 700) = min(2500, 700) = 700 jobs can run in parallel.For partition map jobs, memory is the sum of
memory_limit
, read buffer, and partition buffer. For operations with many partitions, the scheduler allocates to the partition map job a large amount of memory for the partition buffer so as to avoid small blocks. For example, given 1000 partitions and a 2 GB buffer, the worst-case block size will be 2 GB / 1000 = 2 MB (with even data distribution). Going back to our previous example, memory amount for a partition map job will be 250 MB + 100 MB + 2 GB ≈ 2400 MB, and with 1.1 TB of available memory the scheduler will be able to run only min(1100 GB / 2400 MB, 700) = min(470, 700) = 470 jobs in parallel.If the mapper is "heavy" (high CPU usage per unit of output data), then tamping down on concurrent operations will lead to a significant hike in the execution time and cancel out any possible gains from merging the map and partition phases. Since the typical size of the output from the filter mapper is very small, such a mapper should also be considered "heavy".
The best solution in this situation is to use the Map + MapReduce combination (with a trivial mapper). This already results in higher intermediate data storage metrics compared to the Map + Sort + Reduce combination.
Measurement results
A good example of the improvement in performance is the classic WordCount problem. It boils down to a single MapReduce operation, wherein the mapper aggregates data by key.
The table shows the different times it took to solve a WordCount problem using the merged MapReduce operation, the Map + Sort + Reduce sequence, and Map + MapReduce (the last using a trivial mapper).
Operation | Time |
---|---|
MapReduce | 03:25 |
Map + MapReduce | 05:30 |
Map + Sort + Reduce | 08:10 |
These results were collected on a cluster with 200 nodes, each having 48 GB of RAM and 24 cores for jobs. The input data size was 1 TB.
MapReduce operation options
General parameters for all operation types are described in Operation options.
The MapReduce operation supports the following additional options (default values, if set, are specified in brackets):
mapper
— user-defined script for the map phase. If not specified, input table data is output by the operation without processing.reducer
— user-defined script for the reduce phase.reduce_combiner
— description of the reduce_combiner script (more details below).sort_by
(defaults toreduce_by
) — list of columns used for sorting data input at the reduce phase. The sequence of thereduce_by
fields should be set as the prefix of thesort_by
field sequence.reduce_by
— list of columns used for grouping.input_table_paths
— list of input tables with full paths (must not be empty).output_table_paths
— list of output tables.mapper_output_table_count
— number of tables fromoutput_table_paths
that will be output at the map phase. For such tables, the job'stable_index
is counted from one, and the intermediate output is a null output table.partition_count
,partition_data_size
— options that specify how many partitions are to be made in the sorting process.map_job_count
,data_weight_per_map_job
— options that indicate how many jobs should be run at the map phase (these serve as recommendations).data_weight_per_sort_job
— option to control the amount of data at thereduce_combiner
input (more details below).force_reduce_combiners
(false) — forces the launch ofreduce_combiner
(more details below).map_selectivity_factor
(1.0) — proportion of the original amount of data that remains after the map phase (the default value is 1.0, which means the expected size of the data remains unchanged during the map phase; 2.0 means the data size doubles, and so on).intermediate_data_replication_factor
(1) — replication factor for intermediate data.intermediate_data_account
(intermediate) — account to whose quota the transaction's intermediate data goes.intermediate_data_medium
(default) — type of medium to write chunks of intermediate data to.intermediate_compression_codec
(lz4) — codec used for compressing intermediate data.intermediate_data_acl
({action=allow;subjects=[everyone];permissions=[read]})
— permissions for accessing intermediate data that are set up following the map phase.map_job_io, sort_job_io, reduce_job_io
— I/O settings for the respective job types; in thereduce_job_io
option, thetable_writer
section is added for all jobs that write to output tables.sort_locality_timeout
(1 min) — time during which the scheduler waits for resources to free up on specific cluster nodes in order to start sorting all the parts of each partition on a node. This is necessary to ensure higher read locality in the course of the subsequent merging of sorted data.ordered
(false) — enables the logic that is similar to ordered map at the partition map phase (input data is divided into jobs in successive segments, with the input of each map phase job being fed rows according to the order contained in the input tables).pivot_keys
— list of keys to be used for data partitioning at the reduce phase. This option is exactly the same as the corresponding reduce option.compute_pivot_keys_from_samples
(false) — enables automatic key generation for data partitioning using samples from the table. The option can be enabled only ifmapper
is not specified.samples_per_partition
(1000) — number of keys for samples from the table for each partition.reducer/enable_input_table_index
/reduce_combiner/enable_input_table_index
(false) — whether to send the input table index to thereduce
/reduce_combine
phase (theenable_input_table_index
option is specified within thereducer
/reduce_combiner
section).mapper/output_streams
/reduce_combiner/output_streams
— description of intermediate output streams from themap
orreduce_combine
phase, more details below (theoutput_streams
option is specified within themapper
/reduce_combiner
section).disable_sorted_input_in_reducer
(false) — disables the sorting of inputs of reduce jobs. Other guarantees are maintained. In particular, all rows with the same keys fromreduce_by
columns will be sent to a single reduce job.
By default (and for historical reasons), MapReduce operations support table_index
only in a mapper. If you need to have table_index
in a reducer, you can enable the enable_input_table_index
option within the reducer
(or reduce_combiner
) section. In the future, this option may be enabled by default. To enable sorting by table_index
in a reducer, you need to add a column with the name $table_index
to sort_by
.
Working with large keys — reduce_combiner
The reduce phase has a special data processing stage for dealing with large keys. This stage's jobs are called reduce_combiner jobs. They run on parts of "large" partitions and facilitate a partial reduce without waiting for all partition parts to get sorted and for reduce with the final merge to start. As input, reduce jobs running on partitions like that receive merged outputs of several reduce_combiner
jobs.
Reduce_combiner
is triggered if the partition size exceeds data_weight_per_sort_job
. The amount of data in reduce_combiner
equals data_weight_per_sort_job
. The default value for data_weight_per_sort_job
is set in the scheduler configuration, but can be overridden via an operation's specification (in bytes).
You can also force-start reduce_combiner
by setting force_reduce_combiners
to true
. Reduce_combiner
receives a sorted stream of records as input (like a regular reducer). However, these records do not necessarily form a continuous range in sort_by
order. If two given records end up the reduce_combiner
job, there may be records between them (in sort_by
order) that are not in this job.
There are several restrictions on the reduce_combiner
output:
- The output must be sorted.
reduce_combiner
must not change keys — columns specified in the specification'ssort_by
field (if not specified — in thereduce_by
field).- There must be only one output table, as in the map phase of the
MapReduce
operation.
This means that any commutative and associative reducer can be used asreduce_combiner
in its original form.
Starting an operation with reduce_combiner
may look like this:
yt map-reduce --reduce-combiner cat --reducer cat --reduce-by cookies --src //statbox/access-log/2013-05-15 --dst //tmp/asd --format dsv
Intermediate streams
In ordinary cases, intermediate data streams between map
, reduce_combine
, and reduce
phases have no schema. However, a schema is required to work with composite data types and some efficient formats. To set data schemas for these intermediate streams, use the output_streams
option.
The option is specified in the mapper
/ reduce_combiner
sections and must contain a list of dictionaries with the following keys:
schema
— YSON description of the stream schema.
If the option is specified, the user code can assume that it writes to multiple virtual tables (by the number of items in the output_streams
list). The next phase gets combined data from all intermediate tables and can use table_index
of input data to determine which stream a particular row came from.
If both mapper/output_streams
and mapper_output_table_count
options are specified, then first comes intermediate data followed by output data. That is, intermediate data will have table_index = 0 ... len(mapper.output_streams)-1
while output tables will have table_index = len(mapper.output_streams) ... len(mapper.output_streams) + mapper_output_table_count
.
Example in the specification:
mapper = {
output_streams = [
{
"schema" = [
{
name=key;
type= ...
};
...
]
},
]
}
Example specification
Example of a MapReduce operation's specification:
{
partition_count = 100;
reduce_by = ["key"];
sort_by = ["key"; "subkey" ];
input_table_paths = [ "//tmp/input_table" ];
output_table_paths = [ "//tmp/first_output_table"; //tmp/second_output_table" ];
mapper = {
command = "python my_mapper.py";
file_paths = [ "//tmp/my_mapper.py" ];
tmpfs_path = ".";
tmpfs_size = 1000000;
copy_files = %true;
};
reducer = {
command = "python my_reducer.py";
file_paths = [ "//tmp/my_reducer.py" ];
};
}