log/writer.go

181 lines
3.7 KiB
Go
Raw Normal View History

package log
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// Writer 日志写入接口
type Writer interface {
Log(LogEntry, []byte)
Run()
}
// logPayload 包含路由信息的包裹
type logPayload struct {
entry LogEntry
buf []byte
writer Writer // 目标自定义 Writer
file *FileWriter // 目标文件 Writer
}
2026-05-13 00:29:17 +08:00
// writerService manages the background writing of log entries.
type writerService struct {
Running atomic.Bool
StopChan chan bool
LogChannel chan logPayload
Dropped atomic.Uint64
Writers atomic.Value // []Writer
WriterLock sync.Mutex
Files map[string]*FileWriter
FilesLock sync.RWMutex
}
var (
2026-05-13 00:29:17 +08:00
// WriterService is the global instance of defaultService.
WriterService = &writerService{}
)
// ConsoleWriter 控制台写入器
type ConsoleWriter struct {
}
func (w *ConsoleWriter) Log(entry LogEntry, data []byte) {
fmt.Println(Viewable(string(data)))
}
func (w *ConsoleWriter) Run() {
}
func init() {
2026-05-13 00:29:17 +08:00
WriterService.LogChannel = make(chan logPayload, 10000)
WriterService.Writers.Store([]Writer{})
WriterService.Files = make(map[string]*FileWriter)
RegisterWriterMaker("console", func(conf *Config) Writer {
return &ConsoleWriter{}
})
}
// writeAsync 异步写入日志
func writeAsync(payload logPayload) {
defer func() {
recover()
}()
2026-05-13 00:29:17 +08:00
if !WriterService.Running.Load() {
return
}
select {
2026-05-13 00:29:17 +08:00
case WriterService.LogChannel <- payload:
default:
// 丢弃或处理过载
2026-05-13 00:29:17 +08:00
dropped := WriterService.Dropped.Add(1)
if dropped%1000 == 1 {
if DefaultLogger != nil {
// 注意:这里可能会产生递归调用,但 select default 保证了不会死锁
DefaultLogger.Error(fmt.Sprintf("log channel full, dropped %d logs", dropped))
}
}
}
}
// GetDroppedLogs 获取被丢弃的日志数量
func GetDroppedLogs() uint64 {
2026-05-13 00:29:17 +08:00
return WriterService.Dropped.Load()
}
// Start implements starter.Service interface.
2026-05-13 00:29:17 +08:00
func (s *writerService) Start(_ context.Context, _ *Logger) error {
if !s.Running.CompareAndSwap(false, true) {
return nil
}
s.StopChan = make(chan bool)
go s.writerRunner()
return nil
}
// Stop implements starter.Service interface.
2026-05-13 00:29:17 +08:00
func (s *writerService) Stop(_ context.Context) error {
if s.Running.CompareAndSwap(true, false) {
close(s.LogChannel)
if s.StopChan != nil {
<-s.StopChan
s.StopChan = nil
}
}
return nil
}
// Status implements starter.Service interface.
func (s *writerService) Status() (string, error) {
if s.Running.Load() {
return fmt.Sprintf("queue: %d, dropped: %d", len(s.LogChannel), s.Dropped.Load()), nil
}
return "stopped", nil
}
2026-05-13 00:29:17 +08:00
func (s *writerService) writerRunner() {
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
defer func() {
if s.StopChan != nil {
close(s.StopChan)
}
}()
for {
select {
case payload, ok := <-s.LogChannel:
if !ok {
s.flushWriters()
return
}
s.processLog(payload)
// 尝试批量处理更多日志
batchCount := 0
for batchCount < 100 {
select {
case nextPayload, nextOk := <-s.LogChannel:
if !nextOk {
s.flushWriters()
return
}
s.processLog(nextPayload)
batchCount++
default:
batchCount = 100 // break outer loop
}
}
case <-ticker.C:
s.flushWriters()
}
}
}
2026-05-13 00:29:17 +08:00
func (s *writerService) processLog(payload logPayload) {
// 精准路由:根据包裹信息决定写入目标
if payload.writer != nil {
payload.writer.Log(payload.entry, payload.buf)
} else if payload.file != nil {
payload.file.Write(time.Now(), payload.buf)
}
}
2026-05-13 00:29:17 +08:00
func (s *writerService) flushWriters() {
curWriters, _ := s.Writers.Load().([]Writer)
for _, w := range curWriters {
w.Run()
}
s.FilesLock.RLock()
for _, f := range s.Files {
f.Run()
}
s.FilesLock.RUnlock()
}