This commit is contained in:
AI Engineer 2026-05-13 00:29:17 +08:00
parent 13418b5365
commit a2b1055f5d
5 changed files with 31 additions and 51 deletions

View File

@ -97,7 +97,7 @@ func (w *esWriter) Run() {
w.lock.Unlock() w.lock.Unlock()
// 超过100条数据 或 过了1秒 发送数据 // 超过100条数据 或 过了1秒 发送数据
if queueLen > 100 || (queueLen > 0 && (now > w.last || !LoggerService.Running.Load())) { if queueLen > 100 || (queueLen > 0 && (now > w.last || !WriterService.Running.Load())) {
w.lock.Lock() w.lock.Lock()
sendings := w.queue sendings := w.queue
w.queue = make([]string, 0) w.queue = make([]string, 0)

View File

@ -20,6 +20,7 @@ func TestSplitTag(t *testing.T) {
File: logFile, File: logFile,
SplitTag: splitTag, SplitTag: splitTag,
} }
log.WriterService.Start(nil, nil)
logger := log.NewLogger(conf) logger := log.NewLogger(conf)
// 1. 记录第一条日志 // 1. 记录第一条日志

View File

@ -78,29 +78,27 @@ func NewLogger(conf Config) *Logger {
if m, ok := writerMakers[writerName]; ok { if m, ok := writerMakers[writerName]; ok {
if w := m(&conf); w != nil { if w := m(&conf); w != nil {
logger.writer = w logger.writer = w
LoggerService.WriterLock.Lock() WriterService.WriterLock.Lock()
cur := LoggerService.Writers.Load().([]Writer) cur := WriterService.Writers.Load().([]Writer)
newW := append(cur, w) newW := append(cur, w)
LoggerService.Writers.Store(newW) WriterService.Writers.Store(newW)
LoggerService.WriterLock.Unlock() WriterService.WriterLock.Unlock()
Start()
} }
} }
} else { } else {
if conf.SplitTag != "" { if conf.SplitTag != "" {
LoggerService.FilesLock.RLock() WriterService.FilesLock.RLock()
logger.file = LoggerService.Files[conf.File+conf.SplitTag] logger.file = WriterService.Files[conf.File+conf.SplitTag]
LoggerService.FilesLock.RUnlock() WriterService.FilesLock.RUnlock()
if logger.file == nil { if logger.file == nil {
logger.file = &FileWriter{ logger.file = &FileWriter{
fileName: conf.File, fileName: conf.File,
splitTag: conf.SplitTag, splitTag: conf.SplitTag,
} }
LoggerService.FilesLock.Lock() WriterService.FilesLock.Lock()
LoggerService.Files[conf.File+conf.SplitTag] = logger.file WriterService.Files[conf.File+conf.SplitTag] = logger.file
LoggerService.FilesLock.Unlock() WriterService.FilesLock.Unlock()
} }
Start()
} else { } else {
fp, err := os.OpenFile(conf.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) fp, err := os.OpenFile(conf.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err == nil { if err == nil {
@ -124,7 +122,7 @@ func (logger *Logger) asyncWrite(entry LogEntry) {
} }
func (logger *Logger) writeBuf(entry LogEntry, buf []byte) { func (logger *Logger) writeBuf(entry LogEntry, buf []byte) {
if LoggerService.Running.Load() { if WriterService.Running.Load() {
writeAsync(logPayload{ writeAsync(logPayload{
entry: entry, entry: entry,
buf: buf, buf: buf,

View File

@ -28,8 +28,7 @@ func TestLoggerReliability(t *testing.T) {
} }
wg.Wait() wg.Wait()
Stop() WriterService.Stop(nil)
Wait()
file, err := os.Open(logFile) file, err := os.Open(logFile)
if err != nil { if err != nil {

View File

@ -22,8 +22,8 @@ type logPayload struct {
file *FileWriter // 目标文件 Writer file *FileWriter // 目标文件 Writer
} }
// loggerService manages the background writing of log entries. // writerService manages the background writing of log entries.
type loggerService struct { type writerService struct {
Running atomic.Bool Running atomic.Bool
StopChan chan bool StopChan chan bool
LogChannel chan logPayload LogChannel chan logPayload
@ -36,8 +36,8 @@ type loggerService struct {
} }
var ( var (
// LoggerService is the global instance of loggerService. // WriterService is the global instance of defaultService.
LoggerService = &loggerService{} WriterService = &writerService{}
) )
// ConsoleWriter 控制台写入器 // ConsoleWriter 控制台写入器
@ -52,9 +52,9 @@ func (w *ConsoleWriter) Run() {
} }
func init() { func init() {
LoggerService.LogChannel = make(chan logPayload, 10000) WriterService.LogChannel = make(chan logPayload, 10000)
LoggerService.Writers.Store([]Writer{}) WriterService.Writers.Store([]Writer{})
LoggerService.Files = make(map[string]*FileWriter) WriterService.Files = make(map[string]*FileWriter)
RegisterWriterMaker("console", func(conf *Config) Writer { RegisterWriterMaker("console", func(conf *Config) Writer {
return &ConsoleWriter{} return &ConsoleWriter{}
@ -66,14 +66,14 @@ func writeAsync(payload logPayload) {
defer func() { defer func() {
recover() recover()
}() }()
if !LoggerService.Running.Load() { if !WriterService.Running.Load() {
return return
} }
select { select {
case LoggerService.LogChannel <- payload: case WriterService.LogChannel <- payload:
default: default:
// 丢弃或处理过载 // 丢弃或处理过载
dropped := LoggerService.Dropped.Add(1) dropped := WriterService.Dropped.Add(1)
if dropped%1000 == 1 { if dropped%1000 == 1 {
if DefaultLogger != nil { if DefaultLogger != nil {
// 注意:这里可能会产生递归调用,但 select default 保证了不会死锁 // 注意:这里可能会产生递归调用,但 select default 保证了不会死锁
@ -85,29 +85,11 @@ func writeAsync(payload logPayload) {
// GetDroppedLogs 获取被丢弃的日志数量 // GetDroppedLogs 获取被丢弃的日志数量
func GetDroppedLogs() uint64 { func GetDroppedLogs() uint64 {
return LoggerService.Dropped.Load() return WriterService.Dropped.Load()
}
// Start 启动写入器 (兼容旧 API)
func Start() {
_ = LoggerService.Start(nil, nil)
}
// Stop 停止写入器 (兼容旧 API)
func Stop() {
_ = LoggerService.Stop(nil)
}
// Wait 等待写入器停止 (兼容旧 API)
func Wait() {
if LoggerService.StopChan != nil {
<-LoggerService.StopChan
LoggerService.StopChan = nil
}
} }
// Start implements starter.Service interface. // Start implements starter.Service interface.
func (s *loggerService) Start(_ context.Context, _ *Logger) error { func (s *writerService) Start(_ context.Context, _ *Logger) error {
if !s.Running.CompareAndSwap(false, true) { if !s.Running.CompareAndSwap(false, true) {
return nil return nil
} }
@ -117,7 +99,7 @@ func (s *loggerService) Start(_ context.Context, _ *Logger) error {
} }
// Stop implements starter.Service interface. // Stop implements starter.Service interface.
func (s *loggerService) Stop(_ context.Context) error { func (s *writerService) Stop(_ context.Context) error {
if s.Running.CompareAndSwap(true, false) { if s.Running.CompareAndSwap(true, false) {
close(s.LogChannel) close(s.LogChannel)
if s.StopChan != nil { if s.StopChan != nil {
@ -129,11 +111,11 @@ func (s *loggerService) Stop(_ context.Context) error {
} }
// Health implements starter.Service interface. // Health implements starter.Service interface.
func (s *loggerService) Health() error { func (s *writerService) Health() error {
return nil return nil
} }
func (s *loggerService) writerRunner() { func (s *writerService) writerRunner() {
ticker := time.NewTicker(200 * time.Millisecond) ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
@ -173,7 +155,7 @@ func (s *loggerService) writerRunner() {
} }
} }
func (s *loggerService) processLog(payload logPayload) { func (s *writerService) processLog(payload logPayload) {
// 精准路由:根据包裹信息决定写入目标 // 精准路由:根据包裹信息决定写入目标
if payload.writer != nil { if payload.writer != nil {
payload.writer.Log(payload.entry, payload.buf) payload.writer.Log(payload.entry, payload.buf)
@ -182,7 +164,7 @@ func (s *loggerService) processLog(payload logPayload) {
} }
} }
func (s *loggerService) flushWriters() { func (s *writerService) flushWriters() {
curWriters, _ := s.Writers.Load().([]Writer) curWriters, _ := s.Writers.Load().([]Writer)
for _, w := range curWriters { for _, w := range curWriters {
w.Run() w.Run()