SPYT в Java

Настройка проекта, зависимости, сборка

Все примеры в данном разделе собраны с помощью maven. См. POM файл.

  • Все зависимости, которые нужны для запуска джобов в кластере, указываются в Vanilla-операции при старте кластера. Поэтому в приведённом примере все зависимости добавлены в scopeProvided. Зависимости будут использоваться только для компиляции и не будут включены в .jar для запуска.
  • с момента выхода Spark 3.0 проект использует 11 версию Java;
  • код собран командой mvn clean compile assembly:assembly и выложен в YTsaurus: //sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar. Код нужно обязательно выкладывать в YTsaurus, Spark не распределяет пользовательские файлы по воркерам.

Запуск примеров

Каждый пример состоит из одного класса, который унаследован от SparkAppJava.SparkJavaApp инициализирует Spark и правильно завершает работу.

Чтение данных из YTsaurus

package tech.ytsaurus.spyt.example;

import org.apache.spark.sql.SparkSession;

import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.SparkAppJava;

public class SmokeTest extends SparkAppJava {
    @Override
    protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
        spark.read().format("yt").load("/sys/spark/examples/test_data").show();
    }

    public static void main(String[] args) {
        new SmokeTest().run(args);
    }
}

  1. Класс джоба унаследован от SparkAppJava. Вспомогательный метод метод run инициализирует SparkSession и вызывает абстрактный метод doRun. В методе main данный метод вызывается у инстанса.

  2. SparkSession — точка входа, через которую происходит взаимодействие с фреймворком. Самый часто используемый метод — read, позволяющий читать данные из YTsaurus.

  3. spark.read().format("yt").load("/sys/spark/examples/test_data") — чтение данных в формате yt по пути //sys/spark/examples/test_data.

Результатом чтения является объект Dataset, в Scala для него существует алиас DataFrame. Датафреймы — это внутреннее представление схематизированных таблиц в Spark. Их можно фильтровать, джойнить между собой, добавлять новые колонки. После применения таких операций к датафрейму получается новый датафрейм и его можно использовать в дальнейшей работе. Датафрейм состоит из нескольких партиций, каждая из которых может обрабатываться отдельно. У датафрейма, который прочитан из YTsaurus, столько же партиций, сколько чанков было у таблицы в YTsaurus.

В приведённом примере у датафрейма вызывается метод show(). В результате в stdout выводится несколько первых строк датафрейма (по умолчанию 20). Результат в логах будет выглядеть следующим образом:

+-----+
|value|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    6|
|    7|
|    8|
|    9|
|   10|
+-----+

Запуск осуществляется командой spark-submit-yt. В аргументах необходимо указать координаты кластера Spark, main class и запускаемый файл .jar:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path //my/discovery/path \
  --deploy-mode cluster \
  --class tech.ytsaurus.spark.example.SmokeTest \
  yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar

Использование UDF

package tech.ytsaurus.spyt.example;

import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.types.DataTypes;

import tech.ytsaurus.client.CompoundClient;
import tech.ytsaurus.spyt.SparkAppJava;

public class UdfExample extends SparkAppJava {
    @Override
    protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
        Dataset<Row> df = spark.read().format("yt").load("/sys/spark/examples/example_1");
        UserDefinedFunction splitUdf = functions.udf((String s) -> s.split("-")[1], DataTypes.StringType);

        df
          .filter(df.col("id").gt(5))
          .select(splitUdf.apply(df.col("uuid")).as("value"))
          .write().mode(SaveMode.Overwrite).format("yt")
          .save("/sys/spark/examples/example_1_map");
    }

    public static void main(String[] args) {
        new UdfExample().run(args);
    }
}

  1. Класс джоба унаследован от SparkAppJava, как в предыдущем примере.

  2. Чтение датафрейма из //sys/spark/examples/example_1.

  3. Для парсинга uuid и получения подстроки, необходимо написать UDF:

    • Создать лямбда-функцию для парсинга: (String s) -> s.split("-")[1].
    • Завернуть её в объект с помощью functions.udf.
    • Сообщить Spark тип выходного значения DataTypes.StringType.
  4. Фильтрация датафрейма по колонке id > 5.

  5. Применение udf к колонке uuid, новая колонка называется value.

  6. Сохранение результата в YTsaurus. По умолчанию Spark не перезаписывает существующие таблицы, а выкидывает исключение. Для перезаписи данных используйте .mode(SaveMode.Overwrite).

