SPYT in Python
Steps to launch
- Write some code.
- Post your code and dependencies to YTsaurus. The main
.pyfile and the dependencies in.py,.zip, or.egg. - Build a binary file and post it to YTsaurus (Spark 3.2.2+).
- Run
spark-submit-ytfor submitting to inner standalone cluster orspark-submitfor submitting directly to YTsaurus (available from version 1.76.0)
Differences for submitting directly to YTsaurus
All of the examples below are written for using with inner Spark standalone cluster. There's some differences for creating SparkSession object when submitting directly to YTsaurus. Instead of using with spark_session() or spyt.connect() functions the object should be created explicitly according to Spark recommendations:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('My Application').getOrCreate()
... # Application code
spark.stop()
Running with no dependencies
Main file code
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.read.yt("path_to_file").show()
Launch
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my_discovery_path \
--deploy-mode cluster \
YT_path_to_file
Running with dependencies
Main file code
from my_lib import plus5
from pyspark.sql.functions import col
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.read.yt("path_to_file")\
.withColumn("value", plus5(col("value")))\
.write.mode("overwrite").yt("path_to_file")
Dependency
my_lib.zip containing my_lib.py:
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
plus5 = udf(lambda x: x + 5, LongType())
Code and dependencies in YTsaurus
To be posted to YTsaurus:
-
A file, such as
deps_example.py. -
Dependencies, such as
my_lib.zip.
Launch
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my_discovery_path \
--deploy-mode cluster \
--py-files YT_path_to_lib \
YT_path_to_file
Running with configurations
Spark includes many useful configurations that you can specify when running a job. Such as spark.sql.shuffle.partitions. There is a way to control the amount of resources per job via spark.cores.max and spark.executor.memory. For more information, please see the Spark documentation.
Attention!
In Standalone mode, the num-executors and the spark.executor.instances settings are non-functional while the number of executors depends on the spark.cores.max parameter.
Main file code
from pyspark_yt import spark_session
from pyspark.sql.functions import col
from spyt import spark_session
print("Hello world")
with spark_session() as spark:
spark.read.yt("path_to_file")\
.withColumn("id_mod_10", col("id") % 10)\
.groupBy("id_mod_10")\
.count()\
.write.mode("overwrite").yt("path_to_file")
Code in YTsaurus
You need to post this file to YTsaurus.
Launch
spark-submit-yt \
--proxy <cluster-name> \
--id test \
--discovery-dir //my_discovery_path \
--deploy-mode cluster \
--conf spark.sql.shuffle.partitions=1 \
--conf spark.cores.max=1 \
--conf spark.executor.cores=1 \
YT_path_to_file
Other examples
You can find additional examples in SPYT in Jupyter.
In regular jobs, you can create the spark object the same way as in Jupyter via a call to connector, alternatively, via with spark_session as in the example. The difference between the options is minimal. Jupyter allows you to pass resource settings, while for regular jobs, you would normally do this at launch to avoid reposting code if, for instance, the amount of data had increased.
Example job start and output verification:
import spyt
import time
from spyt.submit import java_gateway, SparkSubmissionClient, SubmissionStatus
user = "user_name"
token = spyt.utils.default_token()
yt_proxy = "cluster_name"
discovery_path = "//my_discovery_path"
spyt_version = "1.4.1"
with java_gateway() as gateway:
client = SparkSubmissionClient(gateway, yt_proxy, discovery_path, spyt_version, user, token)
launcher = (
client
.new_launcher()
.set_app_resource("yt:///sys/spark/examples/smoke_test.py")
.set_conf("spark.pyspark.python", "/opt/python3.7/bin/python3.7")
)
submission_id = client.submit(launcher)
status = client.get_status(submission_id)
while not SubmissionStatus.is_final(status):
status = client.get_status(submission_id)
time.sleep(10)
SubmissionStatus.is_success(status)
SubmissionStatus.is_failure(status)
-
The client operates via
Py4Jcalling the requiredRestSubmissionClientmethods. The stringwith java_gateway() as gatewaywill bring theJVMup with the rightClasspathand exit without error. -
You can use the
launch_gatewayand theshutdown_gatewaymethods to control the creation of aJVMmanually. -
SparkSubmissionClientis a client to submit jobs to a specific cluster. The client finds the location of the master withdiscovery_pathand communicates with it. -
The parameters of the job to be started are described in the
launcherobject. A complete listing of the object's methods is available in the code. They match thespark-submitparameters. -
After launch, you can check job status using the
client.get_statusmethod. The example shows the timeout to get the final output.