Сортированные динамические таблицы

В данном разделе описано устройство сортированных динамических таблиц и действия, которые можно выполнять с данным видом таблиц.

Модель данных

Каждая сортированная динамическая таблица — набор строк, упорядоченных по ключу. Как и в случае статических таблиц, ключ может быть композитным, то есть состоящим из нескольких колонок. В отличие от статических таблиц, ключ в динамической сортированной таблице уникален.

Динамические сортированные таблицы должны быть строго схематизированы, то есть все имена колонок и их типы должны быть заранее указаны в схеме.

Поддерживаемые операции

Создание

Для создания динамической сортированной таблицы необходимо выполнить команду create table, указав в атрибутах схему и настройку dynamic=True. Схема должна соответствовать схеме сортированной таблицы.

CLI

yt create table //path/to/table --attributes \
'{dynamic=%true;schema=[{name=key;type=string;sort_order=ascending}; {name=value;type=string}]}'

Внимание

Важно указать в схеме таблицы хотя бы одну ключевую колонку. Если этого не сделать, таблица будет успешно создана, но окажется не сортированной, а упорядоченной. При этом для таблицы будет работать большинство типов select-запросов. Но все такие запросы будут сводиться к full scan данных.

Изменение схемы и типа таблицы

Поменять схему уже существующей динамической таблицы можно с помощью команды alter-table. Для успешного выполнения команды таблица должна быть отмонтирована, а новая схема — совместима со старой. При этом никаких изменений в записанных на диск данных не происходит, поскольку старые данные удовлетворяют новой схеме.

С помощью alter-table можно из статической таблицы сделать динамическую. Подробнее можно прочитать в разделе MapReduce по динамическим таблицам.

Чтение строки

Клиент может выполнять чтение строк по заданному ключу методом lookup. Для этого необходимо указать имя таблицы, временную метку, определяющую срез читаемых данных (<= t), а также имена интересующих колонок. Данный запрос является точечным, то есть требует указания всех компонент ключа. Существует разновидность вызовов API, позволяющих читать несколько строк по разным ключам за один запрос.

При чтении в пределах атомарной транзакции будет использоваться временная метка старта транзакции. Можно также указать конкретное числовое значение (например, приближенно отвечающее физическому моменту времени), а также одно из двух специальных значений:

  • sync_last_committed — следует прочитать самую свежую версию, гарантированно содержащую все изменения, выполненные в уже закоммиченных транзакциях;
  • async_last_committed — следует прочитать по возможности последнюю версию, но разрешается вернуть данные с небольшим (неспецифицированным, типично в пределах десятков миллисекунд) отставанием.

Метка async_last_committed может быть использована в тех случаях, когда консистентность чтения не требуется. Данный режим может работать быстрее при наличии конкуренции между чтениями и двухфазными коммитами. При двухфазном коммите строки таблицы блокируются специальным образом до тех пор, пока для транзакции не началась вторая стадия и выбрана метка коммита. Читатели, желающие получить самые последние данные, вынуждены ждать окончания первой фазы, так как до этого неизвестно, будет ли транзакция успешной.

Обе метки sync_last_committed, async_last_committed не гарантируют глобального консистентного среза. Данные, которые увидит читающий запрос, могут отвечать разным моментам времени как на уровне целых строк, так и на уровне отдельных колонок строк. Для консистентного чтения по всей таблице или для набора таблиц необходимо указывать конкретную временную метку.

Для таблицы с неатомарными изменениями режимы sync_last_committed и async_last_committed эквивалентны, так как двухфазного коммита не происходит.

Выполнение запроса

Система понимает SQL-подобный диалект, с помощью которого можно производить выборки и агрегацию по объемам данных в миллионы строк в режиме реального времени. Как и для операции чтения строк по ключу в запросе можно указать временную метку, относительно которой запрос следует выполнить.

