Skip to content

RPC streaming

Andrei Pangin edited this page Apr 12, 2019 · 1 revision

What is RPC streaming

Streaming allows to transfer a large (potentially unlimited) stream of data through an existing RPC connection.

  • It reuses existing sockets of remote service connection, both on client and server side.
  • It is possible to stream Java objects. They will be automatically (de-)serialized using one-nio protocol.
  • Streaming is optimized for throughput. Data is buffered in off-heap memory to reduce the amount of copying and I/O operations.
  • Streaming works with a fixed-size buffer. It does not need to store all the transferred data in the heap.

API

Streaming requires one-nio version 1.1.0 or newer.

A stream is opened by a regular remote call. The method must return one of the following types from one.nio.rpc.stream package.

RpcStream

This is a low level stream for implementing your own protocols. It implements ObjectInput and ObjectOutput interfaces and allows to read and write both primitive types and Java objects. RpcStream supports read(ByteBuffer) and write(ByteBuffer) methods for direct off-heap memory I/O.

SendStream

Allows sending objects of type S from a client to a server. It has the method

send(S object)

ReceiveStream

Provides a way to transfer objects of type R from a server to a client. It has the method

R receive()

BidiStream<S, R>

A bidirectional stream that allows to send objects of type S one way and receive back the objects of type R. Extends SendStream<S> and ReceiveStream<R>.

It is possible to send and receive objects in arbitrary order, even concurrently in different threads. Mind the buffering though. If you'd like to receive a response after sending an object, you need to make sure that the objects has been actually sent to the network by calling flush(). Alternatively you may use sendAndGet() method which is an equivalent for

R sendAndGet(S object) {
    send(object);
    flush();
    return receive();
}

All above streams have the following common methods:

// Get the address of the remote endpoint
InetSocketAddress getRemoteAddress();
 
// Total number of bytes transferred from and to this stream
long getBytesRead();
long getBytesWritten();

Client side usage

Declare a remote method

@RemoteMethod
ReceiveStream<User> getGroupMembers(long groupId);

Call the method on the client side, get the stream, and read it up to the end. Usually the end-of-stream is denoted by null object, but you may use your own end-of-stream mark as well.

User user;
while ((user = stream.receive()) != null) {
    // do something
}

⚠️ The stream must be closed after data exchange is over. While the stream is open, the corresponding connection is busy. Any stream operation may throw IOException, so the good practice is to wrap all the stream I/O into a try-finally or try-with-resource block.

try (ReceiveStream<User> stream = remoteClient.getGroupMembers(groupId)) {
    for (User user; (user = stream.receive()) != null; ) {
        // process user
    }
} catch (IOException | ClassNotFoundException e) {
    log.error("Something goes wrong!", e);
}

Stream operations are synchronous, i.e. receive() call blocks until it gets some incoming data or timeout occurs or IOException is thrown. Each individual stream I/O operation has the timeout defined in the configuration of a remote service (timeout or readTimeout parameter in the connection URL).

Server side usage

The implementation of a remote method should return a stream created by a factory method create(). The method accepts StreamHandler argument which is usually implemented with lambda. This lambda is your code for dealing with a stream on the server side. This code will be executed as soon as the remote method returns.

StreamHandler will receive a stream of the opposite type. For instance, if a method returns ReceiveStream<R>, the StreamHandler will work with SendStream<R>. Similarly, if a remote method returns BidiStream<S, R>, the StreamHandler will accept BidiStream<R, S>, and so on.

ReceiveStream<User> getGroupMembers(long groupId) {
    return ReceiveStream.create(stream -> {
        long lastId = 0;
        List<User> chunk;
        while ((chunk = loadGroupMembers(groupId, lastId, CHUNK_SIZE)) != null) {
            for (User user : chunk) {
                stream.send(user);
                lastId = user.getId();
            }
        }
        send(null);  // remeber that client expects null as an end-of-stream mark
    });
}

On the server side the stream lifecycle is controlled by one-nio. There is no need to close the stream - it will be closed automatically when lambda execution completes. Up to this moment the connection is considered busy, and the thread processing the remote method is also occupied. Similary to the client I/O, all stream operations are synchronous.

Examples

1. Cache bootstrap (loading data from a replica)

Client side

@RemoteMethod
ReceiveStream<IFullUser> bootstrap();
 
...
 
try (ReceiveStream<IFullUser> stream = cacheClient.bootstrap()) {
    log.info("Bootstrapping from " + cacheClient.getRemoteAddress());
 
    IFullUser user;
    while ((user = stream.receive()) != null) {
        localCache.put(user.getId(), user);
    }
 
    log.info("Loaded " + cacheClient.getBytesRead() / (1024*1024) + "MB");
} catch (Exception e) {
    log.error("Bootstrap failed", e);
}

Server side

ReceiveStream<IFullUser> bootstrap() {
    return ReceiveStream.create(stream -> {
        log.info("Sending data to " + stream.getRemoteAddress());
 
        localCache.forEach(stream::send);
        stream.send(null);
 
        log.info("Sent " + stream.getBytesWritten() / (1024*1024) + "MB");
    });
}

2. Computation of a path length

Client side

@RemoteMethod
BidiStream<GeoPoint, Double> getPathLength();
 
...
 
List<GeoPoint> path = getPath(...);
try (BidiStream<GeoPoint, Double> stream = geoClient.getPathLength()) {
    for (GeoPoint point : path) {
        stream.send(point);
    }
    return stream.sendAndGet(null);
} catch (IOException | ClassNotFoundException e) {
    LoggerUtil.operationFailure(...);
    throw e;
}

Server side

BidiStream<GeoPoint, Double> getPathLength() {
    return BidiStream.create(stream -> {
        double distance = 0;
        GeoPoint current = stream.receive();
        for (GeoPoint next; (next = stream.receive()) != null; current = next) {
            distance += calcDistance(current, next);
        }
        stream.send(distance);
    });
}