Starting a Spark cluster

This section contains expanded instructions for starting a Spark cluster. Basic start operations are described in the Quick start section.

Attention!

A started Spark cluster statically occupies the resources allocated to it. So, it is recommended that the cluster be started in a separate computational pool with guaranteed resources. It makes sense to use one cluster for the command, and recycle the resources among several users. It is recommended to use direct submitting when it is not planned to run Spark tasks at a high intensity (less than one time in an hour).

Auto-scaler

To save resources of the computational pool if the load is low, a special auto-scaler mode can be turned on in Spark, which proportionally decreases the resources used.

How it works

The operation YTsaurus has an update_operation_parameters method that enables the operation parameters to be changed. The number of jobs in the operation can be changed via the user_slots parameter. When the parameter is changed, the scheduler stops some of the jobs, or launches new ones (within the limit specified at the start of the operation). Since the scheduler believes that all jobs in the operation are the same, this scaling method, performed in the regular mode in Spark, could lead to loss of the master or the history server, as well as of workers that Spark job drivers are being performed for. To prevent disruptions to the Spark cluster's operation, it is started not as a single YTsaurus operation, but as several. This way, one operation is allocated to the dynamically changing set of workers, and can scale within the limits configured at the start. In one or two other operations, a master and history-server (one operation), or a driver (when drivers are launched on the cluster, two operations) are performed.

Starting a cluster with an auto-scaler

Additional parameters are used for the spark-launch-yt launch script, or similar parameters of the SPYT client library.

  • The autoscaler-period <period> is the frequency of auto-scaler launches, and (potentially) of changes in the operation settings. The period is programmed in <length><unit of measurement [d|h|min|s|ms|µs|ns]>.
  • enable-multi-operation-mode turns on Spark start mode in multiple YTsaurus operations.
  • enable-dedicated-driver-operation-mode launches workers for drivers in a separate YTsaurus operation.
  • driver-num <number of workers> allocates a certain number of workers for the driver.
  • autoscaler-max-free-workers is the maximum value of free workers (all superfluous workers will be stopped).
  • autoscaler-slot-increment-step is the increment in which the number of workers is increased when the cluster is automatically expanded.

Example:

$ spark-launch-yt
--proxy <cluster_name>
--autoscaler-period 1s
--enable-multi-operation-mode
--discovery-path //discovery/path

Updating a cluster

To update a Spark cluster, the following actions must be performed:

  1. Stop the operation with the current cluster in YTsaurus. You can find a link to the operation using spark-discovery-yt. You can also use the --abort-existing flag of the spark-launch-yt command. In this case, the current cluster will be stopped before the new one starts.
  2. Start a cluster using spark-launch-yt. The desired version can be specified in the spark-cluster-version argument. If no version is specified, the last version will be started.

spark-launch-yt parameters

To start an internal Spark cluster, use the spark-launch-yt command and pass a number of parameters to it. They are described in the table below:

