Configurations
For a listing of all the options supported by Spark, see the documentation.
Basic options
Most options are available starting with version 1.23.0 if otherwise is not specified.
Name | Default value | Description |
---|---|---|
spark.yt.write.batchSize |
500000 |
The amount of data transferred in a single WriteTable transaction |
spark.yt.write.miniBatchSize |
1000 |
The amount of data part transmitted to WriteTable |
spark.yt.write.timeout |
60 seconds |
The timeout to write a single data block |
spark.yt.write.typeV3.enabled (spark.yt.write.writingTypeV3.enabled before 1.75.2) |
true |
Write tables with schema in type_v3 instead of type_v1 |
spark.yt.read.vectorized.capacity |
1000 |
The maximum number of rows in a batch for reading via wire protocol |
spark.yt.read.arrow.enabled |
true |
Use arrow format for reading (if possible) |
spark.hadoop.yt.timeout |
300 seconds |
The timeout to read from YTsaurus |
spark.yt.read.typeV3.enabled (spark.yt.read.parsingTypeV3.enabled before 1.75.2) |
true |
Read tables with schema in type_v3 instead of type_v1 |
spark.yt.read.keyColumnsFilterPushdown.enabled |
true |
Pushdown Spark query filters to read from YTsaurus |
spark.yt.read.keyColumnsFilterPushdown.union.enabled |
false |
Merge table ranges into a single continius range when filter pushdown is enabled |
spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit |
100 |
The maximum number of ranges for a table generated by filters |
spark.yt.transaction.timeout |
5 minutes |
Write operation transaction timeout |
spark.yt.transaction.pingInterval |
30 seconds |
Ping interval for write operation transaction |
spark.yt.globalTransaction.enabled |
false |
Use of a global transaction |
spark.yt.globalTransaction.id |
None |
Id of a global transaction |
spark.yt.globalTransaction.timeout |
5 minutes |
Global transaction timeout |
spark.hadoop.yt.user |
- | Name of YTsaurus user |
spark.hadoop.yt.timeout |
- | Token of YTsaurus user |
spark.yt.read.ytPartitioning.enabled |
true |
Use YTsaurus API for table partitioning |
spark.yt.read.planOptimization.enabled |
false |
Enable agregation and join optimization rules for sorted data |
spark.yt.read.keyPartitioningSortedTables.enabled |
true |
Use key table ranges for reading, required for plan optimization |
spark.yt.read.keyPartitioningSortedTables.unionLimit |
1 |
The maximum number of partitions that need to be merged when key table ranges are used |
Agregation and join optimizations
Spark cluster ignores a sort metadata about YTsaurus tables and creates many Shuffle and Sort operations, even if it's not required. For these cases extra optimization rules were added. When a logical plan is being constructing, a rule adds sort mark to source vertexes of sorted tables. Later on generating physical plan stage the marks are transformed to nodes that will not do data processing but it will notify optimizer about sorting and partitioning.
Static tables partitioning is performed by index ranges, but the option spark.yt.read.keyPartitioningSortedTables.enabled
enables partitioning by key ranges. When index ranges are being transforming into key ranges the amount of partitions might be reduced. It can lead increased a fraction of data that one executor processes.
Additional options for cluster configuration
Additional options are passed in via --params
:
spark-launch-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
--spyt-version '2.2.0'
Spark configuration
When using spark-launch-yt
to configure a cluster, --params '{"spark_conf"={...};}
is available as an option:
spark-launch-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--params '{"spark_conf"={"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1};}' \
--spyt-version '2.2.0'
When using spark-submit-yt
to configure a task, spark_conf_args
is available as an option:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=1 \
--conf spark.cores.max=1 \
--conf spark.executor.cores=1 \
yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
yt:///sys/spark/examples/grouping_example.py
When launching programmatically, you can configure via spark_session.conf.set("...", "...")
.
Python example:
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
spark.read.yt("//sys/spark/examples/test_data").show()
Java example:
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
spark.conf.set("spark.sql.adaptive.enabled", "false");
spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}
Operation options
When using spark-launch-yt
to configure a cluster, --params '{"operation_spec"={...};}
is available as an option. List of all operations options.
It will be useful if you need to change the standard operation settings, for example, to increase the number of failed jobs past which the operation is considered failed.
spark-launch-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--params '{"operation_spec"={"max_failed_job_count"=100;owners=[...]};}' \
--spyt-version '2.2.0'
Updating Python version
There are two ways to update the Python version:
- Install the required Python version:
- Install the required Python version on the exec nodes
- Add the Python version to
//home/spark/conf/global
and the path to the new interpreter. - After that,
spark-submit-yt
will be able to use it. The--python-version
parameter
- Build your own image with the required Python version
Installing additional packages
You need to build an image with the installed packages and use it as a base image to run the task.
Building an image with installed packages
Dockerfile example for building a python3.12 image with installed packages:
# Dockerfile
FROM mirror.gcr.io/ubuntu:focal
USER root
RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:deadsnakes/ppa
RUN apt-get update && DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get install -y \
containerd \
curl \
less \
gdb \
lsof \
strace \
telnet \
tini \
zstd \
unzip \
dnsutils \
iputils-ping \
lsb-release \
openjdk-11-jdk \
libidn11-dev \
python3.12 \
python3-pip \
python3.12-dev \
python3.12-distutils
RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64 /opt/jdk11
RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.12 1 \
&& update-alternatives --install /usr/bin/python python /usr/bin/python3.12 1
COPY ./requirements.txt /requirements.txt
# Ensure pip is installed correctly
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \
&& python3.12 get-pip.py \
&& python3.12 -m pip install --upgrade pip setuptools wheel \
&& rm get-pip.py
RUN python3.12 -m pip install -r requirements.txt
# requirements.txt
ytsaurus-client==0.13.20
ytsaurus-spyt==2.4.0
pyspark==3.3.4
Create a cluster with a docker image
spark-launch-yt \
--params '{operation_spec={tasks={history={docker_image="MY_DOCKER_IMAGE"};master={docker_image="MY_DOCKER_IMAGE"};workers={docker_image="MY_DOCKER_IMAGE"}}}}'