Поскольку данные в системе фактически являются сортированными по набору ключевых колонок, при исполнении система использует данное свойство для сокращения объема прочитанного. В частности, пользовательский запрос анализируется и из него выводятся key ranges — диапазоны в пространстве ключей, объединение которых покрывает всю область поиска. Это позволяет выполнять эффективный range scan одним select-запросом. Подробнее можно прочитать в разделе Язык запросов.

При построении запроса, следует помнить, что если система не сможет вывести нетривиальные key ranges, то произойдет full scan данных. Full scan случится, например, если в качестве ключевых колонок указать key1, key2, а в запросе задать фильтрацию лишь по key2.

Запись строки

Клиент может выполнить запись данных методом insert_rows в пределах активной транзакции. Для этого он должен сообщить записываемые строки. В каждой такой строке должны присутствовать все ключевые поля. Часть полей данных из указанных в схеме может отсутствовать.

Семантически если строки с указанным ключом в таблице нет, то она появляется. Если же строка с таким ключом уже есть, то происходит перезапись части колонок.

При указании части полей существует 2 режима:

  • overwrite (по умолчанию) — все неуказанные поля обновляют свои значения на null;
  • update — включается опцией update == true. В таком случае сохранится предыдущее значение. В этом режиме необходимо передать все колонки, помеченные атрибутом required.

Примечание

При чтении и выполнении SQL запроса видны данные только на момент начала транзакции. Изменения, записанные в пределах той же транзакции, для чтения недоступны.

Удаление строки

В пределах транзакции клиент может удалить строку или набор строк, сообщив соответствующие ключи.

Семантически если строка с указанным ключом присутствовала в таблице, то она будет удалена. Если же строки не было, то никаких изменений не наступит.

Как и в большинстве MVCC-систем, удаление в системе YTsaurus сводится к записи строки без данных, но со специальным маркером tombstone, сигнализирующим об удалении. Это означает, что освобождение дискового пространства от удалённых строк происходит не сразу, а отложенным образом. Также удаление строк не вызывает немедленного ускорения при чтении: напротив, поскольку при чтении происходит слияние данных, оно замедляется.

Блокировка строки

Клиент может заблокировать строки в пределах транзакции. Блокировка дает гарантию того, что строка в течение текущей транзакции не будет изменена в других транзакциях. Одну строку можно заблокировать сразу из нескольких транзакций. Можно указывать отдельные lock группы колонок, которые будут заблокированы, а также режим блокировки weak или strong.

Удаление старых данных (TTL)

В процессе слияния чанков таблета часть данных может быть признана устаревшей и удалена.

Примечание

Удаление строк в транзакции фактически лишь записывает специальный маркер, но не освобождает память.

Значения атрибутов min_data_versions, max_data_versions, min_data_ttl, max_data_ttl показывают, можно ли удалить данные. Значение может быть удалено, если одновременно выполнено два условия:

  • нет ни одного запрета удалять данное значение;
  • есть хотя бы одно разрешение удалить данное значение.

Чтобы понять, для каких значений существуют разрешения и запреты и какие типы они имеют, можно мысленно отсортировать все значения в данной строке и данной колонке таблицы: (t1, v1), (t2, v2), ..., (tn, vn), где ti — временные метки, а vi — сами значения. Временные метки считаются упорядоченными по убыванию. Кроме того, команда удаления строки таблицы порождает специальное значение для всех её колонок: семантически оно не равно null, так как после записи null в строки их невозможно удалить. Тогда правила удаления таковы:

  • первые min_data_versions значений нельзя удалять по соображениям числа версий;
  • значения, записанные менее чем min_data_ttl до текущего момента нельзя удалять по соображениям времени;
  • значения, следующие за первыми max_data_versions, можно удалять по соображениям числа версий;
  • значения, записанные более давно чем max_data_ttl от текущего момента, можно удалять по соображениям времени.

Настройки по умолчанию:

  • min_data_versions = 1;
  • max_data_versions = 1;
  • min_data_ttl = 1800000 (30 min);
  • max_data_ttl = 1800000 (30 min).

