Java interface with examples (ytsaurus-client)

We have Java SDK for YTsaurus which is published in maven.

A detailed description of the Java SDK is located here.

The examples are located at yt/java/ytsaurus-client-examples.

Before using the examples, read the instructions for obtaining the token.

Also you need to have tutorial tables on your YTsaurus cluster. It can be generated using this script:

./yt/yt/scripts/upload_tutorial_data/upload_tutorial_data.py --yt-directory //home/dev/tutorial --proxy <your-http-proxy> 

Every example can be run from its directory using gradle:

cd yt/java/ytsaurus-client-examples/<some-example>
YT_PROXY=<your-http-proxy> gradle run

Working with Cypress

The example is located at yt/java/ytsaurus-client-examples/cypress-operations-example/src/main/java/tech/ytsaurus/example/ExampleCypressOperations.java.

package tech.ytsaurus.example;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.GetNode;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.ysontree.YTreeNode;


public class ExampleCypressOperations {
    private ExampleCypressOperations() {
    }

    public static void main(String[] args) {
        // *** Create the client ***

        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        // The most convenient way to create a client is through builder
        // The only required parameter to specify is the cluster.
        //
        // The YT token will be picked up from `~/.yt/token`, and the username from the operating system.
        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        // It is necessary to call close() on the client to shut down properly
        // The most reliable and convenient way to do this is with the try-with-resources statement.
        try (client) {
            // *** Simple requests ***

            // Most of the client methods return CompletableFuture
            // The request will be sent in the background.
            CompletableFuture<YTreeNode> listResult = client.listNode("/");

            // You can use the join method to wait for the result of the request.
            // It is recommended to read the CompletableFuture documentation:
            // https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html
            System.out.println("listResult: " + listResult.join());

            // *** Using optional parameters in a request ***

            // There is a full version of the method for all requests.
            // You can pass a request object in it, specifying advanced options there.
            CompletableFuture<YTreeNode> getResult = client.getNode(
                    GetNode.builder()
                            .setPath(YPath.simple("//home/dev/tutorial"))
                            .setAttributes(List.of("account", "row_count"))
                            .setTimeout(Duration.ofSeconds(10))
                            .build()
            );
            System.out.println("getResult: " + getResult.join());

            // *** Errors ***

            // Method calls will not throw exceptions
            // (maybe except when there is some kind of software error or misconfiguration).
            CompletableFuture<YTreeNode> badListResult = client.listNode("//some/directory/that/does/not/exist");

            try {
                // If YT returns an error or a network error occurs as a result of executing a request,
                // then the corresponding exception will be stored in CompletableFuture.
                badListResult.join();
            } catch (CompletionException ex) {
                System.out.println("ERROR: " + ex);
            }
        }
    }
}

Reading and writing static tables (Entity version)

The example is located at yt/java/ytsaurus-client-examples/read-write-entity-example/src/main/java/tech/ytsaurus/example/ExampleReadWriteEntity.java.

package tech.ytsaurus.example;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;

import javax.persistence.Entity;

import tech.ytsaurus.client.AsyncReader;
import tech.ytsaurus.client.AsyncWriter;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.ReadTable;
import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnValueType;
import tech.ytsaurus.core.tables.TableSchema;

public class ExampleReadWriteEntity {
    private ExampleReadWriteEntity() {
    }

    @Entity
    static class TableRow {
        private String english;
        private String russian;

        TableRow(String english, String russian) {
            this.english = english;
            this.russian = russian;
        }

