Importing data from Hive, S3, MongoDB, etc
Importing data into YTsaurus from external systems is performed with SPYT.
This document contains instructions for using the import.py
script. This script imports data from Hadoop, Hive, or from database systems that support the JDBC protocol.
To import data from other systems not supported by import.py
(for example, from S3 or MongoDB), you can directly use SPYT, reading from the external system via the corresponding Spark Data Source.
Starting SPYT
To start a SPYT cluster, follow the instruction.
Alternatively, import.py
can launching the cluster for you before starting the import operation. To start a SPYT cluster using import.py
, provide the following command line flags:
$ ./import.py \
--start-spyt true \
--discovery_path //path/to/discovery \
--proxy yt_proxy_host:port
If --start-spyt
flag was not set, import.py
will expect SPYT cluster to be already up an running.
Dependencies
Logic for reading data from external systems into Spark is implemented in external libraries, packaged as jar-files. jar-dependencies for interating with Hive are provided together with the pyspark
Python package. pyspark
is also required for interacting with SPYT.
To read data from a JDBC-supporting data, one has to download a JDBC-driver specific for that system.
connectors/pom.xml
is a maven config that includes JDBC drivers for MySQL and PostgreSQL as dependencies. To download these drivers, run the commandline below:
~/yt/connectors$ mvn dependency:copy-dependency
mvn
will download jar files with JDBC drivers for PostgreSQL and MySQL into target/dependency
To import from a different database, add the JDBC driver for that database into pom.xml
and run $ mvn dependency:copy-dependencies
import.py
When launching import.py
, provide the SPYT discovery path as a commandline argument.
$ ./import.py --discovery_path //path/to/discovery \
... # all other options
Additional commandline arguments should identify the data source, path to imported data within that data source, and path in YTsaurus where imported data should be stored.
To import from Hive, run:
$ ./import.py --discovery_path //path/to/discovery \
--metastore master_host:9083 \
--warehouse-dir /path/to/hive/warehouse \
--input hive:database_name.table_name \
--output //path/in/yt/table
Alternatively, provide an SQL query to be executed by Hive, using hive_sql
input specifier. Query results will be stored in YT.
$ ./import.py --discovery_path //path/to/discovery \
...
--input hive_sql:database_name:SELECT * FROM action_log WHERE action_date > '2023-01-01' \
...
To import from a database using JDBC protocol (for example, from PostgreSQL), run:
$ ./import.py --discovery_path //path/to/discovery \
--jdbc postgresql \
--jdbc-server pg_host:5432 \
--jdbc-user user \
--jdbc-password '' \ # Получить пароль из консольного ввода
--input jdbc:database_name.table_name \
--output //path/in/yt/table
To import results of a SQL statement, use jdbc_sql
input specifier:
$ ./import.py --discovery_path //path/to/discovery \
...
--input jdbc_sql:database_name:SELECT * FROM users WHERE signup_date > '2023-01-01' \
...
To import a file from HDFS, use an input specifier indicating file format. Path to the file must include HDFS NameNode address:
$ ./import.py --discovery_path //path/to/discovery \
...
--input text:hdfs://namenode/path/to/text/file
...
text
, parquet
and orc
file formats are supported.
Command line arguments
import.py supports the following arguments:
Argument | Description |
---|---|
--discovery-path |
required - path that identifies the SPYT cluster |
--num-executors |
number of workers for distributed operations (1 by default) |
--cores-per-executor |
number of reserved CPU cores per worker (1 by default) |
--ram-per-core |
amount of RAM reserved, per core (2GB by default) |
--jdbc |
Type of JDBC driver. For example, mysql or postgresql |
--jdbc-server |
database host:port |
--jdbc-user |
user name to login to the database |
--jdbc-password |
password to login to the database. If empty, read from terminal |
--jars |
additional jar-dependencies. By default, target/dependency/jar/*.jar |
--input |
path to import. this flag may appear multiple times |
--output |
path to write in YTsaurus. For every --input flag, one --output must be provided |
To configure the SPYT cluster started as part of import operation, use the following arguments:
Argument | Description |
---|---|
--start-spyt |
Instructs import.py to start a SPYT cluster |
--proxy |
Path to YTsaurus proxy for the YTsaurus cluster where SPYT should run |
--pool |
Resource pool in YTsaurus to run SPYT in |
--spark-cluster-version |
SPYT version |
--executor-timeout |
Timeout for idle Spark workers |
--executor-tmpfs-limit |
Size of tmpfs partition for Spark workers |
The following input specifiers are supported:
Specifier | Description |
---|---|
hive |
table name in Hive with database name, in db_name.table_name format |
hive_sql |
SQL query to run in hive Hive, in db_name:sql statement format |
jdbc |
table in a JDBC-database, in db_name.table_name format |
jdbc_sql |
SQL query for a JDBC-database, in `db_name:sql statement' format |
text |
text file in HDFS |
parquet |
parquet file in HDFS |
orc |
orc file in HDFS |
Type conversions
Importing complex types is only supported partially. The type system of YTsaurus does not exactly match its counterparts in other storage systems. When importing data, SPYT will try to keep the type on a best effort basis. However, the value may get converted to a string when no matching type in YTsaurus could be inferred. When necessary, use SQL to properly convert types.
Ranges of values that correspond for a single type may differ between YTsaurus and other systems. For example, YTsaurus date
type only stores calendar dates starting with the Unix epoch, January 1st 1970. Attempt to write an earlier data will cause a runtime error. It is still possible to store earlier dates as strings (for example, applying a to_char(date_value, 'YYY-MM-DD')
conversion in PostgreSQL), or as integers (date_value - '1970-01-01'
in PostgreSQL).