Исходя из настроек по умолчанию, хотя бы одно (последнее) значение будет сохраняться всегда, как и все значения, записанные за последние 30 минут. При этом ограничивается время, на протяжении которого транзакция может оставаться консистентной (система не допускает длинные транзакции).

Используя перечисленные параметры, можно строить гибкие политики хранения. Например, min_data_versions = 0, max_data_versions = 1, min_data_ttl = 0, max_data_ttl = 86400000 (1 day) разрешают удалять любые данные старше одного дня, сохраняя за последний день только одну версию.

Примечание

Указанные параметры дают системе возможность удалять данные, но не принуждают ее к этому. Операция слияния чанков и удаления данных является фоновой.

Если необходимо принудительно очистить данные, воспользуйтесь атрибутом forced_compaction_revision:

yt set //table/@forced_compaction_revision 1; yt remount-table //table

Приведённый набор команд запускает компактификацию всех данных, записанных до текущего момента. Таким образом, будут удалены как лишние версии-дубликаты, так и логически удаленные данные. Данная операция создает моментальную нагрузку на кластер, которая зависит от объема многоверсионных данных, поэтому данная операция считается административным вмешательством.

Внимание

Установка forced_compaction_revision вызывает сильную нагрузку на кластер. Не рекомендуется использовать данный атрибут без особой необходимости и понимания последствий.

Ещё один популярный сценарий возникает, когда в таблицу добавляются свежие записи по (в среднем) возрастающим ключам, а старые данные при этом удаляются также по (в среднем) возрастающим ключам. В конце таблицы возникают партиции c данными, которые логически удалены, т. е. для них присутствуют tombstones. Данные партиции не будут сжиматься, пока количество чанков в них мало. Размер таблицы, хранимый на диске, постоянно растёт, хотя количество неудалённых данных в ней останется на постоянном уровне. Возможное решение — указать параметр auto_compaction_period, задающий периодичность, с которой партиции будут форсированно компактифицироваться.

Агрегирующие колонки

В случае, если сценарий работы с данными подразумевает постоянное прибавление дельт к значениям, уже записанным в таблице, стоит воспользоваться агрегирующими колонками. В схеме для колонки указывается атрибут aggregate=sum или другая агрегирующая функция. Далее можно делать запись в такую колонку с семантикой прибавления к значению, уже записанному в таблице.

Чтения старого значения не происходит, в таблицу записывается только дельта. Реальное суммирование происходит при чтении. Значения в таблицах хранятся вместе с временной меткой. При чтении из агрегирующей колонки значения, соответствующие одному ключу и одной колонке, но с разными временными метками, суммируются. Для оптимизации, на стадии компактификации старые данные в агрегирующей колонке суммируются, и в таблице остаётся только одно значение, соответствующее их сумме.

Поддерживаются следующие агрегирующие функции: sum, min, max или first.

По умолчанию происходит перезапись того, что находится в таблице. Чтобы записать дельту, в команде записи необходимо указать опцию aggregate=true.

Построчный кеш

Для точечного чтения строк (чтения по полному ключу) из динамической таблицы есть вызов API lookup rows.

В базовом сценарии таблица хранится на диске (hdd, ssd, nvme). Единицей чтения данных с диска, а также единицей сжатия кодеком (lz, gzip, zstd) является блок. При чтении блоков с диска кеширование данных происходит на нескольких уровнях — на уровне операционной системы и на уровне процесса ноды YTsaurus. В процессе ноды YTsaurus блоки кешируются как в сжатом, так и разжатом виде.

Если в кешах не оказывается нужного блока, содержащего ключ, в общем случае происходит чтение блока с диска, передача по сети, разжатие и чтение блока в соответствии с форматом чанков таблицы (unversioned/versioned, scan/lookup). Всё это может занимать довольно большое время. Кроме этого, создается нагрузка на диск, сеть и CPU.

