log/writer.go

197 lines
3.9 KiB
Go
Raw Normal View History

package log
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Writer 日志写入接口
type Writer interface {
Log(LogEntry, []byte)
Run()
}
// logPayload 包含路由信息的包裹
type logPayload struct {
entry LogEntry
buf []byte
writer Writer // 目标自定义 Writer
file *FileWriter // 目标文件 Writer
}
// loggerService manages the background writing of log entries.
type loggerService struct {
Running atomic.Bool
StopChan chan bool
LogChannel chan logPayload
Dropped atomic.Uint64
Writers atomic.Value // []Writer
WriterLock sync.Mutex
Files map[string]*FileWriter
FilesLock sync.RWMutex
}
var (
// LoggerService is the global instance of loggerService.
LoggerService = &loggerService{}
)
// ConsoleWriter 控制台写入器
type ConsoleWriter struct {
}
func (w *ConsoleWriter) Log(entry LogEntry, data []byte) {
fmt.Println(Viewable(string(data)))
}
func (w *ConsoleWriter) Run() {
}
func init() {
LoggerService.LogChannel = make(chan logPayload, 10000)
LoggerService.Writers.Store([]Writer{})
LoggerService.Files = make(map[string]*FileWriter)
RegisterWriterMaker("console", func(conf *Config) Writer {
return &ConsoleWriter{}
})
}
// writeAsync 异步写入日志
func writeAsync(payload logPayload) {
defer func() {
recover()
}()
if !LoggerService.Running.Load() {
return
}
select {
case LoggerService.LogChannel <- payload:
default:
// 丢弃或处理过载
dropped := LoggerService.Dropped.Add(1)
if dropped%1000 == 1 {
if DefaultLogger != nil {
// 注意:这里可能会产生递归调用,但 select default 保证了不会死锁
DefaultLogger.Error(fmt.Sprintf("log channel full, dropped %d logs", dropped))
}
}
}
}
// GetDroppedLogs 获取被丢弃的日志数量
func GetDroppedLogs() uint64 {
return LoggerService.Dropped.Load()
}
// Start 启动写入器 (兼容旧 API)
func Start() {
_ = LoggerService.Start(nil, nil)
}
// Stop 停止写入器 (兼容旧 API)
func Stop() {
_ = LoggerService.Stop(nil)
}
// Wait 等待写入器停止 (兼容旧 API)
func Wait() {
if LoggerService.StopChan != nil {
<-LoggerService.StopChan
LoggerService.StopChan = nil
}
}
// Start implements starter.Service interface.
func (s *loggerService) Start(_ context.Context, _ *Logger) error {
if !s.Running.CompareAndSwap(false, true) {
return nil
}
s.StopChan = make(chan bool)
go s.writerRunner()
return nil
}
// Stop implements starter.Service interface.
func (s *loggerService) Stop(_ context.Context) error {
if s.Running.CompareAndSwap(true, false) {
close(s.LogChannel)
if s.StopChan != nil {
<-s.StopChan
s.StopChan = nil
}
}
return nil
}
// Health implements starter.Service interface.
func (s *loggerService) Health() error {
return nil
}
func (s *loggerService) writerRunner() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
defer func() {
if s.StopChan != nil {
close(s.StopChan)
}
}()
for {
select {
case payload, ok := <-s.LogChannel:
if !ok {
s.flushWriters()
return
}
s.processLog(payload)
// 尝试批量处理更多日志
batchCount := 0
for batchCount < 100 {
select {
case nextPayload, nextOk := <-s.LogChannel:
if !nextOk {
s.flushWriters()
return
}
s.processLog(nextPayload)
batchCount++
default:
batchCount = 100 // break outer loop
}
}
case <-ticker.C:
s.flushWriters()
}
}
}
func (s *loggerService) processLog(payload logPayload) {
// 精准路由:根据包裹信息决定写入目标
if payload.writer != nil {
payload.writer.Log(payload.entry, payload.buf)
} else if payload.file != nil {
payload.file.Write(time.Now(), payload.buf)
}
}
func (s *loggerService) flushWriters() {
curWriters, _ := s.Writers.Load().([]Writer)
for _, w := range curWriters {
w.Run()
}
s.FilesLock.RLock()
for _, f := range s.Files {
f.Run()
}
s.FilesLock.RUnlock()
}