3.4. 事件驱动架构

在讨论事件驱动之前我们先思考上一节服务协同中用户注册例子描述的场景,其对应的用户服务伪代码如下:
// 用户服务
@Transaction
def register(user):Boolean = {
  // 完成注册
  doRegister(user)
  // 调用卡券服务生成新人优惠券
  var sendCouponResult = http.put("/coupon/{user.id}","{'kind':'register'}")
  if(sendCouponResult.error){
   throw sendCouponResult.error
  }
  if(user.inviter){
    // 如果存在邀请人,调用积分服务添加邀请关系
    var sendPointResult = http.put("/point/{user.inviter}",s"{'kind':'register','regUser':'${user.id}'}")
    if(sendPointResult.error){
      throw sendPointResult.error
    }
  }
  true
}

这是最简单直接的调用,从中我们可以发现有两个问题:

  • 服务耦合,从代码中更能看出用户服务除了完成自身操作外还调用要发优惠券、添加邀请关系,即用户服务要感知卡券服务及积分服务,彼此耦合了
  • 性能损失,存在服务间调用,注册需要等待其它服务返回,接口响应时间受限于卡券服务及积分服务的性能
  • 可用性降低,如果卡券服务或积分服务宕机则注册服务也会不可用,但业务流程上只要注册本身成功即可视为注册服务可用,发优惠券及奖励邀请人不是核心,如果后两者有问题可以执行一定的灾后补偿方案(详见后续章节的降级处理)

服务耦合上我们可以按上节逻辑引入“活动服务”以聚合发卡券和奖励邀请人,如:

// 用户服务
@Transaction
def register(user):Boolean = {
  // 完成注册
  doRegister(user)
  // 调用活动服务完成发卡券和奖励邀请人
  var sendPromotionResult = http.put("/promotion/{user.id}",s"{'kind':'register','inviter':'${user.inviter}'}")
  if(sendPromotionResult.error){
    throw sendPromotionResult.error
  }
  true
}

这一做法有一定程度上缓解了用户服务与其关联服务的耦合,但也未实现真正的解耦,并且任然存在性能及可用性问题。

性能及可用性的问题我们可以简单地这样处理:

// 用户服务
@Transaction
def register(user):Boolean = {
  // 完成注册
  doRegister(user)
  async{ // 使用异步代码块
      // 异步调用活动服务完成发卡券和奖励邀请人
      asyncHttp
      .put("/promotion/{user.id}",s"{'kind':'register','inviter':'${user.inviter}'}")
      .onSuccess(result => log.info("活动服务处理成功"))
      .onFailure(result => log.error("活动服务处理失败"))
  }
  true
}

我们将非核心逻辑放到新的线程中执行,用户注册在完成doRegister后即可返回,这可能也是我们比较常见的做法。要注意的是异步代码块中的IO操作也都要异步化,否则如果活动服务或其依赖的卡券、积分服务下线了或因代码bug导致异常,那会造成请求堆积,非异步化下会导致线程堆积,大量地消耗CPU与内存,从而拖垮核心功能。

❓ 使用有界队列的线程池(如Java的ArrayBlockingQueue)是否可以解决同步IO线程堆积导致拖垮核心功能的问题?答案是可以的,但带来的问题是线程到达临界值后无法再添加任务进而导致逻辑不被执行。

这一做法存在的问题是对活动服务还是有依赖,如果活动服务宕机那将无法处理发卡券和加积分。另外有些场景下会要求注册成功后通知其它业务系统以进行用户同步之类的需求,越多的调用需求导致越多的耦合,所以这时我们就会想到用MQ,加上MQ后我们的代码变会成这样:

// 用户服务
@Transaction
def register(user){
  // 完成注册
  doRegister(user)
  // 发送用户注册成功事件
  mq.publish("user.register.success",user)
}

我们在用户注册完成后发送用户注册成功事件,此时用户服务不知道也不需要知道哪些服务会消费这个事件,这样就完成了服务间的解耦,同时我们也不用担心接口性能受限三方服务的问题,而且这也完美地避开了请求重试导致数据一致性问题,以RabbitMQ为例,它可以实现“刚好一次消费(Exactly once)”的交付保证。

🔆 MQ的delivery guarantee(交付保证)一般有如下三个,不同的MQ实现支持程度不同:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least once 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,此方式对性能会有一定的影响,且支持的MQ有限