Результат: //sys/spark/examples/example_1_map.

Запуск осуществляется командой spark-submit-yt:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path //my/discovery/path \
  --deploy-mode cluster \
  --class tech.ytsaurus.spark.example.UdfExample \
  yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar

Агрегации

package tech.ytsaurus.spyt.example;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;

import tech.ytsaurus.spyt.SparkAppJava;
import tech.ytsaurus.client.CompoundClient;

public class GroupingExample extends SparkAppJava {
    @Override
    protected void doRun(String[] args, SparkSession spark, CompoundClient yt) {
        Dataset<Row> df = spark.read().format("yt").load("/sys/spark/examples/example_1");
        Dataset<Row> dictDf = spark.read().format("yt").load("/sys/spark/examples/example_dict");

        df
          .join(dictDf, df.col("uuid").equalTo(dictDf.col("uuid")), "left_outer")
          .groupBy("count")
          .agg(functions.max("id").as("max_id"))
          .repartition(1)
          .write().mode(SaveMode.Overwrite).format("yt").save("/sys/spark/examples/example_1_agg");
    }

    public static void main(String[] args) {
        new GroupingExample().run(args);
    }

    @Override
    protected SparkConf getSparkConf() {
        return super.getSparkConf().setAppName("Custom name");
    }
}

  1. Класс джоба унаследован от SparkAppJava, как в предыдущем примере.

  2. Чтение датафрейма из //sys/spark/examples/example_1.

  3. Чтение датафрейма из //sys/spark/examples/example_dict.

  4. Джойн по колонке uuid. В данном примере Spark оценит размер правого датафрейма и решит, что можно сделать map-side join. В терминологии Spark такая ситуация называется broadcast.

  5. Группировка результата по count, найден максимальный id для каждого count. Для группировки Spark сделает перетасовку (shuffle) датафрейма, чтобы записи с одинаковыми ключами оказались в одной партиции. Количество партиций после перетасовки определяется параметром spark.sql.shuffle.partitions. По умолчанию число партиций равно 200, значение можно изменить при запуске джоба.

  6. Сохранение результата в YTsaurus. Количество чанков в сохраненной таблице будет равно количеству партиций у записываемого датафрейма. Партиций образовалось много (200), так как была сделана группировка. Для того чтобы при записи не порождать много мелких чанков, перед записью они собираются в одну партицию с помощью repartition(1)

Результат: //sys/spark/examples/example_1_agg.

Запуск осуществляется командой spark-submit-yt. Дополнительно указывается spark.sql.shuffle.partitions:

spark-submit-yt \
  --proxy <cluster-name> \
  --discovery-path //my/discovery/path \
  --conf spark.sql.shuffle.partitions=8 \
  --deploy-mode cluster \
  --class tech.ytsaurus.spark.example.GroupingExample \
  yt:///sys/spark/examples/spark-over-yt-examples-jar-with-dependencies.jar

Другие примеры

Пример запуска джоба и проверки выходных данных.

Внимание

В зависимости необходимо добавить клиент spark-yt-submit-client_2.12. В свою очередь у клиента есть зависимости на классы Spark классы в runtime. Не нужно добавлять данные зависимости в сборку джобов, которые запускаются в кластере. Зависимость на Spark должна быть в значении Provided, поскольку классы Spark уже присутствуют в кластере. Данная зависимость используется только для запуска джобов.

  1. SubmissionClient — клиент для отправки джобов на конкретный кластер. Клиент находит координаты мастера по discovery_path и взаимодействует с ним.
  2. Параметры запускаемого джоба описываются в объекте launcher. Полный список методов объекта содержится в исходном коде. Методы соответствуют параметрам spark-submit.
  3. После отправки джоба на кластер можно проверять его статус при помощи метода client.getStatus. В примере указано время ожидания финального результата, но это необязательно.