diff --git a/es_writer.go b/es_writer.go index 0b26786..cd084cd 100644 --- a/es_writer.go +++ b/es_writer.go @@ -97,7 +97,7 @@ func (w *esWriter) Run() { w.lock.Unlock() // 超过100条数据 或 过了1秒 发送数据 - if queueLen > 100 || (queueLen > 0 && (now > w.last || !LoggerService.Running.Load())) { + if queueLen > 100 || (queueLen > 0 && (now > w.last || !WriterService.Running.Load())) { w.lock.Lock() sendings := w.queue w.queue = make([]string, 0) diff --git a/functional_test.go b/functional_test.go index 19011f7..a7a3da5 100644 --- a/functional_test.go +++ b/functional_test.go @@ -20,6 +20,7 @@ func TestSplitTag(t *testing.T) { File: logFile, SplitTag: splitTag, } + log.WriterService.Start(nil, nil) logger := log.NewLogger(conf) // 1. 记录第一条日志 diff --git a/logger.go b/logger.go index 855f954..0bf7645 100644 --- a/logger.go +++ b/logger.go @@ -78,29 +78,27 @@ func NewLogger(conf Config) *Logger { if m, ok := writerMakers[writerName]; ok { if w := m(&conf); w != nil { logger.writer = w - LoggerService.WriterLock.Lock() - cur := LoggerService.Writers.Load().([]Writer) + WriterService.WriterLock.Lock() + cur := WriterService.Writers.Load().([]Writer) newW := append(cur, w) - LoggerService.Writers.Store(newW) - LoggerService.WriterLock.Unlock() - Start() + WriterService.Writers.Store(newW) + WriterService.WriterLock.Unlock() } } } else { if conf.SplitTag != "" { - LoggerService.FilesLock.RLock() - logger.file = LoggerService.Files[conf.File+conf.SplitTag] - LoggerService.FilesLock.RUnlock() + WriterService.FilesLock.RLock() + logger.file = WriterService.Files[conf.File+conf.SplitTag] + WriterService.FilesLock.RUnlock() if logger.file == nil { logger.file = &FileWriter{ fileName: conf.File, splitTag: conf.SplitTag, } - LoggerService.FilesLock.Lock() - LoggerService.Files[conf.File+conf.SplitTag] = logger.file - LoggerService.FilesLock.Unlock() + WriterService.FilesLock.Lock() + WriterService.Files[conf.File+conf.SplitTag] = logger.file + WriterService.FilesLock.Unlock() } - Start() } else { fp, err := os.OpenFile(conf.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err == nil { @@ -124,7 +122,7 @@ func (logger *Logger) asyncWrite(entry LogEntry) { } func (logger *Logger) writeBuf(entry LogEntry, buf []byte) { - if LoggerService.Running.Load() { + if WriterService.Running.Load() { writeAsync(logPayload{ entry: entry, buf: buf, diff --git a/reliability_test.go b/reliability_test.go index 7b1f752..eb26c37 100644 --- a/reliability_test.go +++ b/reliability_test.go @@ -28,8 +28,7 @@ func TestLoggerReliability(t *testing.T) { } wg.Wait() - Stop() - Wait() + WriterService.Stop(nil) file, err := os.Open(logFile) if err != nil { diff --git a/writer.go b/writer.go index bb6c15e..10a933f 100644 --- a/writer.go +++ b/writer.go @@ -22,8 +22,8 @@ type logPayload struct { file *FileWriter // 目标文件 Writer } -// loggerService manages the background writing of log entries. -type loggerService struct { +// writerService manages the background writing of log entries. +type writerService struct { Running atomic.Bool StopChan chan bool LogChannel chan logPayload @@ -36,8 +36,8 @@ type loggerService struct { } var ( - // LoggerService is the global instance of loggerService. - LoggerService = &loggerService{} + // WriterService is the global instance of defaultService. + WriterService = &writerService{} ) // ConsoleWriter 控制台写入器 @@ -52,9 +52,9 @@ func (w *ConsoleWriter) Run() { } func init() { - LoggerService.LogChannel = make(chan logPayload, 10000) - LoggerService.Writers.Store([]Writer{}) - LoggerService.Files = make(map[string]*FileWriter) + WriterService.LogChannel = make(chan logPayload, 10000) + WriterService.Writers.Store([]Writer{}) + WriterService.Files = make(map[string]*FileWriter) RegisterWriterMaker("console", func(conf *Config) Writer { return &ConsoleWriter{} @@ -66,14 +66,14 @@ func writeAsync(payload logPayload) { defer func() { recover() }() - if !LoggerService.Running.Load() { + if !WriterService.Running.Load() { return } select { - case LoggerService.LogChannel <- payload: + case WriterService.LogChannel <- payload: default: // 丢弃或处理过载 - dropped := LoggerService.Dropped.Add(1) + dropped := WriterService.Dropped.Add(1) if dropped%1000 == 1 { if DefaultLogger != nil { // 注意:这里可能会产生递归调用,但 select default 保证了不会死锁 @@ -85,29 +85,11 @@ func writeAsync(payload logPayload) { // GetDroppedLogs 获取被丢弃的日志数量 func GetDroppedLogs() uint64 { - return LoggerService.Dropped.Load() -} - -// Start 启动写入器 (兼容旧 API) -func Start() { - _ = LoggerService.Start(nil, nil) -} - -// Stop 停止写入器 (兼容旧 API) -func Stop() { - _ = LoggerService.Stop(nil) -} - -// Wait 等待写入器停止 (兼容旧 API) -func Wait() { - if LoggerService.StopChan != nil { - <-LoggerService.StopChan - LoggerService.StopChan = nil - } + return WriterService.Dropped.Load() } // Start implements starter.Service interface. -func (s *loggerService) Start(_ context.Context, _ *Logger) error { +func (s *writerService) Start(_ context.Context, _ *Logger) error { if !s.Running.CompareAndSwap(false, true) { return nil } @@ -117,7 +99,7 @@ func (s *loggerService) Start(_ context.Context, _ *Logger) error { } // Stop implements starter.Service interface. -func (s *loggerService) Stop(_ context.Context) error { +func (s *writerService) Stop(_ context.Context) error { if s.Running.CompareAndSwap(true, false) { close(s.LogChannel) if s.StopChan != nil { @@ -129,11 +111,11 @@ func (s *loggerService) Stop(_ context.Context) error { } // Health implements starter.Service interface. -func (s *loggerService) Health() error { +func (s *writerService) Health() error { return nil } -func (s *loggerService) writerRunner() { +func (s *writerService) writerRunner() { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() @@ -173,7 +155,7 @@ func (s *loggerService) writerRunner() { } } -func (s *loggerService) processLog(payload logPayload) { +func (s *writerService) processLog(payload logPayload) { // 精准路由:根据包裹信息决定写入目标 if payload.writer != nil { payload.writer.Log(payload.entry, payload.buf) @@ -182,7 +164,7 @@ func (s *loggerService) processLog(payload logPayload) { } } -func (s *loggerService) flushWriters() { +func (s *writerService) flushWriters() { curWriters, _ := s.Writers.Load().([]Writer) for _, w := range curWriters { w.Run()