service/server.go

215 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"apigo.cc/go/discover"
"apigo.cc/go/log"
"apigo.cc/go/redis"
"apigo.cc/go/safe"
"context"
"fmt"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"net"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
// GlobalDiscoverer 供服务框架内部使用的发现实例
var GlobalDiscoverer *discover.Discoverer
// AsyncServer 异步服务实例
type AsyncServer struct {
server *http.Server
listener net.Listener
Addr string
stopChan chan os.Signal
startChan chan bool
useDiscover bool
discoverer *discover.Discoverer
}
// AsyncStart 异步启动服务
func AsyncStart() *AsyncServer {
as := &AsyncServer{
startChan: make(chan bool, 1),
stopChan: make(chan os.Signal, 1),
}
go as.start()
<-as.startChan
return as
}
func (as *AsyncServer) start() {
listenStr := Config.Listen
as.useDiscover = false
if listenStr == "" {
listenStr = ":0,h2c"
as.useDiscover = true
}
// 解析第一个监听配置
part := strings.Split(listenStr, "|")[0]
addr, opts, _ := strings.Cut(part, ",")
protocol := "http"
for _, opt := range strings.Split(opts, ",") {
opt = strings.ToLower(strings.TrimSpace(opt))
if opt == "h2c" || opt == "h2" {
protocol = opt
}
}
if !strings.Contains(addr, ":") {
addr = ":" + addr
}
// 检查是否需要启动服务发现
if Config.App == "" {
Config.App = GetDefaultName()
}
appName := Config.App
if appName != "" || Config.Register != "" {
as.useDiscover = true
}
// 初始化服务器唯一标识 (8位物理上限 3,844/s)
serverId = IDMaker.Get8Bytes4KPerSecond()
// 初始化分布式 ID 生成器
if Config.IdServer != "" {
rd := redis.GetRedis(Config.IdServer, log.New(serverId))
if rd.Error == nil {
IDMaker = redis.NewIDMaker(rd)
}
}
listener, err := net.Listen("tcp", addr)
if err != nil {
log.DefaultLogger.Error("failed to listen", "addr", addr, "error", err.Error())
as.startChan <- false
return
}
as.listener = listener
as.Addr = listener.Addr().String()
serverAddr = as.Addr
// 如果使用了随机端口且没有明确指定不需要服务发现,则开启
if addr == ":0" || strings.HasSuffix(addr, ":0") {
as.useDiscover = true
}
h2s := &http2.Server{}
var handler http.Handler = &RouteHandler{}
if protocol == "h2c" {
handler = h2c.NewHandler(handler, h2s)
}
as.server = &http.Server{
Handler: handler,
ReadTimeout: time.Duration(Config.ReadTimeout) * time.Millisecond,
ReadHeaderTimeout: time.Duration(Config.ReadHeaderTimeout) * time.Millisecond,
WriteTimeout: time.Duration(Config.WriteTimeout) * time.Millisecond,
IdleTimeout: time.Duration(Config.IdleTimeout) * time.Millisecond,
MaxHeaderBytes: Config.MaxHeaderBytes,
}
// 启动服务发现
if as.useDiscover {
_, port, _ := net.SplitHostPort(as.Addr)
ip := GetServerIp()
discoverAddr := fmt.Sprintf("%s:%s", ip, port)
// 转换配置
discConf := discover.Config{
Weight: Config.Weight,
CallRetryTimes: 10, // Default
Calls: make(map[string]discover.CallConfig),
}
if discConf.Weight <= 0 {
discConf.Weight = 100
}
for name, call := range Config.Calls {
dc := discover.CallConfig{
Http2: call.Http2,
SSL: call.SSL,
}
if call.Timeout > 0 {
dc.Timeout = time.Duration(call.Timeout) * time.Millisecond
} else if Config.RedirectTimeout > 0 {
dc.Timeout = time.Duration(Config.RedirectTimeout) * time.Millisecond
}
if call.Token != "" {
dc.Token = safe.NewSafeBuf([]byte(call.Token))
}
discConf.Calls[name] = dc
}
// 解析必需的 Register支持环境变量 fallback
registry := Config.Register
if registry == "" {
registry = os.Getenv("DISCOVER_REGISTRY")
}
if registry == "" {
registry = "127.0.0.1:6379::15" // Default fallback
}
as.discoverer = discover.Start(registry, appName, discoverAddr, log.DefaultLogger, discConf)
GlobalDiscoverer = as.discoverer
if as.discoverer != nil {
log.DefaultLogger.Info("discover registered", "app", appName, "addr", discoverAddr)
}
}
signal.Notify(as.stopChan, os.Interrupt, syscall.SIGTERM)
go func() {
log.DefaultLogger.Info("service starting", "addr", as.Addr, "proto", protocol)
as.startChan <- true
if err := as.server.Serve(listener); err != nil && err != http.ErrServerClosed {
log.DefaultLogger.Error("server error", "error", err.Error())
}
}()
}
// Stop 停止服务
func (as *AsyncServer) Stop() {
log.DefaultLogger.Info("service stopping")
if as.discoverer != nil {
as.discoverer.Stop()
}
stopTimeout := time.Duration(Config.StopTimeout) * time.Millisecond
if stopTimeout <= 0 {
stopTimeout = 5 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), stopTimeout)
defer cancel()
if err := as.server.Shutdown(ctx); err != nil {
log.DefaultLogger.Error("server shutdown error", "error", err.Error())
}
log.DefaultLogger.Info("service stopped")
}
// Wait 等待服务结束 (信号监听)
func (as *AsyncServer) Wait() {
<-as.stopChan
as.Stop()
}
// Start 同步启动服务
func Start() {
AsyncStart().Wait()
}