网络知识 娱乐 一天,把 Pulsar 客户端的性能提升3倍+!

一天,把 Pulsar 客户端的性能提升3倍+!

导读:大佬希望我帮忙排查一下一个性能问题,我⼀听内心当然是拒绝的啦,这种 Commit 挣得太难了,修个 NPE 之类的不香吗? 于是我拿起手机回复大佬:好的,我看⼀下。

作者介绍

林琳

腾讯云高级工程师

活跃于开源社区,Apache Pulsar Commiter

专注于中间件领域,在消息队列和微服务方向具有丰富的经验,负责 CKafka/CMQ/移动开发平台 的后端设计于开发工作,目前致力于打造稳定、高效和可扩展的基础组件与服务

故事的开端

话说,这个事情已经过去很久了,应该是在2020年的10月份,今天就拿出来炒一下冷饭吧。当时业务赶着上线,国庆节连续肝了6天,最后⼀天我想休息⼀下,于是像咸鱼一样躺在床上,如下所示:

突然 Pulsar 社区的大佬找我,希望我帮忙排查一下一个性能问题:Pulsar 的客户端消费分为 ConsumerImpl 和 MultiTopicsConsumerImpl 两种。

通常情况下,当⼀个 Topic 只有⼀个分区的时候,Builder 只会创建 ConsumerImpl,每个 ConsumerImpl 能连接到⼀个 Partition 消费。

当⼀个 Topic 有多个 Partition 的时候,则 Builder 会创建 MultiTopicsConsumerImpl,每个 MultiTopicsConsumerImpl 包含了多个 ConsumerImpl,即它会为每个 Partition 创建⼀个 ConsumerImpl。

多个 ConsumerImpl 各自接收消息,最终所有消息会汇总进 MultiTopicsConsumerImpl 的队列里,供业务使用。如下所示:

按道理,这么多个 ConsumerImpl ⼀起接收消息,又是多个 Partition 并行拉数据, MultiTopicsConsumerImpl 的性能应该远远超过单个 ConsumerImpl 才对。然而,现实是 MultiTopicsConsumerImpl 的性能只有 Consumer 的⼀半。

我⼀听,内心当然是拒绝的啦,这种 Commit 挣得太难了,修个 NPE 之类的不香吗?

于是我拿起手机回复大佬:好的,我看⼀下。

问题排查

要排查这个问题,首先我得有个 Pulsar 集群呀。于是我找了3台虚拟机,开始部署⼀个集群,然后......

(此处省略1W字)

最后,我用 Pulsar 自带的 perf 工具开始分别模拟单个 ConsumerImpl 和 MultiTopicsConsumerImpl 的消费,测试环境的配置如下:

  • 3台8核16G机器
  • Pulsar 的 Topic 创建4个 Partition
  • 消费时间2分钟
  • 使用 Pulsar 自带的 perf 工具,MultiTopicsConsumerImpl 的测试命令:
bin/pulsar-perf consume -u 'http://x.x.x.x:8080' -s my-sub-6 -sp Earliest -q
100000 persistent://public/default/p-topic

第⼀次的测试结果出来后,我发现大佬有点太乐观,因为 MultiTopicsConsumerImpl 的性能根本就没有 Consumer 的⼀半,情况比估计的还要差,只有七分之一左右。

MultiTopicsConsumerImpl 的性能结果:

Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s --- 537.605 Mbit/s

ConsumerImpl 的性能结果:

Aggregated throughput stats --- 78403434 records received --- 462640.204 msg/s --- 3614.377 Mbit/s

看完这个结果,我感觉这个起点很低,有巨大的上升空间。首先,我们需要知道时间都去哪了,于是我拉了⼀个火焰图看看CPU时间的消耗。

初步排查,发现业务线程占用了总体时间的 40.65%,其中 MessageReceived 里占用的总时间的 14%, 另外可重入的读写锁占用 8.22%。而 MessageReceived 里,其实也是锁占了很大的比重。锁差不多占用了 20% 的时间。于是,第⼀个优化方向就出现了 —— 去锁

去锁的方向主要有以下几个:

  1. 利用现有线程安全的 BlockingQueue,不再重复加锁。
  2. 降低锁获取频率。对于无法消除的锁,通过前置判断降低最终锁获取的频率。
  3. 修改逻辑实现方式,去除明显可以移除的锁。
  4. 都多写少的地方使用读写锁替换可重入锁。