Exactly once推荐在业务代码中实现,这会比MQ中的保证更灵活。我们可以通过自行设计的幂等处理在At least once的基础上去重从而实现Exactly once,幂等处理后续章节会介绍。

细心的读者一定会想到这样一个问题:如果三方服务消费失败怎么回滚?这是个好问题,只是对于我们用户注册这个场景是不存在,因为只要注册成功,发优惠券及奖励邀请人是业务上要求必须成功的,即使失败也不应该导致注册失败。但这种场景是特例,引入MQ解耦后只有解决消费方错误可以引发生产方回滚才能有更大的适用场景。这个问题我在接下来的分布式事务处理上再展开讨论。

我通过上面的例子可以发现,使用MQ后可以更好地解耦、异步化后可以更好的提升性能,而这正是事件驱动架构(EDA)的优势所在。

当然并不是引入MQ就是EDA了,后者还需要很多的特性支持,比如核心CQRS、Domain Event、Event Sourcing等。EDA是个很大的话题,笔者坦言在这块上个人经验有限,无法展开叙述,当然这也不是本文的重点,之所以写这个章节一方面是让读者了解EDA中最核心的消息系统有什么优势,在很多场景下我们应该优先考虑使用MQ,另一方面EDA虽不为主流微服务架构采用,但它却是微服务不可忽视的实现方案,抛砖引玉,有兴趣的读者可以自行查阅相关文档。

🔆 事件驱动架构(Event-Driven Architecture)与SOA一样是一种软件架构风格。相较而言,SOA关注的是静态架构,而EDA关注的是动态的、数据流的架构。 详见此处 ❓ EDA与服务编制

前面我们讲了微服务以服务协同的实现为主,但从本节分析EDA更像是服务编制的一种实现。的确,我们更可以把MQ看成ESB的一变体,所以说没有绝对好差的方案,我们更应该以辨证的态度看待技术。

MQ的使用场景是什么?在不同场合下笔者看到好几次就这个问题展开的讨论。对这个问题我们首先应该肯定的是MQ或是事件驱动肯定是更优雅的方案,但它也存在一定的局限,比如几个核心限制:

  • 性能与可用性,MQ虽然用异步化解决了非核心流程对核心活动的性能及可用性影响,但这又受限于MQ自身的性能及可用性表现,并且MQ是相对中心化的服务,可用性会成为系统的瓶颈,所以有朋友认为MQ的引入无非是将风险从业务服务转嫁给了MQ,当然这个说法有些偏激
  • 复杂,MQ虽然可以完美地解耦各个服务,但过多的异步化调用会让业务流程变得复杂,可读性差
  • 请求-响应模式支持有限,主流的MQ只有RabbitMQ支持RPC方式的请求-响应模式,其它MQ需要自己手工实现,并且引入MQ后这种请求-响应的模式会有比较大的性能损失

了解到这几个限制后我们可以对照下系统的需求,如果我们的系统对性能及可用性有着很高的要求,比如TPS几百万那么可能要慎重评估,否则市场上的MQ有侧重性能的Kafka、侧重一致性的RabbitMQ以及比较均衡的RocketMQ,还有ZeroMQ、Hazelcast等MQ的变种,一般情况下都可以找到适合需求的MQ,引入MQ后调用的复杂度问题是对架构及研发团队的考验,架构上要明确服务间调用,研发过程中有文档跟进,运维时要重点关注MQ的情况,对于普遍的请求-响应模式这不是MQ的强项,不要生搬硬套。

一般情况下我们都会引入MQ,笔者建议是在团队能力及工期允许的范围内尽可能地将大部分场景使用MQ实现服务调用异步化,并且如果存在如下场景那一定要用事件解耦:

  • 一个服务内存在核心与非核心调用,非核心调用的失败不影响核心调用且非核心调用的性能、可用性并不能保证,比如上文的注册场景
  • 请求耗时长且请求方可以接受回调方式,比如风控请求场景
  • 需要高可用保证,确保一致性的情况,比如支付请求

以车贷通为例,我们在风控系统、公共服务层的各个服务中大量应用了事件解耦。

下一节:在前几节我们确定了服务的划分、通讯协议的选择及接口的设计等,那接下我们考虑这样一个问题:我们将车贷通拆分成了30个左右的服务,这些服务怎么调用呢?