JavaでgRPC

はじめに

JavaのgRPCで、足し算(add)と合計(sum)を計算するサービスを作ってみる。

ディレクトリ構成

.
|-- pom.xml
`-- src
    |-- main
    |   |-- java
    |   |   `-- redj
    |   |       `-- grpc
    |   |           |-- CalcClient.java
    |   |           `-- CalcService.java
    |   `-- proto
    |       `-- calc.proto
    `-- test
        `-- java
            `-- redj
                `-- grpc
                    |-- GrpcServerRule.java
                    |-- GrpcTest.java
                    `-- ManagedChannelRule.java

ソースコード

pom.xml

.protoをコンパイルしてJavaコードを吐き出すプラグインprotobuf-maven-pluginを利用する。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    
    <groupId>redj-grpc</groupId>
    <artifactId>redj-grpc</artifactId>
    <version>1.0-SNAPSHOT</version>    
    <packaging>jar</packaging>
    
    <properties>
        <!-- configurations -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        
        <!-- plugin versios -->
        <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
        <protobuf-maven-plugin.version>0.5.0</protobuf-maven-plugin.version>
        
        <!-- dependency versions -->
        <junit.version>4.12</junit.version>
        <hamcrest.version>1.3</hamcrest.version>
        <grpc.version>1.6.1</grpc.version>
        <protobuf.version>3.3.0</protobuf.version>
    </properties>
    
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>${os-maven-plugin.version}</version>
            </extension>
        </extensions>

        <pluginManagement>
            <plugins>
            </plugins>
        </pluginManagement>

        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>${protobuf-maven-plugin.version}</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.hamcrest</groupId>
            <artifactId>hamcrest-library</artifactId>
            <version>${hamcrest.version}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty</artifactId>
            <version>${grpc.version}</version>
        </dependency>
            
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>${grpc.version}</version>
        </dependency>
            
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>${grpc.version}</version>
        </dependency>
    </dependencies>
</project>

src/main/proto/calc.proto

gRPCのIDL。 …というよりは、データシリアライズを担当するProtocol Buffersのメッセージ定義という言い方が近いかも。 ここでは、足し算を行うAddと、合計を計算するSumの2のRPCを定義している、

syntax = "proto3";

option java_multiple_files = true;
option java_package = "redj.grpc.calc";
option java_outer_classname = "CalcProto";

package calc;

// The calc service
service Calc {
    // add
    rpc Add (AddRequest) returns (IntResponse);

    // sum
    rpc Sum (SumRequest) returns (IntResponse);
}

// The response containing the single int value.
message IntResponse {
    int32 value = 1;
}

// The request containing 2 int operands.
message AddRequest {
    int32 x = 1;
    int32 y = 2;
}

// The request containing int values.
message SumRequest {
    repeated int32 values = 1;
}

これをコンパイルすると、以下のクラスのソースが生成される。

  • target/generated-sources/protobuf/java
    • redj/grpc/calc/SumRequest.java
    • redj/grpc/calc/IntResponseOrBuilder.java
    • redj/grpc/calc/IntResponse.java
    • redj/grpc/calc/SumRequestOrBuilder.java
    • redj/grpc/calc/CalcProto.java
    • redj/grpc/calc/AddRequest.java
    • redj/grpc/calc/AddRequestOrBuilder.java
  • target/generated-sources/protobuf/grpc-java
    • redj/grpc/calc/CalcGrpc.java

生成されるソースのパッケージやクラス名は、calc.proto中のoption java_xxxで調整している。

src/main/java/redj/grpc/CalcService.java

足し算と合計を行うサービス。 サービスは、calc.protoから生成されたクラスを利用して実装する。 レスポンスの組み立てと送信が少々野暮ったい。

package redj.grpc;

import io.grpc.stub.StreamObserver;
import java.util.List;
import redj.grpc.calc.AddRequest;
import redj.grpc.calc.CalcGrpc;
import redj.grpc.calc.IntResponse;
import redj.grpc.calc.SumRequest;

public class CalcService extends CalcGrpc.CalcImplBase {

    @Override
    public void add(AddRequest request, StreamObserver<IntResponse> responseObserver) {
        System.out.println("calc add");

        IntResponse response = IntResponse
                .newBuilder()
                .setValue(request.getX() + request.getY())
                .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    @Override
    public void sum(SumRequest request, StreamObserver<IntResponse> responseObserver) {
        System.out.println("calc sum");

        IntResponse response = IntResponse
                .newBuilder()
                .setValue(sum(request.getValuesList()))
                .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

    private int sum(List<Integer> values) {
        return values
                .stream()
                .mapToInt(value -> value)
                .sum();
    }
}

src/main/java/redj/grpc/CalcClient.java

クライアントも、calc.protoから生成されたクラスを利用して実装する。

package redj.grpc;

import io.grpc.ManagedChannel;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import redj.grpc.calc.AddRequest;
import redj.grpc.calc.CalcGrpc;
import redj.grpc.calc.IntResponse;
import redj.grpc.calc.SumRequest;

public class CalcClient {

    private final CalcGrpc.CalcBlockingStub stub;

    public CalcClient(ManagedChannel managedChannel) {
        this.stub = CalcGrpc.newBlockingStub(managedChannel);
    }

    public int add(int x, int y) {
        AddRequest request = AddRequest
                .newBuilder()
                .setX(x)
                .setY(y)
                .build();

        IntResponse response = stub.add(request);

        return response.getValue();
    }

    public int sum(int... values) {
        SumRequest request = SumRequest
                .newBuilder()
                .addAllValues(toList(values))
                .build();

        IntResponse response = stub.sum(request);

        return response.getValue();
    }

    private List<Integer> toList(int... values) {
        return Arrays
                .stream(values)
                .mapToObj(value -> value)
                .collect(Collectors.toList());
    }
}

src/test/java/redj/grpc/GrpcTest.java

gRPC用の組み込みサーバを立ち上げて、クライアントからアクセスするテスト。 サーバの起動停止はJUnitのRuleの機能を利用して、GrpcServerRuleというクラス内に隠蔽。 同様にサーバへの接続は、ManagedChannelRuleというクラス内に隠蔽。 ただし、複数のRuleの実行順序は未定義なので、RuleChainを利用して実行順序を指定する。

ちなみに、JUnitのRuleとは、@Before/@Afterの代替手段らしい。 たしかに、テストの前処理/後処理をRuleとしてテストクラスから切り離すとテストコードの見通しが良くなるし、他のテストでもRuleが再利用できて便利そう。

package redj.grpc;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;

public class GrpcTest {

    public GrpcServerRule grpcServerRule = new GrpcServerRule(8080, new CalcService());

    public ManagedChannelRule managedChannelRule = new ManagedChannelRule("localhost", 8080);

    @Rule
    public RuleChain ruleChain = RuleChain
            .outerRule(grpcServerRule)
            .around(managedChannelRule);

    @Test
    public void test() throws Exception {
        CalcClient calcClient = new CalcClient(managedChannelRule.getManagedChannel());
        System.out.println("add: " + calcClient.add(1, 2));
        System.out.println("sum: " + calcClient.sum(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
    }
}

src/test/java/redj/grpc/GrpcServerRule.java

サーバの起動/停止を、JUnitのRuleの一つであるTestWatcherを利用して実装する。

package redj.grpc;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

public class GrpcServerRule extends TestWatcher {

    private final Server server;

    public GrpcServerRule(int port, BindableService... bindableServices) {
        ServerBuilder serverBuilder = ServerBuilder.forPort(port);

        for (BindableService bindableService : bindableServices) {
            serverBuilder.addService(bindableService);
        }

        this.server = serverBuilder.build();
    }

    @Override
    protected void starting(Description description) {
        System.out.println("start the server");

        try {
            server.start();
        } catch (IOException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override
    protected void finished(Description description) {
        System.out.println("stop the server");

        if (server != null) {
            server.shutdownNow();
        }
    }
}

src/test/java/redj/grpc/ManagedChannelRule.java

クライアントとサーバの接続/切断を、JUnitのRuleの一つであるTestWatcherを利用して実装する。

package redj.grpc;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

public class ManagedChannelRule extends TestWatcher {

    private final String host;

    private final int port;

    private ManagedChannel managedChannel;

    public ManagedChannelRule(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public ManagedChannel getManagedChannel() {
        return managedChannel;
    }

    @Override
    protected void starting(Description description) {
        System.out.println("connect to the server");

        this.managedChannel = ManagedChannelBuilder
                .forAddress(host, port)
                .usePlaintext(true)
                .build();
    }

    @Override
    protected void finished(Description description) {
        System.out.println("disconnect from the server");

        if (managedChannel != null) {
            managedChannel.shutdownNow();
            
            try {
                managedChannel.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e.getMessage(), e);
            }
        }
    }
}

ManagedChannelはそこそこ高機能で、スレッドーセーフだったり、再接続(リトライ?)機能があったりするみたい。 JavaDocを見ても詳しい仕様は読み取れないので、ソースを読まないと詳細は分からなさそう。

ビルドとテスト

ソースコード中のSystem.out.println()が想定通りに出力されていることを確認する。

$ mvn clean test
(略)
-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running redj.grpc.GrpcTest
start the server
connect to the server
calc add
add: 3
calc sum
sum: 55
disconnect from the server
stop the server
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.975 sec
(略)

参考