Запуск операций по динамическим таблицам

В данном разделе описано, как запускать MapReduce операции по динамическим таблицам, сказано о консистентности данных. Приведены примеры конвертации статической таблицы в динамическую и обратно.

Всё описанное в равной степени относится ко всем типам операций — map, merge и т.д., а не только непосредственно map-reduce, а также к команде read-table.

Обзор

В системе YTsaurus существует возможность запускать MapReduce операции поверх сортированных и упорядоченных динамических таблиц. Также возможно быстро (на уровне метаданных) преобразовать статическую таблицу в динамическую.

Поскольку статические и динамические таблицы изначально создавались под разные сценарии работы, существует несколько принципиальных отличий статических таблиц от динамических (как сортированных, так и упорядоченных).

  1. При вставке в динамическую таблицу данные в первую очередь попадают в журнал и специальное хранилище в памяти (т. н. dynamic store), и только после этого записываются в чанки на дисках в виде версионированых строк (пока данные не записаны в чанки, отказоустойчивость гарантируется журналом). Данные статических таблиц всегда записываются сразу в чанки на дисках;
  2. Все ключи в динамической сортированной таблице уникальны. Строки таблицы однозначно определяются своим ключом;
  3. Все данные в динамической сортированной таблице версионированы: при вставке в динамическую сортированную таблицу данным присваивается метка времени коммита. При чтении можно указать временную метку для получения версии строки таблицы на указанный временной срез;
  4. В сортированных динамических таблицах периодически происходит компактификация: старые версии данных удаляются, чанки перезаписываются;
  5. Ни сортированные, ни упорядоченные динамические таблицы не позволяют делать сквозную индексацию по номерам строк: в сортированных это полностью невозможно, в упорядоченных возможно внутри каждого таблета отдельно.

Запуск операций по динамическим таблицам

Данная секция относится как к сортированным, так и к упорядоченным таблицам.

Динамическая таблица может быть указана как входная (или одна из входных) для MapReduce операции. Интерфейс MapReduce операции такой же как и у статических таблиц с тем отличием, что вместо пути к статической таблице указывается путь к динамической. Динамическая таблица при этом может быть как примонтирована, так и отмонтирована. Использованию динамических таблиц как выходных для операции посвящена отдельная статья.

Поскольку обычные MapReduce операции работают с чанками, а динамические таблицы хранят часть данных в памяти в dynamic stores, в запуске операций по динамическим таблицам есть некоторая специфика. По умолчанию данные из dynamic stores из операций не видны. Есть несколько способов это обойти.

Подходящий в большинстве случаев способ — указать на таблице атрибут @enable_dynamic_store_read=%true (предварительно её отмонтировав), после чего все данные будут видны из операций. Это принятый подход, если вам нужны свежие данные, однако есть побочные эффекты:

  • результат последовательных запусков одной и той же операции поверх одной таблицы (даже под snapshot lock) может отличаться, поскольку второй запуск прочитает больше свежих строк. По этой причине в YQL отключено кеширование для таблиц с @enable_dynamic_store_read. Для идемпотентности стоит явно указывать timestamp или набор row index-ов;
  • если на таблице установлен атрибут @merge_rows_on_flush, обеспечить консистентное чтение по timestamp при включённом @enable_dynamic_store_read на данный момент невозможно;
  • если один таблет будут читать сотни джобов, есть риск перегрузить таблетные ноды.

Если по каким-то причинам этот режим использовать не хочется, есть ещё ряд способов:

  1. Отмонтировать таблицу. В отмонтированном состоянии все данные динамической таблицы хранятся в чанках;
  2. Заморозить таблицу командой freeze-table и дождаться когда все таблеты будут находиться в состоянии frozen. В таком состоянии таблицы все данные записаны в чанки, запись новых данных запрещена, но таблица доступна на чтение командами lookup-rows и select-rows. Можно выполнять MapReduce операции;
  3. Заказать состояние таблицы на такой момент времени в прошлом, чтобы все данные уже попали в чанки;
  4. Ничего не предпринимать и получить данные из чанков как в пункте 3, но без гарантии, что все ключи будут относиться к одной версии таблицы (в разных таблетах данные записываются на диск в разное время, синхронизации между ними нет).

Примечание

