Переключение таблиц

Операции могут иметь несколько входных и несколько выходных таблиц. В данном разделе описано, как переключать чтение и запись между ними.

В пользовательском коде операций Map и Reduce можно получать данные о том, из какой таблицы пришла запись, и выбирать, в какую таблицу отправить запись. Реализуется это с помощью дескрипторов или через переключатели таблиц.

Примечание

Для пользователя есть несколько способов указать, в какую таблицу отправить строку: можно указать дескриптор таблицы, а можно использовать переключатель.

Технически нет ограничений на одновременное использование этих механизмов, но на практике не рекомендуется использовать эти механизмы одновременно.

Внимание

Когда у операции типа MapReduce не указан mapper, на входе reducer не будет данных о том, из каких входных таблиц пришли записи.

Дескрипторы

Существуют правила нумерации дескрипторов, которые позволят пользовательскому коду (джобу) записать строку Х в таблицу 0, а строку Y в таблицу 1.

Когда выходная таблица одна — джоб пишет все записи в дескриптор 1 (в stdout).
Когда выходных таблиц несколько, применяются правила нумерации, формула 3k+1, где k — индекс выходной таблицы, начинается с 0:

  • Таблица 0 — дескриптор 1;
  • Таблица 1 — дескриптор 4;
  • Таблица 2 — дескриптор 7;
  • ...

Переключатели таблиц

Соответствие между номерами дескрипторов и выходными таблицами можно менять в процессе работы джоба с помощью переключателей таблиц.

Схему настройки переключателей входных таблиц можно прочесть в разделе Настройки ввода/вывода.

Форма переключателей зависит от формата выходных данных.

YSON

В YSON и для входных и для выходных таблиц в потоке записей может встретиться entity с атрибутами. Это управляющие команды, встроенные в поток данных. Например: table_index=N, где N — integer. Эта команда «переключает» поток таким образом, что последующие записи относятся к другой таблице с индексом N. Это верно и для входных, и для выходных таблиц.

Допустим у нашей операции на выходе 2 выходные таблицы.
По умолчанию для отправки данных в таблицу с индексом 0 необходимо записать их в дескриптор 1, а для отправки в таблицу 1 — в дескриптор 4.
Предположим, что джоб записал в дескриптор 4 такую последовательность записей:

{a=1}; 
<table_index=0>#;
{b=2}; 

В таком случае запись a=1 попадет в таблицу 1, а запись b=2 в таблицу 0, и оба дескриптора после этого будут писать в таблицу 0.

Внимание

Порядок строк при записи в одну таблицу через два дескриптора не определен.

Переключатели работают независимо для каждого выходного дескриптора.

Допустим, дескриптор N соответствует таблице X. При записи в дескриптор N 10 строк без переключателей, они попадут в таблицу X ровно в том порядке, в котором были записаны в дескриптор. Никаких других строк между ними не появится.

Допустим дескрипторы N и M соответствуют таблице X. При записи 5 строк в дескриптор N и 5 строк в дескриптор M, все строки окажутся в таблице X, однако порядок их будет неизвестен. Даже при условии, что пользовательский код пишет сначала только в дескриптор N, а потом только в дескриптор M, нельзя гарантировать, что строки из N будут лежать в таблице X раньше строк из M, или что они не перемешаются.

JSON

Работает по аналогии с YSON.

Переключатель таблицы:

'{"$value": null, "$attributes": {"table_index": 1}}\n'

DSV

Строго говоря, формат и его производные не поддерживают переключатели как отдельную управляющую команду в потоке данных. Каждая запись данных должна содержать информацию об индексе таблицы.

Примечание

DSV не поддерживает переключение выходных таблиц.

У входных таблиц при включенной опции формата enable_table_index каждая строка будет дополнена служебным полем, содержащим индекс таблицы. По умолчанию имя поля — @table_index. Имя поля можно изменить через опцию формата table_index_column.

Все строки из таблицы //path/to/table, которая имела номер N в списке входных таблиц операции, будут иметь одинаковое значение поля @table_index, равное N.

SCHEMAFUL_DSV

При установке опции enable_table_index=true для таблиц в формате SCHEMAFUL_DSV во всех строках таблицы первым полем, перед колонами схемы, будет записан индекс таблицы.

Примеры

# -*- coding: utf-8 -*-
import yt.wrapper as yt

from datetime import datetime

def parse_time(time_str):
    "2012-10-19T11:22:58.190448Z"
    return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")

def get_duration(rowA, rowB):
    if rowA["event_type"] == "job_completed":
        rowA, rowB = rowB, rowA
    delta = parse_time(rowB["timestamp"]) - parse_time(rowA["timestamp"])
    return delta.total_seconds() + (delta.microseconds / 1000000.0)

# По-дефолту map-функция принимает на вход одну запись. 
# Любая функция, запускаемая в качестве операции должна быть генератором. 
def filter_event_type(row):
    if row.get("event_type") in ["job_started", "job_completed"]:
        yield row

# В случае reduce-функции на вход приходит ключ (это мапа из ключевых колонок в их значения) и список записей. 
# У каждой записи ключевые колонки равны тому, что лежит в key
def calculate_duration(key, rows):
    rows = list(rows)
    if len(rows) == 2:
        yield {"id": key["id"], "duration": get_duration(*rows)}
    else:
        assert len(rows) == 1
        row = rows[0]
        # Выставить номер выходной таблицы можно через специальное поле
        row["@table_index"] = 1
        yield row

if __name__ == "__main__":
    yt.config["tabular_data_format"] = yt.JsonFormat(control_attributes_mode="row_fields")
    yt.run_map_reduce(filter_event_type, calculate_duration,
                      "//tmp/forbeginners/event_log",
                      ["//tmp/forbeginners/durations", "//tmp/forbeginners/filtered"],
                      reduce_by="id")

Пример переключения между выходными таблицами с table_index при control_attributes_mode="iterator":

import yt.wrapper as yt

import random

@yt.aggregator
@yt.with_context
def mapper(rows, context):
    for row in rows:
        input_table_index = context.table_index
        sum = 0
        if input_table_index == 0:
            sum += int(row["value"])
        else:
            sum -= int(row["value"])

        output_table_index = random.randint(0, 1)
        # функция позволяет создать запись, являющую переключателем на таблицу с указанным индексом.
        yield yt.create_table_switch(output_table_index)
        yield {"sum": sum}

if __name__ == "__main__":
    yt.run_map(mapper,
               ["//tmp/input1", "//tmp/input2", "//tmp/input3"],
               ["//tmp/output1", "//tmp/output2"],
               format=yt.YsonFormat(control_attributes_mode="iterator"))
Предыдущая
Следующая