Вставка из операций в динамические таблицы

YTsaurus позволяет указывать сортированные динамические таблицы в качестве выходных таблиц map-reduce операций (так называемый «bulk insert»).

Применение:

  • поставка данных в таблицы;
  • клиентская реализация DELETE WHERE;
  • фоновый процесс очистки строк с нетривиальным TTL.

Внимание

Если вы впервые запускаете map-reduce операцию с выводом в динамическую таблицу, пожалуйста, прочитайте эту страницу, особенно раздел «Когда bulk insert не нужен». Возможно, вашу задачу правильно решать иным образом.

Для запуска операций на production-кластерах необходимо указать параметр {allow_output_dynamic_tables=%true} в спецификации операции.

Когда bulk insert не нужен

Часто вставка из операций в динамические таблицы не является оптимальным способом решить задачу. Если вы попали сюда из алерта операции, прочитайте, пожалуйста, этот абзац. Возможно, вашу задачу нужно решать иначе.

  • Запись небольшого объёма данных в динамическую таблицу. Обычный способ вставки в динамическую таблицу — метод insert_rows. Если ваша задача — просто писать в таблицу небольшие объёмы, рекомендуется пользоваться этим API.
  • Создание динамической таблицы из готовых данных. YTsaurus позволяет конвертировать статические таблицы в динамические. Если есть готовые данные, из которых нужно создать динамическую таблицу один раз, стоит создать статическую таблицу, а затем сконвертировать её в динамическую командой alter-table. Подробности можно прочитать в разделе Конвертация статической таблицы в динамическую.
  • Изменение формата чанка, кодека компресии и т.д. При изменении атрибутов хранения статической таблицы рекомендуется использовать операцию Merge, чтобы применить изменения к существующим чанкам. В случае динамических таблиц для этого правильно использовать форсированную компактификацию. Если в динамическую таблицу регулярно записываются данные, компактификация и так будет запускаться регулярно, поэтому обычно необходимости явно вызывать форсированную компактификацию нет.

Особенности

У операций, в которых есть динамические выходные таблицы, есть ряд особенностей.

  • Операцию нельзя запускать под пользовательской транзакцией.
  • Выходные динамические таблицы могут быть только сортированными.
  • В динамическую таблицу можно писать как в режиме добавления данных, так и в режиме перезаписи. Это регулируется атрибутом <append=%true> на пути. В отличие от сортированных статических таблиц, при добавлении данных в динамическую таблицу на неё берётся shared-блокировка, поэтому несколько операций могут писать в таблицу одновременно. Отметим, что запись в режиме добавления — это upsert, а не append, то есть записывать данные не обязательно в конец таблицы.
  • Можно писать только в примонтированную или замороженную динамическую таблицу. Запись в замороженную таблицу настоятельно не рекомендуется по причине того, что в замороженных таблицах не работает компактификация; без неё чтение вскоре станет неэффективным, а запись заблокируется.
  • Писать в динамическую таблицу можно только сортированные данные с unique_keys=%true. Если пользовательские джобы таким свойством не обладают, стоит использовать промежуточную статическую таблицу, которую затем отсортировать с выходом в динамическую операцией sort.
  • Транзакция операции не похожа на классические таблетные транзакции. В частности, во время работы операции в выходную таблицу можно писать при помощи insert_rows, и если операция затрагивает те же строки, они будут перезатёрты.
  • Из операции можно писать данные в расширенном формате, который позволяет удалять строки и указывать режим агрегации.

Рекомендации по эффективности