        @Override
        public String toString() {
            return String.format("TableRow(\"%s, %s\")", english, russian);
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        try (client) {
            // The table is located in `//tmp` and contains the name of the current user.
            // The username is necessary in case two people run this example at the same time
            // so that they use different output tables.
            YPath table = YPath.simple("//tmp/" + System.getProperty("user.name") + "-read-write");

            // Create table

            TableSchema tableSchema = TableSchema.builder()
                    .addValue("english", ColumnValueType.STRING)
                    .addValue("russian", ColumnValueType.STRING)
                    .build().toWrite();

            client.createNode(CreateNode.builder()
                    .setPath(table)
                    .setType(CypressNodeType.TABLE)
                    .setAttributes(Map.of("schema", tableSchema.toYTree()))
                    .setIgnoreExisting(true)
                    .build()
            ).join();

            // Write a table.

            // Create the writer.
            AsyncWriter<TableRow> writer = client.writeTableV2(new WriteTable<>(table, TableRow.class)).join();

            writer.write(List.of(
                    new TableRow("one", "один"),
                    new TableRow("two", "два"))
            ).join();

            writer.finish().join();

            // Read a table.

            // Create the reader.
            AsyncReader<TableRow> reader = client.readTableV2(new ReadTable<>(table, TableRow.class)).join();

            List<TableRow> rows = new ArrayList<>();
            var executor = Executors.newSingleThreadExecutor();
            // Read all rows asynchronously.
            reader.acceptAllAsync(rows::add, executor).join();

            for (TableRow row : rows) {
                System.out.println("russian: " + row.russian + "; english: " + row.english);
            }
        }
    }
}

Reading and writing static tables (YTreeMapNode version)

The example is located at yt/java/ytsaurus-client-examples/read-write-ytree-example/src/main/java/tech/ytsaurus/example/ExampleReadWriteYTree.java.

package tech.ytsaurus.example;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import tech.ytsaurus.client.TableReader;
import tech.ytsaurus.client.TableWriter;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.ReadTable;
import tech.ytsaurus.client.request.WriteTable;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnValueType;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;

public class ExampleReadWriteYTree {
    private ExampleReadWriteYTree() {
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        try (client) {
            // The table is located in `//tmp` and contains the name of the current user.
            // The username is necessary in case two people run this example at the same time
            // so that they use different output tables.
            YPath table = YPath.simple("//tmp/" + System.getProperty("user.name") + "-read-write");

            // Create table

            TableSchema tableSchema = TableSchema.builder()
                    .addValue("english", ColumnValueType.STRING)
                    .addValue("russian", ColumnValueType.STRING)
                    .build().toWrite();

            client.createNode(CreateNode.builder()
                    .setPath(table)
                    .setType(CypressNodeType.TABLE)
                    .setAttributes(Map.of("schema", tableSchema.toYTree()))
                    .setIgnoreExisting(true)
                    .build()
            ).join();

            // Write a table.

            // Create the writer.
            TableWriter<YTreeMapNode> writer = client.writeTable(
                    new WriteTable<>(table, YTreeMapNode.class)
            ).join();

            try {
                while (true) {
                    // It is necessary to wait for readyEvent before trying to write.
                    writer.readyEvent().join();

                    // If false is returned, then readyEvent must be waited for before trying again.
                    boolean accepted = writer.write(List.of(
                            YTree.mapBuilder()
                                    .key("english").value("one")
                                    .key("russian").value("один")
                                    .buildMap(),
                            YTree.mapBuilder()
                                    .key("english").value("two")
                                    .key("russian").value("два")
                                    .buildMap()
                    ), tableSchema);

                    if (accepted) {
                        break;
                    }
                }
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            } finally {
                // Waiting for completion of writing. An exception might be thrown if something goes wrong.
                writer.close().join();
            }

            // Read a table.

            // Create the reader.
            TableReader<YTreeMapNode> reader = client.readTable(
                    new ReadTable<>(table, YTreeMapNode.class)
            ).join();

            List<YTreeMapNode> rows = new ArrayList<>();

            try {
                // We will read while we can.
                while (reader.canRead()) {
                    // We wait until we can continue reading.
                    reader.readyEvent().join();

                    List<YTreeMapNode> currentRows;
                    while ((currentRows = reader.read()) != null) {
                        rows.addAll(currentRows);
                    }
                }
            } catch (Exception ex) {
                throw new RuntimeException("Failed to read");
            } finally {
                reader.close().join();
            }

            for (YTreeMapNode row : rows) {
                System.out.println("russian: " + row.getString("russian") +
                        "; english: " + row.getString("english"));
            }
        }
    }
}

Working with dynamic tables

The example is located at yt/java/ytsaurus-client-examples/dynamic-table-example/src/main/java/tech/ytsaurus/example/ExampleDynamicTable.java.

package tech.ytsaurus.example;

import java.util.List;
import java.util.Map;

import tech.ytsaurus.client.ApiServiceTransaction;
import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.request.CreateNode;
import tech.ytsaurus.client.request.LookupRowsRequest;
import tech.ytsaurus.client.request.ModifyRowsRequest;
import tech.ytsaurus.client.request.StartTransaction;
import tech.ytsaurus.client.request.TransactionType;
import tech.ytsaurus.client.rows.UnversionedRow;
import tech.ytsaurus.client.rows.UnversionedRowset;
import tech.ytsaurus.core.cypress.CypressNodeType;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.tables.ColumnSchema;
import tech.ytsaurus.core.tables.ColumnSortOrder;
import tech.ytsaurus.core.tables.TableSchema;
import tech.ytsaurus.typeinfo.TiType;
import tech.ytsaurus.ysontree.YTree;


public class ExampleDynamicTable {
    private ExampleDynamicTable() {
    }

