redis/subscribe.go

184 lines
3.7 KiB
Go

package redis
import (
"strings"
"time"
"github.com/gomodule/redigo/redis"
)
func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool {
rd.subLock.Lock()
defer rd.subLock.Unlock()
if rd.subs == nil {
rd.subs = make(map[string]*SubCallbacks)
}
rd.subs[name] = &SubCallbacks{reset: reset, received: received}
if rd.subConn != nil {
err := rd.subConn.Subscribe(name)
if err != nil {
rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name)
} else {
return true
}
}
return false
}
func (rd *Redis) Unsubscribe(name string) bool {
rd.subLock.Lock()
defer rd.subLock.Unlock()
if rd.subs != nil {
delete(rd.subs, name)
}
if rd.subConn != nil {
err := rd.subConn.Unsubscribe(name)
if err != nil {
rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name)
} else {
return true
}
}
return false
}
func (rd *Redis) Start() {
rd.subLock.Lock()
if rd.subs == nil {
rd.subs = make(map[string]*SubCallbacks)
}
rd.SubRunning = true
rd.subLock.Unlock()
subStartChan := make(chan bool)
go rd.receiveSub(subStartChan)
<-subStartChan
}
func (rd *Redis) receiveSub(subStartChan chan bool) {
for {
rd.subLock.RLock()
running := rd.SubRunning
rd.subLock.RUnlock()
if !running {
break
}
// 开始接收订阅数据
rd.subLock.Lock()
if rd.subConn == nil {
conn, err := rd.GetConnection()
if err != nil {
rd.subLock.Unlock()
time.Sleep(time.Second)
continue
}
rd.subConn = &redis.PubSubConn{Conn: conn}
// 重新订阅
if len(rd.subs) > 0 {
subs := make([]any, 0)
for k := range rd.subs {
subs = append(subs, k)
}
err = rd.subConn.Subscribe(subs...)
if err != nil {
_ = rd.subConn.Close()
rd.subConn = nil
rd.subLock.Unlock()
time.Sleep(time.Second)
continue
}
// 重新连接时调用重置数据的回调
for _, v := range rd.subs {
if v.reset != nil {
v.reset()
}
}
}
}
rd.subLock.Unlock()
if subStartChan != nil {
subStartChan <- true
subStartChan = nil
}
for {
isErr := false
rd.subLock.RLock()
subConn := rd.subConn
rd.subLock.RUnlock()
if subConn == nil {
break
}
receiveObj := subConn.Receive()
switch v := receiveObj.(type) {
case redis.Message:
rd.subLock.RLock()
callback := rd.subs[v.Channel]
rd.subLock.RUnlock()
if callback != nil && callback.received != nil {
callback.received(v.Data)
}
case redis.Subscription:
case redis.Pong:
case error:
if strings.Contains(v.Error(), "i/o timeout") {
break
}
if !strings.Contains(v.Error(), "connection closed") && !strings.Contains(v.Error(), "use of closed network connection") {
rd.logger.Error(v.Error(), "type", "redis", "action", "receiveSub")
}
rd.subLock.Lock()
if rd.subConn != nil {
_ = rd.subConn.Close()
rd.subConn = nil
}
rd.subLock.Unlock()
isErr = true
}
rd.subLock.RLock()
running = rd.SubRunning
rd.subLock.RUnlock()
if isErr || !running {
break
}
}
}
if rd.subStopChan != nil {
rd.subStopChan <- true
}
}
func (rd *Redis) Stop() {
rd.subLock.Lock()
if rd.SubRunning {
rd.subStopChan = make(chan bool)
rd.SubRunning = false
if rd.subConn != nil {
// 取消订阅
if len(rd.subs) > 0 {
_ = rd.subConn.Unsubscribe()
}
// 读一次再关闭可以防止Close时阻塞
_ = rd.subConn.ReceiveWithTimeout(50 * time.Millisecond)
_ = rd.subConn.Close()
rd.subConn = nil
}
rd.subLock.Unlock()
<-rd.subStopChan
rd.subStopChan = nil
} else {
rd.subLock.Unlock()
}
}
func (rd *Redis) PUBLISH(channel, data string) bool {
return rd.Do("PUBLISH "+channel, data).Bool()
}