redis/subscribe.go

197 lines
4.2 KiB
Go
Raw Normal View History

package redis
import (
"strings"
"time"
"github.com/gomodule/redigo/redis"
)
func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool {
rd.sub.lock.Lock()
defer rd.sub.lock.Unlock()
if rd.sub.subs == nil {
rd.sub.subs = make(map[string]*SubCallbacks)
}
rd.sub.subs[name] = &SubCallbacks{reset: reset, received: received}
if rd.sub.conn != nil {
err := rd.sub.conn.Subscribe(name)
if err != nil {
rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name)
} else {
return true
}
}
return false
}
func (rd *Redis) Unsubscribe(name string) bool {
rd.sub.lock.Lock()
defer rd.sub.lock.Unlock()
if rd.sub.subs != nil {
delete(rd.sub.subs, name)
}
if rd.sub.conn != nil {
err := rd.sub.conn.Unsubscribe(name)
if err != nil {
rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name)
} else {
return true
}
}
return false
}
func (rd *Redis) Start() {
rd.sub.lock.Lock()
if rd.sub.subs == nil {
rd.sub.subs = make(map[string]*SubCallbacks)
}
if rd.sub.running {
rd.sub.lock.Unlock()
return
}
rd.sub.running = true
rd.sub.lock.Unlock()
subStartChan := make(chan bool)
go rd.receiveSub(subStartChan)
<-subStartChan
}
func (rd *Redis) receiveSub(subStartChan chan bool) {
for {
rd.sub.lock.RLock()
running := rd.sub.running
rd.sub.lock.RUnlock()
if !running {
break
}
// 开始接收订阅数据
rd.sub.lock.Lock()
if rd.sub.conn == nil {
// 获取一个全新的连接,并将超时设为 0 (无限) 以支持长连接订阅
conf := *rd.Config
conf.ReadTimeout = 0
conf.WriteTimeout = 0
conn, err := rd.pool.Dial()
if err != nil {
rd.sub.lock.Unlock()
time.Sleep(time.Second)
continue
}
// 订阅模式下,必须使用长连接,通过 Stop() 时关闭底层连接来强制退出
rd.sub.conn = &redis.PubSubConn{Conn: conn}
// 重新订阅
if len(rd.sub.subs) > 0 {
subs := make([]any, 0)
for k := range rd.sub.subs {
subs = append(subs, k)
}
err = rd.sub.conn.Subscribe(subs...)
if err != nil {
_ = rd.sub.conn.Close()
rd.sub.conn = nil
rd.sub.lock.Unlock()
time.Sleep(time.Second)
continue
}
// 重新连接时调用重置数据的回调
for _, v := range rd.sub.subs {
if v.reset != nil {
v.reset()
}
}
}
}
rd.sub.lock.Unlock()
if subStartChan != nil {
subStartChan <- true
subStartChan = nil
}
for {
isErr := false
rd.sub.lock.RLock()
subConn := rd.sub.conn
rd.sub.lock.RUnlock()
if subConn == nil {
break
}
receiveObj := subConn.Receive()
switch v := receiveObj.(type) {
case redis.Message:
rd.sub.lock.RLock()
callback := rd.sub.subs[v.Channel]
rd.sub.lock.RUnlock()
if callback != nil && callback.received != nil {
callback.received(v.Data)
}
case redis.Subscription:
case redis.Pong:
case error:
if strings.Contains(v.Error(), "i/o timeout") {
break
}
// 使用 strings.Contains 是因为 redigo 的错误通常是自定义 string 类型
errMsg := v.Error()
if !strings.Contains(errMsg, "connection closed") && !strings.Contains(errMsg, "use of closed network connection") {
rd.logger.Error(errMsg, "type", "redis", "action", "receiveSub")
}
rd.sub.lock.Lock()
if rd.sub.conn != nil {
_ = rd.sub.conn.Close()
rd.sub.conn = nil
}
rd.sub.lock.Unlock()
isErr = true
}
rd.sub.lock.RLock()
running = rd.sub.running
rd.sub.lock.RUnlock()
if isErr || !running {
break
}
}
}
rd.sub.lock.Lock()
if rd.sub.stopChan != nil {
rd.sub.stopChan <- true
}
rd.sub.lock.Unlock()
}
func (rd *Redis) Stop() {
rd.sub.lock.Lock()
if rd.sub.running {
rd.sub.stopChan = make(chan bool)
rd.sub.running = false
if rd.sub.conn != nil {
// 取消订阅
if len(rd.sub.subs) > 0 {
_ = rd.sub.conn.Unsubscribe()
}
_ = rd.sub.conn.Close()
rd.sub.conn = nil
}
rd.sub.lock.Unlock()
<-rd.sub.stopChan
rd.sub.lock.Lock()
rd.sub.stopChan = nil
rd.sub.lock.Unlock()
} else {
rd.sub.lock.Unlock()
}
}
func (rd *Redis) PUBLISH(channel, data string) bool {
return rd.Do("PUBLISH "+channel, data).Bool()
}