Java интерфейс на примерах (ytsaurus-client)
Java API опубликовано в maven.
Документацию можно найти здесь.
Примеры находятся в yt/java/ytsaurus-client-examples.
Перед использованием примеров прочитайте инструкцию по получению токена.
Нужно иметь специальные таблицы с данными на вашем YTsaurus кластере, их можно сгенерировать так:
./yt/yt/scripts/upload_tutorial_data/upload_tutorial_data.py --yt-directory //home/dev/tutorial --proxy <your-http-proxy>
Примеры можно запускать с помощью gradle из директории примера:
cd yt/java/ytsaurus-client-examples/<some-example>
YT_PROXY=<your-http-proxy> gradle run
Работа с Кипарисом
Пример находится в yt/java/ytsaurus-client-examples/cypress-operations-example/src/main/java/tech/ytsaurus/example/ExampleCypressOperations.java.
package tech.ytsaurus.example;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.GetNode;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.ysontree.YTreeNode;
public class ExampleCypressOperations {
private ExampleCypressOperations() {
}
public static void main(String[] args) {
// *** Create the client ***
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
// The most convenient way to create a client is through builder
// The only required parameter to specify is the cluster.
//
// The YT token will be picked up from `~/.yt/token`, and the username from the operating system.
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
// It is necessary to call close() on the client to shut down properly
// The most reliable and convenient way to do this is with the try-with-resources statement.
try (client) {
// *** Simple requests ***
// Most of the client methods return CompletableFuture
// The request will be sent in the background.
CompletableFuture<YTreeNode> listResult = client.listNode("/");
// You can use the join method to wait for the result of the request.
// It is recommended to read the CompletableFuture documentation:
// https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html
System.out.println("listResult: " + listResult.join());
// *** Using optional parameters in a request ***
// There is a full version of the method for all requests.
// You can pass a request object in it, specifying advanced options there.
CompletableFuture<YTreeNode> getResult = client.getNode(
GetNode.builder()
.setPath(YPath.simple("//home/dev/tutorial"))
.setAttributes(List.of("account", "row_count"))
.setTimeout(Duration.ofSeconds(10))
.build()
);
System.out.println("getResult: " + getResult.join());
// *** Errors ***
// Method calls will not throw exceptions
// (maybe except when there is some kind of software error or misconfiguration).
CompletableFuture<YTreeNode> badListResult = client.listNode("//some/directory/that/does/not/exist");
try {
// If YT returns an error or a network error occurs as a result of executing a request,
// then the corresponding exception will be stored in CompletableFuture.
badListResult.join();
} catch (CompletionException ex) {
System.out.println("ERROR: " + ex);
}
}
}
}
Чтение и запись статических таблиц (UserClass версия)
Пример находится в yt/java/ytsaurus-client-examples/read-write-entity-example/src/main/java/tech/ytsaurus/example/ExampleReadWriteEntity.java.
package tech.ytsaurus.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import javax.persistence.Entity;
import tech.ytsaurus.client.AsyncReader;
import tech.ytsaurus.client.AsyncWriter;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.ReadTable;
import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnValueType;
import tech.ytsaurus.core.tables.TableSchema;
public class ExampleReadWriteEntity {
private ExampleReadWriteEntity() {
}
@Entity
static class TableRow {
private String english;
private String russian;
TableRow(String english, String russian) {
this.english = english;
this.russian = russian;
}
@Override
public String toString() {
return String.format("TableRow(\"%s, %s\")", english, russian);
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
try (client) {
// The table is located in `//tmp` and contains the name of the current user.
// The username is necessary in case two people run this example at the same time
// so that they use different output tables.
YPath table = YPath.simple("//tmp/" + System.getProperty("user.name") + "-read-write");
// Create table
TableSchema tableSchema = TableSchema.builder()
.addValue("english", ColumnValueType.STRING)
.addValue("russian", ColumnValueType.STRING)
.build().toWrite();
client.createNode(CreateNode.builder()
.setPath(table)
.setType(CypressNodeType.TABLE)
.setAttributes(Map.of("schema", tableSchema.toYTree()))
.setIgnoreExisting(true)
.build()
).join();
// Write a table.
// Create the writer.
AsyncWriter<TableRow> writer = client.writeTableV2(new WriteTable<>(table, TableRow.class)).join();
writer.write(List.of(
new TableRow("one", "один"),
new TableRow("two", "два"))
).join();
writer.finish().join();
// Read a table.
// Create the reader.
AsyncReader<TableRow> reader = client.readTableV2(new ReadTable<>(table, TableRow.class)).join();
List<TableRow> rows = new ArrayList<>();
var executor = Executors.newSingleThreadExecutor();
// Read all rows asynchronously.
reader.acceptAllAsync(rows::add, executor).join();
for (TableRow row : rows) {
System.out.println("russian: " + row.russian + "; english: " + row.english);
}
}
}
}
Чтение и запись статических таблиц (YTreeMapNode версия)
Пример находится в yt/java/ytsaurus-client-examples/read-write-ytree-example/src/main/java/tech/ytsaurus/example/ExampleReadWriteYTree.java.
package tech.ytsaurus.example;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import tech.ytsaurus.client.TableReader;
import tech.ytsaurus.client.TableWriter;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.ReadTable;
import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnValueType;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;
public class ExampleReadWriteYTree {
private ExampleReadWriteYTree() {
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
try (client) {
// The table is located in `//tmp` and contains the name of the current user.
// The username is necessary in case two people run this example at the same time
// so that they use different output tables.
YPath table = YPath.simple("//tmp/" + System.getProperty("user.name") + "-read-write");
// Create table
TableSchema tableSchema = TableSchema.builder()
.addValue("english", ColumnValueType.STRING)
.addValue("russian", ColumnValueType.STRING)
.build().toWrite();
client.createNode(CreateNode.builder()
.setPath(table)
.setType(CypressNodeType.TABLE)
.setAttributes(Map.of("schema", tableSchema.toYTree()))
.setIgnoreExisting(true)
.build()
).join();
// Write a table.
// Create the writer.
TableWriter<YTreeMapNode> writer = client.writeTable(
new WriteTable<>(table, YTreeMapNode.class)
).join();
try {
while (true) {
// It is necessary to wait for readyEvent before trying to write.
writer.readyEvent().join();
// If false is returned, then readyEvent must be waited for before trying again.
boolean accepted = writer.write(List.of(
YTree.mapBuilder()
.key("english").value("one")
.key("russian").value("один")
.buildMap(),
YTree.mapBuilder()
.key("english").value("two")
.key("russian").value("два")
.buildMap()
), tableSchema);
if (accepted) {
break;
}
}
} catch (Exception ex) {
throw new RuntimeException(ex);
} finally {
// Waiting for completion of writing. An exception might be thrown if something goes wrong.
writer.close().join();
}
// Read a table.
// Create the reader.
TableReader<YTreeMapNode> reader = client.readTable(
new ReadTable<>(table, YTreeMapNode.class)
).join();
List<YTreeMapNode> rows = new ArrayList<>();
try {
// We will read while we can.
while (reader.canRead()) {
// We wait until we can continue reading.
reader.readyEvent().join();
List<YTreeMapNode> currentRows;
while ((currentRows = reader.read()) != null) {
rows.addAll(currentRows);
}
}
} catch (Exception ex) {
throw new RuntimeException("Failed to read");
} finally {
reader.close().join();
}
for (YTreeMapNode row : rows) {
System.out.println("russian: " + row.getString("russian") +
"; english: " + row.getString("english"));
}
}
}
}
Работа с динамическими таблицами
Пример находится в yt/java/ytsaurus-client-examples/dynamic-table-example/src/main/java/tech/ytsaurus/example/ExampleDynamicTable.java.
package tech.ytsaurus.example;
import java.util.List;
import java.util.Map;
import tech.ytsaurus.client.ApiServiceTransaction;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.LookupRowsRequest;
import tech.ytsaurus.client.request.ModifyRowsRequest;
import tech.ytsaurus.client.request.StartTransaction;
import tech.ytsaurus.client.request.TransactionType;
import tech.ytsaurus.client.rows.UnversionedRow;
import tech.ytsaurus.client.rows.UnversionedRowset;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnSchema;
import tech.ytsaurus.core.tables.ColumnSortOrder;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.typeinfo.TiType;
import tech.ytsaurus.ysontree.YTree;
public class ExampleDynamicTable {
private ExampleDynamicTable() {
}
public static void main(String[] args) {
//
// The program takes as arguments the name of the cluster and the path to the table it will work with
// (the table must not exist).
//
// By default, users do not have permissions to mount dynamic tables, and you must obtain such permissions
// (permissions to mount tables) on some YT cluster before starting the program.
if (args.length != 2) {
throw new IllegalArgumentException("Incorrect arguments count");
}
String cluster = args[0];
String path = args[1];
TableSchema schema = TableSchema.builder()
.setUniqueKeys(true)
.add(
ColumnSchema.builder("key", TiType.int64())
.setSortOrder(ColumnSortOrder.ASCENDING)
.build()
)
.add(
ColumnSchema.builder("value", TiType.string())
.build()
)
.build();
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(cluster)
.build();
try (client) {
// ATTENTION: when creating a dynamic table, we need to
// - specify attribute dynamic: true
// - specify scheme
// This must be done in the createNode call.
// That is, it will not work to create a table first, and then put down these attributes.
CreateNode createNode = CreateNode.builder()
.setPath(YPath.simple(path))
.setType(CypressNodeType.TABLE)
.setAttributes(Map.of(
"dynamic", YTree.booleanNode(true),
"schema", schema.toYTree()
))
.build();
client.createNode(createNode).join();
// To start working with a dynamic table, you need to "mount" it.
//
// Creating/mounting/unmounting tables is usually done by dedicated administrative scripts,
// and the application simply assumes that the tables exist and are already mounted.
//
// We will mount the table and unmount it at the end for completeness of the example.
client.mountTable(path).join();
// Fill the table.
try (ApiServiceTransaction transaction =
client.startTransaction(new StartTransaction(TransactionType.Tablet)).join()) {
// Inserting values into a dynamic table can be done using modifyRows.
transaction.modifyRows(
ModifyRowsRequest.builder()
.setPath(path)
.setSchema(schema)
.addInsert(List.of(1, "a"))
.addInsert(List.of(2, "b"))
.addInsert(List.of(3, "c"))
.build()).join();
transaction.commit().join();
}
// Read the table.
try (ApiServiceTransaction transaction =
client.startTransaction(new StartTransaction(TransactionType.Tablet)).join()) {
// Get values from a dynamic table can be done using lookupRows.
// It returns UnversionedRows consists of the rows it finds.
UnversionedRowset rowset = transaction.lookupRows(
LookupRowsRequest.builder()
.setPath(path)
.setSchema(schema.toLookup())
.addFilter(1)
.build()).join();
System.out.println("====== LOOKUP RESULT ======");
for (UnversionedRow row : rowset.getRows()) {
System.out.println(row.toYTreeMap(schema, true));
}
System.out.println("====== END LOOKUP RESULT ======");
transaction.commit().join();
}
// Unmount the table.
client.unmountTable(path).join();
}
}
}
Map операция (UserClass версия)
Пример находится в yt/java/ytsaurus-client-examples/map-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapEntity.java.
package tech.ytsaurus.example;
import javax.persistence.Entity;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
public class ExampleMapEntity {
private ExampleMapEntity() {
}
@Entity
static class InputStaffInfo {
private String login;
private String name;
private long uid;
public String getLogin() {
return login;
}
public String getName() {
return name;
}
public long getUid() {
return uid;
}
}
@Entity
static class OutputStaffInfo {
private String email;
private String name;
OutputStaffInfo() {
}
OutputStaffInfo(String email, String name) {
this.email = email;
this.name = name;
}
}
// The mapper class must implement the appropriate interface.
// Generic type arguments are the classes to represent the input and output objects.
@NonNullApi
public static class SimpleMapper implements Mapper<InputStaffInfo, OutputStaffInfo> {
@Override
public void map(InputStaffInfo entry, Yield<OutputStaffInfo> yield, Statistics statistics,
OperationContext context) {
String name = entry.getName();
String email = entry.getLogin().concat("@ytsaurus.tech");
yield.yield(new OutputStaffInfo(name, email));
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
// The output table is located in `//tmp` and contains the name of the current user.
// The username is necessary in case two people run this example at the same time
// so that they use different output tables.
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
try (client) {
Operation op = client.map(
MapOperation.builder()
.setSpec(MapSpec.builder()
.setInputTables(YPath.simple("//home/tutorial/staff_unsorted").withRange(0, 2))
.setOutputTables(outputTable)
.setMapperSpec(new MapperSpec(new SimpleMapper()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}
Map операция (YTreeMapNode версия)
Пример находится в yt/java/ytsaurus-client-examples/map-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapEntity.java.
package tech.ytsaurus.example;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;
public class ExampleMapYTree {
private ExampleMapYTree() {
}
// The mapper class must implement the appropriate interface.
// Generic type arguments are the classes to represent the input and output objects.
// In this case, it's a universal YTreeMapNode that allows you to work with an arbitrary table.
@NonNullApi
public static class SimpleMapper implements Mapper<YTreeMapNode, YTreeMapNode> {
@Override
public void map(YTreeMapNode entry, Yield<YTreeMapNode> yield, Statistics statistics,
OperationContext context) {
String name = entry.getString("name");
String email = entry.getString("login").concat("@ytsaurus.tech");
YTreeMapNode outputRow = YTree.builder().beginMap()
.key("name").value(name)
.key("email").value(email)
.buildMap();
yield.yield(outputRow);
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
// The output table is located in `//tmp` and contains the name of the current user.
// The username is necessary in case two people run this example at the same time
// so that they use different output tables.
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
try (client) {
Operation op = client.map(
MapOperation.builder()
.setSpec(MapSpec.builder()
.setInputTables(YPath.simple("//home/tutorial/staff_unsorted").withRange(0, 2))
.setOutputTables(outputTable)
.setMapperSpec(new MapperSpec(new SimpleMapper()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}
Reduce операция (UserClass версия)
Пример находится в yt/java/ytsaurus-client-examples/reduce-entity-example/src/main/java/tech/ytsaurus/example/ExampleReduceEntity.java.
package tech.ytsaurus.example;
import java.util.Iterator;
import javax.persistence.Entity;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReduceSpec;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.SortSpec;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.ReduceOperation;
import tech.ytsaurus.client.request.SortOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
public class ExampleReduceEntity {
private ExampleReduceEntity() {
}
@Entity
static class InputStaffInfo {
private String login;
private String name;
private long uid;
public String getLogin() {
return login;
}
public String getName() {
return name;
}
public long getUid() {
return uid;
}
}
@Entity
static class OutputStaffInfo {
private String name;
private int count;
OutputStaffInfo() {
}
OutputStaffInfo(String name, int count) {
this.name = name;
this.count = count;
}
}
// The reducer class must implement the appropriate interface.
// Generic type arguments are the classes to represent the input and output objects.
// The third generic type argument in the ReducerWithKey interface specifies the type of the key.
@NonNullApi
public static class SimpleReducer implements ReducerWithKey<InputStaffInfo, OutputStaffInfo, String> {
@Override
public String key(InputStaffInfo entry) {
return entry.getName();
}
@Override
public void reduce(String key, Iterator<InputStaffInfo> input, Yield<OutputStaffInfo> yield,
Statistics statistics) {
// All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.
int count = 0;
while (input.hasNext()) {
input.next();
++count;
}
yield.yield(new OutputStaffInfo(key, count));
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
YPath sortedTmpTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-tmp");
try (client) {
// Sort the input table by `name`.
client.sort(SortOperation.builder()
.setSpec(SortSpec.builder()
.setInputTables(YPath.simple("//home/tutorial/staff_unsorted"))
.setOutputTable(sortedTmpTable)
.setSortBy("name")
.build())
.build()).join();
Operation op = client.reduce(ReduceOperation.builder()
.setSpec(ReduceSpec.builder()
.setInputTables(sortedTmpTable)
.setOutputTables(outputTable)
.setReduceBy("name")
.setReducerSpec(new ReducerSpec(new SimpleReducer()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}
Reduce операция (YTreeMapNode версия)
Пример находится в yt/java/ytsaurus-client-examples/reduce-ytree-example/src/main/java/tech/ytsaurus/example/ExampleReduceYTree.jav.
package tech.ytsaurus.example;
import java.util.Iterator;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReduceSpec;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.SortSpec;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.ReduceOperation;
import tech.ytsaurus.client.request.SortOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;
public class ExampleReduceYTree {
private ExampleReduceYTree() {
}
// The reducer class must implement the appropriate interface.
// Generic type arguments are the classes to represent the input and output objects.
// In this case, it's a universal YTreeMapNode that allows you to work with an arbitrary table.
// The third generic type argument in the ReducerWithKey interface specifies the type of the key.
@NonNullApi
public static class SimpleReducer implements ReducerWithKey<YTreeMapNode, YTreeMapNode, String> {
@Override
public String key(YTreeMapNode entry) {
return entry.getString("name");
}
@Override
public void reduce(String key, Iterator<YTreeMapNode> input, Yield<YTreeMapNode> yield, Statistics statistics) {
// All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.
int count = 0;
while (input.hasNext()) {
input.next();
++count;
}
YTreeMapNode outputRow = YTree.builder().beginMap()
.key("name").value(key)
.key("count").value(count)
.buildMap();
yield.yield(outputRow);
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
YPath sortedTmpTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-tmp");
try (client) {
// Sort the input table by `name`.
client.sort(SortOperation.builder()
.setSpec(SortSpec.builder()
.setInputTables(YPath.simple("//home/tutorial/staff_unsorted"))
.setOutputTable(sortedTmpTable)
.setSortBy("name")
.build())
.build()).join();
Operation op = client.reduce(ReduceOperation.builder()
.setSpec(ReduceSpec.builder()
.setInputTables(sortedTmpTable)
.setOutputTables(outputTable)
.setReduceBy("name")
.setReducerSpec(new ReducerSpec(new SimpleReducer()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}
MapReduce операция (UserClass версия)
Пример находится в yt/java/ytsaurus-client-examples/map-reduce-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapReduceEntity.java.
package tech.ytsaurus.example;
import java.util.Iterator;
import javax.persistence.Column;
import javax.persistence.Entity;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapReduceSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapReduceOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
public class ExampleMapReduceEntity {
private ExampleMapReduceEntity() {
}
@Entity
static class InputStaffInfo {
private String login;
private String name;
private long uid;
public String getLogin() {
return login;
}
public String getName() {
return name;
}
public long getUid() {
return uid;
}
}
@Entity
static class StaffNameLength {
private String name;
@Column(name = "name_length")
private int nameLength;
StaffNameLength() {
}
StaffNameLength(String name, int nameLength) {
this.name = name;
this.nameLength = nameLength;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getNameLength() {
return nameLength;
}
public void setNameLength(int nameLength) {
this.nameLength = nameLength;
}
}
@Entity
static class OutputStaffInfo {
private String name;
@Column(name = "sum_name_length")
private int sumNameLength;
OutputStaffInfo() {
}
OutputStaffInfo(String name, int sumNameLength) {
this.name = name;
this.sumNameLength = sumNameLength;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getSumNameLength() {
return sumNameLength;
}
public void setSumNameLength(int sumNameLength) {
this.sumNameLength = sumNameLength;
}
}
@NonNullApi
public static class SimpleMapper implements Mapper<InputStaffInfo, StaffNameLength> {
@Override
public void map(InputStaffInfo entry, Yield<StaffNameLength> yield, Statistics statistics,
OperationContext context) {
String name = entry.getName();
yield.yield(new StaffNameLength(name, name.length()));
}
}
@NonNullApi
public static class SimpleReducer implements ReducerWithKey<StaffNameLength, OutputStaffInfo, String> {
@Override
public String key(StaffNameLength entry) {
return entry.getName();
}
@Override
public void reduce(String key, Iterator<StaffNameLength> input, Yield<OutputStaffInfo> yield,
Statistics statistics) {
// All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.
int sumNameLength = 0;
while (input.hasNext()) {
StaffNameLength staffNameLength = input.next();
sumNameLength += staffNameLength.getNameLength();
}
yield.yield(new OutputStaffInfo(key, sumNameLength));
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
YPath inputTable = YPath.simple("//home/tutorial/staff_unsorted");
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
try (client) {
Operation op = client.mapReduce(MapReduceOperation.builder()
.setSpec(MapReduceSpec.builder()
.setInputTables(inputTable)
.setOutputTables(outputTable)
.setReduceBy("name")
.setSortBy("name")
.setReducerSpec(new ReducerSpec(new SimpleReducer()))
.setMapperSpec(new MapperSpec(new SimpleMapper()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}
MapReduce операция (YTreeMapNode версия)
Пример находится в yt/java/ytsaurus-client-examples/map-reduce-ytree-example/src/main/java/tech/ytsaurus/example/ExampleMapReduceYTree.java.
package tech.ytsaurus.example;
import java.util.Iterator;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapReduceSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapReduceOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;
public class ExampleMapReduceYTree {
private ExampleMapReduceYTree() {
}
@NonNullApi
public static class SimpleMapper implements Mapper<YTreeMapNode, YTreeMapNode> {
@Override
public void map(YTreeMapNode entry, Yield<YTreeMapNode> yield, Statistics statistics,
OperationContext context) {
String name = entry.getString("name");
YTreeMapNode outputRow = YTree.builder().beginMap()
.key("name").value(name)
.key("name_length").value(name.length())
.buildMap();
yield.yield(outputRow);
}
}
@NonNullApi
public static class SimpleReducer implements ReducerWithKey<YTreeMapNode, YTreeMapNode, String> {
@Override
public String key(YTreeMapNode entry) {
return entry.getString("name");
}
@Override
public void reduce(String key, Iterator<YTreeMapNode> input, Yield<YTreeMapNode> yield, Statistics statistics) {
// All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.
int sumNameLength = 0;
while (input.hasNext()) {
YTreeMapNode node = input.next();
sumNameLength += node.getInt("name_length");
}
YTreeMapNode outputRow = YTree.builder().beginMap()
.key("name").value(key)
.key("sum_name_length").value(sumNameLength)
.buildMap();
yield.yield(outputRow);
}
}
public static void main(String[] args) {
// You need to set up cluster address in YT_PROXY environment variable.
var clusterAddress = System.getenv("YT_PROXY");
if (clusterAddress == null || clusterAddress.isEmpty()) {
throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
}
YTsaurusClient client = YTsaurusClient.builder()
.setCluster(clusterAddress)
.build();
YPath inputTable = YPath.simple("//home/tutorial/staff_unsorted");
YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
try (client) {
Operation op = client.mapReduce(MapReduceOperation.builder()
.setSpec(MapReduceSpec.builder()
.setInputTables(inputTable)
.setOutputTables(outputTable)
.setReduceBy("name")
.setSortBy("name")
.setReducerSpec(new ReducerSpec(new SimpleReducer()))
.setMapperSpec(new MapperSpec(new SimpleMapper()))
.build())
.build()
).join();
System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
System.err.println("Status: " + op.getStatus().join());
}
System.err.println(
"Output table: " + outputTable
);
}
}