- General scheme of Spark Structured Streaming in SPYT
- Checkpoint storage
- Streaming queues in YTsaurus
- Microbatch processing mechanism in Streaming Query
- Working with offsets
- Write guarantees
- Configuring the number of rows per batch
- Composite types
- Spark Structured Streaming parameters set via Spark methods
- Options and parameters
- Monitoring in Spark webUI
- Best practices
Structured Streaming
SPYT version 1.77.0 introduces support for streaming processes on top of YTsaurus.
General scheme of Spark Structured Streaming in SPYT

Key concepts
Queue — any ordered dynamic table.
Consumer — a sorted table with a fixed schema. A consumer has a many‑to‑many relationship with queues and acts as a consumer of one or more queues. The consumer’s task is to store offsets for partitions of the queues being read.
Source — a data source. In streaming over the Queue API, this is a queue (an ordered dynamic table).
Sink — a data sink. For example, an output dynamic table.
Microbatch — a batch of data processed in a single streaming iteration.
Streaming Query — a continuously running process that processes a data stream in microbatches. It reads data from the Source, applies specified transformations, and writes the results to the Sink. Created by calling the start() method of the DataStreamWriter class.
Checkpoint storage
YTsaurus can serve as a reliable storage for offsets and other metadata. To do so, specify the checkpointLocation option with the value yt:///.... After that, all metadata for this task will be saved at the specified path.
Listing 1 — Example of using checkpoint storage
val numbers = spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
.select($"timestamp", floor(rand() * 10).as("num"))
val groupedNumbers = numbers
.withWatermark("timestamp", "5 seconds")
.groupBy(window($"timestamp", "5 seconds", "3 seconds"), $"num")
.count()
val job = groupedNumbers
.writeStream
.option("checkpointLocation", "yt:///tmp/spark-streaming/sample01/checkpoints")
.trigger(ProcessingTime(1000))
.foreachBatch { (frame: DataFrame, batchNum: Long) =>
frame.write.mode(SaveMode.Append).yt("//tmp/spark-streaming/sample01/result")
}
val query = job.start()
query.awaitTermination()
Streaming queues in YTsaurus
YTsaurus provides its own implementation of queues based on ordered dynamic tables.
Note
Currently, Spark Streaming running on a YTsaurus cluster can only operate on data located in the same YTsaurus cluster.
Before starting a streaming task, create and configure queues according to the documentation. For reading, create queue and consumer tables and register them. Streaming results are written to a pre‑created and mounted ordered dynamic table.
After processing the latest data batch, a new offset is committed, notifying the input table that it can delete obsolete rows.
When using queues for reading and writing, at‑least‑once guarantees apply.
Listing 2 — Example of using queues in Scala
val numbers = spark
.readStream
.format("yt")
.option("consumer_path", "//tmp/spark-streaming/sample02/consumer")
.load("//tmp/spark-streaming/sample02/queue")
val job = numbers
.writeStream
.option("checkpointLocation", "yt:///tmp/spark-streaming/sample02/checkpoints")
.trigger(ProcessingTime(2000))
.format("yt")
.option("path", "//tmp/spark-streaming/sample02/result")
val query = job.start()
query.awaitTermination()
Listing 3 — Example of using queues in Python
from pyspark.sql.functions import length
from spyt import spark_session
with spark_session() as spark:
df = spark \
.readStream \
.format("yt") \
.option("consumer_path", "//tmp/spark-streaming/sample02/consumer") \
.load("//tmp/spark-streaming/sample02/queue")
df\
.select("data") \
.withColumn('data_length', length("data")) \
.writeStream \
.format("yt") \
.option("checkpointLocation", "yt:///tmp/spark-streaming/sample02/checkpoints") \
.option("path", "//tmp/spark-streaming/sample02/result") \
.start().awaitTermination()
Microbatch processing mechanism in Streaming Query
-
Batch initialization
The driver starts processing a new microbatch #N:
- Retrieves the current offsets (lowerIndex) for all queue partitions.
- Calculates the upperIndex for each partition using the formula:
upperIndex = min( lowerIndex + max_rows_per_partition, current_end_of_queue )
-
Comparing lowerIndex and upperIndex
For each partition:
- If lowerIndex < upperIndex:
- Calls the advanceConsumer method to commit offsets for batch
#N‑1in the consumer. - Generates tasks for executors.
- Calls the advanceConsumer method to commit offsets for batch
- Otherwise: skips processing (empty batch).
- If lowerIndex < upperIndex:
-
Execution on executors
Each executor for its partition:
- Reads data using the pullConsumer method.
- Applies transformations (if any).
- Writes data to the output table.
-
Creating checkpoint files on Cypress for batch #N
- A file in the
offsetsdirectory. - A file in the
commitsdirectory.
- A file in the
-
New iteration
- The system moves to batch
#N+1. - Repeats steps 1–3 with new offsets.
- Only in step 2 of iteration
#N+1is batch #N committed if there are available rows.
- The system moves to batch
Working with offsets
Currently, offsets are stored in two locations:
- In checkpoint files automatically created by Spark in a directory.
- In the consumer table.
Determining lowerIndex and upperIndex for each partition
- Spark tries to find the latest checkpoint file on Cypress:
- If the file exists, it retrieves an offset structure containing
lowerIndexfor each partition. - If not, it uses offsets from the consumer.
- If the file exists, it retrieves an offset structure containing
- For each input queue partition, it retrieves the maximum $row_index —
upper_row_index. - It analyzes the
max_rows_per_partitionoption:- If set, it calculates
upperIndexfor each partition using the formula:upperIndex = min( lowerIndex + max_rows_per_partition, upper_row_index )
- If set, it calculates
Possible offset desynchronization
Due to the late call of the commit method implemented in Spark itself, offsets in the consumer may lag behind offsets in checkpoint files by one batch if:
- Batch #N is fully processed, but batch
#N+1has not yet been initialized. - Batch
#N+1is empty and will not be processed because there are no (or no more) unread rows in the input queue.
In this case, offsets in the latest checkpoint file will correspond to the upperIndex of batch #N, and the offset field in the consumer will correspond to upperIndex.
Write guarantees
SPYT Structured Streaming supports several levels of write guarantees. By default, at‑least‑once applies: on restarts, a microbatch may be processed again, leading to duplicates. If duplicates are not acceptable, tools are available to achieve exactly‑once.
| Mode | Guarantee | When to use |
|---|---|---|
| Non‑transactional (default) | at‑least‑once |
When duplicates are acceptable: logs, metrics, cache warming. No additional configuration required |
| Transactional mode (SPYT 2.10+) | exactly‑once for any transformations |
When data accuracy is critical: financial analytics, ML features, incremental data mart construction |
| Idempotent receiver | exactly‑once for stateless 1:1 transformations |
When you want to avoid the additional load on the RPC proxy from transactional mode, or when maintaining legacy code |
For a comparison of modes, operation schemes, and instructions on enabling them, see the article Exactly‑once guarantee.
Configuring the number of rows per batch
By default, Spark attempts to read all available rows in a queue in one streaming microbatch. For queues with many unread rows, this will lead to an OutOfMemoryError on executors. There are two options for setting a row limit per microbatch:
-
The
max_rows_per_partitionoption sets the maximum number of rows that can be read from a single queue partition within one batch. For example:- If a queue consists of 3 partitions and
max_rows_per_partitionis 1 000, no more than 1 000 rows per partition will be read — i.e., no more than 3 000 rows per batch. Partitions are distributed evenly among executors. If there are at least as many executors as partitions, each will process no more than one partition. - In the simplest case, when a queue consists of one partition,
max_rows_per_partitioneffectively sets the row limit per batch.
- If a queue consists of 3 partitions and
-
The configuration parameter
spark.yt.write.dynBatchSizesets the maximum number of rows that can be written to a dynamic table in one call of themodifyRowscommand. For example:- If an executor reads 1 000 rows and
spark.yt.write.dynBatchSizeis 100, 10ModifyRowsRequestinstances will be sequentially generated, each containing 100 rows.
- If an executor reads 1 000 rows and
Listing 5 — Using the max_rows_per_partition option
spark = SparkSession.builder.appName('streaming example') \
.config("spark.yt.write.dynBatchSize", 100) \
.getOrCreate()
df = spark \
.readStream \
.format("yt") \
.option("consumer_path", consumer_path) \
.option("max_rows_per_partition", 1000) \
.load(queue_path)
query = df\
.writeStream \
.outputMode("append") \
.format("yt") \
.option("checkpointLocation", checkpoints_path) \
.option("path", result_table_path) \
.start()
Composite types
To process composite data types with streaming, you must enable the parsing_type_v3 and write_type_v3 options, just as for batch jobs.
Listing 6 — Processing composite types in Structured Streaming
df = spark \
.readStream \
.format("yt") \
.option("consumer_path", consumer_path) \
.option("parsing_type_v3", "true") \
.load(queue_path)
query = df\
.writeStream \
.outputMode("append") \
.format("yt") \
.option("write_type_v3", True) \
.option("checkpointLocation", checkpoints_path) \
.option("path", result_table_path) \
.start()
Spark Structured Streaming parameters set via Spark methods
| Parameter | Description | Required | Default value |
|---|---|---|---|
| format | Format. Specified separately for readStream and writeStream | Yes (for Structured Streaming over dynamic tables, you must specify yt) |
— |
| load | Input queue | Yes | — |
| outputMode | Write mode | No | "append" |
Options and parameters
For the complete reference of streaming options (DataFrameReader/Writer options, service columns, version compatibility matrix), see the Streaming options page. For Spark session parameters (including spark.yt.streaming.transactional and spark.yt.write.dynBatchSize), see the Configuration parameters page.
Monitoring in Spark webUI
The Jobs, Stages, and SQL/DataFrames pages, as in a regular Spark application, are used to monitor jobs, execution stages, and query plans.
The Environment page is used to view all Spark session configuration parameters and some metrics.
On the Executors page, you can see the number of active, failed, and completed tasks and assess how optimally the executor cores are loaded. You can also view memory usage statistics. In case of a memory leak, it is sometimes useful to go to Thread Dump or Heap Histogram to find the cause.

