nsq源码阅读(5)-至少被成功投递一次的实现
保证只被成功投递一次?
上次阅读了 Message 投递的整个数据流程(正常情况下的流程); 阅读中, 心中的一个疑问还是没有解开, 就是 “消息如何保证 被 且 只被 成功投递一次?”; 并且粗略阅读过程中, 发现消息被发送给Client之前, 会被塞到另外一个 InFlight 队列中, 更加疑惑, 既然消息已经做过了发送, 为什么还要塞到另外一个队列里?
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
for {
select {
case msg := <-memoryMsgChan:
//TODO 这个是什么作用? 下面不是已经发送了? 怎么又塞到一个队列里?
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg, &buf)
}
}
}
查看nsq 的文档, 在 FEATURES & GUARANTEES 一章, 列出了 4个 保证:
- messages are not durable (by default)
- messages are delivered at least once
- messages received are un-ordered
- consumers eventually find all topic producers
好嘛… nsq 并没有保证 “仅一次”, 只保证了 “至少一次”
至少一次的实现
这个函数叫做channle.StartInFlightTimeout(), 根据字面意思理解, InFlight
, 开始”投递了”, InFlightTimeout
投递超时, StartInFlightTimeout
开始一个(防止)投递超时的(动作)
看下函数实现
func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
// msg的 优先级是 超时的时间戳
msg.pri = now.Add(timeout).UnixNano()
//放到缓存, 按msg.ID, 记录一下, 估计方便之后查找: c.inFlightMessages[msg.ID]
err := c.pushInFlightMessage(msg)
if err != nil {
return err
}
//放到一个叫 Inflight的队列里
c.addToInFlightPQ(msg)
return nil
}
函数里, “至少一次” 如何实现, 并没有很清楚, 只不过又塞到另一个InFlight队列里;这里插入队列, 所以还是查找这个队列在哪里被”读”, 找到在 channel.processInFlightQueue()
这个函数的功能是 把 InFlightQueue 里, 优先级 小于 参数 t 的, 全部重新发送
//把 InFlightQueue 里, 优先级 小于 参数 t 的, 全部重新发送
func (c *Channel) processInFlightQueue(t int64) bool {
// 先 检查是否已经 退出
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.inFlightMutex.Lock()
// 如果 栈顶元素的优先级 小于参数
// 弹出 栈顶元素并返回
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
//如果 栈顶 元素的优先级 大于参数, 返回 nil
if msg == nil {
// 没有大于 指定参数优先级的元素, 什么也不做, 返回 deirty = false
goto exit
}
//标记是 "脏" 的
dirty = true
//把之前存起来的 msg 取出来
//TODO: inFlightPQ 里存的不就是msg 么, 怎么又存一次?
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
//找出这个msg 原来是发给哪个client 发的
//通知它这个msg timeout了, nsqd 要重发了
client.TimedOutMessage()
}
// 重发, 重新塞入队列: channel.memoryMsgChan <- m:
c.doRequeue(msg)
} //循环直到队列里 没有满足条件的
exit:
return dirty
}
看来, 这个消息 “至少被投递一次” 的保证实现的有点粗暴? 直接起个延时, 超时了就重新发送?
再网上追追, processInFlightQueue 的调用在哪里:
// 这里应该是 "生产者/消费者" 模式
// 这个函数是 队列扫描 "消费者", 消费的是 扫描 channel InFlightQueue 和 DeferredQueue 的任务
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
// InFlightQueue 是 container/head 包实现的一个优先级队列, 队列的顶部的优先级最小
// head 是一个 堆数据结构, 优先级队列 是一个 小根堆
// TODO: 堆算法 https://idisfkj.github.io/2016/06/19/%E7%AE%97%E6%B3%95-%E4%B8%83-%E5%A0%86%E6%8E%92%E5%BA%8F/
// 然后将这个 优先级队列做了一个转换, 当做 "超时队列" 来用,
// 具体办法是将 超时时间 作为优先级
// 那么, 队列的顶端的任务的 "优先级最小", 也就是它 应该"最早超时"
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
// DeferredQueue 也是一个优先级队列
// 然后同样将这个 优先级队列 转换为 延时队列 来使用
// 将 任务"触发(发动)时间" 当做优先级, 放到队列里
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
//注意这个地方, 跟 之前的close(exitChan) 用法不同
//这里是启动多个worker, 然后当判断worker太多了, 需要关闭一个多余的worker时
//给 closeCh <- 1 发个消息, 利用golang chan 随机分发的特性
//这样就会随机的关闭掉一个 worker, 也就是随机退出一个 queueScanWorker 的 循环
case <-closeCh:
return
}
}
}
把 InFlightQueue 的 写入和读取结合起来看:
//写入
msg.pri = now.Add(timeout).UnixNano() // now + timetou 作为 优先级
c.addToInFlightPQ(msg)
//读取
now := time.Now().UnixNano() //跟 当前实现比较, 也就是比较是否超时
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
现在重新看一下 processInFlightQueue(), 就明白 这个函数的 整个作用是: 把 InFlightQueue 里, 超时的 msg 全部重新发送出去
//把 InFlightQueue 里, 超时的 msg 全部重新发送出去
func (c *Channel) processInFlightQueue(t int64) bool {
dirty := false
for {
// t 是当前时间 now, 假如 inFlightPQ 顶端的优先级(也就是超时时间) 小于 now
// 那返回的msg 就是 "超时" 了
msg, _ := c.inFlightPQ.PeekAndShift(t)
//标记是 "脏" 的
dirty = true
// 重发, 重新塞入队列: channel.memoryMsgChan <- m:
c.doRequeue(msg)
}
}
queueScanWorker
现在已经知道了, “至少投递一次” 这个保证是由 queueScanWorker 不断的扫描 InFlightQueue 实现的, 之前猜测 queueScanWorker 用了 “生产者/消费者模式”, 所以再来看下 queueScanWorker 的启动
nsqd 启动的时候就 起了一个 queueScanLoop 线程, 间隔性的派发 scan 任务, 并适时调整 worker 的数量
func (n *NSQD) Main() {
n.waitGroup.Wrap(func() {
n.queueScanLoop()
})
}
func (n *NSQD) queueScanLoop() {
//任务派发 队列
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
//任务结果 队列
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
// 用来优雅关闭
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
// 确切一点, 这里应该叫 resizeWorkerPool
// 就是调整 queue scan 任务的 worker 的数量
// 当然, 一开始就是启动一些 worker
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C: // 开始一次任务的派发
if len(channels) == 0 {
continue
}
case <-refreshTicker.C: // 重新调整 worker 数量
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan: // 退出
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
// 随机取出几个 channel, 派发给 worker 进行 扫描
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
// 接收 扫描结果, 统一 有多少 channel 是 "脏" 的
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
// 假如 "脏" 的 "比例" 大于阀值, 则不等待 workTicker
// 马上进行下一轮 扫描
if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf("QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
// 设置 queueScanWorker 的 数量 为 当前 nsqd 所有channel 个数的 1/4
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
//queueScanWorker 多了就减少一个, 少了就增加一个
//一直循环, 直到 queueScanWorker 的数量满足要求
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// queueScanWorker 多了, 减少一个
// 利用 chan 的特性, 向closeCh 推一个消息, 这样 所有的 worCh 就会随机有一个收到这个消息, 然后关闭
// 细节: 这里跟 exitCh 的用法不同, exitCh 是要告知 "所有的" looper 退出, 所以使用的是 close(exitCh) 的用法
// 而如果想 让其中 一个 退出, 则使用 exitCh <- 1 的用法
closeCh <- 1
n.poolSize--
} else {
// queueScanWorker 少了, 增加一个
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
Message 的”完成”
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
switch {
// 当一个客户端成功接收到msg 并处理完成, 按照协议会向nsqd 发送一个 "FIN" 命令通知nsqd, 这时候nsqd 会将这条msg 从InFlight队列中删除
case bytes.Equal(params[0], []byte("FIN")):
return p.FIN(client, params)
}
func (p *protocolV2) FIN(client *clientV2, params [][]byte) ([]byte, error) {
id, err := getMessageID(params[1])
err = client.Channel.FinishMessage(client.ID, *id)
// 做些 client 计数
client.FinishedMessage()
return nil, nil
}
func (c *Channel) FinishMessage(clientID int64, id MessageID) error {
//删除 inFlightMessages 缓存
msg, err := c.popInFlightMessage(clientID, id)
if err != nil {
return err
}
// 从超时队列中删除
c.removeFromInFlightPQ(msg)
if c.e2eProcessingLatencyStream != nil {
c.e2eProcessingLatencyStream.Insert(msg.Timestamp)
}
return nil
}
nsqd 收到客户端发来的”FIN” 只会, 就会从超时队列中删除这条msg; 这个时候, 一条msg 在nsqd 中的流转完成
细节1: 减少类型推断, 提高性能
InFlightQueue 和 DeferredQueue 为什么要实现两次? DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?
之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.
https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53
意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能
写个test试一下
type Item struct {
d1 int
d2 int
}
func BenchmarkT1(b *testing.B) {
q := make([]*Item, 0) // 不需要类型推断的 slice
for i := 0; i < b.N; i++ {
q = append(q, &Item{i, i})
}
for _, hero := range q {
hero.d1++
}
}
func BenchmarkT2(b *testing.B) {
q := make([]interface{}, 0)
for i := 0; i < b.N; i++ {
q = append(q, &Item{i, i})
}
for _, hero := range q {
hero.(*Item).d1++ // 需要做类型推断
}
}
结果还是很明显, 做类型推断的更耗时, 并且有50%的多余负载:
BenchmarkT1-8 10000000 241 ns/op
BenchmarkT2-8 5000000 332 ns/op