Запуск кластера Spark

В данном разделе приведена расширенная инструкция по запуску кластера Spark. Базовые операции по запуску описаны в разделе Быстрый старт.

Внимание

Запущенный кластер Spark статически занимает выданные ему ресурсы. По этой причине рекомендуется запускать кластер в отдельном вычислительном пуле с гарантированными ресурсами. Целесообразно поднимать один кластер на команду и переиспользовать ресурсы между несколькими пользователями. Если не планируется высокой интенсивности запуска Spark задач (чаще одного раза в час) — рекомендуется воспользоваться способом запуска задач напрямую.

Автоскейлер

Для экономии ресурсов вычислительного пула в случае низкой нагрузки на Spark возможно включить специальный режим работы, обеспечивающий пропорциональное уменьшение потребляемых ресурсов – автоскейлер.

Как это работает

У операций YTsaurus есть метод update_operation_parameters, позволяющий изменять параметры операции. Можно менять число джобов в операции через параметр user_slots. При изменении параметра планировщик остановит часть джобов или наоборот запустит новые (в пределах лимита, заданного при старте операции). Поскольку планировщик считает, что все джобы в операции одинаковы, в обычном режиме работы Spark подобный способ масштабирования мог бы привести к потере мастера или history-сервера, а так же воркеров, на которых выполняются драйверы джобов Spark. Чтобы недопустить нарушение работы Spark-кластера, реализован механизм его запуска не в виде одной YTsaurus-операции, а в виде нескольких. В этом случае одна операция выделяется под динамически изменяемый набор воркеров и может масштабироваться в заданных при старте лимитах. В одной или двух других операциях выполняются мастер и history-server (одна операция), драйвер (при запуске драйверов на кластере, вторая операция).

Как запустить кластер с автоскейлером

Используются дополнительные параметры для скрипта запуска spark-launch-yt или аналогичные параметры клиентской библиотеки SPYT:

  • autoscaler-period <период> — частота запуска автоскейлера и (потенциально) смены настройки операции, период задается в виде <длина><единица измерения [d|h|min|s|ms|µs|ns]>;
  • enable-multi-operation-mode — включить режим запуска Spark в нескольких YTsaurus-операциях;
  • enable-dedicated-driver-operation-mode — запускать воркеры для драйверов в отдельной YTsaurus-операции;
  • driver-num <кол-во воркеров> — выделить под драйвер определенное количество воркеров;
  • autoscaler-max-free-workers — максимальное значение свободных воркеров (все лишние воркеры будут остановлены);
  • autoscaler-slot-increment-step — шаг увеличения количества воркеров при автоматическом расширении кластера.

Пример:

$ spark-launch-yt
--proxy <cluster_name>
--autoscaler-period 1s
--enable-multi-operation-mode
--discovery-path //discovery/path

Как обновить кластер

Для того чтобы обновить Spark-кластер, необходимо проделать следующие шаги:

  1. Остановить операцию с текущим кластером в YTsaurus. Ссылку на операцию можно найти с помощью spark-discovery-yt. Также можно воспользоваться флагом --abort-existing команды spark-launch-yt, в этом случае текущий кластер будет остановлен перед стартом нового кластера.
  2. Запустить кластер с помощью spark-launch-yt. Можно указать в аргументе spark-cluster-version нужную версию. Если версия не указана, то будет запущена последняя версия.

Параметры команды spark-launch-yt

Для запуска внутреннего Spark кластера используется команда spark-launch-yt, которой необходимо передавать ряд параметров. Они описаны в таблице ниже:

