Triple 协议是 Dubbo3 的主力协议,完整兼容 gRPC over HTTP/2,并在协议层面扩展了负载均衡和流量控制相关机制。本文档旨在指导用户正确的使用 Triple 协议。
在开始前,需要决定服务使用的序列化方式,如果为新服务,推荐使用 protobuf 作为默认序列化,在性能和跨语言上的效果都会更好。如果是原有服务想进行协议升级,Triple 协议也已经支持其他序列化方式,如 Hessian / JSON 等
Protobuf
- 编写 IDL 文件
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.apache.dubbo.hello"; option java_outer_classname = "HelloWorldProto"; option objc_class_prefix = "HLW"; package helloworld; // The request message containing the user's name. message HelloRequest { string name = 1; } // The response message containing the greetings message HelloReply { string message = 1; }
- 添加编译 Protobuf 的 extension 和 plugin (以 maven 为例)
<extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact> <pluginId>triple-java</pluginId> <outputDirectory>build/generated/source/proto/main/java</outputDirectory> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>test-compile</goal> </goals> </execution> </executions> </plugin> </plugins>
- 构建/ 编译生成 Protobuf Message 类
$ mvn clean install
Unary 方式
- 编写 Java 接口
import org.apache.dubbo.hello.HelloReply; import org.apache.dubbo.hello.HelloRequest; public interface IGreeter { /** * <pre> * Sends a greeting * </pre> */ HelloReply sayHello(HelloRequest request); }
- 创建 Provider
public static void main(String[] args) throws InterruptedException { ServiceConfig<IGreeter> service = new ServiceConfig<>(); service.setInterface(IGreeter.class); service.setRef(new IGreeter1Impl()); // 这里需要显示声明使用的协议为triple service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)); service.setApplication(new ApplicationConfig("demo-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); }
- 创建 Consumer
public static void main(String[] args) throws IOException { ReferenceConfig<IGreeter> ref = new ReferenceConfig<>(); ref.setInterface(IGreeter.class); ref.setCheck(false); ref.setInterface(IGreeter.class); ref.setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref.setLazy(true); ref.setTimeout(100000); ref.setApplication(new ApplicationConfig("demo-consumer")); ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); final IGreeter iGreeter = ref.get(); System.out.println("dubbo ref started"); try { final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder() .setName("name") .build()); TimeUnit.SECONDS.sleep(1); System.out.println("Reply:" + reply); } catch (Throwable t) { t.printStackTrace(); } System.in.read(); }
- 运行 Provider 和 Consumer ,可以看到请求正常返回了
Reply:message: “name”
stream 方式
- 编写 Java 接口
import org.apache.dubbo.hello.HelloReply; import org.apache.dubbo.hello.HelloRequest; public interface IGreeter { /** * <pre> * Sends greeting by stream * </pre> */ StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver); }
- 编写实现类
public class IStreamGreeterImpl implements IStreamGreeter { @Override public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) { return new StreamObserver<HelloRequest>() { private List<HelloReply> replyList = new ArrayList<>(); @Override public void onNext(HelloRequest helloRequest) { System.out.println("onNext receive request name:" + helloRequest.getName()); replyList.add(HelloReply.newBuilder() .setMessage("receive name:" + helloRequest.getName()) .build()); } @Override public void onError(Throwable cause) { System.out.println("onError"); replyObserver.onError(cause); } @Override public void onCompleted() { System.out.println("onComplete receive request size:" + replyList.size()); for (HelloReply reply : replyList) { replyObserver.onNext(reply); } replyObserver.onCompleted(); } }; } }
- 创建 Provider
public class StreamProvider { public static void main(String[] args) throws InterruptedException { ServiceConfig<IStreamGreeter> service = new ServiceConfig<>(); service.setInterface(IStreamGreeter.class); service.setRef(new IStreamGreeterImpl()); service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051)); service.setApplication(new ApplicationConfig("stream-provider")); service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181")); service.export(); System.out.println("dubbo service started"); new CountDownLatch(1).await(); } }
- 创建 Consumer
public class StreamConsumer { public static void main(String[] args) throws InterruptedException, IOException { ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>(); ref.setInterface(IStreamGreeter.class); ref.setCheck(false); ref.setProtocol(CommonConstants.TRIPLE); ref.setLazy(true); ref.setTimeout(100000); ref.setApplication(new ApplicationConfig("stream-consumer")); ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181")); final IStreamGreeter iStreamGreeter = ref.get(); System.out.println("dubbo ref started"); try { StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() { @Override public void onNext(HelloReply reply) { System.out.println("onNext"); System.out.println(reply.getMessage()); } @Override public void onError(Throwable throwable) { System.out.println("onError:" + throwable.getMessage()); } @Override public void onCompleted() { System.out.println("onCompleted"); } }); streamObserver.onNext(HelloRequest.newBuilder() .setName("tony") .build()); streamObserver.onNext(HelloRequest.newBuilder() .setName("nick") .build()); streamObserver.onCompleted(); } catch (Throwable t) { t.printStackTrace(); } System.in.read(); } }
- 运行 Provider 和 Consumer ,可以看到请求正常返回了
onNext receive name:tony onNext receive name:nick onCompleted
其他序列化方式
省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的协议即可完成协议升级。
示例程序
本文的示例程序可以在 triple-samples 找到