For streaming processes, the webUI displays a Structured Streaming page by default. Here you can view statistics for active and completed streams. In particular, it is useful to check what error caused a Streaming Query to fail.

By clicking Run ID, you can view more detailed statistics for a specific Streaming Query.

Best practices
- Properly configure the
max_rows_per_partitionoption and thespark.yt.write.dynBatchSizeconfig to limit the number of rows processed per batch. Set them not too high, otherwise you risk anOutOfMemoryError. But also not too low, otherwise batches will be created too frequently, significantly increasing the load on master servers and proxy servers in YTsaurus. - Set a low value for the
spark.sql.streaming.minBatchesToRetainconfig. This parameter sets the minimum number of recent batches whose metadata must be stored. These are files in the checkpoint directory on Cypress and objects in the driver’s internal structures. By default, this config’s value is 100. For streaming without stateful transformations, it is sufficient to set--conf spark.sql.streaming.minBatchesToRetain=2. This saves chunks on Cypress and driver memory. - When creating a Spark session, set the following configuration parameters:
.config("spark.streaming.stopGracefullyOnShutdown", True)— Gracefully complete processing of all created batches and clean up resources before stopping the stream..config("spark.streaming.stopGracefullyOnShutdown.timeout", ...)— Time for graceful shutdown in milliseconds..config("spark.sql.adaptive.enabled", False)— Regular adaptive execution does not work for streaming. Disable it..config("spark.sql.streaming.adaptiveExecution.enabled", True)— Enable special adaptive execution for streaming jobs.
- Configure memory allocation using the
spark.memory.fractionandspark.memory.storageFractionparameters, for example:.config("spark.memory.fraction", 0.5)— 50 % of memory for execution, 50 % for storage..config("spark.memory.storageFraction", 0.2)— 20 % of memory allocated to cache and 80 % to data processing (suitable if there is no caching).