package redis import ( "strings" "time" "github.com/gomodule/redigo/redis" ) func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool { rd.subLock.Lock() defer rd.subLock.Unlock() if rd.subs == nil { rd.subs = make(map[string]*SubCallbacks) } rd.subs[name] = &SubCallbacks{reset: reset, received: received} if rd.subConn != nil { err := rd.subConn.Subscribe(name) if err != nil { rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name) } else { return true } } return false } func (rd *Redis) Unsubscribe(name string) bool { rd.subLock.Lock() defer rd.subLock.Unlock() if rd.subs != nil { delete(rd.subs, name) } if rd.subConn != nil { err := rd.subConn.Unsubscribe(name) if err != nil { rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name) } else { return true } } return false } func (rd *Redis) Start() { rd.subLock.Lock() if rd.subs == nil { rd.subs = make(map[string]*SubCallbacks) } rd.SubRunning = true rd.subLock.Unlock() subStartChan := make(chan bool) go rd.receiveSub(subStartChan) <-subStartChan } func (rd *Redis) receiveSub(subStartChan chan bool) { for { rd.subLock.RLock() running := rd.SubRunning rd.subLock.RUnlock() if !running { break } // 开始接收订阅数据 rd.subLock.Lock() if rd.subConn == nil { conn, err := rd.GetConnection() if err != nil { rd.subLock.Unlock() time.Sleep(time.Second) continue } rd.subConn = &redis.PubSubConn{Conn: conn} // 重新订阅 if len(rd.subs) > 0 { subs := make([]any, 0) for k := range rd.subs { subs = append(subs, k) } err = rd.subConn.Subscribe(subs...) if err != nil { _ = rd.subConn.Close() rd.subConn = nil rd.subLock.Unlock() time.Sleep(time.Second) continue } // 重新连接时调用重置数据的回调 for _, v := range rd.subs { if v.reset != nil { v.reset() } } } } rd.subLock.Unlock() if subStartChan != nil { subStartChan <- true subStartChan = nil } for { isErr := false rd.subLock.RLock() subConn := rd.subConn rd.subLock.RUnlock() if subConn == nil { break } receiveObj := subConn.Receive() switch v := receiveObj.(type) { case redis.Message: rd.subLock.RLock() callback := rd.subs[v.Channel] rd.subLock.RUnlock() if callback != nil && callback.received != nil { callback.received(v.Data) } case redis.Subscription: case redis.Pong: case error: if strings.Contains(v.Error(), "i/o timeout") { break } // 使用 strings.Contains 是因为 redigo 的错误通常是自定义 string 类型 errMsg := v.Error() if !strings.Contains(errMsg, "connection closed") && !strings.Contains(errMsg, "use of closed network connection") { rd.logger.Error(errMsg, "type", "redis", "action", "receiveSub") } rd.subLock.Lock() if rd.subConn != nil { _ = rd.subConn.Close() rd.subConn = nil } rd.subLock.Unlock() isErr = true } rd.subLock.RLock() running = rd.SubRunning rd.subLock.RUnlock() if isErr || !running { break } } } if rd.subStopChan != nil { rd.subStopChan <- true } } func (rd *Redis) Stop() { rd.subLock.Lock() if rd.SubRunning { rd.subStopChan = make(chan bool) rd.SubRunning = false if rd.subConn != nil { // 取消订阅 if len(rd.subs) > 0 { _ = rd.subConn.Unsubscribe() } // 读一次再关闭可以防止Close时阻塞 _ = rd.subConn.ReceiveWithTimeout(50 * time.Millisecond) _ = rd.subConn.Close() rd.subConn = nil } rd.subLock.Unlock() <-rd.subStopChan rd.subStopChan = nil } else { rd.subLock.Unlock() } } func (rd *Redis) PUBLISH(channel, data string) bool { return rd.Do("PUBLISH "+channel, data).Bool() }