2026-05-03 08:43:23 +08:00
|
|
|
package redis
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
|
|
|
|
defer rd.subLock.Unlock()
|
|
|
|
|
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.subs == nil {
|
|
|
|
|
rd.subs = make(map[string]*SubCallbacks)
|
|
|
|
|
}
|
|
|
|
|
rd.subs[name] = &SubCallbacks{reset: reset, received: received}
|
|
|
|
|
if rd.subConn != nil {
|
|
|
|
|
err := rd.subConn.Subscribe(name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name)
|
|
|
|
|
} else {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Unsubscribe(name string) bool {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
|
|
|
|
defer rd.subLock.Unlock()
|
|
|
|
|
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.subs != nil {
|
|
|
|
|
delete(rd.subs, name)
|
|
|
|
|
}
|
|
|
|
|
if rd.subConn != nil {
|
|
|
|
|
err := rd.subConn.Unsubscribe(name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name)
|
|
|
|
|
} else {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Start() {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.subs == nil {
|
|
|
|
|
rd.subs = make(map[string]*SubCallbacks)
|
|
|
|
|
}
|
|
|
|
|
rd.SubRunning = true
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
|
|
|
|
|
2026-05-03 08:43:23 +08:00
|
|
|
subStartChan := make(chan bool)
|
|
|
|
|
go rd.receiveSub(subStartChan)
|
|
|
|
|
<-subStartChan
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) receiveSub(subStartChan chan bool) {
|
|
|
|
|
for {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.RLock()
|
|
|
|
|
running := rd.SubRunning
|
|
|
|
|
rd.subLock.RUnlock()
|
|
|
|
|
if !running {
|
2026-05-03 08:43:23 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 开始接收订阅数据
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.subConn == nil {
|
|
|
|
|
conn, err := rd.GetConnection()
|
|
|
|
|
if err != nil {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
rd.subConn = &redis.PubSubConn{Conn: conn}
|
|
|
|
|
// 重新订阅
|
|
|
|
|
if len(rd.subs) > 0 {
|
|
|
|
|
subs := make([]any, 0)
|
|
|
|
|
for k := range rd.subs {
|
|
|
|
|
subs = append(subs, k)
|
|
|
|
|
}
|
|
|
|
|
err = rd.subConn.Subscribe(subs...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
_ = rd.subConn.Close()
|
|
|
|
|
rd.subConn = nil
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// 重新连接时调用重置数据的回调
|
|
|
|
|
for _, v := range rd.subs {
|
|
|
|
|
if v.reset != nil {
|
|
|
|
|
v.reset()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
|
|
|
|
|
if subStartChan != nil {
|
|
|
|
|
subStartChan <- true
|
|
|
|
|
subStartChan = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
isErr := false
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.RLock()
|
|
|
|
|
subConn := rd.subConn
|
|
|
|
|
rd.subLock.RUnlock()
|
|
|
|
|
|
|
|
|
|
if subConn == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
receiveObj := subConn.Receive()
|
2026-05-03 08:43:23 +08:00
|
|
|
switch v := receiveObj.(type) {
|
|
|
|
|
case redis.Message:
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.RLock()
|
2026-05-03 08:43:23 +08:00
|
|
|
callback := rd.subs[v.Channel]
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.RUnlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if callback != nil && callback.received != nil {
|
|
|
|
|
callback.received(v.Data)
|
|
|
|
|
}
|
|
|
|
|
case redis.Subscription:
|
|
|
|
|
case redis.Pong:
|
|
|
|
|
case error:
|
|
|
|
|
if strings.Contains(v.Error(), "i/o timeout") {
|
|
|
|
|
break
|
|
|
|
|
}
|
2026-05-04 00:46:17 +08:00
|
|
|
// 使用 strings.Contains 是因为 redigo 的错误通常是自定义 string 类型
|
|
|
|
|
errMsg := v.Error()
|
|
|
|
|
if !strings.Contains(errMsg, "connection closed") && !strings.Contains(errMsg, "use of closed network connection") {
|
|
|
|
|
rd.logger.Error(errMsg, "type", "redis", "action", "receiveSub")
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.subConn != nil {
|
|
|
|
|
_ = rd.subConn.Close()
|
|
|
|
|
rd.subConn = nil
|
|
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
isErr = true
|
|
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
|
|
|
|
|
rd.subLock.RLock()
|
|
|
|
|
running = rd.SubRunning
|
|
|
|
|
rd.subLock.RUnlock()
|
|
|
|
|
if isErr || !running {
|
2026-05-03 08:43:23 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if rd.subStopChan != nil {
|
|
|
|
|
rd.subStopChan <- true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Stop() {
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Lock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if rd.SubRunning {
|
|
|
|
|
rd.subStopChan = make(chan bool)
|
|
|
|
|
rd.SubRunning = false
|
|
|
|
|
if rd.subConn != nil {
|
|
|
|
|
// 取消订阅
|
|
|
|
|
if len(rd.subs) > 0 {
|
|
|
|
|
_ = rd.subConn.Unsubscribe()
|
|
|
|
|
}
|
|
|
|
|
// 读一次再关闭可以防止Close时阻塞
|
|
|
|
|
_ = rd.subConn.ReceiveWithTimeout(50 * time.Millisecond)
|
|
|
|
|
_ = rd.subConn.Close()
|
|
|
|
|
rd.subConn = nil
|
|
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
<-rd.subStopChan
|
|
|
|
|
rd.subStopChan = nil
|
2026-05-03 09:47:14 +08:00
|
|
|
} else {
|
|
|
|
|
rd.subLock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) PUBLISH(channel, data string) bool {
|
|
|
|
|
return rd.Do("PUBLISH "+channel, data).Bool()
|
|
|
|
|
}
|