SPYT в Java
Настройка проекта, зависимости, сборка
Все примеры в данном разделе собраны с помощью maven
. См. POM файл.
- Все зависимости, которые нужны для запуска джобов в кластере, указываются в Vanilla-операции при старте кластера. Поэтому в приведённом примере все зависимости добавлены в
scope
–Provided
. Зависимости будут использоваться только для компиляции и не будут включены в.jar
для запуска. - с момента выхода Spark 3.0 проект использует 11 версию Java;
- код собран командой
mvn clean compile assembly:assembly
и выложен в YTsaurus://sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
. Код нужно обязательно выкладывать в YTsaurus, Spark не распределяет пользовательские файлы по воркерам.
Запуск примеров
Каждый пример состоит из одного класса, который унаследован от SparkAppJava
.SparkJavaApp
инициализирует Spark и правильно завершает работу.
Чтение данных из YTsaurus
package tech.ytsaurus.spyt.example;
import org.apache.spark.sql.SparkSession;
import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.SparkAppJava;
public class SmokeTest extends SparkAppJava {
@Override
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
spark.read().format("yt").load("/sys/spark/examples/test_data").show();
}
public static void main(String[] args) {
new SmokeTest().run(args);
}
}
-
Класс джоба унаследован от
SparkAppJava
. Вспомогательный метод методrun
инициализируетSparkSession
и вызывает абстрактный методdoRun
. В методеmain
данный метод вызывается у инстанса. -
SparkSession
— точка входа, через которую происходит взаимодействие с фреймворком. Самый часто используемый метод —read
, позволяющий читать данные из YTsaurus. -
spark.read().format("yt").load("/sys/spark/examples/test_data")
— чтение данных в форматеyt
по пути//sys/spark/examples/test_data
.
Результатом чтения является объект Dataset
, в Scala для него существует алиас DataFrame
. Датафреймы — это внутреннее представление схематизированных таблиц в Spark. Их можно фильтровать, джойнить между собой, добавлять новые колонки. После применения таких операций к датафрейму получается новый датафрейм и его можно использовать в дальнейшей работе. Датафрейм состоит из нескольких партиций, каждая из которых может обрабатываться отдельно. У датафрейма, который прочитан из YTsaurus, столько же партиций, сколько чанков было у таблицы в YTsaurus.
В приведённом примере у датафрейма вызывается метод show()
. В результате в stdout
выводится несколько первых строк датафрейма (по умолчанию 20). Результат в логах будет выглядеть следующим образом:
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+-----+
Запуск осуществляется командой spark-submit-yt
. В аргументах необходимо указать координаты кластера Spark, main class
и запускаемый файл .jar
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.SmokeTest \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Использование UDF
package tech.ytsaurus.spyt.example;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;
import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.SparkAppJava;
public class UdfExample extends SparkAppJava {
@Override
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
Dataset<Row> df = spark.read().format("yt").load("/sys/spark/examples/example_1");
UserDefinedFunction splitUdf = functions.udf((String s) -> s.split("-")[1], DataTypes.StringType);
df
.filter(df.col("id").gt(5))
.select(splitUdf.apply(df.col("uuid")).as("value"))
.write().mode(SaveMode.Overwrite).format("yt")
.save("/sys/spark/examples/example_1_map");
}
public static void main(String[] args) {
new UdfExample().run(args);
}
}
-
Класс джоба унаследован от
SparkAppJava
, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1
. -
Для парсинга
uuid
и получения подстроки, необходимо написать UDF:- Создать лямбда-функцию для парсинга:
(String s) -> s.split("-")[1]
. - Завернуть её в объект с помощью
functions.udf
. - Сообщить Spark тип выходного значения
DataTypes.StringType
.
- Создать лямбда-функцию для парсинга:
-
Фильтрация датафрейма по колонке
id > 5
. -
Применение
udf
к колонкеuuid
, новая колонка называетсяvalue
. -
Сохранение результата в YTsaurus. По умолчанию Spark не перезаписывает существующие таблицы, а выкидывает исключение. Для перезаписи данных используйте
.mode(SaveMode.Overwrite)
.
Результат: //sys/spark/examples/example_1_map
.
Запуск осуществляется командой spark-submit-yt
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.UdfExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Агрегации
package tech.ytsaurus.spyt.example;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import tech.ytsaurus.spyt.SparkAppJava;
import tech.ytsaurus.client.CompoundClient;
public class GroupingExample extends SparkAppJava {
@Override
protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
Dataset<Row> df = spark.read().format("yt").load("/sys/spark/examples/example_1");
Dataset<Row> dictDf = spark.read().format("yt").load("/sys/spark/examples/example_dict");
df
.join(dictDf, df.col("uuid").equalTo(dictDf.col("uuid")), "left_outer")
.groupBy("count")
.agg(functions.max("id").as("max_id"))
.repartition(1)
.write().mode(SaveMode.Overwrite).format("yt").save("/sys/spark/examples/example_1_agg");
}
public static void main(String[] args) {
new GroupingExample().run(args);
}
@Override
protected SparkConf getSparkConf() {
return super.getSparkConf().setAppName("Custom name");
}
}
-
Класс джоба унаследован от
SparkAppJava
, как в предыдущем примере. -
Чтение датафрейма из
//sys/spark/examples/example_1
. -
Чтение датафрейма из
//sys/spark/examples/example_dict
. -
Джойн по колонке
uuid
. В данном примере Spark оценит размер правого датафрейма и решит, что можно сделатьmap-side join
. В терминологии Spark такая ситуация называетсяbroadcast
. -
Группировка результата по
count
, найден максимальныйid
для каждогоcount
. Для группировки Spark сделает перетасовку (shuffle) датафрейма, чтобы записи с одинаковыми ключами оказались в одной партиции. Количество партиций после перетасовки определяется параметромspark.sql.shuffle.partitions
. По умолчанию число партиций равно200
, значение можно изменить при запуске джоба. -
Сохранение результата в YTsaurus. Количество чанков в сохраненной таблице будет равно количеству партиций у записываемого датафрейма. Партиций образовалось много (200), так как была сделана группировка. Для того чтобы при записи не порождать много мелких чанков, перед записью они собираются в одну партицию с помощью
repartition(1)
Результат: //sys/spark/examples/example_1_agg
.
Запуск осуществляется командой spark-submit-yt
. Дополнительно указывается spark.sql.shuffle.partitions
:
spark-submit-yt \
--proxy <cluster-name> \
--discovery-path //my/discovery/path \
--conf spark.sql.shuffle.partitions=8 \
--deploy-mode cluster \
--class tech.ytsaurus.spark.example.GroupingExample \
yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar
Другие примеры
Пример запуска джоба и проверки выходных данных.
Внимание
В зависимости необходимо добавить клиент spark-yt-submit-client_2.12
. В свою очередь у клиента есть зависимости на классы Spark классы в runtime
. Не нужно добавлять данные зависимости в сборку джобов, которые запускаются в кластере. Зависимость на Spark должна быть в значении Provided
, поскольку классы Spark уже присутствуют в кластере. Данная зависимость используется только для запуска джобов.
SubmissionClient
— клиент для отправки джобов на конкретный кластер. Клиент находит координаты мастера поdiscovery_path
и взаимодействует с ним.- Параметры запускаемого джоба описываются в объекте
launcher
. Полный список методов объекта содержится в исходном коде. Методы соответствуют параметрамspark-submit
. - После отправки джоба на кластер можно проверять его статус при помощи метода
client.getStatus
. В примере указано время ожидания финального результата, но это необязательно.