Configurations

For a listing of all the options supported by Spark, see the documentation.

Basic options

Most options are available starting with version 1.23.0 if otherwise is not specified.

Name Default value Description
spark.yt.write.batchSize 500000 The amount of data transferred in a single WriteTable transaction
spark.yt.write.miniBatchSize 1000 The amount of data part transmitted to WriteTable
spark.yt.write.timeout 60 seconds The timeout to write a single data block
spark.yt.write.typeV3.enabled (spark.yt.write.writingTypeV3.enabled before 1.75.2) true Write tables with schema in type_v3 instead of type_v1
spark.yt.read.vectorized.capacity 1000 The maximum number of rows in a batch for reading via wire protocol
spark.yt.read.arrow.enabled true Use arrow format for reading (if possible)
spark.hadoop.yt.timeout 300 seconds The timeout to read from YTsaurus
spark.yt.read.typeV3.enabled (spark.yt.read.parsingTypeV3.enabled before 1.75.2) true Read tables with schema in type_v3 instead of type_v1
spark.yt.read.keyColumnsFilterPushdown.enabled true Pushdown Spark query filters to read from YTsaurus
spark.yt.read.keyColumnsFilterPushdown.union.enabled false Merge table ranges into a single continius range when filter pushdown is enabled
spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit 100 The maximum number of ranges for a table generated by filters
spark.yt.transaction.timeout 5 minutes Write operation transaction timeout
spark.yt.transaction.pingInterval 30 seconds Ping interval for write operation transaction
spark.yt.globalTransaction.enabled false Use of a global transaction
spark.yt.globalTransaction.id None Id of a global transaction
spark.yt.globalTransaction.timeout 5 minutes Global transaction timeout
spark.hadoop.yt.user - Name of YTsaurus user
spark.hadoop.yt.timeout - Token of YTsaurus user
spark.yt.read.ytPartitioning.enabled true Use YTsaurus API for table partitioning
spark.yt.read.planOptimization.enabled false Enable agregation and join optimization rules for sorted data
spark.yt.read.keyPartitioningSortedTables.enabled true Use key table ranges for reading, required for plan optimization
spark.yt.read.keyPartitioningSortedTables.unionLimit 1 The maximum number of partitions that need to be merged when key table ranges are used

Agregation and join optimizations

Spark cluster ignores a sort metadata about YTsaurus tables and creates many Shuffle and Sort operations, even if it's not required. For these cases extra optimization rules were added. When a logical plan is being constructing, a rule adds sort mark to source vertexes of sorted tables. Later on generating physical plan stage the marks are transformed to nodes that will not do data processing but it will notify optimizer about sorting and partitioning.

Static tables partitioning is performed by index ranges, but the option spark.yt.read.keyPartitioningSortedTables.enabled enables partitioning by key ranges. When index ranges are being transforming into key ranges the amount of partitions might be reduced. It can lead increased a fraction of data that one executor processes.

Additional options for cluster configuration

Additional options are passed in via --params:

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
  --spyt-version '2.2.0'

Spark configuration

When using spark-launch-yt to configure a cluster, --params '{"spark_conf"={...};} is available as an option:

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"spark_conf"={"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1};}' \
  --spyt-version '2.2.0'

When using spark-submit-yt to configure a task, spark_conf_args is available as an option:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --conf spark.sql.shuffle.partitions=1 \
  --conf spark.cores.max=1 \
  --conf spark.executor.cores=1 \
  yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
  yt:///sys/spark/examples/grouping_example.py

When launching programmatically, you can configure via spark_session.conf.set("...", "...").

Python example:

from spyt import spark_session

print("Hello world")
with spark_session() as spark:
    spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
    spark.read.yt("//sys/spark/examples/test_data").show()

Java example:

protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
    spark.conf.set("spark.sql.adaptive.enabled", "false");
    spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}

Operation options

When using spark-launch-yt to configure a cluster, --params '{"operation_spec"={...};} is available as an option. List of all operations options.
It will be useful if you need to change the standard operation settings, for example, to increase the number of failed jobs past which the operation is considered failed.

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"operation_spec"={"max_failed_job_count"=100;owners=[...]};}' \
  --spyt-version '2.2.0'

Updating Python version

There are two ways to update the Python version:

  1. Install the required Python version:
    1. Install the required Python version on the exec nodes
    2. Add the Python version to //home/spark/conf/global and the path to the new interpreter.
    3. After that, spark-submit-yt will be able to use it. The --python-version parameter
  2. Build your own image with the required Python version

Installing additional packages

You need to build an image with the installed packages and use it as a base image to run the task.

Building an image with installed packages

Dockerfile example for building a python3.12 image with installed packages:

# Dockerfile
FROM mirror.gcr.io/ubuntu:focal

USER root

RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:deadsnakes/ppa

RUN apt-get update && DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get install -y \
  containerd \
  curl \
  less \
  gdb \
  lsof \
  strace \
  telnet \
  tini \
  zstd \
  unzip \
  dnsutils \
  iputils-ping \
  lsb-release \
  openjdk-11-jdk \
  libidn11-dev \
  python3.12 \
  python3-pip \
  python3.12-dev \
  python3.12-distutils

RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64 /opt/jdk11

RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.12 1 \
    && update-alternatives --install /usr/bin/python python /usr/bin/python3.12 1

COPY ./requirements.txt /requirements.txt

# Ensure pip is installed correctly
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \
    && python3.12 get-pip.py \
    && python3.12 -m pip install --upgrade pip setuptools wheel \
    && rm get-pip.py


RUN python3.12 -m pip install -r requirements.txt
# requirements.txt
ytsaurus-client==0.13.20
ytsaurus-spyt==2.4.1
pyspark==3.3.4

Create a cluster with a docker image

spark-launch-yt \
--params '{operation_spec={tasks={history={docker_image="MY_DOCKER_IMAGE"};master={docker_image="MY_DOCKER_IMAGE"};workers={docker_image="MY_DOCKER_IMAGE"}}}}'