SPYT в Java
Настройка проекта, зависимости, сборка
Все примеры в данном разделе собраны с помощью maven. См. POM файл.
- Все зависимости, которые нужны для запуска джобов в кластере, указываются в Vanilla-операции при старте кластера. Поэтому в приведённом примере все зависимости добавлены в
scope–Provided. Зависимости будут использоваться только для компиляции и не будут включены в.jarдля запуска. - с момента выхода Spark 3.0 проект использует 11 версию Java;
- код собран командой
mvn clean compile assembly:assemblyи выложен в YTsaurus://sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar. Код нужно обязательно выкладывать в YTsaurus, Spark не распределяет пользовательские файлы по воркерам.
Запуск примеров
Каждый пример состоит из одного класса, который унаследован от SparkAppJava.SparkJavaApp инициализирует Spark и правильно завершает работу. Данный способ нужно использовать в случае работы с внутренним standalone Spark кластером. В случае, если предполагается использовать запуск задач с использованием планировщика YTsaurus, для создания SparkSession нужно воспользоваться стандартными рекомендациями Spark:
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
class MySparkApplication {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
SparkSession spark = SparkSession.builder.config(conf).getOrCreate();
try {
// Application code
} finally {
spark.stop();
}
}
}
Чтение данных из YTsaurus
-
Класс джоба унаследован от
SparkAppJava. Вспомогательный метод методrunинициализируетSparkSessionи вызывает абстрактный методdoRun. В методеmainданный метод вызывается у инстанса. -
SparkSession— точка входа, через которую происходит взаимодействие с фреймворком. Самый часто используемый метод —read, позволяющий читать данные из YTsaurus. -
spark.read().format("yt").load("/sys/spark/examples/test_data")— чтение данных в форматеytпо пути//sys/spark/examples/test_data.
Результатом чтения является объект Dataset, в Scala для него существует алиас DataFrame. Датафреймы — это внутреннее представление схематизированных таблиц в Spark. Их можно фильтровать, джойнить между собой, добавлять новые колонки. После применения таких операций к датафрейму получается новый датафрейм и его можно использовать в дальнейшей работе. Датафрейм состоит из нескольких партиций, каждая из которых может обрабатываться отдельно. У датафрейма, который прочитан из YTsaurus, столько же партиций, сколько чанков было у таблицы в YTsaurus.
В приведённом примере у датафрейма вызывается метод show(). В результате в stdout выводится несколько первых строк датафрейма (по умолчанию 20). Результат в логах будет выглядеть следующим образом:
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+-----+
Запуск осуществляется командой spark-submit-yt. В аргументах необходимо указать координаты кластера Spark, main class и запускаемый файл .jar:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.SmokeTest \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Использование UDF
-
Класс джоба унаследован от
SparkAppJava, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1. -
Для парсинга
uuidи получения подстроки, необходимо написать UDF:- Создать лямбда-функцию для парсинга:
(String s) -> s.split("-")[1]. - Завернуть её в объект с помощью
functions.udf. - Сообщить Spark тип выходного значения
DataTypes.StringType.
- Создать лямбда-функцию для парсинга:
-
Фильтрация датафрейма по колонке
id > 5. -
Применение
udfк колонкеuuid, новая колонка называетсяvalue. -
Сохранение результата в YTsaurus. По умолчанию Spark не перезаписывает существующие таблицы, а выкидывает исключение. Для перезаписи данных используйте
.mode(SaveMode.Overwrite).
Результат: //sys/spark/examples/example_1_map.
Запуск осуществляется командой spark-submit-yt:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.UdfExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Агрегации
-
Класс джоба унаследован от
SparkAppJava, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1. -
Чтение датафрейма из
//sys/spark/examples/example_dict. -
Джойн по колонке
uuid. В данном примере Spark оценит размер правого датафрейма и решит, что можно сделатьmap-side join. В терминологии Spark такая ситуация называетсяbroadcast. -
Группировка результата по
count, найден максимальныйidдля каждогоcount. Для группировки Spark сделает перетасовку (shuffle) датафрейма, чтобы записи с одинаковыми ключами оказались в одной партиции. Количество партиций после перетасовки определяется параметромspark.sql.shuffle.partitions. По умолчанию число партиций равно200, значение можно изменить при запуске джоба. -
Сохранение результата в YTsaurus. Количество чанков в сохраненной таблице будет равно количеству партиций у записываемого датафрейма. Партиций образовалось много (200), так как была сделана группировка. Для того чтобы при записи не порождать много мелких чанков, перед записью они собираются в одну партицию с помощью
repartition(1)
Результат: //sys/spark/examples/example_1_agg.
Запуск осуществляется командой spark-submit-yt. Дополнительно указывается spark.sql.shuffle.partitions:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--conf spark.sql.shuffle.partitions=8 \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.GroupingExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Другие примеры
Пример запуска джоба и проверки выходных данных.
Внимание
В зависимости необходимо добавить клиент spark-yt-submit-client_2.12. В свою очередь у клиента есть зависимости на классы Spark классы в runtime. Не нужно добавлять данные зависимости в сборку джобов, которые запускаются в кластере. Зависимость на Spark должна быть в значении Provided, поскольку классы Spark уже присутствуют в кластере. Данная зависимость используется только для запуска джобов.
SubmissionClient— клиент для отправки джобов на конкретный кластер. Клиент находит координаты мастера поdiscovery_pathи взаимодействует с ним.- Параметры запускаемого джоба описываются в объекте
launcher. Полный список методов объекта содержится в исходном коде. Методы соответствуют параметрамspark-submit. - После отправки джоба на кластер можно проверять его статус при помощи метода
client.getStatus. В примере указано время ожидания финального результата, но это необязательно.