    public static void main(String[] args) {
        //
        // The program takes as arguments the name of the cluster and the path to the table it will work with
        // (the table must not exist).
        //
        // By default, users do not have permissions to mount dynamic tables, and you must obtain such permissions
        // (permissions to mount tables) on some YT cluster before starting the program.
        if (args.length != 2) {
            throw new IllegalArgumentException("Incorrect arguments count");
        }
        String cluster = args[0];
        String path = args[1];

        TableSchema schema = TableSchema.builder()
                .setUniqueKeys(true)
                .add(
                        ColumnSchema.builder("key", TiType.int64())
                                .setSortOrder(ColumnSortOrder.ASCENDING)
                                .build()
                )
                .add(
                        ColumnSchema.builder("value", TiType.string())
                                .build()
                )
                .build();

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(cluster)
                .build();

        try (client) {
            // ATTENTION: when creating a dynamic table, we need to
            //  - specify attribute dynamic: true
            //  - specify scheme
            // This must be done in the createNode call.
            // That is, it will not work to create a table first, and then put down these attributes.
            CreateNode createNode = CreateNode.builder()
                    .setPath(YPath.simple(path))
                    .setType(CypressNodeType.TABLE)
                    .setAttributes(Map.of(
                            "dynamic", YTree.booleanNode(true),
                            "schema", schema.toYTree()
                    ))
                    .build();

            client.createNode(createNode).join();

            // To start working with a dynamic table, you need to "mount" it.
            //
            // Creating/mounting/unmounting tables is usually done by dedicated administrative scripts,
            // and the application simply assumes that the tables exist and are already mounted.
            //
            // We will mount the table and unmount it at the end for completeness of the example.
            client.mountTable(path).join();

            // Fill the table.
            try (ApiServiceTransaction transaction =
                         client.startTransaction(new StartTransaction(TransactionType.Tablet)).join()) {
                // Inserting values into a dynamic table can be done using modifyRows.
                transaction.modifyRows(
                        ModifyRowsRequest.builder()
                                .setPath(path)
                                .setSchema(schema)
                                .addInsert(List.of(1, "a"))
                                .addInsert(List.of(2, "b"))
                                .addInsert(List.of(3, "c"))
                                .build()).join();
                transaction.commit().join();
            }

            // Read the table.
            try (ApiServiceTransaction transaction =
                         client.startTransaction(new StartTransaction(TransactionType.Tablet)).join()) {
                // Get values from a dynamic table can be done using lookupRows.
                // It returns UnversionedRows consists of the rows it finds.
                UnversionedRowset rowset = transaction.lookupRows(
                        LookupRowsRequest.builder()
                                .setPath(path)
                                .setSchema(schema.toLookup())
                                .addFilter(1)
                                .build()).join();

                System.out.println("====== LOOKUP RESULT ======");
                for (UnversionedRow row : rowset.getRows()) {
                    System.out.println(row.toYTreeMap(schema, true));
                }
                System.out.println("====== END LOOKUP RESULT ======");

                transaction.commit().join();
            }

            // Unmount the table.
            client.unmountTable(path).join();
        }
    }
}

Map operation (UserClass version)

The example is located at yt/java/ytsaurus-client-examples/map-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapEntity.java.

package tech.ytsaurus.example;

import javax.persistence.Entity;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;

public class ExampleMapEntity {
    private ExampleMapEntity() {
    }

