Импорт данных из Hive, S3, MongoDB и других систем
Импорт данных из внешних систем в YTsaurus реализован через SPYT.
Ниже на этой странице приведена инструкция для запуска скрипта import.py
. Этот скрипт реализует импорт данных из Hadoop, Hive, а также из СУБД, поддерживающих протокол JDBC.
Для импорта из других систем хранения данных (например, S3 или MongoDB), вы можете использовать SPYT напрямую, обратившись к этой системе с помощью соответствующего Spark Datasource.
Запуск SPYT
Для запуска SPYT-кластера можно воспользоваться инструкцией.
Также, import.py
может запустить кластер перед выполнением операции импорта данных. Для этого, передайте следующие опции командной строки:
$ ./import.py \
--start-spyt true \
--discovery_path //path/to/discovery \
--proxy yt_proxy_host:port
Если флаг --start-spyt
не был передан, то import.py
ожидает, что кластер уже запущен.
Зависимости
jar-зависимости для чтения из Hive предоставляются вместе с пакетом pyspark
. Пакеты для работы с SPYT (включая pyspark
), должны быть установлены на системе, из которой вызывается импорт.
Для чтения из СУБД, которая поддерживает JDBC-протокол, нужно скачать JDBC-драйвер этой СУБД.
Файл connectors/pom.xml
содержит конфиг для Maven, в котором указаны JDBC-драйвера для MySQL и PostgreSQL. Эти драйвера можно скачать следующим образом:
~/yt/connectors$ mvn dependency:copy-dependencies
mvn
скачает jar-файлы с драйверами для PostgreSQL и MySQL в директорию target/dependency
.
Если необходимо импортировать данные из другой СУБД, добавьте JDBC-драйвер этой СУБД в качестве зависимости в pom.xml
, и запустите команду $ mvn dependency:copy-dependencies
import.py
При запуске import.py
необходимо передать discovery-путь для SPYT-кластера
$ ./import.py --discovery_path //path/to/discovery \
... # остальные опциии
Дополнительные опции должны идентифицировать источник данных, путь к импортируемым данным, а также путь в YTsaurus, куда должны быть записанны импортируемые данные.
Для Hive:
$ ./import.py --discovery_path //path/to/discovery \
--metastore master_host:9083 \
--warehouse-dir /path/to/hive/warehouse \
--input hive:database_name.table_name \
--output //path/in/yt/table
Альтернативно, можно передать в Hive SQL-запрос для исполнения, и импортировать результат этого запроса.
Для этого, используйте префикс hive_sql
для описания исходных данных:
$ ./import.py --discovery_path //path/to/discovery \
...
--input hive_sql:database_name:SELECT * FROM action_log WHERE action_date > '2023-01-01' \
...
Для СУБД, поддерживающих JDBC-протокол (например, PostgreSQL), используйте следующую команду:
$ ./import.py --discovery_path //path/to/discovery \
--jdbc postgresql \
--jdbc-server pg_host:5432 \
--jdbc-user user \
--jdbc-password '' \ # Получить пароль из консольного ввода
--input jdbc:database_name.table_name \
--output //path/in/yt/table
Чтобы передать в СУБД запрос для исполнения, и импортировать результат,
можно использовать префикс jdbc_sql
:
$ ./import.py --discovery_path //path/to/discovery \
...
--input jdbc_sql:database_name:SELECT * FROM users WHERE signup_date > '2023-01-01' \
...
Для импорта файла из HDFS, используйте спецификатор с указанием формата этого файла, а также
адрес HDFS NameNode:
$ ./import.py --discovery_path //path/to/discovery \
...
--input text:hdfs://namenode/path/to/text/file
...
import.py также поддерживает форматы parquet, orc.
Опции
import.py поддерживает следующие опции:
Опция | Описание |
---|---|
--discovery-path |
обязательная опция - путь, идентифицирующий SPYT-кластер |
--num-executors |
число воркеров для операции импорта (по умолчанию 1) |
--cores-per-executor |
резервация для числа CPU-ядер на воркер (по умолчанию 1) |
--ram-per-core |
RAM-резервация на каждое ядро (по умолчанию 2GB) |
--jdbc |
тип JDBC-драйвера для СУБД. Например mysql или postgresql |
--jdbc-server |
host:port сервера СУБД |
--jdbc-user |
имя пользователя в СУБД |
--jdbc-password |
пароль для СУБД. Если передан пустой флаг, получить из консоли |
--jars |
дополнительные JAR-библиотеки. По умолчанию, target/dependency/jar/*.jar |
--input |
объект для импорта, можно указывать несколько раз |
--output |
путь для записи в YTsaurus. Должно быть указано одно значение на каждое значение --input |
Если операция импорта данных предусматривает запуск кластера SPYT, следующие опции командной строки могут использоваться для конфигурации этого кластера:
Argument | Description |
---|---|
--start-spyt |
указание для import.py запустить кластер SPYT |
--proxy |
адрес прокси в кластере YTsaurus, где будет работать SPYT |
--pool |
пул ресурсов в YTsaurus для кластера SPYT |
--spark-cluster-version |
версия SPYT |
--executor-timeout |
Таймаут для Spark-воркеров |
--executor-tmpfs-limit |
Размер tmpfs-партиции для Spark воркеров |
Поддерживаются следующие спецификаторы для импортируемых данных:
Спецификатор | Описание |
---|---|
hive |
таблица в Hive, вида db_name.table_name |
hive_sql |
SQL-запрос над Hive, вида db_name:sql statement |
jdbc |
таблица в СУБД, вида db_name.table_name |
jdbc_sql |
SQL-запрос для СУБД, вида `db_name:sql statement' |
text |
текстовый файл в HDFS |
parquet |
parquet-файл в HDFS |
orc |
orc-файл из в HDFS |
По умолчанию при записи таблицы в YTsaurus подразумевается, что таблица не существует. Если
таблица уже существует, можно ее перезаписать или дополнить, используя спецификатор overwrite
или
append
, например: --output overwrite:/path/to/yt
Преобразования типов
Импорт сложных типов из внешних систем может поддерживаться лишь частично. Система типов YTsaurus не имеет взаимно однозначного соответствия с системами типов других систем хранения данных. При иморте, SPYT сохранит тип по мере возможности, и преобразует значение в строку в тех случаях, когда не удалось вывести правильный тип. При необходимости, используйте SQL для корректного преобразования типов.
Диапазоны значений одного типа могут оказаться разными. Например, YTsaurus date
не хранит даты ранее Юникс-эпохи, 1 января 1970 года. Попытка записать в YTsaurus более ранние даты приведет к ошибке. Даты раньше Юникс-эпохи в YTsaurus можно хранить как строки (например, используя преобразование to_char(date_value, 'YYYY-MM-DD')
в PostgreSQL), или как целые числа (date_value - '1970-01-01'
в PostgreSQL).