如何使用通道实现一个函数?

huangapple go评论88阅读模式
英文:

How to implement a function using channels

问题

有一个在goroutine中运行的函数:

func (c *controlUC) WebhookPool() {
    for {
        if len(c.webhookPool) == 0 {
            continue
        }
        for i := 0; i < len(c.webhookPool); i++ {
            if !c.webhookPool[i].LastSentTime.IsZero() && time.Now().Before(c.webhookPool[i].LastSentTime.Add(GetDelayBySentCount(c.webhookPool[i].SendCount))) {
                continue
            }
            var headers = make(map[string]string)
            headers["Content-type"] = "application/json"
            _, statusCode, err := c.fhttpClient.Request("POST", c.webhookPool[i].Path, c.webhookPool[i].Body, nil, headers)
            if err != nil {
                c.logger.Error(err)
                return
            }
            if statusCode != 200 {
                if c.webhookPool[i].SendCount >= 2 {
                    c.webhookPool = append(c.webhookPool[:i], c.webhookPool[i+1:]...)
                    i--
                    continue
                }
                c.webhookPool[i].SendCount++
            } else {
                c.webhookPool = append(c.webhookPool[:i], c.webhookPool[i+1:]...)
                i--
                continue
            }
            c.webhookPool[i].LastSentTime = time.Now()
        }
    }
}
// webhookPool []models.WebhookPoolElem
type WebhookPoolElem struct {
    SendCount    int
    LastSentTime time.Time
    Path         string
    Body         []byte
}

webhookPoolElem元素被添加到c.webhookpool中,然后向服务器发送请求(路径取自WebhookPoolElem.path)。如果服务器返回的状态码不是200,那么我需要在X秒后再次发送请求(根据SendCountGetDelayBySentCount()中返回不同的时间)。尝试次数是有限的(c.webhookpool[i].SendCount >= 2)。

但也许这个函数需要通过通道来实现?如果是这样,应该如何实现?

英文:

There is a function that is running in goroutine:

func (c *controlUC) WebhookPool() {
for {
if len(c.webhookPool) == 0 {
continue
}
for i := 0; i &lt; len(c.webhookPool); i++ {
if !c.webhookPool[i].LastSentTime.IsZero() &amp;&amp; time.Now().Before(c.webhookPool[i].LastSentTime.Add(GetDelayBySentCount(c.webhookPool[i].SendCount))) {
continue
}
var headers = make(map[string]string)
headers[&quot;Content-type&quot;] = &quot;application/json&quot;
_, statusCode, err := c.fhttpClient.Request(&quot;POST&quot;, c.webhookPool[i].Path, c.webhookPool[i].Body, nil, headers)
if err != nil {
c.logger.Error(err)
return
}
if statusCode != 200 {
if c.webhookPool[i].SendCount &gt;= 2 {
c.webhookPool = append(c.webhookPool[:i], c.webhookPool[i+1:]...)
i--
continue
}
c.webhookPool[i].SendCount++
} else {
c.webhookPool = append(c.webhookPool[:i], c.webhookPool[i+1:]...)
i--
continue
}
c.webhookPool[i].LastSentTime = time.Now()
}
}
}
// webhookPool []models.WebhookPoolElem
type WebhookPoolElem struct {
SendCount    int
LastSentTime time.Time
Path         string
Body         []byte
}

The webhookPoolElem element is added to c.webhookpool, after which a request is sent to the server (the path is taken from WebhookPoolElem.path). If the server returned a non - 200 200, then I need to send the request again, after X seconds (taken from GetDelayBySentCount(), depending on SendCount returns different times). The number of attempts is limited (c.webhookpool[i].SendCount &gt;= 2)

But maybe this function needs to be done through channels? If so, how?

答案1

得分: 1

假设controlUC接收器有一个字段webhookPool chan WebhookPoolElem,并且使用webhookPool: make(chan WebhookPoolElem, n)进行初始化,其中n是缓冲区大小。

你可以接收元素,并且可以将c.webhookPool[i]替换为elem。重写如下:

func (c *controlUC) WebhookPool() {
	for {
		elem, open := <-c.webhookPool
		if !open {
			return
		}

		if !elem.LastSentTime.IsZero() && time.Now().Before(elem.LastSentTime.Add(GetDelayBySentCount(elem.SendCount))) {
			continue
		}
		// 我省略了http请求
		if statusCode != 200 {
			if elem.SendCount >= 2 {
				// 从通道中丢弃消息,无需执行任何操作
				continue
			}
			elem.SendCount++
			elem.LastSentTime = time.Now()
			c.webhookPool <- elem // 再次入队
		}
	}
}

我建议使用带缓冲的通道,这样最后的发送操作c.webhookPool <- elem不会阻塞,但最好将发送操作放在select语句中,这样如果发送无法进行,无论缓冲区如何,goroutine都不会阻塞:

select {
    case c.webhookPool <- elem:
        // 发送成功
    default:
        // 无法发送
}
英文:

Lets say controlUC receiver has a field webhookPool chan WebhookPoolElem and init as webhookPool: make(chan WebhookPoolElem, n) with n as buffer.

You can receive elements and more or less replace c.webhookPool[i] to elem. Rewrite like this:

func (c *controlUC) WebhookPool() {
for {
elem, open := &lt;-c.webhookPool
if !open {
return
}
if !elem.LastSentTime.IsZero() &amp;&amp; time.Now().Before(elem.LastSentTime.Add(GetDelayBySentCount(elem.SendCount))) {
continue
}
// I omit http request
if statusCode != 200 {
if elem.SendCount &gt;= 2 {
// drop message from channel, no need to do anything
continue
}
elem.SendCount++
elem.LastSentTime = time.Now()
c.webhookPool &lt;- elem // enqueue again
}
}

I suggest buffered channel so the last send c.webhookPool &lt;- elem does not block, but it's best if you place the send in a select so if the send can not proceed regardless of the buffer, the goroutine doesn't block:

select {
case c.webhookPool &lt;- elem:
// success
default:
// can not send 
}

huangapple
  • 本文由 发表于 2021年9月30日 16:34:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/69388935.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定