Импорт данных из Hive, S3, MongoDB и других систем
Импорт данных из внешних систем в YTsaurus реализован через SPYT.
На данной странице приведена инструкция по импорту данных с помощью скрипта import.py. Этот скрипт позволяет импортировать данные из Hive, Hadoop, S3, а также из СУБД, поддерживающих протокол JDBC. Для импорта из других систем хранения данных — например, из MongoDB — вы можете использовать SPYT напрямую, обратившись к этой системе с помощью соответствующего Spark Datasource.
Установите зависимости
jar-зависимости для чтения из Hive предоставляются вместе с пакетом pyspark
. Пакеты для работы с SPYT (включая pyspark
), должны быть установлены на системе, из которой вызывается импорт.
Для чтения из СУБД, которая поддерживает JDBC-протокол, нужно скачать JDBC-драйвер этой СУБД.
Файл connectors/pom.xml
содержит конфиг для Maven, в котором указаны JDBC-драйвера для MySQL и PostgreSQL. Эти драйвера можно скачать следующим образом:
~/yt/connectors$ mvn dependency:copy-dependencies
mvn
скачает jar-файлы с драйверами для PostgreSQL и MySQL в директорию target/dependency
.
Если необходимо импортировать данные из другой СУБД, добавьте JDBC-драйвер этой СУБД в качестве зависимости в pom.xml
, и запустите команду $ mvn dependency:copy-dependencies
Запустите импорт данных
Для импорта данных следует запустить скрипт import.py
:
$ ./import.py \
# Опции импорта.
# ...
Опции должны идентифицировать:
- источник данных;
- путь к импортируемым данным;
- путь в YTsaurus, куда должны быть записаны импортируемые данные.
Список всех опций приведён в конце этого раздела, в секции Опции. Ниже приведены примеры импорта данных из разных систем.
Hive
Чтобы импортировать данные из Hive:
- Загрузите на Кипарис файлы
hadoop-aws-*.jar
иaws-java-sdk-bundle-*.jar
из локальной директорииtarget/dependency
. - Запустите импорт данных следующей командой:
$ ./import.py \ --metastore master_host:9083 \ --warehouse-dir /path/to/hive/warehouse \ --input hive:database_name.table_name \ --output //path/in/yt/table --proxy https://ytsaurus.company.net \ --num-executors 5 \ --executor-memory 4G \ --executor-memory-overhead 2G \ --jars yt:///path/to/jars/aws-java-sdk-bundle-1.11.901.jar \ yt:///path/to/jars/hadoop-aws-3.3.1.jar
Альтернативно, можно передать в Hive SQL-запрос для исполнения и импортировать результат этого запроса. Для этого используйте префикс hive_sql
для описания исходных данных:
$ ./import.py \
...
--input hive_sql:database_name:SELECT * FROM action_log WHERE action_date > '2023-01-01' \
...
HDFS
Для импорта файла из HDFS используйте спецификатор с указанием формата этого файла, а также адрес HDFS NameNode:
$ ./import.py \
...
--input text:hdfs://namenode/path/to/text/file
...
import.py
также поддерживает форматы parquet, orc.
СУБД, поддерживающие JDBC-протокол
Чтобы импортировать данные из JDBC-совместимых СУБД (например, из PostgreSQL), запустите команду:
$ ./import.py \
--jdbc postgresql \
--jdbc-server pg_host:5432 \
--jdbc-user user \
--jdbc-password '' \ # Получить пароль из консольного ввода
--input jdbc:database_name.table_name \
--output //path/in/yt/table
Чтобы передать в СУБД запрос для исполнения и импортировать результат, можно использовать префикс jdbc_sql
:
$ ./import.py \
...
--input jdbc_sql:database_name:SELECT * FROM users WHERE signup_date > '2023-01-01' \
...
S3
Чтобы импортировать данные из S3:
-
Загрузите на Кипарис файлы
hadoop-aws-*.jar
иaws-java-sdk-bundle-*.jar
из локальной директорииtarget/dependency
. -
Запустите импорт данных следующей командой:
./import.py \ --input parquet:s3a://bucket/path/to/data/sample-parquet \ --output //home/tables/import_from_s3 \ --s3-access-key <S3 Access Key> \ --s3-secret-key <S3 Secret Key> \ --s3-endpoint <S3 endpoint> \ --proxy https://ytsaurus.company.net \ --num-executors 5 \ --executor-memory 4G \ --executor-memory-overhead 2G \ --jars yt:///path/to/jars/aws-java-sdk-bundle-1.11.901.jar \ yt:///path/to/jars/hadoop-aws-3.3.1.jar
Описание опций приведено ниже.
Опции
import.py
поддерживает следующие опции:
Опция | Описание |
---|---|
--num-executors |
Число экзекьютеров для операции импорта (по умолчанию 1) |
--cores-per-executor |
Резервация для числа CPU-ядер на экзекьютер (по умолчанию 1) |
--ram-per-core |
RAM-резервация на каждое ядро (по умолчанию 2GB) |
--jdbc |
Тип JDBC-драйвера для СУБД. Например mysql или postgresql |
--jdbc-server |
host:port сервера СУБД |
--jdbc-user |
Имя пользователя в СУБД |
--jdbc-password |
Пароль для СУБД. Если передан пустой флаг, получить из консоли |
--jars |
Дополнительные JAR-библиотеки. По умолчанию, target/dependency/jar/*.jar |
--input |
Объект для импорта, можно указывать несколько раз |
--output |
Путь для записи в YTsaurus. Должно быть указано одно значение на каждое значение --input |
Если операция импорта данных предусматривает запуск кластера SPYT, следующие опции командной строки могут использоваться для конфигурации этого кластера:
Опция | Описание |
---|---|
--proxy |
Адрес прокси в кластере YTsaurus, где будет работать SPYT |
--pool |
Пул ресурсов в YTsaurus для кластера SPYT |
--executor-timeout |
Таймаут для Spark-экзекьютеров |
--executor-tmpfs-limit |
Размер tmpfs-партиции для Spark экзекьютеров |
--executor-memory |
Объём оперативной памяти, выделяемой каждому экзекьютеру |
--executor-memory-overhead |
Дополнительный объём памяти, который выделяется экзекьютерам сверх основной памяти. Например, если основная память 4 ГБ и дополнительная 2 ГБ, то общий объём памяти, запрашиваемый у кластера, будет 6 ГБ |
В случае импорта данных с S3:
Опция | Описание |
---|---|
--s3-access-key |
Ключ доступа (Access Key ID), который идентифицирует пользователя или приложение в S3 |
--s3-secret-key |
Секретный ключ (Secret Access Key), связанный с ключом доступа |
--s3-endpoint |
URL-адрес (endpoint) S3-хранилища |
Поддерживаются следующие спецификаторы для импортируемых данных:
Спецификатор | Описание |
---|---|
hive |
Таблица в Hive, вида db_name.table_name |
hive_sql |
SQL-запрос над Hive, вида db_name:sql statement |
jdbc |
Таблица в СУБД, вида db_name.table_name |
jdbc_sql |
SQL-запрос для СУБД, вида `db_name:sql statement' |
text |
Текстовый файл в HDFS |
parquet |
parquet-файл в HDFS |
orc |
orc-файл из в HDFS |
По умолчанию при записи таблицы в YTsaurus подразумевается, что таблица не существует. Если таблица уже существует, можно её перезаписать или дополнить, используя спецификатор overwrite
или append
. Например: --output overwrite:/path/to/yt
Преобразования типов
Импорт сложных типов из внешних систем может поддерживаться лишь частично. Система типов YTsaurus не имеет взаимно однозначного соответствия с системами типов других систем хранения данных. При импорте SPYT сохранит тип по мере возможности, и преобразует значение в строку в тех случаях, когда не удалось вывести правильный тип. При необходимости используйте SQL для корректного преобразования типов.
Диапазоны значений одного типа могут оказаться разными. Например, YTsaurus date
не хранит даты ранее Юникс-эпохи, 1 января 1970 года. Попытка записать в YTsaurus более ранние даты приведёт к ошибке. Даты раньше Юникс-эпохи в YTsaurus можно хранить как строки (например, используя преобразование to_char(date_value, 'YYYY-MM-DD')
в PostgreSQL), или как целые числа (date_value - '1970-01-01'
в PostgreSQL).