Конфигурации

Список всех поддерживаемых опций Spark содержится в документации.

Основные опции

Большинство опций доступны начиная с версии 1.23.0, если не указано иное.

Имя Значение по умолчанию Описание
spark.yt.write.batchSize 500000 Размер данных, отправляемых через одну операцию WriteTable
spark.yt.write.miniBatchSize 1000 Размер блока данных, отправляемого в WriteTable
spark.yt.write.timeout 60 seconds Ограничение на ожидание записи одного блока данных
spark.yt.write.typeV3.enabled (spark.yt.write.writingTypeV3.enabled до 1.75.2) true Запись таблиц со схемой в формате type_v3 вместо type_v1
spark.yt.read.vectorized.capacity 1000 Максимальное количество строк в батче при чтении через wire протокол
spark.yt.read.arrow.enabled true Использовать arrow формат для чтения данных (если это возможно)
spark.hadoop.yt.timeout 300 seconds Таймаут на чтение из YTsaurus
spark.yt.read.typeV3.enabled (spark.yt.read.parsingTypeV3.enabled до 1.75.2) true Чтение таблиц со схемой в формате type_v3 вместо type_v1
spark.yt.read.keyColumnsFilterPushdown.enabled true Использовать фильтры Spark-запроса для выборочного чтения из YTsaurus
spark.yt.read.keyColumnsFilterPushdown.union.enabled false Объединять все фильтры в непрерывный диапазон при выборочном чтении
spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit 100 Максимальное количество диапазонов таблицы при выборочном чтении
spark.yt.transaction.timeout 5 minutes Таймаут на транзакцию записывающей операции
spark.yt.transaction.pingInterval 30 seconds Периодичность пингования транзакции записывающей операции
spark.yt.globalTransaction.enabled false Использовать глобальную транзакцию
spark.yt.globalTransaction.id None Идентификатор глобальной транзакции
spark.yt.globalTransaction.timeout 5 minutes Таймаут глобальной транзакции
spark.hadoop.yt.user - Имя пользователя YTsaurus
spark.hadoop.yt.token - Токен пользователя YTsaurus
spark.yt.read.ytPartitioning.enabled true Использовать партиционирование таблиц средствами YTsaurus
spark.yt.read.planOptimization.enabled false Оптимизировать агрегации и джойны на сортированных входных данных
spark.yt.read.keyPartitioningSortedTables.enabled true Использовать партиционирование по ключам для сортированных таблиц, необходимо для оптимизации планов
spark.yt.read.keyPartitioningSortedTables.unionLimit 1 Максимальное количество объединений партиций при переходе от чтения по индексам к чтению по ключам

Оптимизация агрегаций и джойнов

Spark кластер при чтении игнорирует метаинформацию о сортированности таблиц YTsaurus, создавая планы с множеством Shuffle и Sort стадий. Для более эффективной работы были внедрены дополнительные правила оптимизации агрегаций и джойнов поверх сортированных данных. На этапе построения логического плана к вершинам чтения добавляются пометки о сортированности. Позже при создании физического плана эти пометки превращаются в физические вершины, которые не производят действий над данными, но уведомляют планировщик о способе сортировки и партиционирования данных.

Партиционирование статических таблиц производится по индексам строк, однако опция spark.yt.read.keyPartitioningSortedTables.enabled включает партиционирование и чтение по ключам. При таком переходе возможно уменьшение количества партиций, если ключи оказываются достаточно большими. Это может повлечь увеличение количества данных, которые приходятся на один экзекьютор.

Дополнительные опции конфигурации кластера

Дополнительные опции передаются через --params:

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
  --spyt-version '2.2.0'

Spark configuration

При использовании spark-launch-yt для настройки кластера доступна опция --params '{"spark_conf"={...};}:

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"spark_conf"={"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1};}' \
  --spyt-version '2.2.0'

При использовании spark-submit-yt для настройки задачи существует опция spark_conf_args:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --conf spark.sql.shuffle.partitions=1 \
  --conf spark.cores.max=1 \
  --conf spark.executor.cores=1 \
  yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --deploy-mode cluster \
  --spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
  yt:///sys/spark/examples/grouping_example.py

При запуске из кода можно производить настройку через spark_session.conf.set("...", "...").

Пример на Python:

from spyt import spark_session

print("Hello world")
with spark_session() as spark:
    spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
    spark.read.yt("//sys/spark/examples/test_data").show()

Пример на Java:

protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
    spark.conf.set("spark.sql.adaptive.enabled", "false");
    spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}

Настройки операций

При использовании spark-launch-yt для настройки кластера доступна опция --params '{"operation_spec"={...};}. Список всех поддерживаемых опций.
Это будет полезно, если необходимо изменить стандартные настройки операции например для увеличения количество failed джобов, после которого операция считается failed.

spark-launch-yt \
  --proxy <cluster-name> \
  --discovery-path my_discovery_path \
  --params '{"operation_spec"={"max_failed_job_count"=100;owners=[...]};}' \
  --spyt-version '2.2.0'

Обновление версии python

Существуют два способа обновления версии python:

  1. Установить необходимую версию python:
    1. Установить необходимую версию python на exeс nodes
    2. Добавит версию python //home/spark/conf/global и путь к новому интерпретатору.
    3. После этого в spark-submit-yt будет возможность использовать его. Параметр --python-version
  2. Собрать свой образ с необходимой версией python

Установка дополнительных пакетов

Необходимо собрать образ с установленными пакетами и использовать его в качестве базового образа для запуска задачи.

Сборка образа с установленными пакетами

Сборка образа

Пример Dockerfile для сборки образа python3.12 с установленными пакетами:

# Dockerfile
FROM mirror.gcr.io/ubuntu:focal

USER root

RUN apt-get update && apt-get install -y software-properties-common
RUN add-apt-repository ppa:deadsnakes/ppa

RUN apt-get update && DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC apt-get install -y \
  containerd \
  curl \
  less \
  gdb \
  lsof \
  strace \
  telnet \
  tini \
  zstd \
  unzip \
  dnsutils \
  iputils-ping \
  lsb-release \
  openjdk-11-jdk \
  libidn11-dev \
  python3.12 \
  python3-pip \
  python3.12-dev \
  python3.12-distutils

RUN ln -s /usr/lib/jvm/java-11-openjdk-amd64 /opt/jdk11

RUN update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.12 1 \
    && update-alternatives --install /usr/bin/python python /usr/bin/python3.12 1

COPY ./requirements.txt /requirements.txt

# Ensure pip is installed correctly
RUN curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py \
    && python3.12 get-pip.py \
    && python3.12 -m pip install --upgrade pip setuptools wheel \
    && rm get-pip.py


RUN python3.12 -m pip install -r requirements.txt
# requirements.txt
ytsaurus-client==0.13.18
ytsaurus-spyt==2.3.0
pyspark==3.3.4

Запуск кластера с docker образом

spark-launch-yt \
--params '{operation_spec={tasks={history={docker_image="MY_DOCKER_IMAGE"};master={docker_image="MY_DOCKER_IMAGE"};workers={docker_image="MY_DOCKER_IMAGE"}}}}'
Предыдущая
Следующая