日志埋坑优化实操记录

目前线上日志处理流为:
一条日志先写文件->生产者写redis list->消费者读redsi list->写MySQL/ES
当初这样设计主要是为了:

但是这套方案下来,生产速度稍微大点出现消费速度慢的问题,观察日志后发现,一条日志 处理要起码7次网络io,pop 的消费速度最高80/s,还不如让消费者直接写ES,通过打耗时日志,发现比较显著的问题:

针对读写redis慢的问题 #

后面怀疑是不是go的代码有问题,同样的redis cluster用python试了一下,push的速度很快符合redis的毫秒级操作, 试着更换go的redis代码包后发现,在golang中对redis进行push/pop慢(单次耗时达50ms+)的原因在于这个redis client的v6版本包的cluster代码有问题,不过使用单例时速度倒正常(1ms以下),后面通过升级到v8版本后,测试发现可以解决,而v8版本对应线上的redis 6不会有问题.

针对入库慢导致消费慢问题 #

日志的消费入库路径为 list ->ES ->MySQL ->redis
发现这个瓶颈之后,在消费端使用golang channel进行缓存到内存优化, 把写库慢的MySQL操作放到channel中进行异步入库,先写ES,这样可以保证数据最快入库(这个方案可能在程序突然推出时出事丢失日志的情况,但可以通过 在生产端从日志文件中重写日志到队列中恢复,日志重跑不影响数据的幂等性);

或者使用简单又能满足当前迸发大小的方案, 生产者直接写ES, 后面需要的统计数据 离线定时计算(这个改动较大,出事故风险大,没实践)

最终优化后对不同入库速率50,100,150,200,300进行测试后,发现入库速率200/s左右为比较理想的入库速率,为了防止写库过于频繁错误产生,特意用uber的ratelimit进行限流入库

写数据的问题的一些思考总结: #

针对批处理读写问题 #

这里实现了一个golang的批处理代码:从channel中消费数据,数据量达到阈值或者到达时间超时阈值时,进行数据的批处理执行

package batcher

import (
	"time"

	"github.com/linnv/logx"
)

type Element string

func BatchJobs(input <-chan Element, exit <-chan struct{}, maxItems int, maxTimeout time.Duration) (batch []Element) {
	for {
		for keepGoing := true; keepGoing; {
			expire := time.After(maxTimeout)
			for {
				select {
				case onelog, ok := <-input:
					//empty or closed-channel
					if !ok {
						logx.Debugf("closed input\n")
						keepGoing = false
						goto done
					}

					if onelog == "" {
						continue
					}

					batch = append(batch, onelog)
					if len(batch) == maxItems {
						logx.Debugf("full items %d batchjobs persist now\n", maxItems)
						goto done
					}

				case <-expire:
					logx.Debugf("expire batchjobs persist now %d item(s)\n", len(batch))
					goto done
				}

			}

		done:
			if !keepGoing {
				select {
				case <-exit:
					logx.Debugf("exit batchjobs\n")
					return
				default:
				}
			}
			logx.Debugf("need to handle batchjobs: %+v\n", len(batch))
			return
		}
	}
}

func BatchJobsWithExecutor(input <-chan Element, exit <-chan struct{}, maxItems int, maxTimeout time.Duration, executor func(doBatchers []Element) error) {
	for {
		for keepGoing := true; keepGoing; {
			expire := time.After(maxTimeout)
			var batch []Element
			for {
				select {
				case onelog, ok := <-input:
					//empty or closed-channel
					if !ok {
						logx.Debugf("closed input do batcher and done\n")
						keepGoing = false
						goto done
					}

					if onelog == "" {
						continue
					}

					batch = append(batch, onelog)
					if len(batch) == maxItems {
						logx.Debugf("full items %d batchjobs persist now\n", maxItems)
						goto done
					}

				case <-expire:
					logx.Debugf("expire batchjobs persist now %d item(s)\n", len(batch))
					goto done
				}
			}

		done:
			if !keepGoing {
				select {
				case <-exit:
					logx.Debugf("exit batchjobs\n")
					return
				default:
				}
			}
			logx.Debugf("need to handle batchjobs: %+v\n", len(batch))
			if err := executor(batch); err != nil {
				logx.Warnf("won' retry on err: %+v\n", err)
				// return
			}
		}
	}
}

2022-07-20