SPYT в Java
Настройка проекта, зависимости, сборка
Все примеры в данном разделе собраны с помощью maven
. См. POM файл.
- Все зависимости, которые нужны для запуска джобов в кластере, указываются в Vanilla-операции при старте кластера. Поэтому в приведённом примере все зависимости добавлены в
scope
–Provided
. Зависимости будут использоваться только для компиляции и не будут включены в.jar
для запуска. - с момента выхода Spark 3.0 проект использует 11 версию Java;
- код собран командой
mvn clean compile assembly:assembly
и выложен в YTsaurus://sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
. Код нужно обязательно выкладывать в YTsaurus, Spark не распределяет пользовательские файлы по воркерам.
Запуск примеров
Каждый пример состоит из одного класса, который унаследован от SparkAppJava
.SparkJavaApp
инициализирует Spark и правильно завершает работу. Данный способ нужно использовать в случае работы с внутренним standalone Spark кластером. В случае, если предполагается использовать запуск задач с использованием планировщика YTsaurus, для создания SparkSession
нужно воспользоваться стандартными рекомендациями Spark:
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
class MySparkApplication {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
SparkSession spark = SparkSession.builder.config(conf).getOrCreate();
try {
// Application code
} finally {
spark.stop();
}
}
}
Чтение данных из YTsaurus
-
Класс джоба унаследован от
SparkAppJava
. Вспомогательный метод методrun
инициализируетSparkSession
и вызывает абстрактный методdoRun
. В методеmain
данный метод вызывается у инстанса. -
SparkSession
— точка входа, через которую происходит взаимодействие с фреймворком. Самый часто используемый метод —read
, позволяющий читать данные из YTsaurus. -
spark.read().format("yt").load("/sys/spark/examples/test_data")
— чтение данных в форматеyt
по пути//sys/spark/examples/test_data
.
Результатом чтения является объект Dataset
, в Scala для него существует алиас DataFrame
. Датафреймы — это внутреннее представление схематизированных таблиц в Spark. Их можно фильтровать, джойнить между собой, добавлять новые колонки. После применения таких операций к датафрейму получается новый датафрейм и его можно использовать в дальнейшей работе. Датафрейм состоит из нескольких партиций, каждая из которых может обрабатываться отдельно. У датафрейма, который прочитан из YTsaurus, столько же партиций, сколько чанков было у таблицы в YTsaurus.
В приведённом примере у датафрейма вызывается метод show()
. В результате в stdout
выводится несколько первых строк датафрейма (по умолчанию 20). Результат в логах будет выглядеть следующим образом:
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+-----+
Запуск осуществляется командой spark-submit-yt
. В аргументах необходимо указать координаты кластера Spark, main class
и запускаемый файл .jar
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.SmokeTest \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Использование UDF
-
Класс джоба унаследован от
SparkAppJava
, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1
. -
Для парсинга
uuid
и получения подстроки, необходимо написать UDF:- Создать лямбда-функцию для парсинга:
(String s) -> s.split("-")[1]
. - Завернуть её в объект с помощью
functions.udf
. - Сообщить Spark тип выходного значения
DataTypes.StringType
.
- Создать лямбда-функцию для парсинга:
-
Фильтрация датафрейма по колонке
id > 5
. -
Применение
udf
к колонкеuuid
, новая колонка называетсяvalue
. -
Сохранение результата в YTsaurus. По умолчанию Spark не перезаписывает существующие таблицы, а выкидывает исключение. Для перезаписи данных используйте
.mode(SaveMode.Overwrite)
.
Результат: //sys/spark/examples/example_1_map
.
Запуск осуществляется командой spark-submit-yt
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.UdfExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Агрегации
-
Класс джоба унаследован от
SparkAppJava
, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1
. -
Чтение датафрейма из
//sys/spark/examples/example_dict
. -
Джойн по колонке
uuid
. В данном примере Spark оценит размер правого датафрейма и решит, что можно сделатьmap-side join
. В терминологии Spark такая ситуация называетсяbroadcast
. -
Группировка результата по
count
, найден максимальныйid
для каждогоcount
. Для группировки Spark сделает перетасовку (shuffle) датафрейма, чтобы записи с одинаковыми ключами оказались в одной партиции. Количество партиций после перетасовки определяется параметромspark.sql.shuffle.partitions
. По умолчанию число партиций равно200
, значение можно изменить при запуске джоба. -
Сохранение результата в YTsaurus. Количество чанков в сохраненной таблице будет равно количеству партиций у записываемого датафрейма. Партиций образовалось много (200), так как была сделана группировка. Для того чтобы при записи не порождать много мелких чанков, перед записью они собираются в одну партицию с помощью
repartition(1)
Результат: //sys/spark/examples/example_1_agg
.
Запуск осуществляется командой spark-submit-yt
. Дополнительно указывается spark.sql.shuffle.partitions
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--conf spark.sql.shuffle.partitions=8 \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.GroupingExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Другие примеры
Пример запуска джоба и проверки выходных данных.
Внимание
В зависимости необходимо добавить клиент spark-yt-submit-client_2.12
. В свою очередь у клиента есть зависимости на классы Spark классы в runtime
. Не нужно добавлять данные зависимости в сборку джобов, которые запускаются в кластере. Зависимость на Spark должна быть в значении Provided
, поскольку классы Spark уже присутствуют в кластере. Данная зависимость используется только для запуска джобов.
SubmissionClient
— клиент для отправки джобов на конкретный кластер. Клиент находит координаты мастера поdiscovery_path
и взаимодействует с ним.- Параметры запускаемого джоба описываются в объекте
launcher
. Полный список методов объекта содержится в исходном коде. Методы соответствуют параметрамspark-submit
. - После отправки джоба на кластер можно проверять его статус при помощи метода
client.getStatus
. В примере указано время ожидания финального результата, но это необязательно.