Configurations
For a listing of all the options supported by Spark, see the documentation.
Basic options
Most options are available starting with version 1.23.0 if otherwise is not specified.
Name | Default value | Description |
---|---|---|
spark.yt.write.batchSize |
500000 |
The amount of data transferred in a single WriteTable transaction |
spark.yt.write.miniBatchSize |
1000 |
The amount of data part transmitted to WriteTable |
spark.yt.write.timeout |
60 seconds |
The timeout to write a single data block |
spark.yt.write.typeV3.enabled (spark.yt.write.writingTypeV3.enabled before 1.75.2) |
true |
Write tables with schema in type_v3 instead of type_v1 |
spark.yt.read.vectorized.capacity |
1000 |
The maximum number of rows in a batch for reading via wire protocol |
spark.yt.read.arrow.enabled |
true |
Use arrow format for reading (if possible) |
spark.hadoop.yt.timeout |
300 seconds |
The timeout to read from YTsaurus |
spark.yt.read.typeV3.enabled (spark.yt.read.parsingTypeV3.enabled before 1.75.2) |
true |
Read tables with schema in type_v3 instead of type_v1 |
spark.yt.read.keyColumnsFilterPushdown.enabled |
true |
Pushdown Spark query filters to read from YTsaurus |
spark.yt.read.keyColumnsFilterPushdown.union.enabled |
false |
Merge table ranges into a single continius range when filter pushdown is enabled |
spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit |
100 |
The maximum number of ranges for a table generated by filters |
spark.yt.transaction.timeout |
5 minutes |
Write operation transaction timeout |
spark.yt.transaction.pingInterval |
30 seconds |
Ping interval for write operation transaction |
spark.yt.globalTransaction.enabled |
false |
Use of a global transaction |
spark.yt.globalTransaction.id |
None |
Id of a global transaction |
spark.yt.globalTransaction.timeout |
5 minutes |
Global transaction timeout |
spark.hadoop.yt.user |
- | Name of YTsaurus user |
spark.hadoop.yt.timeout |
- | Token of YTsaurus user |
spark.yt.read.ytPartitioning.enabled |
true |
Use YTsaurus API for table partitioning |
spark.yt.read.planOptimization.enabled |
false |
Enable agregation and join optimization rules for sorted data |
spark.yt.read.keyPartitioningSortedTables.enabled |
true |
Use key table ranges for reading, required for plan optimization |
spark.yt.read.keyPartitioningSortedTables.unionLimit |
1 |
The maximum number of partitions that need to be merged when key table ranges are used |
Agregation and join optimizations
Spark cluster ignores a sort metadata about YTsaurus tables and creates many Shuffle and Sort operations, even if it's not required. For these cases extra optimization rules were added. When a logical plan is being constructing, a rule adds sort mark to source vertexes of sorted tables. Later on generating physical plan stage the marks are transformed to nodes that will not do data processing but it will notify optimizer about sorting and partitioning.
Static tables partitioning is performed by index ranges, but the option spark.yt.read.keyPartitioningSortedTables.enabled
enables partitioning by key ranges. When index ranges are being transforming into key ranges the amount of partitions might be reduced. It can lead increased a fraction of data that one executor processes.
Additional options for cluster configuration
Additional options are passed in via --params
:
spark-launch-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
--spark-cluster-version '1.36.0'
When using spark-submit-yt
to configure a task, spark_conf_args
is available as an option:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=1 \
--conf spark.cores.max=1 \
--conf spark.executor.cores=1 \
yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
yt:///sys/spark/examples/grouping_example.py
When launching programmatically, you can configure via spark_session.conf.set("...", "...")
.
Python example:
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
spark.read.yt("//sys/spark/examples/test_data").show()
Java example:
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
spark.conf.set("spark.sql.adaptive.enabled", "false");
spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}