141 lines
2.6 KiB
Go
141 lines
2.6 KiB
Go
package log
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"apigo.cc/go/cast"
|
|
)
|
|
|
|
type ESWriter struct {
|
|
config *Config
|
|
url string
|
|
user string
|
|
password string
|
|
group string
|
|
lock sync.Mutex
|
|
queue []string
|
|
last int64
|
|
client *http.Client
|
|
prefix string
|
|
}
|
|
|
|
func NewESWriter(conf *Config) Writer {
|
|
w := &ESWriter{
|
|
config: conf,
|
|
queue: make([]string, 0),
|
|
client: &http.Client{},
|
|
}
|
|
|
|
esUrl, err := url.Parse(conf.File)
|
|
if err != nil {
|
|
if DefaultLogger != nil {
|
|
// 这里不能直接调用 DefaultLogger 的方法,防止死循环
|
|
log.Printf("invalid es url: %s, error: %v", conf.File, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if esUrl.User != nil {
|
|
w.user = esUrl.User.Username()
|
|
w.password, _ = esUrl.User.Password()
|
|
esUrl.User = nil
|
|
}
|
|
|
|
if esUrl.Scheme == "ess" {
|
|
esUrl.Scheme = "https"
|
|
} else {
|
|
esUrl.Scheme = "http"
|
|
}
|
|
|
|
if timeout := esUrl.Query().Get("timeout"); timeout != "" {
|
|
w.client.Timeout = cast.Duration(timeout)
|
|
}
|
|
|
|
if len(esUrl.Path) > 1 {
|
|
w.group = strings.ReplaceAll(esUrl.Path[1:], "/", ".")
|
|
}
|
|
|
|
esUrl.Path = "_bulk"
|
|
esUrl.RawQuery = ""
|
|
w.url = esUrl.String()
|
|
|
|
if w.group != "" {
|
|
w.prefix = fmt.Sprintf("{\"index\":{\"_index\":\"%s.%s\"}}", w.group, w.config.Name)
|
|
} else {
|
|
w.prefix = fmt.Sprintf("{\"index\":{\"_index\":\"%s\"}}", w.config.Name)
|
|
}
|
|
|
|
return w
|
|
}
|
|
|
|
func (w *ESWriter) Log(data []byte) {
|
|
if len(data) == 0 {
|
|
return
|
|
}
|
|
dataString := string(data)
|
|
|
|
w.lock.Lock()
|
|
w.queue = append(w.queue, w.prefix, dataString)
|
|
w.lock.Unlock()
|
|
}
|
|
|
|
var responseOkBytes = []byte("\"errors\":false")
|
|
|
|
func (w *ESWriter) Run() {
|
|
now := time.Now().Unix()
|
|
w.lock.Lock()
|
|
queueLen := len(w.queue)
|
|
w.lock.Unlock()
|
|
|
|
// 超过100条数据 或 过了1秒 发送数据
|
|
if queueLen > 100 || (queueLen > 0 && (now > w.last || !writerRunning.Load())) {
|
|
w.lock.Lock()
|
|
sendings := w.queue
|
|
w.queue = make([]string, 0)
|
|
w.lock.Unlock()
|
|
|
|
var body bytes.Buffer
|
|
for _, s := range sendings {
|
|
body.WriteString(s)
|
|
body.WriteByte('\n')
|
|
}
|
|
|
|
req, err := http.NewRequest("POST", w.url, &body)
|
|
if err != nil {
|
|
log.Println("es request creation failed", err)
|
|
return
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
if w.user != "" {
|
|
req.SetBasicAuth(w.user, w.password)
|
|
}
|
|
|
|
res, err := w.client.Do(req)
|
|
if err != nil {
|
|
log.Println("es sent failed", err)
|
|
return
|
|
}
|
|
defer res.Body.Close()
|
|
|
|
result, err := io.ReadAll(res.Body)
|
|
if err != nil {
|
|
log.Println("es result read failed", err)
|
|
return
|
|
}
|
|
|
|
if !bytes.Contains(result, responseOkBytes) {
|
|
log.Printf("es sent errors: %s, data: %s", string(result), body.String())
|
|
}
|
|
w.last = now
|
|
}
|
|
}
|