Как правило, при точечных чтениях читается достаточно малая доля данных из блока (меньше 1% процента). Из-за этого при отсутствии блоков, содержащих читаемые ключи, в кеше uncompressed блоков много работы будет сделано впустую.

Для критичного в latency сценария чтения данных по полным ключам из динамической таблицы был единственный вариант — положить данные полностью в память в разжатом виде (атрибут таблицы @in_memory_mode=uncompressed). Минусом такого подхода является то, что для больших таблиц требуется много оперативной памяти.

Довольно частым является сценарий работы с таблицей, когда есть некоторый ограниченный объём часто читаемых данных. Другими словами, вероятность чтения по различным ключам в некоторый момент времени не одинакова. Если есть такой working set горячих данных, то можно воспользоваться lookup кешом.

Lookup cache представляет собой построчный LRU кеш данных — то есть кешируются отдельные строки таблицы. Для кеширования отдельных строк можно было бы пользоваться отдельным инструментом. Например, memcached: cначала делать запрос к memcached, а в случае отсутствия данных в кеше читать из YTsaurus и добавлять в кеш.

Проблема в том, что данные в таблице могут постоянно меняться: если в некоторый момент времени строка таблицы есть в кеше, в этот момент может происходить транзакция, которая изменит или удалит эту строку. Таким образом, для обеспечения консистентности и изоляции данных при чтении необходимо синхронизировать кеш с изменениями данных в таблице. Поэтому кеш встроен в YTsaurus.

Для использования кеша необходимы следующие действия:

  • Запросить квоту в категории памяти lookup_rows_cache (аналогично квоте tablet_static для in memory таблиц).
  • Выставить у таблиц атрибут @lookup_cache_rows_ratio, указывающий, какую долю строк в каждом таблете нужно кешировать. Можно начинать выставлять с долей процента (0.0005) и увеличивать до нескольких процентов (0.03).
  • Выставить у таблицы атрибут @enable_lookup_cache_by_default в значение %true.
  • Сделать remount-table.

После этого все lookup запросы к таблице будут обслуживаться с использованием кеша.

Если же для некоторых запросов нужно отключить использование кеша, это можно сделать, указав опцию use_lookup_cache в самом запросе lookup rows. Это может быть полезно, когда есть фоновый процесс, который периодически меняет данные во всей таблице (делает lookup и последующий insert). Если же делать эти lookup с использование кеша, данные из него могут вымываться. С другой стороны, если таким периодическим процессом происходит update только небольшой части данных в таблице, использование кеша может быть полезно для всех lookup запросов, так как ускорит выполнение периодического процесса.

По умолчанию опция use_lookup_cache имеет значение null. Если use_lookup_cache равен null, кеш работает в зависимости от атрибута @enable_lookup_cache_by_default на таблице (также должен быть сконфигурирован объём кеша). Если в use_lookup_cache передаётся значение true или false, то использование кеша для lookup запроса определяется этой опцией.

Атрибут таблицы @lookup_cache_rows_per_tablet устаревший, его не стоит использовать. Проблема с ним в том, что разные таблеты могут иметь разное количество данных. При фиксировании размера на таблет кеш может работать неэффективно. Кроме этого, при изменении настроек шардирования и росте таблицы фиксированный размер кеша часто перестаёт соответствовать оптимальному.

Если используются ханки, то может быть полезен режим, когда таблица находится в памяти @in_memory_mode=uncompressed, а также включен lookup cache. В этом случае данные в памяти содержат ссылки на ханки. Как правило, объём данных в памяти не очень большой. Lookup cache позволит кешировать ханки и сокращать их чтение с диска, а также потребление сети.

Метрики для диагностики работы lookup cache.

При эффективной работе кеша должно снижаться чтение блоков с диска/из кеша блоков.

Общие метрики lookup:

Метрики чтения с диска и передачи по сети:

  • yt.tablet_node.lookup.chunk_reader_statistics.data_bytes_read_from_disk.rate
  • yt.tablet_node.lookup.chunk_reader_statistics.data_bytes_read_from_cache.rate
  • yt.tablet_node.lookup.chunk_reader_statistics.data_bytes_transmitted.rate

В случае использования ханков ещё метрики:

  • yt.tablet_node.lookup.hunks.chunk_reader_statistics.data_bytes_read_from_disk.rate
  • yt.tablet_node.lookup.hunks.chunk_reader_statistics.data_bytes_transmitted.rate

Метрика времени ответов на запросы lookup rows на таблетной ноде:

  • yt.tablet_node.lookup.duration

Метрики специфичные для lookup cache:

  • yt.tablet_node.lookup.cache_hits.rate - количество ключей в секунду, попадающих в кеш.
  • yt.tablet_node.lookup.cache_misses.rate - количество ключей в секунду, которых нет в кеше.
  • yt.tablet_node.lookup.cache_inserts.rate - количество ключей в секунду, добавляемых в кеш. Это значение может отличаться от cache_misses, если ключей нет вообще в таблице, либо закончилась память для кеша (категории памяти lookup_rows_cache). Отсутствие запрашиваемых ключей в таблице можно определить по метрике yt.tablet_node.lookup.missing_row_count.rate. Чтение отсутствующих ключей всегда приводит к обращению к диску (если таблица не в памяти). Чтобы этого не происходило стоит включить key xor filter.
  • yt.tablet_node.lookup.cache_outdated.rate - количество ключей в секунду, которые есть в кеше, но они устарели из-за того, что их не удалось вовремя обновить, если закончилась память для кеша. При вревышении квоты на память (категория памяти lookup_rows_cache) возникает невозможность обновления уже существующей строчки в кеше. При этом старое значение остается в кеше, но уже не может быть прочитано.

В процессе работы с кешом происходит garbage collection. По этим метрикам можно отслеживать реальный объем используемой памяти.

  • yt.tablet_node.row_cache.slab_allocator.lookup.alive_items - количество текущих аллокаций.
  • yt.tablet_node.row_cache.slab_allocator.lookup.allocated_items.rate - количество аллокаций в секунду.
  • yt.tablet_node.row_cache.slab_allocator.lookup.freed_items.rate - количество деаллокаций в секунду.
  • yt.tablet_node.row_cache.slab_allocator.lookup.arena_size - размеры арен (множества аллокаций одного размера) в байтах.

У этих метрик тег rank, соответствующий размеру аллокации (в байтах): 16, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1022, 1536, 2048, 3072, 4096, 6144, 8192, 12288, 16384, 22576, 32768, large. Large соответствует размерам аллокации больше 32768 байт.

Фильтрация несуществующих ключей

Зачастую паттерн нагрузки на таблицу таков, что чтение по несуществующим ключам является значительной долей запросов. Для того чтобы быстро понимать, что такого ключа в таблице нет, в динамических таблицах реализован механизм фильтрации ключей. Для этого используется xor-фильтр, аналогичный по функциональности фильтру Блума.

Фильтр может быть полезен в двух сценариях:

  • читаемого ключа нет в таблице.
  • читаемый ключ записывается редко, в этом случае мы читаем все чанки партиции, но в большинстве чанков ключа заведомо нет.

Фильтрация поддержана для lookup-запросов, а также для select-запросов, включающих условие на префикс ключа.

Включение фильтра