Над таблицей с включённым @enable_dynamic_store_read можно запускать операции, игнорирующие dynamic stores и читающие только из чанков. Для этого надо явно указать enable_dynamic_store_read = %false в корне спеки.

Запуск операций по состоянию сортированной динамической таблицы на определённый момент времени

По умолчанию в MapReduce операцию попадает самая последняя версия строки, которая содержится в чанках. Если таблица примонтирована, то версии в чанках могут быть неконсистентны: для двух разных строк из одной и той же транзакции может оказаться, что одна строка уже находится в чанке, а вторая всё ещё находится в оперативной памяти. В итоге в MapReduce операции будет видна только часть строк данной транзакции. В случае включённого @enable_dynamic_store_read неконсистентность также возможна, поскольку джобы читают разные части таблицы в разное время, и между чтениями могла произойти запись. В тех случаях, когда важно иметь консистентное состояние таблицы в MapReduce операции, необходимо указать временную метку в специальном атрибуте YPath: <timestamp=123456789>//path/to/dynamic/sorted/table.

На timestamp есть нижнее и верхнее ограничения:

  • поскольку данные периодически компактифицируются, timestamp должен быть не меньше чем значение атрибута @retained_timestamp. Компактификация очищает старые данные, но только те, что предшествуют @retained_timestamp;
  • если на таблице указан атрибут @enable_dynamic_store_read, то верхнего ограничения нет. Однако если указать timestamp из будущего, то консистентность обеспечена не будет;
  • если @enable_dynamic_store_read не указан, то timestamp должен быть меньше чем временная метка тех строк, которые ещё не были записаны на диск. Узнать временную метку можно через атрибут таблицы unflushed_timestamp.

Таким образом, в случае включённого чтения из dynamic stores рекомендуется брать текущий timestamp (его можно получить командой generate_timestamp). При чтении из таблицы-реплики генерировать timestamp нужно на метакластере. Если чтение из dynamic stores не включено, нужно использовать unflushed_timestamp - 1.

При указании timestamp планировщик проверит, что значение временной метки попадает в допустимый интервал.

У примонтированной таблицы значения атрибутов unflushed_timestamp и retained_timestamp постоянно меняются, поэтому при работе с таблицей на неё стоит брать snapshot lock. Нет гарантий, что retained_timestamp всегда меньше unflushed_timestamp. Перечисленные значения зависят от настроек компактификации и процесса записи данных в чанки и, если указанное условие слишком часто нарушается, придётся поменять настройки таблицы.

Полезно обратить внимание на следующие атрибуты:

  • dynamic_store_auto_flush_period — периодичность сброса данных на диск. Имеет смысл при отключённом @enable_dynamic_store_read. Значение по умолчанию — 15 минут;
  • min_data_ttl — определяет, насколько старые данные не должны подвергаться компактификации. Подробнее можно прочитать в разделе Динамические сортированные таблицы.

Примечание

В текущей реализации не поддерживаются операции с указанием timestamp над таблицами, примонтированными в неатомарном режиме.

Запуск операций по упорядоченным таблицам