Алгоритмы динамических таблиц сконфигурированы для работы с определённым размером чанков (порядка 50-200 MB). Если чанки будут сильно меньше или больше, компактификация может работать неэффективно, а в ряде случаев таблица может временно стать недоступной для записи. Для соблюдения размеров чанков есть ряд рекомендаций.

  • При запуске merge-операции в динамическую таблицу из статической таблицы с совместимой схемой чанки входной таблицы могут быть «телепортированы» в выходную таблицу. Совместимость схемы означает, что входная таблица отсортирована как минимум по тем же колонкам, что и выходная, и в схеме указан атрибут unique_keys=%true. Поскольку чанки статических таблиц имеют больший размер, стоит использовать один из двух вариантов:
    • указывать опцию {force_transform=%true} в спецификации merge-операции: эта опция запрещает телепортацию, данные будут перезаписаны в формате, подходящем для динамических таблиц;
    • использовать специальные параметры job_io при подготовке статической таблицы, которую предполагается вставлять в динамическую. Подробнее об этих параметрах можно прочитать в разделе Конвертация статической таблицы в динамическую.
  • Если каждый джоб операции порождает мало выходных данных, это приведёт к маленьким чанкам на выходе операции. В этом случае следует либо уменьшать число джобов в операции, либо вставлять данные в промежуточную статическую таблицу, а затем с помощью merge-операции с {force_transform=%true} вставлять данные в динамическую таблицу.

Транзакционная модель

Как и в случае обычных операций, коммит bulk insert атомарный: изменения либо целиком попадут во все выходные таблицы, либо не попадут ни в одну.

В момент коммита операции все динамические таблицы, в которые происходит вставка, блокируются. Блокировка распространяется на любые записи, а также на чтения с timestamp, большим времени начала блокировки, и SyncLastCommittedTimestamp. Чтения с AsyncLastCommittedTimestamp заблокированы не будут. Если коммит операции завершается успешно, таблица разблокируется и в ней появляются данные. Timestamp всех вставленных строк равен timestamp коммита операции. В частности, timestamp вставленных строк во всех затронутых таблицах будет одинаковым.

Bulk insert конфликтует только с теми таблетными транзакциями, которые затрагивают момент коммита (блокировки). Если таблетная транзакция началась до блокировки, то она не сможет закоммититься после. Либо (скоре всего) произойдёт abort, либо коммит операции будет дожидаться коммита транзакции.

Несколько bulk insert операций c <append=%true> могут работать и даже быть закоммичены параллельно. Построчных блокировок нет: система позволяет редактировать одну и ту же строку разным операциям.

Удаления и расширенный формат записи

Расширенный формат позволяет удалять строки из таблицы, устанавливать режим агрегации (aggregate в insert_rows) и режим перезаписи (update в insert_rows). Чтобы им воспользоваться, нужно указать на пути к выходной таблице атрибут schema_modification=unversioned_update и писать из операции в специальной схеме:

  • ключевые колонки остаются без изменений;
  • добавляется обязательная колонка с именем $change_type, указывающая, является ли строка записью или удалением;
  • каждая неключевая колонка с именем name заменяется на две колонки с именами $value:name и $flags:name. Первая будет содержать непосредственно значение, а вторая управлять режимами агрегации и перезаписи.

Названия служебных колонок можно взять из api.

$change_type принимает значения из enum-а ERowModificationType. Допустимы только значения Write и Delete.

Название Значение
ERowModificationType::Write 0
ERowModificationType::Delete 1

Флаги значений принимают битовую маску значений enum-а EUnversionedUpdateDataFlags. Комбинация missing | aggregate не имеет смысла, хотя и допустима. Если флаги для колонки не указаны, они считаются равными 0.

Название Значение Пояснение
EUnversionedUpdateDataFlags::Missing 1 Если бит установлен, значение будет проигнорировано, иначе перезаписано (если значение отсутствует, в таблицу будет записан Null).
EUnversionedUpdateDataFlags::Aggregate 2 Если бит установлен, будет применена агрегация, иначе значение обновится.

Предположим, у нас есть таблица с ключевой колонкой user_name (string), колонкой age (uint64) и агрегирующей колонкой balance (int64). Тогда расширенная схема будет выглядеть так:

{name="user_name"; type=string; sort_order=ascending}
{name="$change_type"; type=uint64; required=%true}
{name="$value:age"; type=uint64}
{name="$flags:age"; type=uint64}
{name="$value:balance"; type=int64}
{name="$flags:balance"; type=uint64}

