Чтение и запись под транзакцией
Подробнее о транзакциях можно узнать в разделе Транзакции.
По умолчанию Spark не создает транзакций на чтение и не берет никаких блокировок. Чтобы обеспечить консистентность чтения для изменяющихся таблиц, можно создавать транзакцию самостоятельно и перенаправлять её в чтение датафрейма Spark.
Пример на Python:
with yt.Transaction() as tr:
df = spark.read.option("transaction", tr.transaction_id).yt("//sys/spark/examples/example_1")
df.show()
В момент вызова spark.read.yt
берётся блокировка на таблицу (snapshot lock), то есть во время составления плана и до начала физического чтения. В приведённом примере два вызова show()
будут дважды читать таблицу, но обязательно вернут один и тот же результат:
with yt.Transaction() as tr:
df = spark.read.option("transaction", tr.transaction_id).yt("//sys/spark/examples/example_1")
df.show()
time.sleep(60)
df.show()
Примечание
Можно включить создание глобальной транзакции, которая открывается в начале сессии Spark и держится всё время работы джоба. При использовании глобальной транзакции необходимо учесть что джоб будет брать блокировки на все таблицы, которые читает, и будет продолжать держать их до завершения. Использование глобальной транзакции включается параметром конфигурации spark.yt.globalTransaction.enabled=true
.
В случае необходимости записи в таблицу под транзакцией нужно использовать опцию write_transaction
при сохранении датафрейма:
df.write.option('write_transaction', transaction_id).yt("//target/table/path")