Для включения фильтра необходимо:

  • Выделить память в бандле под key filter block cache (рекомендуемое значение 500 MB). Это можно сделать самостоятельно в меню Edit Bundle -> Resources, в бандлах не переведенных на новую ресурсную модель значение нужно установить через динамический конфиг нод в поле data_node/block_cache/xor_filter/capacity. Это может сделать администратор кластера.
  • Включить сохранение фильтров при записи. Для этого нужно установить на таблице атрибут @chunk_writer/key_filter/enable = %true. Если атрибут @chunk_writer на таблице отсутствует, нужно установить его в значение {key_filter={enable=%true}}.
  • Если нужна фильтрация для select-запросов, нужно аналогично включить запись через атрибут @chunk_writer/key_filter_prefix и указать в нем параметр prefix_lengths.
  • Выставить @mount_config/enable_key_filter_for_lookup = %true. Эта опция включает использование фильтров как для lookup-запросов, так и для select-запросов.
  • Сделать remount-table на таблице.
  • Для добавления фильтра во все существующие чанки можно использовать forced compaction.

Параметры фильтра

Параметры записи фильтра, указываются в @chunk_writer/key_filter для lookup-запросов и в @chunk_writer/key_filter_prefix для select-запросов:

  • enable - необходимо ли создавать фильтр для блоков чанка.
  • false_positive_rate - процент неверных ответов фильтра (фильтр не отфильтровал ключ, но его на самом деле нет) значение по умолчанию 1 / 256. Чем выше точность, тем больше места будет занимать фильтр.
  • bits_per_key - количество бит в фильтре на ключ.
  • prefix_lengths - список длин префиксов ключей, для которых необходимо создать фильтр. Параметр валиден только для @chunk_writer/key_filter_prefix.

false_positive_rate и bits_per_key не могут быть выставлены одновременно. При выставленном false_positive_rate bits_per_key будет выведен из него. Рекомендуется использовать именно false_positive_rate.

Метрики для диагностики работы фильтра

При эффективной работе фильтра должно снижаться чтение блоков с диска/из кеша блоков.

Для удобного мониторинга эффективности использования фильтра есть уже готовые дашборды Key Filter (потабличный) и Bundle UI Key Filter (побандловый, доступен в интерфейсе бандла).

Общие метрики

Для оценки целесообразности включения фильтра полезно оценить отношение суммарного числа чанков, в которых был произведен поиск ключа, к числу прочитанных ключей (рекомендуем рассмотреть включение фильтра при соотношениие больше 3):

  • yt.tablet_node.lookup.unmerged_missing_row_count.rate / yt.tablet_node.lookup.row_count.rate

Метрики для lookup

Метрики чтений с диска/из кеша:

  • yt.tablet_node.lookup.chunk_reader_statistics.data_bytes_read_from_cache.rate - количество байт, прочитанных lookup-запросами из кеша блоков.
  • yt.tablet_node.lookup.chunk_reader_statistics.data_bytes_read_from_disk.rate - количество байт, прочитанных lookup-запросами с диска.

Метрики, специфичные для фильтра:

  • yt.tablet_node.lookup.key_filter.input_key_count.rate - количество ключей, поступающих на вход фильтру.
  • yt.tablet_node.lookup.key_filter.filtered_out_key_count.rate - количество отфильтрованных фильтром ключей.
  • yt.tablet_node.lookup.key_filter.false_positive_key_count.rate - количество ключей, которых на самом деле не существовало, но фильтр их не отфильтровал.

Метрики для select

Метрики чтений с диска/из кеша:

  • yt.tablet_node.select.chunk_reader_statistics.data_bytes_read_from_cache.rate - количество байт прочитанных select-запросами из кеша блоков.
  • yt.tablet_node.select.chunk_reader_statistics.data_bytes_read_from_disk.rate - количество байт прочитанных select-запросами с диска.

Метрики, специфичные для фильтра:

  • yt.tablet_node.select.range_filter.input_range_count.rate - количество интервалов, поступающих на вход фильтру.
  • yt.tablet_node.select.range_filter.filtered_out_range_count.rate - количество отфильтрованных фильтром интервалов.
  • yt.tablet_node.select.range_filter.false_positive_range_count.rate - количество интервалов, которых на самом деле не существовало, но фильтр их не отфильтровал.
Предыдущая
Следующая