目前线上日志处理流为:
一条日志先写文件->生产者写redis list->消费者读redsi list->写MySQL/ES
当初这样设计主要是为了:
- 先写文件: 防止数据要进行恢复,可以进行重写
- 写redis list: 防止直接写MySQL,迸发写过大,写失败,(还可以批量写MySQL/ES),不用kafka的原因纯是不想引入过多中间件
- 在consumer这里消费后写入redsi set中进行数据计算聚合: 比如有效对话轮数,结点数等
但是这套方案下来,生产速度稍微大点出现消费速度慢的问题,观察日志后发现,一条日志 处理要起码7次网络io,pop 的消费速度最高80/s,还不如让消费者直接写ES,通过打耗时日志,发现比较显著的问题:
- pop/push to redis慢(也可能是其它服务频繁读写redis导致服务变慢),单次耗时达50ms+,
- 日志decode比较慢耗时不少达40ms+(json Unmarshal)
- 写MySQL最慢:100ms+,比写redis慢10x(实际上慢100x),写ES最快:4-5ms,
针对读写redis慢的问题 #
后面怀疑是不是go的代码有问题,同样的redis cluster用python试了一下,push的速度很快符合redis的毫秒级操作, 试着更换go的redis代码包后发现,在golang中对redis进行push/pop慢(单次耗时达50ms+)的原因在于这个redis client的v6版本包的cluster代码有问题,不过使用单例时速度倒正常(1ms以下),后面通过升级到v8版本后,测试发现可以解决,而v8版本对应线上的redis 6不会有问题.
针对入库慢导致消费慢问题 #
日志的消费入库路径为 list ->ES ->MySQL ->redis
发现这个瓶颈之后,在消费端使用golang channel进行缓存到内存优化, 把写库慢的MySQL操作放到channel中进行异步入库,先写ES,这样可以保证数据最快入库(这个方案可能在程序突然推出时出事丢失日志的情况,但可以通过 在生产端从日志文件中重写日志到队列中恢复,日志重跑不影响数据的幂等性);
或者使用简单又能满足当前迸发大小的方案, 生产者直接写ES, 后面需要的统计数据 离线定时计算(这个改动较大,出事故风险大,没实践)
最终优化后对不同入库速率50,100,150,200,300进行测试后,发现入库速率200/s左右为比较理想的入库速率,为了防止写库过于频繁错误产生,特意用uber的ratelimit进行限流入库
写数据的问题的一些思考总结: #
- 是upsert还是单read/query或者单write/insert: query(索引/慢查询问题),update:query&&write(数据大小/类型/批处理问题);
- 批量read/write(考虑一次写的最大数据大小限制 ,MySQL/ES的批量写入大小限制 );
- 读写失败,重试的策略
- 数据库的写大概耗时,redis:1ms-, ES:5ms左右,MySQL:100ms+
针对批处理读写问题 #
这里实现了一个golang的批处理代码:从channel中消费数据,数据量达到阈值或者到达时间超时阈值时,进行数据的批处理执行
package batcher
import (
"time"
"github.com/linnv/logx"
)
type Element string
func BatchJobs(input <-chan Element, exit <-chan struct{}, maxItems int, maxTimeout time.Duration) (batch []Element) {
for {
for keepGoing := true; keepGoing; {
expire := time.After(maxTimeout)
for {
select {
case onelog, ok := <-input:
//empty or closed-channel
if !ok {
logx.Debugf("closed input\n")
keepGoing = false
goto done
}
if onelog == "" {
continue
}
batch = append(batch, onelog)
if len(batch) == maxItems {
logx.Debugf("full items %d batchjobs persist now\n", maxItems)
goto done
}
case <-expire:
logx.Debugf("expire batchjobs persist now %d item(s)\n", len(batch))
goto done
}
}
done:
if !keepGoing {
select {
case <-exit:
logx.Debugf("exit batchjobs\n")
return
default:
}
}
logx.Debugf("need to handle batchjobs: %+v\n", len(batch))
return
}
}
}
func BatchJobsWithExecutor(input <-chan Element, exit <-chan struct{}, maxItems int, maxTimeout time.Duration, executor func(doBatchers []Element) error) {
for {
for keepGoing := true; keepGoing; {
expire := time.After(maxTimeout)
var batch []Element
for {
select {
case onelog, ok := <-input:
//empty or closed-channel
if !ok {
logx.Debugf("closed input do batcher and done\n")
keepGoing = false
goto done
}
if onelog == "" {
continue
}
batch = append(batch, onelog)
if len(batch) == maxItems {
logx.Debugf("full items %d batchjobs persist now\n", maxItems)
goto done
}
case <-expire:
logx.Debugf("expire batchjobs persist now %d item(s)\n", len(batch))
goto done
}
}
done:
if !keepGoing {
select {
case <-exit:
logx.Debugf("exit batchjobs\n")
return
default:
}
}
logx.Debugf("need to handle batchjobs: %+v\n", len(batch))
if err := executor(batch); err != nil {
logx.Warnf("won' retry on err: %+v\n", err)
// return
}
}
}
}