Structured Streaming
SPYT 1.77.0 introduced support for streaming processes over YTsaurus.
Checkpoint storage
YTsaurus can serve as a reliable storage for offsets and other metadata. For this, specify the checkpointLocation
option with the yt:///...
value. This saves all subsequent metainformation about the task at the specified path.
Listing 1 — Example of using checkpoint storage
val numbers = spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.select($"timestamp", floor(rand() * 10).as("num"))
val groupedNumbers = numbers
.withWatermark("timestamp", "5 seconds")
.groupBy(window($"timestamp", "5 seconds", "3 seconds"), $"num")
.count()
val job = groupedNumbers
.writeStream
.option("checkpointLocation", "yt:///home/spark-streaming/sample01/checkpoints")
.trigger(ProcessingTime(1000))
.foreachBatch { (frame: DataFrame, batchNum: Long) =>
frame.write.mode(SaveMode.Append).yt("//home/spark-streaming/sample01/result")
}
val query = job.start()
query.awaitTermination()
Streaming YTsaurus queues
YTsaurus has its own implementation of queues based on ordered dynamic tables.
Currently, the SPYT cluster can operate only on data located on the same YTsaurus cluster. This limitation also applies to queues and consumers.
Before starting a streaming task, create and configure queues as described in the documentation. For read tasks, create queue and consumer tables, and then register the consumers. Streaming results are written to an ordered dynamic table that is created and mounted in advance.
After processing a chunk of data, a new offset is committed to notify the input table to delete unnecessary rows.
When using queues for reading and writing, at-least-once guarantees are applicable.
Listing 2 — Example of using queues in Scala
val numbers = spark
.readStream
.format("yt")
.option("consumer_path", "//home/spark-streaming/sample02/consumer")
.load("//home/spark-streaming/sample02/queue")
val job = numbers
.writeStream
.option("checkpointLocation", "yt:///home/spark-streaming/sample02/checkpoints")
.trigger(ProcessingTime(2000))
.format("yt")
.option("path", "//home/spark-streaming/sample02/result")
val query = job.start()
query.awaitTermination()
Listing 3 — Example of using queues in Python
from pyspark.sql.functions import length
from spyt import spark_session
with spark_session() as spark:
df = spark \
.readStream \
.format("yt") \
.option("consumer_path", "//home/spark-streaming/sample02/consumer") \
.load("//home/spark-streaming/sample02/queue")
df\
.select("data") \
.withColumn('data_length', length("data")) \
.writeStream \
.format("yt") \
.option("checkpointLocation", "yt:///home/spark-streaming/sample02/checkpoints") \
.option("path", "//home/spark-streaming/sample02/result") \
.start().awaitTermination()