Переключение таблиц
Операции могут иметь несколько входных и несколько выходных таблиц. В данном разделе описано, как переключать чтение и запись между ними.
В пользовательском коде операций Map и Reduce можно получать данные о том, из какой таблицы пришла запись, и выбирать, в какую таблицу отправить запись. Реализуется это с помощью дескрипторов или через переключатели таблиц.
Примечание
Для пользователя есть несколько способов указать, в какую таблицу отправить строку: можно указать дескриптор таблицы, а можно использовать переключатель.
Технически нет ограничений на одновременное использование этих механизмов, но на практике не рекомендуется использовать эти механизмы одновременно.
Внимание
Когда у операции типа MapReduce не указан mapper, на входе reducer не будет данных о том, из каких входных таблиц пришли записи.
Дескрипторы
Существуют правила нумерации дескрипторов, которые позволят пользовательскому коду (джобу) записать строку Х в таблицу 0, а строку Y в таблицу 1.
Когда выходная таблица одна — джоб пишет все записи в дескриптор 1 (в stdout).
Когда выходных таблиц несколько, применяются правила нумерации, формула 3k+1, где k — индекс выходной таблицы, начинается с 0:
- Таблица 0 — дескриптор 1;
- Таблица 1 — дескриптор 4;
- Таблица 2 — дескриптор 7;
- ...
Переключатели таблиц
Соответствие между номерами дескрипторов и выходными таблицами можно менять в процессе работы джоба с помощью переключателей таблиц.
Схему настройки переключателей входных таблиц можно прочесть в разделе Настройки ввода/вывода.
Форма переключателей зависит от формата выходных данных.
YSON
В YSON и для входных и для выходных таблиц в потоке записей может встретиться entity с атрибутами. Это управляющие команды, встроенные в поток данных. Например: table_index=N
, где N — integer. Эта команда «переключает» поток таким образом, что последующие записи относятся к другой таблице с индексом N. Это верно и для входных, и для выходных таблиц.
Допустим у нашей операции на выходе 2 выходные таблицы.
По умолчанию для отправки данных в таблицу с индексом 0 необходимо записать их в дескриптор 1, а для отправки в таблицу 1 — в дескриптор 4.
Предположим, что джоб записал в дескриптор 4 такую последовательность записей:
{a=1};
<table_index=0>#;
{b=2};
В таком случае запись a=1
попадет в таблицу 1, а запись b=2
в таблицу 0, и оба дескриптора после этого будут писать в таблицу 0.
Внимание
Порядок строк при записи в одну таблицу через два дескриптора не определен.
Переключатели работают независимо для каждого выходного дескриптора.
Допустим, дескриптор N соответствует таблице X. При записи в дескриптор N 10 строк без переключателей, они попадут в таблицу X ровно в том порядке, в котором были записаны в дескриптор. Никаких других строк между ними не появится.
Допустим дескрипторы N и M соответствуют таблице X. При записи 5 строк в дескриптор N и 5 строк в дескриптор M, все строки окажутся в таблице X, однако порядок их будет неизвестен. Даже при условии, что пользовательский код пишет сначала только в дескриптор N, а потом только в дескриптор M, нельзя гарантировать, что строки из N будут лежать в таблице X раньше строк из M, или что они не перемешаются.
JSON
Работает по аналогии с YSON.
Переключатель таблицы:
'{"$value": null, "$attributes": {"table_index": 1}}\n'
DSV
Строго говоря, формат и его производные не поддерживают переключатели как отдельную управляющую команду в потоке данных. Каждая запись данных должна содержать информацию об индексе таблицы.
Примечание
DSV не поддерживает переключение выходных таблиц.
У входных таблиц при включенной опции формата enable_table_index
каждая строка будет дополнена служебным полем, содержащим индекс таблицы. По умолчанию имя поля — @table_index
. Имя поля можно изменить через опцию формата table_index_column
.
Все строки из таблицы //path/to/table
, которая имела номер N в списке входных таблиц операции, будут иметь одинаковое значение поля @table_index
, равное N.
SCHEMAFUL_DSV
При установке опции enable_table_index=true
для таблиц в формате SCHEMAFUL_DSV во всех строках таблицы первым полем, перед колонами схемы, будет записан индекс таблицы.
Примеры
# -*- coding: utf-8 -*-
import yt.wrapper as yt
from datetime import datetime
def parse_time(time_str):
"2012-10-19T11:22:58.190448Z"
return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
def get_duration(rowA, rowB):
if rowA["event_type"] == "job_completed":
rowA, rowB = rowB, rowA
delta = parse_time(rowB["timestamp"]) - parse_time(rowA["timestamp"])
return delta.total_seconds() + (delta.microseconds / 1000000.0)
# По-дефолту map-функция принимает на вход одну запись.
# Любая функция, запускаемая в качестве операции должна быть генератором.
def filter_event_type(row):
if row.get("event_type") in ["job_started", "job_completed"]:
yield row
# В случае reduce-функции на вход приходит ключ (это мапа из ключевых колонок в их значения) и список записей.
# У каждой записи ключевые колонки равны тому, что лежит в key
def calculate_duration(key, rows):
rows = list(rows)
if len(rows) == 2:
yield {"id": key["id"], "duration": get_duration(*rows)}
else:
assert len(rows) == 1
row = rows[0]
# Выставить номер выходной таблицы можно через специальное поле
row["@table_index"] = 1
yield row
if __name__ == "__main__":
yt.config["tabular_data_format"] = yt.JsonFormat(control_attributes_mode="row_fields")
yt.run_map_reduce(filter_event_type, calculate_duration,
"//tmp/forbeginners/event_log",
["//tmp/forbeginners/durations", "//tmp/forbeginners/filtered"],
reduce_by="id")
Пример переключения между выходными таблицами с table_index
при control_attributes_mode="iterator"
:
import yt.wrapper as yt
import random
@yt.aggregator
@yt.with_context
def mapper(rows, context):
for row in rows:
input_table_index = context.table_index
sum = 0
if input_table_index == 0:
sum += int(row["value"])
else:
sum -= int(row["value"])
output_table_index = random.randint(0, 1)
# функция позволяет создать запись, являющую переключателем на таблицу с указанным индексом.
yield yt.create_table_switch(output_table_index)
yield {"sum": sum}
if __name__ == "__main__":
yt.run_map(mapper,
["//tmp/input1", "//tmp/input2", "//tmp/input3"],
["//tmp/output1", "//tmp/output2"],
format=yt.YsonFormat(control_attributes_mode="iterator"))