    @Entity
    static class InputStaffInfo {
        private String login;
        private String name;
        private long uid;

        public String getLogin() {
            return login;
        }

        public String getName() {
            return name;
        }

        public long getUid() {
            return uid;
        }
    }

    @Entity
    static class OutputStaffInfo {
        private String email;
        private String name;

        OutputStaffInfo() {
        }

        OutputStaffInfo(String email, String name) {
            this.email = email;
            this.name = name;
        }
    }

    // The mapper class must implement the appropriate interface.
    // Generic type arguments are the classes to represent the input and output objects.
    @NonNullApi
    public static class SimpleMapper implements Mapper<InputStaffInfo, OutputStaffInfo> {
        @Override
        public void map(InputStaffInfo entry, Yield<OutputStaffInfo> yield, Statistics statistics,
                        OperationContext context) {
            String name = entry.getName();
            String email = entry.getLogin().concat("@ytsaurus.tech");

            yield.yield(new OutputStaffInfo(name, email));
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        // The output table is located in `//tmp` and contains the name of the current user.
        // The username is necessary in case two people run this example at the same time
        // so that they use different output tables.
        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");

        try (client) {
            Operation op = client.map(
                    MapOperation.builder()
                            .setSpec(MapSpec.builder()
                                    .setInputTables(YPath.simple("//home/tutorial/staff_unsorted").withRange(0, 2))
                                    .setOutputTables(outputTable)
                                    .setMapperSpec(new MapperSpec(new SimpleMapper()))
                                    .build())
                            .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}

Map operation (YTreeMapNode version)

The example is located at yt/java/ytsaurus-client-examples/map-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapEntity.java.

package tech.ytsaurus.example;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;


public class ExampleMapYTree {
    private ExampleMapYTree() {
    }

    // The mapper class must implement the appropriate interface.
    // Generic type arguments are the classes to represent the input and output objects.
    // In this case, it's a universal YTreeMapNode that allows you to work with an arbitrary table.
    @NonNullApi
    public static class SimpleMapper implements Mapper<YTreeMapNode, YTreeMapNode> {
        @Override
        public void map(YTreeMapNode entry, Yield<YTreeMapNode> yield, Statistics statistics,
                        OperationContext context) {
            String name = entry.getString("name");
            String email = entry.getString("login").concat("@ytsaurus.tech");

            YTreeMapNode outputRow = YTree.builder().beginMap()
                    .key("name").value(name)
                    .key("email").value(email)
                    .buildMap();

            yield.yield(outputRow);
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        // The output table is located in `//tmp` and contains the name of the current user.
        // The username is necessary in case two people run this example at the same time
        // so that they use different output tables.
        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");

        try (client) {
            Operation op = client.map(
                    MapOperation.builder()
                            .setSpec(MapSpec.builder()
                                    .setInputTables(YPath.simple("//home/tutorial/staff_unsorted").withRange(0, 2))
                                    .setOutputTables(outputTable)
                                    .setMapperSpec(new MapperSpec(new SimpleMapper()))
                                    .build())
                            .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}

Reduce operation (UserClass version)

The example is located at yt/java/ytsaurus-client-examples/reduce-entity-example/src/main/java/tech/ytsaurus/example/ExampleReduceEntity.java.

package tech.ytsaurus.example;

import java.util.Iterator;

import javax.persistence.Entity;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReduceSpec;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.SortSpec;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.ReduceOperation;
import tech.ytsaurus.client.request.SortOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;

public class ExampleReduceEntity {
    private ExampleReduceEntity() {
    }

    @Entity
    static class InputStaffInfo {
        private String login;
        private String name;
        private long uid;

        public String getLogin() {
            return login;
        }

        public String getName() {
            return name;
        }

        public long getUid() {
            return uid;
        }
    }

    @Entity
    static class OutputStaffInfo {
        private String name;
        private int count;

        OutputStaffInfo() {
        }

        OutputStaffInfo(String name, int count) {
            this.name = name;
            this.count = count;
        }
    }

    // The reducer class must implement the appropriate interface.
    // Generic type arguments are the classes to represent the input and output objects.
    // The third generic type argument in the ReducerWithKey interface specifies the type of the key.
    @NonNullApi
    public static class SimpleReducer implements ReducerWithKey<InputStaffInfo, OutputStaffInfo, String> {
        @Override
        public String key(InputStaffInfo entry) {
            return entry.getName();
        }

        @Override
        public void reduce(String key, Iterator<InputStaffInfo> input, Yield<OutputStaffInfo> yield,
                           Statistics statistics) {
            // All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.

            int count = 0;
            while (input.hasNext()) {
                input.next();
                ++count;
            }

            yield.yield(new OutputStaffInfo(key, count));
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
        YPath sortedTmpTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-tmp");

        try (client) {
            // Sort the input table by `name`.
            client.sort(SortOperation.builder()
                    .setSpec(SortSpec.builder()
                            .setInputTables(YPath.simple("//home/tutorial/staff_unsorted"))
                            .setOutputTable(sortedTmpTable)
                            .setSortBy("name")
                            .build())
                    .build()).join();

            Operation op = client.reduce(ReduceOperation.builder()
                    .setSpec(ReduceSpec.builder()
                            .setInputTables(sortedTmpTable)
                            .setOutputTables(outputTable)
                            .setReduceBy("name")
                            .setReducerSpec(new ReducerSpec(new SimpleReducer()))
                            .build())
                    .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}

Reduce operation (YTreeMapNode version)

The example is located at yt/java/ytsaurus-client-examples/reduce-ytree-example/src/main/java/tech/ytsaurus/example/ExampleReduceYTree.java.

package tech.ytsaurus.example;

import java.util.Iterator;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReduceSpec;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.SortSpec;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.ReduceOperation;
import tech.ytsaurus.client.request.SortOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;


public class ExampleReduceYTree {
    private ExampleReduceYTree() {
    }

    // The reducer class must implement the appropriate interface.
    // Generic type arguments are the classes to represent the input and output objects.
    // In this case, it's a universal YTreeMapNode that allows you to work with an arbitrary table.
    // The third generic type argument in the ReducerWithKey interface specifies the type of the key.
    @NonNullApi
    public static class SimpleReducer implements ReducerWithKey<YTreeMapNode, YTreeMapNode, String> {
        @Override
        public String key(YTreeMapNode entry) {
            return entry.getString("name");
        }

        @Override
        public void reduce(String key, Iterator<YTreeMapNode> input, Yield<YTreeMapNode> yield, Statistics statistics) {
            // All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.

            int count = 0;
            while (input.hasNext()) {
                input.next();
                ++count;
            }

            YTreeMapNode outputRow = YTree.builder().beginMap()
                    .key("name").value(key)
                    .key("count").value(count)
                    .buildMap();

            yield.yield(outputRow);
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");
        YPath sortedTmpTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-tmp");

        try (client) {
            // Sort the input table by `name`.
            client.sort(SortOperation.builder()
                    .setSpec(SortSpec.builder()
                            .setInputTables(YPath.simple("//home/tutorial/staff_unsorted"))
                            .setOutputTable(sortedTmpTable)
                            .setSortBy("name")
                            .build())
                    .build()).join();

            Operation op = client.reduce(ReduceOperation.builder()
                    .setSpec(ReduceSpec.builder()
                            .setInputTables(sortedTmpTable)
                            .setOutputTables(outputTable)
                            .setReduceBy("name")
                            .setReducerSpec(new ReducerSpec(new SimpleReducer()))
                            .build())
                    .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}

MapReduce operation (UserClass version)

The example is located at yt/java/ytsaurus-client-examples/map-reduce-entity-example/src/main/java/tech/ytsaurus/example/ExampleMapReduceEntity.java.

package tech.ytsaurus.example;

import java.util.Iterator;

import javax.persistence.Column;
import javax.persistence.Entity;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapReduceSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapReduceOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;

public class ExampleMapReduceEntity {

    private ExampleMapReduceEntity() {
    }

    @Entity
    static class InputStaffInfo {
        private String login;
        private String name;
        private long uid;

        public String getLogin() {
            return login;
        }

        public String getName() {
            return name;
        }

        public long getUid() {
            return uid;
        }
    }

    @Entity
    static class StaffNameLength {
        private String name;
        @Column(name = "name_length")
        private int nameLength;

        StaffNameLength() {
        }

        StaffNameLength(String name, int nameLength) {
            this.name = name;
            this.nameLength = nameLength;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getNameLength() {
            return nameLength;
        }

        public void setNameLength(int nameLength) {
            this.nameLength = nameLength;
        }
    }

    @Entity
    static class OutputStaffInfo {
        private String name;
        @Column(name = "sum_name_length")
        private int sumNameLength;

        OutputStaffInfo() {
        }

        OutputStaffInfo(String name, int sumNameLength) {
            this.name = name;
            this.sumNameLength = sumNameLength;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getSumNameLength() {
            return sumNameLength;
        }

        public void setSumNameLength(int sumNameLength) {
            this.sumNameLength = sumNameLength;
        }
    }

    @NonNullApi
    public static class SimpleMapper implements Mapper<InputStaffInfo, StaffNameLength> {
        @Override
        public void map(InputStaffInfo entry, Yield<StaffNameLength> yield, Statistics statistics,
                        OperationContext context) {
            String name = entry.getName();
            yield.yield(new StaffNameLength(name, name.length()));
        }
    }

    @NonNullApi
    public static class SimpleReducer implements ReducerWithKey<StaffNameLength, OutputStaffInfo, String> {
        @Override
        public String key(StaffNameLength entry) {
            return entry.getName();
        }

        @Override
        public void reduce(String key, Iterator<StaffNameLength> input, Yield<OutputStaffInfo> yield,
                           Statistics statistics) {
            // All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.

            int sumNameLength = 0;
            while (input.hasNext()) {
                StaffNameLength staffNameLength = input.next();
                sumNameLength += staffNameLength.getNameLength();
            }

            yield.yield(new OutputStaffInfo(key, sumNameLength));
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        YPath inputTable = YPath.simple("//home/tutorial/staff_unsorted");
        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");

        try (client) {
            Operation op = client.mapReduce(MapReduceOperation.builder()
                    .setSpec(MapReduceSpec.builder()
                            .setInputTables(inputTable)
                            .setOutputTables(outputTable)
                            .setReduceBy("name")
                            .setSortBy("name")
                            .setReducerSpec(new ReducerSpec(new SimpleReducer()))
                            .setMapperSpec(new MapperSpec(new SimpleMapper()))
                            .build())
                    .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}

MapReduce operation (YTreeMapNode version)

The example is located at yt/java/ytsaurus-client-examples/map-reduce-ytree-example/src/main/java/tech/ytsaurus/example/ExampleMapReduceYTree.java.

package tech.ytsaurus.example;

import java.util.Iterator;

import tech.ytsaurus.client.YTsaurusClient;
import tech.ytsaurus.client.operations.MapReduceSpec;
import tech.ytsaurus.client.operations.Mapper;
import tech.ytsaurus.client.operations.MapperSpec;
import tech.ytsaurus.client.operations.Operation;
import tech.ytsaurus.client.operations.ReducerSpec;
import tech.ytsaurus.client.operations.ReducerWithKey;
import tech.ytsaurus.client.operations.Statistics;
import tech.ytsaurus.client.request.MapReduceOperation;
import tech.ytsaurus.core.cypress.YPath;
import tech.ytsaurus.core.operations.OperationContext;
import tech.ytsaurus.core.operations.Yield;
import tech.ytsaurus.lang.NonNullApi;
import tech.ytsaurus.ysontree.YTree;
import tech.ytsaurus.ysontree.YTreeMapNode;


public class ExampleMapReduceYTree {

    private ExampleMapReduceYTree() {
    }

    @NonNullApi
    public static class SimpleMapper implements Mapper<YTreeMapNode, YTreeMapNode> {
        @Override
        public void map(YTreeMapNode entry, Yield<YTreeMapNode> yield, Statistics statistics,
                        OperationContext context) {
            String name = entry.getString("name");

            YTreeMapNode outputRow = YTree.builder().beginMap()
                    .key("name").value(name)
                    .key("name_length").value(name.length())
                    .buildMap();

            yield.yield(outputRow);
        }
    }

    @NonNullApi
    public static class SimpleReducer implements ReducerWithKey<YTreeMapNode, YTreeMapNode, String> {
        @Override
        public String key(YTreeMapNode entry) {
            return entry.getString("name");
        }

        @Override
        public void reduce(String key, Iterator<YTreeMapNode> input, Yield<YTreeMapNode> yield, Statistics statistics) {
            // All rows with a common reduce key come to reduce, i.e. in this case with a common field `name`.

            int sumNameLength = 0;
            while (input.hasNext()) {
                YTreeMapNode node = input.next();
                sumNameLength += node.getInt("name_length");
            }

            YTreeMapNode outputRow = YTree.builder().beginMap()
                    .key("name").value(key)
                    .key("sum_name_length").value(sumNameLength)
                    .buildMap();

            yield.yield(outputRow);
        }
    }

    public static void main(String[] args) {
        // You need to set up cluster address in YT_PROXY environment variable.
        var clusterAddress = System.getenv("YT_PROXY");
        if (clusterAddress == null || clusterAddress.isEmpty()) {
            throw new IllegalArgumentException("Environment variable YT_PROXY is empty");
        }

        YTsaurusClient client = YTsaurusClient.builder()
                .setCluster(clusterAddress)
                .build();

        YPath inputTable = YPath.simple("//home/tutorial/staff_unsorted");
        YPath outputTable = YPath.simple("//tmp/" + System.getProperty("user.name") + "-tutorial-emails");

        try (client) {
            Operation op = client.mapReduce(MapReduceOperation.builder()
                    .setSpec(MapReduceSpec.builder()
                            .setInputTables(inputTable)
                            .setOutputTables(outputTable)
                            .setReduceBy("name")
                            .setSortBy("name")
                            .setReducerSpec(new ReducerSpec(new SimpleReducer()))
                            .setMapperSpec(new MapperSpec(new SimpleMapper()))
                            .build())
                    .build()
            ).join();

            System.err.println("Operation was finished (OperationId: " + op.getId() + ")");
            System.err.println("Status: " + op.getStatus().join());
        }

        System.err.println(
                "Output table: " + outputTable
        );
    }
}