Golang Redis PubSub 超时

huangapple go评论80阅读模式
英文:

Golang Redis PubSub timeout

问题

到目前为止,我一直在做这个:

import (
    _redis "gopkg.in/redis.v3"
    "strconv"
    "time"
)

type Redis struct {
    Connector *_redis.Client
    PubSub    *_redis.PubSub
}

var redis *Redis = nil

func NewRedis() bool {
    if redis == nil {
        redis = new(Redis)
        redis.Connector = _redis.NewClient(&_redis.Options{
            Addr:     config.RedisHostname + ":" + strconv.FormatInt(config.RedisPort, 10),
            Password: "",
            DB:       0,
        })
        Logger.Log(nil, "Connected to Redis")
        err := redis.Init()
        if err != nil {
            Logger.Fatal(nil, "Cannot setup Redis:", err.Error())
            return false
        }
        return true
    }
    return false
}

func (this *Redis) Init() error {
    pubsub, err := this.Connector.Subscribe("test")
    if err != nil {
        return err
    }
    defer pubsub.Close()
    this.PubSub = pubsub
    for {
        msgi, err := this.PubSub.ReceiveTimeout(100 * time.Millisecond)
        if err != nil {
            Logger.Error(nil, "PubSub error:", err.Error())
            err = this.PubSub.Ping("")
            if err != nil {
                Logger.Error(nil, "PubSub failure:", err.Error())
                break
            }
            continue
        }
        switch msg := msgi.(type) {
        case *_redis.Message:
            Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel)
        }
    }
    return nil
}

我的 Connector 是一个 redis.Client,它是工作正常的,因为我能够发布消息。

当我运行程序时,我得到以下错误:
PubSub error: WSARecv tcp 127.0.0.1:64505: i/o timeout

你有任何关于我做错了什么的想法吗?我正在使用这个包:https://github.com/go-redis/redis

英文:

So far I've been doing this:

import (
_redis "gopkg.in/redis.v3"
"strconv"
"time"
)
type Redis struct {
Connector	*_redis.Client
PubSub 		*_redis.PubSub
}
var redis *Redis = nil
func NewRedis() bool {
if redis == nil {
redis = new(Redis)
redis.Connector = _redis.NewClient(&_redis.Options{
Addr: config.RedisHostname + ":" + strconv.FormatInt(config.RedisPort, 10),
Password: "",
DB: 0,
})
Logger.Log(nil, "Connected to Redis")
err := redis.Init()
if err != nil {
Logger.Fatal(nil, "Cannot setup Redis:", err.Error())
return false
}
return true
}
return false
}
func (this *Redis) Init() error {
pubsub, err := this.Connector.Subscribe("test")
if err != nil {
return err
}
defer pubsub.Close()
this.PubSub = pubsub
for {
msgi, err := this.PubSub.ReceiveTimeout(100 * time.Millisecond)
if err != nil {
Logger.Error(nil, "PubSub error:", err.Error())
err = this.PubSub.Ping("")
if err != nil {
Logger.Error(nil, "PubSub failure:", err.Error())
break
}
continue
}
switch msg := msgi.(type) {
case *_redis.Message:
Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel)
}
}
return nil
}

My Connector is a redis.Client, it's working because I was able to publish messages as well.

When I run my program, I get the following error:
PubSub error: WSARecv tcp 127.0.0.1:64505: i/o timeout

Do you have any idea of what I'm doing wrong ? I'm using this package: https://github.com/go-redis/redis

答案1

得分: 5

一些需要注意的事项:

  • (实现细节)当 Redis 进入 PubSub 模式后,此后在该套接字上发生的唯一事情就是 PubSub 事件,这就是为什么 go-redis 中的 PubSub 被抽象成了自己的类型。
  • 一个 PubSub 客户端可以在单个订阅者中订阅多个主题,因此在整个过程中会有订阅/取消订阅事件。
  • 该接口同时具有 Receive()ReceiveTimeout(duration) 方法,两者都返回下一个 事件;这些事件可以是订阅/取消订阅事件和消息事件(你不一定知道是哪个),它们之间唯一的区别是 Receive 会一直阻塞,直到有新的消息,而 ReceiveTimeout 在超时时会报错。

在这种情况下,除非你的消息频率一直超过每秒 10 条(换句话说,消息之间的间隔小于 100 毫秒),否则使用如此短的超时是低效的;我认为由于 golang 具有 goroutine,你几乎不应该在真实应用中使用 ReceiveTimeout,或者使用足够长的超时,比如一分钟。

基于此,你的接收循环应该如下所示:

for {
    msgi, err := this.PubSub.Receive()
    if err != nil {
        Logger.Error(nil, "PubSub error:", err.Error())
        return err
    }
    switch msg := msgi.(type) {
        case *_redis.Message:
            Logger.Log(nil, "Received", msg.Payload, "on channel", msg.Channel)
        default:
            Logger.Log(nil, "Got control message", msg)
    }
}

如果你的应用程序确实需要使用超时,那么你应该使用类型断言来断言表示超时的 *net.OpError,并将其与其他更严重的错误区分开来。

英文:

Some things to note:

  • (implementation detail) When redis goes into PubSub mode, the only thing that happens on that socket afterwards is PubSub events, which is why PubSub in go-redis is abstracted into its own type
  • A PubSub client can potentially subscribe to multiple topics in a single subscriber, hence why there are subscribe/unsubscribe events throughout.
  • the interface has both Receive() and ReceiveTimeout(duration) methods, both of which return the next event on the wire; which can be subscribe/unsubscribe events and message events; (you don't necessarily know which) the only difference between them that Receive blocks forever until there's a new message, and ReceiveTimeout will error on timeout.

With that in mind, unless you have messages far more than 10/second consistently (in other words, <100 milliseconds between messages), it's inefficient to use that short of a timeout; and I'd argue that due to golang having goroutines, you should almost never use ReceiveTimeout for real applications, or use a sufficiently long timeout like a minute.

with that in mind, your receive loop should look like:

    for {
msgi, err := this.PubSub.Receive()
if err != nil {
Logger.Error(nil, &quot;PubSub error:&quot;, err.Error())
return err
}
switch msg := msgi.(type) {
case *_redis.Message:
Logger.Log(nil, &quot;Received&quot;, msg.Payload, &quot;on channel&quot;, msg.Channel)
default:
Logger.Log(nil, &quot;Got control message&quot;, msg)
}
}

If your application really warranted using a timeout, then you should use a type assertion to assert the *net.OpError that signifies a timeout and distinguish it from other more serious errors.

huangapple
  • 本文由 发表于 2015年9月1日 00:19:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/32315648.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定