Конфигурации
Список всех поддерживаемых опций Spark содержится в документации.
Основные опции
Большинство опций доступны, начиная с версии 1.23.0, spark.yt.write.writingTypeV3.enabled
– с версии 1.24.0.
spark.yt.write.batchSize
: по умолчанию –500 000
, размер данных, отправляемых через одну операциюWriteTable
;spark.yt.write.miniBatchSize
: по умолчанию –1000
, размер данных, отправляемых на запись в YTsaurus;spark.yt.write.timeout
: по умолчанию –60 seconds
, ограничение на ожидание записи одного мини-батча;spark.yt.write.writingTypeV3.enabled
: по умолчанию –false
, запись таблиц со схемой в формате type_v3, по умолчаниюtype_v1
;spark.yt.read.vectorized.capacity
: по умолчанию –1000
, максимальное количество строк в батче при чтении черезwire protocol ()
;spark.yt.read.arrow.enabled
: по умолчанию –true
, использование считывания батчами при возможности;spark.yt.read.parsingTypeV3.enabled
: по умолчанию –false
, чтение таблиц со схемой в формате type_v3, по умолчаниюtype_v1
;spark.yt.read.keyColumnsFilterPushdown.enabled
: по умолчанию –false
илиPredicate pushdown
. Использование фильтров Spark-запроса для чтения из YTsaurus оптимизирует объем полученных данных из YTsaurus и, соответственно, уменьшает время чтения. При формировании пути добавляется range необходимых строк;spark.yt.read.keyColumnsFilterPushdown.union.enabled
: по умолчанию –false
, при пробросе фильтров происходит объединение в один и из таблицы запрашивается непрерывный диапазон строк;spark.yt.read.keyColumnsFilterPushdown.ytPathCount.limit
: по умолчанию –100
, максимальное количество диапазонов, на которое распадется Spark-запрос чтения;spark.yt.transaction.timeout
: по умолчанию –5 minutes
, timeout на транзакцию записывающей операции;spark.yt.transaction.pingInterval
: по умолчанию –30 seconds
;spark.yt.globalTransaction.enabled
: по умолчанию –false
, использование глобальной транзакции;spark.yt.globalTransaction.id
: по умолчанию –None
, id созданной глобальной транзакции;spark.yt.globalTransaction.timeout
: по умолчанию –2 minutes
, timeout глобальной транзакции;spark.hadoop.yt.user
: по умолчанию – первое доступное: переменные окруженияYT_SECURE_VAULT_YT_USER
илиYT_USER
,user.name
из сис. свойств, пользователь для YTsaurus;spark.hadoop.yt.user
: по умолчанию – первое доступное: переменные окруженияYT_SECURE_VAULT_YT_TOKEN
илиYT_TOKEN
, содержимое файла~/.yt/token
, токен для YTsaurus;spark.hadoop.yt.timeout
: по умолчанию –300 seconds
, timeout на чтения из YTsaurus.
Дополнительные опции
Дополнительные опции передаются через --params
:
spark-launch-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--params '{"spark_conf"={"spark.yt.jarCaching"="True";};"layer_paths"=["//.../ubuntu_xenial_app_lastest.tar.gz";...;];"operation_spec"={"max_failed_job_count"=100;};}' \
--spark-cluster-version '1.36.0'
При использовании spark-submit-yt
для настройки задачи существует опция spark_conf_args
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=1 \
--conf spark.cores.max=1 \
--conf spark.executor.cores=1 \
yt:///sys/spark/examples/grouping_example.py
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path my_discovery_path \
--deploy-mode cluster \
--spark_conf_args '{"spark.sql.shuffle.partitions":1,"spark.cores.max":1,"spark.executor.cores"=1}' \
yt:///sys/spark/examples/grouping_example.py
При запуске из кода можно производить настройку через spark_session.conf.set("...", "...")
.
Пример на Python:
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.conf.set("spark.yt.read.parsingTypeV3.enabled", "true")
spark.read.yt("//sys/spark/examples/test_data").show()
Пример на Java:
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
spark.conf.set("spark.sql.adaptive.enabled", "false");
spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}