Параметр Обязательный Значение по умолчанию Описание С какой версии
--discovery-path да - Путь к директории для служебных данных на Кипарисе (Discovery path) -
--proxy нет Значение из переменной окружения YT_PROXY Адрес YTsaurus кластера -
--pool нет - Вычислительный пул YTsaurus, который будет использован для запуска кластера -
--operation-alias нет - Алиас YTsaurus операции со Spark кластером -
--params нет - Дополнительные параметры YTsaurus операции, указываются в виде YSON строки. Подробнее - на странице конфигурации -
--spyt-version нет Версия пакета ytsaurus-spyt на клиенте Версия SPYT, которая будет использована для запуска кластера -
--preemption-mode нет normal Режим вытеснения, используемый планировщиком YTsaurus, допустимые значения 'normal' или 'graceful' -
--enable-mtn, --disable-mtn нет --disable-mtn Использовать Multi-Tenant Network (MTN) для запуска кластера. При использовании MTN также должен быть указан сетевой проект через параметр --network-project -
--network-project нет - Название сетевого проекта, в котором будет запущена операция с кластером -
--prefer-ipv6, --prefer-ipv4 нет --prefer-ipv4 Тип IP адресов, используемых при работе с кластером. В случае использования IPv6 адресации при работе со Spark 3.4.0 и выше необходимо выставлять переменную окружения SPARK_PREFER_IPV6=true -
--enable-tmpfs, --disable-tmpfs нет --disable-tmpfs Использовать часть RAM, выделенной Spark worker-у для монтирования tmpfs. При использовании tmpfs необходимо также указать объём через параметр --tmpfs-limit -
--tmpfs-limit нет 8G Объём памяти, используемой для tmpfs -
--enable-tcp-proxy, --disable-tcp-proxy нет --disable-tcp-proxy Использовать TCP proxy для доступа на кластер извне 1.72.0
--tcp-proxy-range-start нет 30000 Начало диапазона портов, используемых для TCP proxy 1.72.0
--tcp-proxy-range-size нет 100 Размер диапазона портов для TCP proxy 1.72.0
--enable-rpc-job-proxy, --disable-rpc-job-proxy нет --enable-rpc-job-proxy Использовать rpc прокси, встроенный в job прокси. При отключении этой опции будет использована общая rpc прокси, что может привести к деградации производительности кластера 1.77.0
--rpc-job-proxy-thread-pool-size нет 4 Размер пула потоков для rpc job proxy 1.77.0
--group-id нет - Идентификатор discovery группы. Используется при запуске кластера с использованием нескольких операций -
--enable-squashfs, --disable-squashfs нет --disable-squashfs Использовать предварительно подготовленные слои Squash FS в YTsaurus джобах 2.6.0
--cluster-java-home нет /opt/jdk11 Путь к JAVA HOME в YTsaurus контейнерах кластера 2.6.0
--master-memory-limit нет 4G Объём памяти, выделяемой контейнеру с Master сервером -
--master-port нет 27001 Порт для Spark RPC вызовов у Master сервера -
--worker-cores да - Количество ядер CPU, выделяемых одному воркеру -
--worker-memory да - Объём памяти, выделяемый воркеру. Этот объём будет в дальнейшем распределяться между Spark процессами, такими как драйвер и экзекьюторы, запускаемыми на этом воркере -
--worker-num да - Количество воркеров в кластере -
--worker-cores-overhead нет 0 Дополнительный объём ядер CPU, выделяемых воркеру. Эти ядра не будут использоваться непосредственно Spark приложениями, а требуются для вспомогательных процессов, запущенных вместе с воркером -
--worker-memory-overhead нет 2G Дополнительный объём памяти, выделяемой воркеру. Данный объём необходим для работы дополнительных процессов, запускаемых в контейнере воркера, например для дочерних процессов, запускаемых экзекьюторами. Также этот объём нужен в случае использования pyspark, особенно при работе с Python udf. В этом случае экзекьютор будет запускать дочерний Python процесс, в котором будет исполняться код udf, и которому также будет требоваться дополнительная память. Стандартная рекомендация значения для --worker-memory-overhead при работе с Python udf - 40% от объёма, выделяемого воркеру через параметр --worker-memory -
--worker-timeout нет 10 min Максимальное время, которое воркер будет ожидать регистрацию на Spark мастере. По истечении этого времени процесс воркера будет завершён с ошибкой. Доступны следующие единицы времени: s (секунды), m или min (минуты), h (часы), d (сутки). Если единица не указана - значение берётся в секундах -
--worker-gpu-limit нет 0 Количество GPU на воркере -
--worker-disk-name нет default Название диска для заказа в спецификации операции, подробнее в разделе заказ диска в джобах -
--worker-disk-limit нет - Максимальный объём диска при заказе в спецификации операции, подробнее в разделе заказ диска в джобах -
--worker-disk-account нет - Аккаунт, используемый при заказе диска в спецификации операции, подробнее в разделе заказ диска в джобах -
--worker-port нет 27001 Порт для Spark RPC вызовов у Worker сервера -
--enable-history-server, --disable-history-server нет --enable-history-server Флаг для запуска History Server в составе кластера -
--history-server-memory-limit нет 4G Объём памяти для процесса History Server -
--history-server-memory-overhead нет 2G Дополнительный объём памяти, выделяемой для вспомогательных процессов в контейнере с History Server -
--history-server-cpu-limit нет 1 Количество ядер CPU, выделяемых для History Server-а -
--shs-location нет --discovery-path/logs Путь к директории с event log-ами, используемый в History Server -
--ssd-account нет - YTsaurus Аккаунт, используемый для выделения SSD -
--ssd-limit нет 0 Объём SSD диска, выделяемый для YTsaurus джобы -
--abort-existing нет - Остановка запущенного Spark кластера на указанном --discovery-path перед запуском нового -
--cluster-log-level нет INFO Уровень логирования на кластере -
--enable-stderr-table, --disable-stderr-table нет --disable-stderr-table Запись логов YTsaurus операции в таблицу. Таблица находится внутри --discovery-path в директории logs -
--enable-advanced-event-log, --disable-advanced-event-log нет --disable-advanced-event-log Запись event log-ов Spark приложений в динамические таблицы YTsaurus -
--enable-worker-log-transfer, --disable-worker-log-transfer нет --disable-worker-log-transfer Трансфер логов Spark приложений из контейнера с воркером в таблицу на Кипарисе -
--enable-worker-log-json-mode, --disable-worker-log-json-mode нет --disable-worker-log-json-mode Запись логов воркера в формате JSON -
--worker-log-update-interval нет 10 min Периодичность переноса логов Spark приложений с воркера на Кипарис -
--worker-log-table-ttl нет 7d Время жизни таблицы с логами Spark приложений на Кипарисе -
--enable-dedicated-driver-operation-mode, --disable-dedicated-driver-operation-mode нет --disable-dedicated-driver-operation-mode Запуск выделенной YTsaurus операции для драйвера -
--driver-cores нет --worker-cores Число ядер CPU, выделяемых для воркера, используемого для запуска драйверов -
--driver-memory нет --worker-memory Объём памяти, выделяемой для воркера, используемого для запуска драйверов -
--driver-num нет --worker-num Количество воркеров, запускаемых специально для драйверов -
--driver-cores-overhead нет --worker-cores-overhead Дополнительное количество ядер, выделяемое воркерам для драйверов, см. --worker-cores-overhead -
--driver-timeout нет --worker-timeout Максимальное время, которое воркер будет ожидать регистрацию на Spark мастере. По истечении этого времени процесс воркера будет завершён с ошибкой -
--autoscaler-period, --autoscaler-metrics-port, --autoscaler-sliding-window, --autoscaler-max-free-workers, --autoscaler-slot-increment-step нет - Параметры, используемые для работы с автоскейлером -
--enable-livy нет false Запустить Livy сервер в составе кластера. Подробнее про Livy сервер описано на соответвтвующей странице 1.74.0
--livy-driver-cores нет 1 Количество ядер CPU, выделяемых драйверу, запускаемому через Livy 1.74.0
--livy-driver-memory нет 1G Объём памяти CPU, выделяемых драйверу, запускаемому через Livy 1.74.0
--livy-max-sessions нет 3 Максимальное количество сессий, которые одновременно можно запустить на Livy сервере 1.74.0
--id нет - Deprecated. Оставлено для обратной совместимости -
--discovery-dir нет - Deprecated. Оставлено для обратной совместимости -
-h, --help нет - Справка по параметрам команды -