package redis import ( "strings" "time" "github.com/gomodule/redigo/redis" ) func (rd *Redis) Subscribe(name string, reset func(), received func([]byte)) bool { rd.sub.lock.Lock() defer rd.sub.lock.Unlock() if rd.sub.subs == nil { rd.sub.subs = make(map[string]*SubCallbacks) } rd.sub.subs[name] = &SubCallbacks{reset: reset, received: received} if rd.sub.conn != nil { err := rd.sub.conn.Subscribe(name) if err != nil { rd.logger.Error(err.Error(), "type", "redis", "action", "subscribe", "name", name) } else { return true } } return false } func (rd *Redis) Unsubscribe(name string) bool { rd.sub.lock.Lock() defer rd.sub.lock.Unlock() if rd.sub.subs != nil { delete(rd.sub.subs, name) } if rd.sub.conn != nil { err := rd.sub.conn.Unsubscribe(name) if err != nil { rd.logger.Error(err.Error(), "type", "redis", "action", "unsubscribe", "name", name) } else { return true } } return false } func (rd *Redis) Start() { rd.sub.lock.Lock() if rd.sub.subs == nil { rd.sub.subs = make(map[string]*SubCallbacks) } if rd.sub.running { rd.sub.lock.Unlock() return } rd.sub.running = true rd.sub.lock.Unlock() subStartChan := make(chan bool) go rd.receiveSub(subStartChan) <-subStartChan } func (rd *Redis) receiveSub(subStartChan chan bool) { for { rd.sub.lock.RLock() running := rd.sub.running rd.sub.lock.RUnlock() if !running { break } // 开始接收订阅数据 rd.sub.lock.Lock() if rd.sub.conn == nil { // 获取一个全新的连接,并将超时设为 0 (无限) 以支持长连接订阅 conf := *rd.Config conf.ReadTimeout = 0 conf.WriteTimeout = 0 conn, err := rd.pool.Dial() if err != nil { rd.sub.lock.Unlock() time.Sleep(time.Second) continue } // 订阅模式下,必须使用长连接,通过 Stop() 时关闭底层连接来强制退出 rd.sub.conn = &redis.PubSubConn{Conn: conn} // 重新订阅 if len(rd.sub.subs) > 0 { subs := make([]any, 0) for k := range rd.sub.subs { subs = append(subs, k) } err = rd.sub.conn.Subscribe(subs...) if err != nil { _ = rd.sub.conn.Close() rd.sub.conn = nil rd.sub.lock.Unlock() time.Sleep(time.Second) continue } // 重新连接时调用重置数据的回调 for _, v := range rd.sub.subs { if v.reset != nil { v.reset() } } } } rd.sub.lock.Unlock() if subStartChan != nil { subStartChan <- true subStartChan = nil } for { isErr := false rd.sub.lock.RLock() subConn := rd.sub.conn rd.sub.lock.RUnlock() if subConn == nil { break } receiveObj := subConn.Receive() switch v := receiveObj.(type) { case redis.Message: rd.sub.lock.RLock() callback := rd.sub.subs[v.Channel] rd.sub.lock.RUnlock() if callback != nil && callback.received != nil { callback.received(v.Data) } case redis.Subscription: case redis.Pong: case error: if strings.Contains(v.Error(), "i/o timeout") { break } // 使用 strings.Contains 是因为 redigo 的错误通常是自定义 string 类型 errMsg := v.Error() if !strings.Contains(errMsg, "connection closed") && !strings.Contains(errMsg, "use of closed network connection") { rd.logger.Error(errMsg, "type", "redis", "action", "receiveSub") } rd.sub.lock.Lock() if rd.sub.conn != nil { _ = rd.sub.conn.Close() rd.sub.conn = nil } rd.sub.lock.Unlock() isErr = true } rd.sub.lock.RLock() running = rd.sub.running rd.sub.lock.RUnlock() if isErr || !running { break } } } rd.sub.lock.Lock() if rd.sub.stopChan != nil { rd.sub.stopChan <- true } rd.sub.lock.Unlock() } func (rd *Redis) Stop() { rd.sub.lock.Lock() if rd.sub.running { rd.sub.stopChan = make(chan bool) rd.sub.running = false if rd.sub.conn != nil { // 取消订阅 if len(rd.sub.subs) > 0 { _ = rd.sub.conn.Unsubscribe() } _ = rd.sub.conn.Close() rd.sub.conn = nil } rd.sub.lock.Unlock() <-rd.sub.stopChan rd.sub.lock.Lock() rd.sub.stopChan = nil rd.sub.lock.Unlock() } else { rd.sub.lock.Unlock() } } func (rd *Redis) PUBLISH(channel, data string) bool { return rd.Do("PUBLISH "+channel, data).Bool() }