Structured Streaming
В версии SPYT 1.77.0 появилась поддержка стриминговых процессов поверх YTsaurus.
Хранилище чекпоинтов
YTsaurus может выступать в роли надежного хранилища оффсетов и других метаданных. Для этого необходимо указать опцию checkpointLocation
со значением yt:///...
. После чего вся метаинформация об этой задаче будет сохраняться по указанному пути.
Листинг 1 — Пример использования хранилища чекпоинтов
val numbers = spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.select($"timestamp", floor(rand() * 10).as("num"))
val groupedNumbers = numbers
.withWatermark("timestamp", "5 seconds")
.groupBy(window($"timestamp", "5 seconds", "3 seconds"), $"num")
.count()
val job = groupedNumbers
.writeStream
.option("checkpointLocation", "yt:///home/spark-streaming/sample01/checkpoints")
.trigger(ProcessingTime(1000))
.foreachBatch { (frame: DataFrame, batchNum: Long) =>
frame.write.mode(SaveMode.Append).yt("//home/spark-streaming/sample01/result")
}
val query = job.start()
query.awaitTermination()
Стриминг очередей YTsaurus
YTsaurus имеет собственную реализацию очередей, основанных на упорядоченных динамических таблицах.
На текущий момент SPYT кластер способен оперировать только с данными, расположенными на том же кластере YTsaurus. Это ограничение накладывается и на консьюмеры/очереди.
Перед запуском стриминговой задачи необходимо создать и настроить очереди в соответствии с документацией. В случае чтения — создать таблицы очередей и консьюмеров, произвести регистрацию. Запись результатов стриминга производится в упорядоченную динамическую таблицу, созданную и примонтированную заранее.
После обработки очередной порции данных совершается коммит нового смещения, что позволяет уведомлять входную таблицу о возможности удалить неактуальные строки.
При использовании очередей на чтение и на запись действуют гарантии at-least-once.
Листинг 2 — Пример использования очередей на языке Scala
val numbers = spark
.readStream
.format("yt")
.option("consumer_path", "//home/spark-streaming/sample02/consumer")
.load("//home/spark-streaming/sample02/queue")
val job = numbers
.writeStream
.option("checkpointLocation", "yt:///home/spark-streaming/sample02/checkpoints")
.trigger(ProcessingTime(2000))
.format("yt")
.option("path", "//home/spark-streaming/sample02/result")
val query = job.start()
query.awaitTermination()
Листинг 3 — Пример использования очередей на языке Python
from pyspark.sql.functions import length
from spyt import spark_session
with spark_session() as spark:
df = spark \
.readStream \
.format("yt") \
.option("consumer_path", "//home/spark-streaming/sample02/consumer") \
.load("//home/spark-streaming/sample02/queue")
df\
.select("data") \
.withColumn('data_length', length("data")) \
.writeStream \
.format("yt") \
.option("checkpointLocation", "yt:///home/spark-streaming/sample02/checkpoints") \
.option("path", "//home/spark-streaming/sample02/result") \
.start().awaitTermination()