移除后,重新跑性能测试,发现性能有了明显的提高,感觉公屏上飘过的都是 666:

//优化前
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s -
-- 537.605 Mbit/s
 
//优化后
Aggregated throughput stats --- 25062077 records received --- 161656.814 msg/s
--- 1262.944 Mbit/s

还有有些地方强制去锁,对现有的⼀些逻辑有小变动,被拒绝了。例如有这样的⼀段代码:

//忽略掉加锁的部分
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) { Message<T> msg = incomingMessages.poll();
if (msg != null) { decreaseIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg); messages.add(interceptMsg);
}
msgPeeked = incomingMessages.peek();
}
result.complete(messages);
//忽略掉释放锁的部分

这段代码的逻辑大意是,先看看队列里第⼀条消息能不能放进集合里,可以就出队并放入。

我把这段 while 改为 do-while ,即不加锁,直接出队往集合里塞,直到塞完后的大小超过了,则结束。这种方式把先前的严格小于变为最多超过集合容量1条消息的大小,但是能借助本身就是线程安全的 BlockingQueue 实现去锁。但是由于行为有变化,CR 没有通过,只能还原回去。

继续回到刚才的火焰图,我们发现除了锁占用了很多的 CPU,Netty 相关的 API 也占用了不少,⼀共有 12.63%

主要是 AbstractEventExecutorGroup,也就是我们常见的 EventLoopGroup 消耗了大量的 CPU 资源。

Pulsar 中几乎所有的操作都是异步的,大量使用了 Java8 里的 CompletableFuture ,但是为什么会有这么多的 EventLoop 呢。看代码发现,Pulsar 里面为了实现异步延迟+循环拉取消息,又为了避免循环调用自己出现栈溢出,使用 Netty 的 EventLoop 作为线程池。

起初我以为是 EventLoop 处理请求比较繁忙引起的,顺着堆栈找到对应的代码,发现并不是。

Netty 的 EventLoop 采用了生产-消费模型,添加任务的线程如果是当前线程,则自己就消费掉了,没有唤醒动作。但是如果使用了 EpollEventLoop 并且添加任务的线程与处理线程不是同⼀个,生产线程会唤醒消费线程来处理任务,进而触发系统调用:

Native.eventFdWrite(this.eventFd.intValue(), 1L);

这个处理起来就比较简单了,如果能避免出现频繁的系统调用就能提升性能,直接使用Java自带的 ThreadPoolExecutor 替换掉就好了,ThreadPoolExecutor 设置使用 BlockingQueue 作为任务队列,性能比每次调用 eventFdWrite 要好。

为了对比单次 EventLoop 优化的效果,因此并没有在移除锁的基础上做,而是单独拉了⼀个分支。前后性能效果的对比如下:

//优化前
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s -
-- 537.605 Mbit/s
 
//优化后
Aggregated throughput stats --- 18392800 records received --- 133314.602 msg/s
--- 1041.520 Mbit/s

剧终

去锁 + EventLoop 一共提升了近4倍的性能:


//MultiTopicsConsumerImpl优化前
Aggregated throughput stats --- 11715556 records received --- 68813.420 msg/s -
-- 537.605 Mbit/s
//MultiTopicsConsumerImpl优化后
Aggregated throughput stats --- 40140549 records received --- 275927.749 msg/s
--- 2155.686 Mbit/s
//ConsumerImpl
Aggregated throughput stats --- 78403434 records received --- 462640.204 msg/s
--- 3614.377 Mbit/s

最终的火焰图如下:

虽然整个优化的过程比较简单,技术含量高,但由于起点比较低,所以优化的效果还是很好的。最终性能只有单个 ConsumerImpl 的 50% 左右,因此还有继续提升的空间。这次的优化是基于已有的架构, 仅对实现做了调整,如果我们尝试对架构进行微调可以有更多的提升,欢迎小伙伴们⼀起来优化。

往期推荐

《超有料!万字详解腾讯微服务平台 TSF 的敏捷开发流程》

《火速围观!鹅厂中间件产品遭遇暴风吐槽!》

《看这里!鹅厂大佬深度解析 Apache Pulsar 五大应用场景》

扫描下方二维码关注本公众号,了解更多微服务、消息队列的相关信息!解锁超多鹅厂周边!