はじめに
JavaのgRPCで、足し算(add)と合計(sum)を計算するサービスを作ってみる。
.
|-- pom.xml
`-- src
|-- main
| |-- java
| | `-- redj
| | `-- grpc
| | |-- CalcClient.java
| | `-- CalcService.java
| `-- proto
| `-- calc.proto
`-- test
`-- java
`-- redj
`-- grpc
|-- GrpcServerRule.java
|-- GrpcTest.java
`-- ManagedChannelRule.java
.protoをコンパイルしてJavaコードを吐き出すプラグインprotobuf-maven-pluginを利用する。
xml version="1.0" encoding="UTF-8"
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlnsxsi="http://www.w3.org/2001/XMLSchema-instance" xsischemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>redj-grpc</groupId>
<artifactId>redj-grpc</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<projectbuildsourceEncoding>UTF-8</projectbuildsourceEncoding>
<mavencompilersource>1.8</mavencompilersource>
<mavencompilertarget>1.8</mavencompilertarget>
<os-maven-pluginversion>1.5.0.Final</os-maven-pluginversion>
<protobuf-maven-pluginversion>0.5.0</protobuf-maven-pluginversion>
<junitversion>4.12</junitversion>
<hamcrestversion>1.3</hamcrestversion>
<grpcversion>1.6.1</grpcversion>
<protobufversion>3.3.0</protobufversion>
</properties>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>${os-maven-plugin.version}</version>
</extension>
</extensions>
<pluginManagement>
<plugins>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>${protobuf-maven-plugin.version}</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
</dependencies>
</project>
src/main/proto/calc.proto
gRPCのIDL。
…というよりは、データシリアライズを担当するProtocol Buffersのメッセージ定義という言い方が近いかも。
ここでは、足し算を行うAddと、合計を計算するSumの2のRPCを定義している、
syntax = "proto3";
option java_multiple_files = true;
option java_package = "redj.grpc.calc";
option java_outer_classname = "CalcProto";
package calc;
// The calc service
service Calc {
// add
rpc Add (AddRequest) returns (IntResponse);
// sum
rpc Sum (SumRequest) returns (IntResponse);
}
// The response containing the single int value.
message IntResponse {
int32 value = 1;
}
// The request containing 2 int operands.
message AddRequest {
int32 x = 1;
int32 y = 2;
}
// The request containing int values.
message SumRequest {
repeated int32 values = 1;
}
これをコンパイルすると、以下のクラスのソースが生成される。
- target/generated-sources/protobuf/java
- redj/grpc/calc/SumRequest.java
- redj/grpc/calc/IntResponseOrBuilder.java
- redj/grpc/calc/IntResponse.java
- redj/grpc/calc/SumRequestOrBuilder.java
- redj/grpc/calc/CalcProto.java
- redj/grpc/calc/AddRequest.java
- redj/grpc/calc/AddRequestOrBuilder.java
- target/generated-sources/protobuf/grpc-java
- redj/grpc/calc/CalcGrpc.java
生成されるソースのパッケージやクラス名は、calc.proto中のoption java_xxxで調整している。
src/main/java/redj/grpc/CalcService.java
足し算と合計を行うサービス。
サービスは、calc.protoから生成されたクラスを利用して実装する。
レスポンスの組み立てと送信が少々野暮ったい。
package redj.grpc;
import io.grpc.stub.StreamObserver;
import java.util.List;
import redj.grpc.calc.AddRequest;
import redj.grpc.calc.CalcGrpc;
import redj.grpc.calc.IntResponse;
import redj.grpc.calc.SumRequest;
public class CalcService extends CalcGrpc.CalcImplBase {
@Override
public void add(AddRequest request, StreamObserver<IntResponse> responseObserver) {
System.out.println("calc add");
IntResponse response = IntResponse
.newBuilder()
.setValue(request.getX() + request.getY())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void sum(SumRequest request, StreamObserver<IntResponse> responseObserver) {
System.out.println("calc sum");
IntResponse response = IntResponse
.newBuilder()
.setValue(sum(request.getValuesList()))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
private int sum(List<Integer> values) {
return values
.stream()
.mapToInt(value -> value)
.sum();
}
}
src/main/java/redj/grpc/CalcClient.java
クライアントも、calc.protoから生成されたクラスを利用して実装する。
package redj.grpc;
import io.grpc.ManagedChannel;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import redj.grpc.calc.AddRequest;
import redj.grpc.calc.CalcGrpc;
import redj.grpc.calc.IntResponse;
import redj.grpc.calc.SumRequest;
public class CalcClient {
private final CalcGrpc.CalcBlockingStub stub;
public CalcClient(ManagedChannel managedChannel) {
this.stub = CalcGrpc.newBlockingStub(managedChannel);
}
public int add(int x, int y) {
AddRequest request = AddRequest
.newBuilder()
.setX(x)
.setY(y)
.build();
IntResponse response = stub.add(request);
return response.getValue();
}
public int sum(int... values) {
SumRequest request = SumRequest
.newBuilder()
.addAllValues(toList(values))
.build();
IntResponse response = stub.sum(request);
return response.getValue();
}
private List<Integer> toList(int... values) {
return Arrays
.stream(values)
.mapToObj(value -> value)
.collect(Collectors.toList());
}
}
src/test/java/redj/grpc/GrpcTest.java
gRPC用の組み込みサーバを立ち上げて、クライアントからアクセスするテスト。
サーバの起動停止はJUnitのRuleの機能を利用して、GrpcServerRuleというクラス内に隠蔽。
同様にサーバへの接続は、ManagedChannelRuleというクラス内に隠蔽。
ただし、複数のRuleの実行順序は未定義なので、RuleChainを利用して実行順序を指定する。
ちなみに、JUnitのRuleとは、@Before/@Afterの代替手段らしい。
たしかに、テストの前処理/後処理をRuleとしてテストクラスから切り離すとテストコードの見通しが良くなるし、他のテストでもRuleが再利用できて便利そう。
package redj.grpc;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
public class GrpcTest {
public GrpcServerRule grpcServerRule = new GrpcServerRule(8080, new CalcService());
public ManagedChannelRule managedChannelRule = new ManagedChannelRule("localhost", 8080);
@Rule
public RuleChain ruleChain = RuleChain
.outerRule(grpcServerRule)
.around(managedChannelRule);
@Test
public void test() throws Exception {
CalcClient calcClient = new CalcClient(managedChannelRule.getManagedChannel());
System.out.println("add: " + calcClient.add(1, 2));
System.out.println("sum: " + calcClient.sum(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
}
src/test/java/redj/grpc/GrpcServerRule.java
サーバの起動/停止を、JUnitのRuleの一つであるTestWatcherを利用して実装する。
package redj.grpc;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
public class GrpcServerRule extends TestWatcher {
private final Server server;
public GrpcServerRule(int port, BindableService... bindableServices) {
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
for (BindableService bindableService : bindableServices) {
serverBuilder.addService(bindableService);
}
this.server = serverBuilder.build();
}
@Override
protected void starting(Description description) {
System.out.println("start the server");
try {
server.start();
} catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
@Override
protected void finished(Description description) {
System.out.println("stop the server");
if (server != null) {
server.shutdownNow();
}
}
}
src/test/java/redj/grpc/ManagedChannelRule.java
クライアントとサーバの接続/切断を、JUnitのRuleの一つであるTestWatcherを利用して実装する。
package redj.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.TimeUnit;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
public class ManagedChannelRule extends TestWatcher {
private final String host;
private final int port;
private ManagedChannel managedChannel;
public ManagedChannelRule(String host, int port) {
this.host = host;
this.port = port;
}
public ManagedChannel getManagedChannel() {
return managedChannel;
}
@Override
protected void starting(Description description) {
System.out.println("connect to the server");
this.managedChannel = ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext(true)
.build();
}
@Override
protected void finished(Description description) {
System.out.println("disconnect from the server");
if (managedChannel != null) {
managedChannel.shutdownNow();
try {
managedChannel.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
}
}
ManagedChannelはそこそこ高機能で、スレッドーセーフだったり、再接続(リトライ?)機能があったりするみたい。
JavaDocを見ても詳しい仕様は読み取れないので、ソースを読まないと詳細は分からなさそう。
ビルドとテスト
ソースコード中のSystem.out.println()が想定通りに出力されていることを確認する。
$ mvn clean test
(略)
-------------------------------------------------------
T E S T S
-------------------------------------------------------
Running redj.grpc.GrpcTest
start the server
connect to the server
calc add
add: 3
calc sum
sum: 55
disconnect from the server
stop the server
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.975 sec
(略)
参考