Configurations

For a listing of all the options supported by Spark, see the documentation.

Basic options

Most options are available starting with version 1.23.0 if otherwise is not specified.

Name Default value Description
spark.yt.write.batchSize 500000 The amount of data transferred in a single WriteTable transaction
spark.yt.write.miniBatchSize 1000 The amount of data part transmitted to WriteTable
spark.yt.write.timeout 60 seconds The timeout to write a single data block
spark.yt.write.typeV3.enabled (spark.yt.write.writingTypeV3.enabled before 1.75.2) true Write tables with schema in type_v3 instead of type_v1
spark.yt.read.vectorized.capacity 1000 The maximum number of rows in a batch for reading via wire protocol
spark.yt.read.arrow.enabled true Use arrow format for reading (if possible)
spark.hadoop.yt.timeout 300 seconds The timeout to read from YTsaurus
spark.yt.read.typeV3.enabled (spark.yt.read.parsingTypeV3.enabled before 1.75.2) true Read tables with schema in type_v3 instead of type_v1
spark.yt.read.keyColumnsFilterPushdown.enabled true Pushdown Spark query filters to read from YTsaurus
spark.yt.read.keyColumnsFilterPushdown.union.enabled false Merge table ranges into a single continius range when filter pushdown is enabled
spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit 100 The maximum number of ranges for a table generated by filters
spark.yt.transaction.timeout 5 minutes Write operation transaction timeout
spark.yt.transaction.pingInterval 30 seconds Ping interval for write operation transaction
spark.yt.globalTransaction.enabled false Use of a global transaction
spark.yt.globalTransaction.id None Id of a global transaction
spark.yt.globalTransaction.timeout 5 minutes Global transaction timeout
spark.hadoop.yt.user - Name of YTsaurus user
spark.hadoop.yt.timeout - Token of YTsaurus user
spark.yt.read.ytPartitioning.enabled true Use YTsaurus API for table partitioning
spark.yt.read.planOptimization.enabled false Enable agregation and join optimization rules for sorted data
spark.yt.read.keyPartitioningSortedTables.enabled true Use key table ranges for reading, required for plan optimization
spark.yt.read.keyPartitioningSortedTables.unionLimit 1 The maximum number of partitions that need to be merged when key table ranges are used

Agregation and join optimizations

Spark cluster ignores a sort metadata about YTsaurus tables and creates many Shuffle and Sort operations, even if it's not required. For these cases extra optimization rules were added. When a logical plan is being constructing, a rule adds sort mark to source vertexes of sorted tables. Later on generating physical plan stage the marks are transformed to nodes that will not do data processing but it will notify optimizer about sorting and partitioning.

Static tables partitioning is performed by index ranges, but the option spark.yt.read.keyPartitioningSortedTables.enabled enables partitioning by key ranges. When index ranges are being transforming into key ranges the amount of partitions might be reduced. It can lead increased a fraction of data that one executor processes.

Additional options for cluster configuration

Additional options are passed in via --params:

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
  --spark-cluster-version '1.36.0'

When using spark-submit-yt to configure a task, spark_conf_args is available as an option:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --conf spark.sql.shuffle.partitions=1 \
  --conf spark.cores.max=1 \
  --conf spark.executor.cores=1 \
  yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
  yt:///sys/spark/examples/grouping_example.py

When launching programmatically, you can configure via spark_session.conf.set("...", "...").

Python example:

from spyt import spark_session

print("Hello world")
with spark_session() as spark:
    spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
    spark.read.yt("//sys/spark/examples/test_data").show()

Java example:

protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
    spark.conf.set("spark.sql.adaptive.enabled", "false");
    spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}