Упорядоченные динамические таблицы довольно похожи на статические. Отличия с точки зрения операций в том, что индексация по номерам строк происходит внутри каждого таблета отдельно (причём индексация может начинаться не с нуля из-за trim'а), а также что свежие данные в каждом таблете находятся в dynamic store.

Примечание

Эффект trim'а может быть заметен в операции не сразу. Ситуация, при которой в операцию попали строки, удалённые через trim, считается допустимой.

Индексация

В операциях по упорядоченным таблицам доступен контрольный атрибут enable_tablet_index. Атрибут enable_row_index же имеет иную семантику, нежели в статических таблицах: возвращаемый индекс строки считается относительно начала таблета. Нумерация строк внутри таблета логическая, то есть даже если сделать trim, то номера существующих строк не изменятся.

Row index и tablet index также можно указывать в диапазонах ypath. Каждая строчка в упорядоченной таблице индексируется парой (tablet_index, row_index). Нижняя граница lower_limit = {tablet_index = a; row_index = b} допускает строки из таблета a с номерами b и больше, а также все строки из таблетов с номерами больше a. Верхняя граница upper_limit = {tablet_index = c; row_index = d} допускает все строки из таблетов с номерами меньше c, а также строки из таблета c с номером строго меньше d.

В границе можно указать только tablet_index. Нижняя граница вида {tablet_index = e} допускает все строки таблетов с номерами не менее e; верхняя граница такого вида допускает все строки таблетов с номерами строго меньше e.

Внимание

Есть ограничение на количество диапазонов в YPath. Если количество таблетов упорядоченной таблицы превышает 100 и вы планируете указывать в операции по одному диапазону на каждый таблет упорядоченной таблицы, стоит обсудить это с администраторами кластера.

Допустимые границы индексов

Как и в случае с timestamp для сортированных таблиц, на row index в упорядоченной таблице есть верхнее и нижнее ограничения в пределах таблета. Нижнее ограничение связано с удалением строк через trim. Верхнее связано с хранением строк в dynamic store и имеет смысл, если вам не подходит enable_dynamic_store_read и вы хотите запускать операцию по примонтированной таблице.

Ограничения можно получить из атрибута таблицы @tablets. В значении, соответствующем каждому таблету, есть поля trimmed_row_count и flushed_row_count.

В отличие от timestamp, корректность индексов для упорядоченных таблиц не валидирируется: если указать индексы за пределами указанных ограничений, операция просто прочитает меньше данных, чем ожидалось.

Конвертация статической таблицы в динамическую

Для конвертации статической таблицы в динамическую необходимо выполнить команду alter-table:

yt alter-table --dynamic //tmp/t
yt.alter_table("//tmp/t", dynamic=True)

В конвертации есть несколько тонкостей, на которые стоит обратить внимание.

  • Команда alter-table выполнится и для несортированной таблицы. В результате получится динамическая упорядоченная таблица. Для таких таблиц команда insert-rows работает не так, как для сортированных, а команда select-rows на тех же запросах будет работать значительно медленнее.

  • Все ключи динамической сортированной таблицы уникальны, поэтому статическая таблица также должна содержать только уникальные ключи. Система определяет данный факт по атрибуту @unique_keys у схемы. Выставить данный атрибут на схему существующей непустой таблицы с помощью команды alter-table нельзя. Необходимо специально создавать таблицу с указанным атрибутом unique_keys=true у схемы. Уникальность ключей будет провалидирована при записи. В случае, когда таблица уже создана, все ключи в ней уникальны, но у схемы выставлен атрибут unique_keys=false, необходимо создать новую таблицу с исправленной схемой и скопировать в неё данные с помощью операции Merge.

  • Типичные размеры чанков и блоков для динамических таблиц меньше, чем для статических. Если таблицу предполагается конвертировать в динамическую, стоит выставить определённые параметры в спеке операции, порождающей эту таблицу:

    "job_io": {"table_writer": {"block_size": 256 * 2**10, "desired_chunk_size": 100 * 2**20}}
    

    Если в операции несколько типов джобов, конфиг необходимо указывать только для job_io последнего типа джобов в операции (merge_job_io для операции Sort, reduce_job_io для операции MapReduce). Если в операции используется автоматическое слияние чанков, аналогичный конфиг необходимо указать в секции "automerge": {"job_io": {...}}.

    Подробнее про job_io для различных типов операций можно прочитать в разделе Настройки операций.

  • Проверить размеры чанков и блоков получившейся таблицы можно с помощью следующих атрибутов таблицы:

    • чанки: @compressed_data_size / @chunk_count
    • блоки: @chunk_format_statistics/*/max_block_size (значение атрибута – это словарь, содержащий значение max_block_size для каждого типа чанков в таблице).
Подробнее про размеры чанков и блоков

Динамические таблицы, как правило, используются для быстрого поиска по ключу. Данные в чанках разбиты на блоки, и при чтении с диска читается сразу весь блок. Таблица по умолчанию сжата определённым кодеком, разжатие будет применяться также для всего блока. Чтобы не тратить слишком много вычислений на поиск одной строки, стоит уменьшить размер блока. Блоки статических таблиц по умолчанию имеют размер 16 мегабайт, тогда как блоки динамических таблиц — 256 килобайт. Но если статическую таблицу конвертировать в динамическую, то размер блока останется прежним как у исходной статической таблицы. Поэтому при создании статической таблицы необходимо принудительно уменьшить размер блока, указав в спецификации настройки block_size.

Внимание

В текущей реализации системы YTsaurus при попытке сделать запрос в динамическую таблицу выполняется проверка, что максимальный размер блока не превышает 512 килобайт. В противном случае запрос завершится с ошибкой "Maximum block size limit violated". Это сделано для того, чтобы избежать медленных чтений из таблиц. Такая же ошибка возникнет и у динамической таблицы, подгружаемой в память, при попытке загрузить чанк с большими блоками. Для решения проблемы можно либо компактифицировать таблицу, либо увеличить ограничение на размер блока до 32 мегабайт с помощью атрибута max_unversioned_block_size (необходимо установить атрибут на таблице, после чего вызвать remount_table).

Таблеты в динамических таблицах делятся на партиции. Типичный размер партиции составляет несколько сотен мегабайт. В том случае, если чанки становятся больше чем размер партиции, запускается фоновый процесс, принудительно разбивающий чанки на более мелкие. Полезно при создании динамической таблицы также указать в спецификации настройки desired_chunk_size.

В таком случае чанки получатся достаточно мелкими (по 100 мегабайт). В противном случае чанки будут по умолчанию большими (512Мб и более). Когда таблица пошардирована на шарды (таблеты), такие чанки могут попадать в несколько таблетов и при вычислении размера таблицы будут учитываться несколько раз, в результате размер динамической таблицы может оказаться в несколько раз больше исходной статической.

В том случае, когда таблицу предполагается держать в памяти (управляется атрибутом @in_memory_mode), для достижения наибольшей производительности рекомендуется выполнить форсированную компактификацию. Необходимость вызвана следующим:

  • один и тот же чанк попадает в разные таблеты, а чанки разных таблетов подгружаются в память независимо (таблеты могут быть на разных узлах кластера). В результате затрачивается больше памяти, чем необходимо;
  • ридеры для чанков статических таблиц не оптимизированы для случая, когда данные уже в памяти.

Если динамическую таблицу предполагается использовать только для чтения, стоит примонтировать её в «замороженном» состоянии (управляется флагом --freeze команды mount-table). Это запретит запись, отключит компактификацию и снизит накладные расходы на обслуживание таблицы. Стоит делать это лишь в случае, когда размеры чанков и блоков соответствуют рекомендованным.

Пример

Ниже приведён пример скрипта, демонстрирующий, как можно сохранить сортированную динамическую таблицу в статическую и вспоследствии восстановить обратно. Данную операцию можно осуществлять для создания резервных копий динамических таблиц.

Обратите внимание, что скрипт не является production-ready и приведён только для ознакомления. В частности, он сохраняет только некоторые атрибуты таблиц и никак не обратабывает ошибки.

Преобразование динамической таблицы в статическую и обратно
#!/usr/bin/python3
import yt.wrapper as yt
import argparse

def dump(src, dst):
    with yt.Transaction():
        # Take snapshot lock for source table.
        node_id = yt.lock(src, mode="snapshot")["node_id"]
        # Get timestamp of flushed data
        # (not required if @enable_dynamic_store_read is set).
        ts = yt.get(f"#{node_id}/@unflushed_timestamp") - 1
        # Create table and save vital attributes.
        yt.create("table", dst, attributes={
            "optimize_for": yt.get(f"#{node_id}/@optimize_for"),
            "schema": yt.get(f"#{node_id}/@schema"),
            "_yt_dump_restore_pivot_keys": yt.get(f"#{node_id}/@pivot_keys"),
        })
        # Dump table contents.
        yt.run_merge(yt.TablePath(f"#{node_id}", attributes={"timestamp": ts}), dst, mode="ordered")

def restore(src, dst):
    # Create destination table.
    yt.create("table", dst, attributes={
        "optimize_for": yt.get(f"{src}/@optimize_for"),
        "schema": yt.get(f"{src}/@schema"),
    })
    # Make blocks smaller (required for real-time lookups).
    yt.run_merge(src, dst, mode="ordered", spec={
        "job_io": {"table_writer": {"block_size": 256 * 2**10, "desired_chunk_size": 100 * 2**20}},
        "force_transform": True,
    })
    # Make table dynamic.
    yt.alter_table(dst, dynamic=True)
    # Restore tablet structure.
    yt.reshard_table(dst, yt.get(f"{src}/@_yt_dump_restore_pivot_keys"), sync=True)

if __name__ ==  "__main__":
    parser = argparse.ArgumentParser(description="Dynamic tables dump-restore tool")
    mode = parser.add_mutually_exclusive_group(required=True)
    mode.add_argument("--dump", nargs=2, metavar=("SOURCE", "DESTINATION"), help="Dump dynamic table to static")
    mode.add_argument("--restore", nargs=2, metavar=("SOURCE", "DESTINATION"), help="Restore dynamic table from static")
    args = parser.parse_args()
    if args.dump:
        dump(*args.dump)
    if args.restore:
        restore(*args.restore)

Использование вычисляемых колонок

Для более равномерного распределения нагрузки по кластеру, динамические таблицы необходимо шардировать (разбивать на таблеты). Для равномерного распределения нагрузки между таблетами может оказаться удобным добавить перед всеми ключевыми колонками дополнительную колонку, в которую записывать хеш от той части ключа, по которой логично выполнять шардирование, например, хеш от первой колонки. В результате получится таблица, ключи которой равномерно распределены в диапазоне [0, 2^64-1].

Добавленная таким образом колонка является вычисляемой. Формулу для вычисления необходимо указывать в схеме колонки в поле expression. Поддержка вычисляемых колонок добавлена во все операции, кроме Sort. При записи в таблицу значение таких колонок будет вычисляться. Чтобы подготовить статическую таблицу для конвертации в динамическую, необходимо в первую очередь сделать таблицу в несортированной схеме (но с вычисляемыми колонками), затем отсортировать её с помощью операции Sort.

Ниже приведён пример создания динамической таблицы с вычисляемыми колонками из статической. Вместо write_rows запись в таблицу могла бы быть осуществлена с помощью операций Map, Reduce, MapReduce или Merge. В последнем случае в спецификации необходимо указать {'schema_inference_mode': 'from_output'}, чтобы данные валидировались по схеме выходной таблицы. Также при указании schema_inference_mode необходимо создавать выходную таблицу операции вручную, явно указывая требуемую схему. Если этого не сделать, используемое API попробует самостоятельно создать выходную таблицу, используя схему, выведенную из входной таблицы, что в данном случае приведёт к ошибкам.

Создание динамической таблицы с вычисляемыми колонками
import yt.wrapper as yt
import yt.yson as yson
import time

# Unsorted schema.
schema = [
    {"name": "hash", "type": "uint64", "expression": "farm_hash(key)"},
    {"name": "key", "type": "uint64"},
    {"name": "value", "type": "string"}]

# Sorted schema with unique keys attribute.
sorted_schema = yson.YsonList([
    {"name": "hash", "type": "uint64", "expression": "farm_hash(key)", "sort_order": "ascending"},
    {"name": "key", "type": "uint64", "sort_order": "ascending"},
    {"name": "value", "type": "string"}])
sorted_schema.attributes["unique_keys"] = True

# Create table
table = "//tmp/table"
yt.remove(table, force=True)
yt.create_table(table, attributes={"schema": schema})

# Write rows into table. Computed columns are omitted: YTsaurus will evaluate them.
yt.write_table(table, [{"key": 2, "value": "2"}, {"key": 1, "value": "1"}])

# Sort table. Resulting table schema has unique_keys=True.
yt.run_sort(table, yt.TablePath(table, attributes={"schema": sorted_schema}), sort_by=["hash", "key"], spec={
    "partition_job_io": {"table_writer": {"block_size": 256 * 2**10}},
    "merge_job_io": {"table_writer": {"block_size": 256 * 2**10}},
    "sort_job_io": {"table_writer": {"block_size": 256 * 2**10}}})

# Alter table into dynamic.
yt.alter_table(table, dynamic=True)

# Mount table and wait until all tablets are mounted.
yt.mount_table(table, sync=True)

# Print rows.
for row in yt.select_rows("* from [{}]".format(table)):
    print row

Примеры запуска операций

Запуск Map операции:

yt map 'grep some_value' --src //path/to/dynamic/table --dst //tmp/output --spec '{pool = "project-pool"; job_count = 100; }'

Запуск Reduce операции:

yt reduce 'python my_reducer.py' --src //path/to/dynamic/table --dst '<sorted_by = ["key"]>'//tmp/output