Тогда для удаления строки нужна запись вида

{
  "user_name"="vasya";
  "$change_type"=1; // delete
}

А для обновления баланса (и сохранения возраста) — запись вида

{
  "user_name"="vasya";
  "$change_type"=0; // write
  "$flags:age"=1; // missing
  "$value:balance"=100500;
  "$flags:balance"=2; // aggregate
}

В расширенном формате также можно создавать промежуточные статические таблицы, которые потом будут вставлены в динамические с помощью sort или merge. Это может быть нужно крайне редко.

DELETE WHERE через input query

С помощью bulk insert можно эффективно удалять много строк из таблицы по условию, используя input query. С помощью SQL-подобного языка запросов можно фильтровать строки, подающиеся на вход операции. Используется такой же синтаксис, как в select-rows. Например, так можно удалить из таблицы все строки с чётным ключом.

input_query = "1u as [$change_type], key1, key2 where key1 % 2 = 0"
yt.run_merge(
    table,
    yt.TablePath(
        table,
        append=True,
        attributes={"schema_modification": "unversioned_update"}),
    mode="ordered",
    spec={"input_query": input_query})

Операция прочитает все строки из таблицы, оставит подходящие под условие, преобразует в формат удаления и запишет обратно в таблицу.

Если при фильтрации остаётся малое число строк, рекомендуется запускать операцию в две стадии: сначала сформировать список ключей, подлежащих удалению, а затем merge-операцией с input query и расширенной схемой вставить удаления в динамическую таблицу. Подробнее читайте в разделе Рекомендации по эффективности.

Важно

Как и в случае delete_rows, удаление не удаляет строки из таблицы, а записывает tombstone, которые впоследствии будут скомпактифицированы. Если удалить из таблицы большую долю данных, запросы могут начать произвольно тормозить, потому что на каждую полезную строку придётся прочитать много лишних удалённых.

При регулярных удалениях в большинстве сценариев компактификация будет запускаться автоматически, но в некоторых случаях может понадобиться настройка таблицы.

Дополнительные поля в спецификации

Спецификация операции может содержать все те же поля, что и обычно, за небольшими исключениями.

  • для указания конфига table writer используйте опцию dynamic_table_writer, которая указывается там же, где и обычный table_writer*_job_io). Чанки и блоки динамических таблиц должны быть меньше, чем чанки статических таблиц. Значение по умолчанию — {desired_chunk_size=100MB; block_size=256KB}.

Особенности компактификации

Этот раздел посвящён деталям реализации. Если вы хотите использовать bulk insert для больших объёмов, стоит его прочитать.

Сортированные динамические таблицы внутри напоминают LSM-дерево. Используемая структура данных эффективно работает с равномерным, пусть и достаточно большим, потоком записи. Записанные данные компактифицируются в фоновом режиме, укладываясь в чанки предсказуемым образом.

При вставке из операции в таблицу добавляется сразу много новых чанков, минимум по одному на джоб. Эти чанки не согласованы со структурой таблицы, а «свалены в кучу». После нескольких операций доступ к данным станет неэффективным, поэтому их надо переложить.

С компактификацией есть две проблемы. Во-первых, нужны ресурсы. Одна нода может компактифицировать со скоростью порядка 100 Mb/s, при этом обрабатываются не только свежие данные, но и старые. Поэтому, если в бандле одна нода, то делать bulk insert по десяткам гигабайт в минуту скорее всего не получится, поскольку структура не будет успевать восстанавливаться.

Во-вторых, эвристики компактификации не идеальны. Если в таблице окажется много очень маленьких чанков, алгоритм станет менее эффективным. Если в таблице окажется очень большой чанк с широким диапазоном ключей — тоже. В худшем случае таблица залипнет, и починить её можно будет только вручную.

Bulk insert может привести к ряду необычных сценариев. Для многих из них предусмотрены правильные эвристики, но вряд ли учтено абсолютно всё. Поэтому тестовый запуск на реальных объёмах — это важный шаг.