Job collectives

Job collectives is a mechanism that enables you to process one data batch with multiple jobs. This is useful for computations that involve multiple processes running together — think distributed GPU computations or parallel data processing by multiple jobs (when you need more than one).

To learn how to set up job collectives in an operation, see Configuration.

Basic concept

Operations with job collectives function similarly to regular operations, but with one key difference: each data batch is processed not by a single job, but by a collective of multiple jobs.

This means that:

  • Job slicing works exactly the same way as in regular operations (meaning you can configure it in the spec the same way as with regular operations).
  • In terms of the operation's data stream, a collective functions as a single job, receiving one input data batch and generating one output data batch.

Note

Job collectives are not the same as gang operations. A gang operation restarts all its jobs when any non-backup job is restarted, whereas job collectives restart jobs only in a single collective (the one processing a given data batch).

Use cases

Distributed GPU computations

A typical use case is data processing using multiple GPUs. The master job reads the data and distributes it among slave jobs, each of which performs computations on its own GPU.

Parallel processing with aggregation

Slave jobs perform parallel computations, while the master job aggregates their results and writes them to the output table.

Gang operations vs. operations with job collectives

Gang operations are a type of Vanilla operations, so when you need to run an operation of a different type, operations with job collectives are the only option.
For scenarios requiring a Vanilla operation, you use either gang or Vanilla operations with job collectives. The difference will lie in some details:

  • Gang operations provide more guarantees (which, in particular, make restarting a collective more optimal) (see Restarting a gang operation).
  • In contrast to gang operations, operations with job collectives do not support backup jobs (see Backup jobs).
  • In contrast to gang operations, operations with job collectives can have output tables (see Limitations and considerations).

Collective structure

A collective comprises multiple jobs working together to process a single data batch. Each job in a collective is assigned a rank (an integer from 0 to size - 1 where size is the collective size).

  • Master job (rank 0): Reads input data and writes results to output tables. From the system's point of view, the master job is what represents the collective as it interacts with the data stream.
  • Slave jobs (ranks 1, 2, ...): Auxiliary jobs that do not have access to the operation’s input/output data streams. They perform computations and exchange data with the master job over the network.

Collective lifecycle

  1. When the controller agent initiates the processing of a data batch, it creates a collective of multiple jobs instead of just one job.
  2. The master job receives input data via stdin (for Map/Reduce/MapReduce operations) exactly the same way as a regular job.
  3. All jobs in a collective can interact with each other over the network using YT_COLLECTIVE_ID for identification.
  4. Only the master job can write output data.
  5. A collective is considered completed when the master job has completed. This means that if slave jobs finished unsuccessfully after the master job has completed (according to the controller agent), the data is still considered processed.

Failure handling

If any job in a collective finishes unsuccessfully (fails or aborts), and the master job has not completed yet, the following happens:

  • All other jobs in the collective are aborted.
  • The collective is restarted entirely, just like a regular job would.
  • Other collectives in that operation continue running independently.

Supported operations

Job collectives are supported in all operations with user code:

Operation Where to specify collective_options
Map In the mapper specification
Reduce In the reducer specification
MapReduce In the mapper, reducer, or reduce_combiner specification
Vanilla In the task specification

Environment variables

The following environment variables are present in the job collectives:

Variable Description
YT_COLLECTIVE_MEMBER_RANK Job's rank in the collective (0 for the master job, 1, 2, ... for slave jobs)
YT_COLLECTIVE_ID The collective's unique ID (currently matches the job_id of the master job, but this property is not guaranteed and may change in the future)

Jobs use these variables to:

  • Determine their role in the collective.
  • Find each other for interaction over the network.
  • Coordinate their processes.

Configuration

To integrate job collectives, add the collective_options parameter to the job specification:

{
  collective_options = {
    size = 2;  # Number of jobs in the collective
  };
}

Collective_options parameters

Parameter Type Description
size int Number of jobs in the collective. Must be more than 1.

Specification examples

Vanilla operation with job collectives

In this example, a Vanilla operation runs with a collective of two jobs. The master job (rank 0) writes the result to the output table, while the slave job (rank 1) does nothing.

{
  tasks = {
    worker = {
      job_count = 1;
      output_table_paths = ["//tmp/output"];
      format = "yson";
      command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then echo '{result=42}'; fi";
      collective_options = {
        size = 2;
      };
      close_stdout_if_unused = %true;
    };
  };
}

Map operation with job collectives

{
  mapper = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
}

MapReduce operation with job collectives

In this MapReduce operation, job collectives are used at all of its stages: mapper, reduce_combiner, and reducer.

{
  mapper = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  reduce_combiner = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  reducer = {
    command = "if [ \"$YT_COLLECTIVE_MEMBER_RANK\" == 0 ]; then cat; fi";
    collective_options = {
      size = 2;
    };
    close_stdout_if_unused = %true;
  };
  force_reduce_combiners = %true;
}

Limitations

  • Incompatibility with gang_options: You cannot use job collectives together with gang operations. If you attempt to specify both parameters, the operation will fail with an error.

  • Data output from the master job only: Only the master job (rank 0) can write data to output tables.

    Tip

    Use the close_stdout_if_unused = %true option in the job specification to explicitly close stdout for slave jobs and avoid accidental writes.

  • Job interruption: You can interrupt only the master job. Attempting to interrupt a slave job results in its abortion followed by a restart of the collective.

See also