2026-05-03 08:43:23 +08:00
|
|
|
package redis
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"strings"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/gomodule/redigo/redis"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
defer rd.sub.lock.Unlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
|
2026-05-05 17:34:23 +08:00
|
|
|
if rd.sub.subs == nil {
|
|
|
|
|
rd.sub.subs = make(map[string]*SubCallbacks)
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.subs[name] = &SubCallbacks{reset: reset, received: received}
|
|
|
|
|
if rd.sub.conn != nil {
|
|
|
|
|
err := rd.sub.conn.Subscribe(name)
|
2026-05-03 08:43:23 +08:00
|
|
|
if err != nil {
|
|
|
|
|
rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name)
|
|
|
|
|
} else {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Unsubscribe(name string) bool {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
defer rd.sub.lock.Unlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
|
2026-05-05 17:34:23 +08:00
|
|
|
if rd.sub.subs != nil {
|
|
|
|
|
delete(rd.sub.subs, name)
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
if rd.sub.conn != nil {
|
|
|
|
|
err := rd.sub.conn.Unsubscribe(name)
|
2026-05-03 08:43:23 +08:00
|
|
|
if err != nil {
|
|
|
|
|
rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name)
|
|
|
|
|
} else {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Start() {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
if rd.sub.subs == nil {
|
|
|
|
|
rd.sub.subs = make(map[string]*SubCallbacks)
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
if rd.sub.running {
|
|
|
|
|
rd.sub.lock.Unlock()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
rd.sub.running = true
|
|
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
|
2026-05-03 08:43:23 +08:00
|
|
|
subStartChan := make(chan bool)
|
|
|
|
|
go rd.receiveSub(subStartChan)
|
|
|
|
|
<-subStartChan
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) receiveSub(subStartChan chan bool) {
|
|
|
|
|
for {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.RLock()
|
|
|
|
|
running := rd.sub.running
|
|
|
|
|
rd.sub.lock.RUnlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
if !running {
|
2026-05-03 08:43:23 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 开始接收订阅数据
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
if rd.sub.conn == nil {
|
|
|
|
|
// 获取一个全新的连接,并将超时设为 0 (无限) 以支持长连接订阅
|
|
|
|
|
conf := *rd.Config
|
|
|
|
|
conf.ReadTimeout = 0
|
|
|
|
|
conf.WriteTimeout = 0
|
|
|
|
|
conn, err := rd.pool.Dial()
|
2026-05-03 08:43:23 +08:00
|
|
|
if err != nil {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
// 订阅模式下,必须使用长连接,通过 Stop() 时关闭底层连接来强制退出
|
|
|
|
|
rd.sub.conn = &redis.PubSubConn{Conn: conn}
|
2026-05-03 08:43:23 +08:00
|
|
|
// 重新订阅
|
2026-05-05 17:34:23 +08:00
|
|
|
if len(rd.sub.subs) > 0 {
|
2026-05-03 08:43:23 +08:00
|
|
|
subs := make([]any, 0)
|
2026-05-05 17:34:23 +08:00
|
|
|
for k := range rd.sub.subs {
|
2026-05-03 08:43:23 +08:00
|
|
|
subs = append(subs, k)
|
|
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
err = rd.sub.conn.Subscribe(subs...)
|
2026-05-03 08:43:23 +08:00
|
|
|
if err != nil {
|
2026-05-05 17:34:23 +08:00
|
|
|
_ = rd.sub.conn.Close()
|
|
|
|
|
rd.sub.conn = nil
|
|
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
time.Sleep(time.Second)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// 重新连接时调用重置数据的回调
|
2026-05-05 17:34:23 +08:00
|
|
|
for _, v := range rd.sub.subs {
|
2026-05-03 08:43:23 +08:00
|
|
|
if v.reset != nil {
|
|
|
|
|
v.reset()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
|
|
|
|
|
if subStartChan != nil {
|
|
|
|
|
subStartChan <- true
|
|
|
|
|
subStartChan = nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
isErr := false
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.RLock()
|
|
|
|
|
subConn := rd.sub.conn
|
|
|
|
|
rd.sub.lock.RUnlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
|
|
|
|
|
if subConn == nil {
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
receiveObj := subConn.Receive()
|
2026-05-03 08:43:23 +08:00
|
|
|
switch v := receiveObj.(type) {
|
|
|
|
|
case redis.Message:
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.RLock()
|
|
|
|
|
callback := rd.sub.subs[v.Channel]
|
|
|
|
|
rd.sub.lock.RUnlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
if callback != nil && callback.received != nil {
|
|
|
|
|
callback.received(v.Data)
|
|
|
|
|
}
|
|
|
|
|
case redis.Subscription:
|
|
|
|
|
case redis.Pong:
|
|
|
|
|
case error:
|
|
|
|
|
if strings.Contains(v.Error(), "i/o timeout") {
|
|
|
|
|
break
|
|
|
|
|
}
|
2026-05-04 00:46:17 +08:00
|
|
|
// 使用 strings.Contains 是因为 redigo 的错误通常是自定义 string 类型
|
|
|
|
|
errMsg := v.Error()
|
|
|
|
|
if !strings.Contains(errMsg, "connection closed") && !strings.Contains(errMsg, "use of closed network connection") {
|
|
|
|
|
rd.logger.Error(errMsg, "type", "redis", "action", "receiveSub")
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
if rd.sub.conn != nil {
|
|
|
|
|
_ = rd.sub.conn.Close()
|
|
|
|
|
rd.sub.conn = nil
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
isErr = true
|
|
|
|
|
}
|
2026-05-03 09:47:14 +08:00
|
|
|
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.RLock()
|
|
|
|
|
running = rd.sub.running
|
|
|
|
|
rd.sub.lock.RUnlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
if isErr || !running {
|
2026-05-03 08:43:23 +08:00
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
if rd.sub.stopChan != nil {
|
|
|
|
|
rd.sub.stopChan <- true
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) Stop() {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
if rd.sub.running {
|
|
|
|
|
rd.sub.stopChan = make(chan bool)
|
|
|
|
|
rd.sub.running = false
|
|
|
|
|
if rd.sub.conn != nil {
|
2026-05-03 08:43:23 +08:00
|
|
|
// 取消订阅
|
2026-05-05 17:34:23 +08:00
|
|
|
if len(rd.sub.subs) > 0 {
|
|
|
|
|
_ = rd.sub.conn.Unsubscribe()
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
_ = rd.sub.conn.Close()
|
|
|
|
|
rd.sub.conn = nil
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
|
|
|
|
<-rd.sub.stopChan
|
|
|
|
|
rd.sub.lock.Lock()
|
|
|
|
|
rd.sub.stopChan = nil
|
|
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 09:47:14 +08:00
|
|
|
} else {
|
2026-05-05 17:34:23 +08:00
|
|
|
rd.sub.lock.Unlock()
|
2026-05-03 08:43:23 +08:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (rd *Redis) PUBLISH(channel, data string) bool {
|
|
|
|
|
return rd.Do("PUBLISH "+channel, data).Bool()
|
|
|
|
|
}
|