13.10. Triple 协议

Triple 协议是 Dubbo3 的主力协议,完整兼容 gRPC over HTTP/2,并在协议层面扩展了负载均衡和流量控制相关机制。本文档旨在指导用户正确的使用 Triple 协议。

在开始前,需要决定服务使用的序列化方式,如果为新服务,推荐使用 protobuf 作为默认序列化,在性能和跨语言上的效果都会更好。如果是原有服务想进行协议升级,Triple 协议也已经支持其他序列化方式,如 Hessian / JSON 等

Protobuf

  1. 编写 IDL 文件
    syntax = "proto3";
    option java_multiple_files = true;
    option java_package = "org.apache.dubbo.hello";
    option java_outer_classname = "HelloWorldProto";
    option objc_class_prefix = "HLW";
    package helloworld;
    // The request message containing the user's name.
    message HelloRequest {
      string name = 1;
    }
    // The response message containing the greetings
    message HelloReply {
      string message = 1;
    }
    
  2. 添加编译 Protobuf 的 extension 和 plugin (以 maven 为例)
       <extensions>
                <extension>
                    <groupId>kr.motd.maven</groupId>
                    <artifactId>os-maven-plugin</artifactId>
                    <version>1.6.1</version>
                </extension>
            </extensions>
            <plugins>
                <plugin>
                    <groupId>org.xolstice.maven.plugins</groupId>
                    <artifactId>protobuf-maven-plugin</artifactId>
                    <version>0.6.1</version>
                    <configuration>
                        <protocArtifact>com.google.protobuf:protoc:3.7.1:exe:${os.detected.classifier}</protocArtifact>
                        <pluginId>triple-java</pluginId>
                        <outputDirectory>build/generated/source/proto/main/java</outputDirectory>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>test-compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
    
  3. 构建/ 编译生成 Protobuf Message 类
    $ mvn clean install
    

Unary 方式

  1. 编写 Java 接口
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    public interface IGreeter {
        /**
         * <pre>
         *  Sends a greeting
         * </pre>
         */
        HelloReply sayHello(HelloRequest request);
    }
    
  2. 创建 Provider
        public static void main(String[] args) throws InterruptedException {
            ServiceConfig<IGreeter> service = new ServiceConfig<>();
            service.setInterface(IGreeter.class);
            service.setRef(new IGreeter1Impl());
            // 这里需要显示声明使用的协议为triple 
            service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
            service.setApplication(new ApplicationConfig("demo-provider"));
            service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
            service.export();
            System.out.println("dubbo service started");
            new CountDownLatch(1).await();
        }
    
  3. 创建 Consumer
    public static void main(String[] args) throws IOException {
        ReferenceConfig<IGreeter> ref = new ReferenceConfig<>();
        ref.setInterface(IGreeter.class);
        ref.setCheck(false);
        ref.setInterface(IGreeter.class);
        ref.setCheck(false);
        ref.setProtocol(CommonConstants.TRIPLE);
        ref.setLazy(true);
        ref.setTimeout(100000);
        ref.setApplication(new ApplicationConfig("demo-consumer"));
        ref.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        final IGreeter iGreeter = ref.get();
        System.out.println("dubbo ref started");
        try {
            final HelloReply reply = iGreeter.sayHello(HelloRequest.newBuilder()
                    .setName("name")
                    .build());
            TimeUnit.SECONDS.sleep(1);
            System.out.println("Reply:" + reply);
        } catch (Throwable t) {
            t.printStackTrace();
        }
        System.in.read();
    }
    
  4. 运行 Provider 和 Consumer ,可以看到请求正常返回了

    Reply:message: “name”

stream 方式

  1. 编写 Java 接口
    import org.apache.dubbo.hello.HelloReply;
    import org.apache.dubbo.hello.HelloRequest;
    public interface IGreeter {
        /**
        * <pre>
        *  Sends greeting by stream
        * </pre>
         */
        StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver);
    }
    
  2. 编写实现类
    public class IStreamGreeterImpl implements IStreamGreeter {
        @Override
        public StreamObserver<HelloRequest> sayHello(StreamObserver<HelloReply> replyObserver) {
            return new StreamObserver<HelloRequest>() {
                private List<HelloReply> replyList = new ArrayList<>();
                @Override
                public void onNext(HelloRequest helloRequest) {
                    System.out.println("onNext receive request name:" + helloRequest.getName());
                    replyList.add(HelloReply.newBuilder()
                        .setMessage("receive name:" + helloRequest.getName())
                        .build());
                }
                @Override
                public void onError(Throwable cause) {
                    System.out.println("onError");
                    replyObserver.onError(cause);
                }
                @Override
                public void onCompleted() {
                    System.out.println("onComplete receive request size:" + replyList.size());
                    for (HelloReply reply : replyList) {
                        replyObserver.onNext(reply);
                    }
                    replyObserver.onCompleted();
                }
            };
        }
    }
    
  3. 创建 Provider
    public class StreamProvider {
       public static void main(String[] args) throws InterruptedException {
           ServiceConfig<IStreamGreeter> service = new ServiceConfig<>();
           service.setInterface(IStreamGreeter.class);
           service.setRef(new IStreamGreeterImpl());
           service.setProtocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051));
           service.setApplication(new ApplicationConfig("stream-provider"));
           service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
           service.export();
           System.out.println("dubbo service started");
           new CountDownLatch(1).await();
       }
    }
    
  4. 创建 Consumer
    public class StreamConsumer {
       public static void main(String[] args) throws InterruptedException, IOException {
           ReferenceConfig<IStreamGreeter> ref = new ReferenceConfig<>();
           ref.setInterface(IStreamGreeter.class);
           ref.setCheck(false);
           ref.setProtocol(CommonConstants.TRIPLE);
           ref.setLazy(true);
           ref.setTimeout(100000);
           ref.setApplication(new ApplicationConfig("stream-consumer"));
           ref.setRegistry(new RegistryConfig("zookeeper://mse-6e9fda00-p.zk.mse.aliyuncs.com:2181"));
           final IStreamGreeter iStreamGreeter = ref.get();
           System.out.println("dubbo ref started");
           try {
               StreamObserver<HelloRequest> streamObserver = iStreamGreeter.sayHello(new StreamObserver<HelloReply>() {
                   @Override
                   public void onNext(HelloReply reply) {
                       System.out.println("onNext");
                       System.out.println(reply.getMessage());
                   }
                   @Override
                   public void onError(Throwable throwable) {
                       System.out.println("onError:" + throwable.getMessage());
                   }
                   @Override
                   public void onCompleted() {
                       System.out.println("onCompleted");
                   }
               });
               streamObserver.onNext(HelloRequest.newBuilder()
                   .setName("tony")
                   .build());
               streamObserver.onNext(HelloRequest.newBuilder()
                   .setName("nick")
                   .build());
               streamObserver.onCompleted();
           } catch (Throwable t) {
               t.printStackTrace();
           }
           System.in.read();
       }
    }
    
  5. 运行 Provider 和 Consumer ,可以看到请求正常返回了

    onNext receive name:tony onNext receive name:nick onCompleted

其他序列化方式

省略上文中的 1-3 步,指定 Provider 和 Consumer 使用的协议即可完成协议升级。

示例程序

本文的示例程序可以在 triple-samples 找到