Parameter Required Default value Description Starting with version
--discovery-path yes - Path to the directory for service data on Cypress (Discovery path). -
--proxy no Value from the YT_PROXY environment variable. YTsaurus cluster address -
--pool no - YTsaurus compute pool that will be used to start the cluster. -
--operation-alias no - Alias of a YTsaurus operation with the Spark cluster. -
--params no - Additional YTsaurus operation parameters specified as a YSON string. Learn more on the configuration page. -
--spyt-version no Version of the ytsaurus-spyt package on the client. SPYT version that will be used to start the cluster. -
--preemption-mode no normal Preemption mode used by the YTsaurus scheduler. Possible values: "normal" or "graceful". -
--enable-mtn, --disable-mtn no --disable-mtn Use Multi-Tenant Network (MTN) to start the cluster. When using MTN, you must also specify the network project via the --network-project parameter. -
--network-project no - Name of the network project where a cluster operation will run. -
--prefer-ipv6, --prefer-ipv4 no --prefer-ipv4 The type of IP addresses used for working with the cluster. If you use IPv6 addressing when working with Spark 3.4.0 and higher, set the SPARK_PREFER_IPV6=true environment variable. -
--enable-tmpfs, --disable-tmpfs no --disable-tmpfs Use part of the RAM allocated to the Spark worker to mount tmpfs. When using tmpfs, you must also specify the amount via the --tmpfs-limit parameter. -
--tmpfs-limit no 8G Amount of memory used for tmpfs. -
--enable-tcp-proxy, --disable-tcp-proxy no --disable-tcp-proxy Use a TCP proxy for external access to the cluster. 1.72.0
--tcp-proxy-range-start no 30000 Start of the range of ports used for the TCP proxy. 1.72.0
--tcp-proxy-range-size no 100 Size of the range of ports used for the TCP proxy. 1.72.0
--enable-rpc-job-proxy, --disable-rpc-job-proxy no --enable-rpc-job-proxy Use an RPC proxy embedded in a job proxy. If this option is disabled, a general RPC proxy will be used, which may result in cluster performance degradation. 1.77.0
--enable-ytsaurus-shuffle, --disable-ytsaurus-shuffle no --disable-ytsaurus-shuffle Use the YTsaurus Shuffle service 2.7.2
--rpc-job-proxy-thread-pool-size no 4 Size of a thread pool for the RPC job proxy. 1.77.0
--group-id no - Discovery group ID. Used when starting a cluster with multiple operations. -
--enable-squashfs, --disable-squashfs no --disable-squashfs Use pre-configured SquashFS layers in YTsaurus jobs. 2.6.0
--cluster-java-home no /opt/jdk11 Path to JAVA HOME in YTsaurus cluster containers. 2.6.0
--master-memory-limit no 4G Amount of memory allocated to the container with the Master server. -
--master-port no 27001 Port for Spark RPC calls on the Master server. -
--worker-cores yes - Number of CPU cores allocated to one worker. -
--worker-memory yes - Amount of memory allocated to a worker. This amount will further be distributed among Spark processes, such as the driver and executors running on that worker. -
--worker-num yes - Number of workers in the cluster. -
--worker-cores-overhead no 0 Extra CPU cores allocated to a worker. These cores will not be used directly by Spark applications, but are required for auxiliary processes launched along with the worker. -
--worker-memory-overhead no 2G Additional memory allocated to a worker. This amount is required for additional processes launched in the worker container, such as child processes run by executors. This amount is also needed when using pyspark, especially when working with Python UDFs. In this case, the executor will start a child Python process that will execute the UDF code and require additional memory. The standard recommended value for --worker-memory-overhead when working with Python UDFs is 40% of the amount allocated to the worker via the --worker-memory parameter. -
--worker-timeout no 10 min The maximum time a worker will wait to register on the Spark master. After this time, the worker process will terminate with an error. The following time units are available: s (seconds), m or min (minutes), h (hours), and d (days). If no unit is specified, defaults to seconds. -
--worker-gpu-limit no 0 Number of GPUs on a worker. -
--worker-disk-name no default Name of the disk requested in the operation specification. For more information, see Disk requests for jobs. -
--worker-disk-limit no - Maximum disk capacity requested in the operation specification. For more information, see Disk requests for jobs. -
--worker-disk-account no - Account for requesting a disk in the operation specification. For more information, see Disk requests for jobs. -
--worker-port no 27001 Port for Spark RPC calls on the Worker server. -
--enable-history-server, --disable-history-server no --enable-history-server Flag for running the History Server as part of the cluster. -
--history-server-memory-limit no 4G Amount of memory for the History Server process. -
--history-server-memory-overhead no 2G Extra memory allocated for auxiliary processes in the container with the History Server. -
--history-server-cpu-limit no 1 Number of CPU cores allocated to the History Server. -
--shs-location no --discovery-path/logs Path to the directory that contains event logs and is used in the History Server. -
--ssd-account no - YTsaurus account used for SSD allocation. -
--ssd-limit no 0 The SSD disk capacity allocated to a YTsaurus job. -
--abort-existing no - Stopping a running Spark cluster on the specified --discovery-path before starting a new one. -
--cluster-log-level no INFO Cluster logging level. -
--enable-stderr-table, --disable-stderr-table no --disable-stderr-table Write YTsaurus operation logs to a table. The table is located inside --discovery-path, in the logs directory. -
--enable-advanced-event-log, --disable-advanced-event-log no --disable-advanced-event-log Write Spark application event logs to dynamic YTsaurus tables. -
--enable-worker-log-transfer, --disable-worker-log-transfer no --disable-worker-log-transfer Transfer Spark application logs from a worker container to a table on Cypress. -
--enable-worker-log-json-mode, --disable-worker-log-json-mode no --disable-worker-log-json-mode Write worker logs in JSON format. -
--worker-log-update-interval no 10 min Frequency of transferring Spark application logs from a worker to Cypress. -
--worker-log-table-ttl no 7d Lifetime of a log table for Spark applications on Cypress. -
--enable-dedicated-driver-operation-mode, --disable-dedicated-driver-operation-mode no --disable-dedicated-driver-operation-mode Run a dedicated YTsaurus operation for a driver. -
--driver-cores no --worker-cores Number of CPU cores allocated to a worker used to run drivers. -
--driver-memory no --worker-memory The amount of memory allocated to a worker used to run drivers. -
--driver-num no --worker-num Number of workers running specifically for drivers. -
--driver-cores-overhead no --worker-cores-overhead Extra cores allocated to workers for drivers (see --worker-cores-overhead). -
--driver-timeout no --worker-timeout The maximum time a worker will await registration on the Spark master. After this time, the worker process will terminate with an error. -
--autoscaler-period, --autoscaler-metrics-port, --autoscaler-sliding-window, --autoscaler-max-free-workers, --autoscaler-slot-increment-step no - Parameters used for working with an auto-scaler. -
--enable-livy no false Run the Livy server as part of the cluster. You can learn more about the Livy server here. 1.74.0
--livy-driver-cores no 1 Number of CPU cores allocated to a driver started via Livy. 1.74.0
--livy-driver-memory no 1G Amount of CPU memory allocated to a driver started via Livy. 1.74.0
--livy-max-sessions no 3 Maximum number of sessions that can run on the Livy server simultaneously. 1.74.0
--id no - Deprecated. Left for backward compatibility. -
--discovery-dir no - Deprecated. Left for backward compatibility. -
-h, --help no - Command parameter reference -