Go:通过通道传递函数

huangapple go评论69阅读模式
英文:

Go: Passing functions through channels

问题

我正在尝试通过将函数放入队列中以后再调用它们来限制函数的调用频率。下面是我创建的一部分请求的切片,requestHandler函数以一定的速率处理每个请求。

我希望它能够接受各种类型和参数的函数,因此使用了interface{}类型。

我该如何通过通道传递函数并成功调用它们呢?

type request struct {
    function interface{}
    channel  chan interface{}
}

var requestQueue []request

func pushQueue(f interface{}, ch chan interface{}) {
    req := request{
        f,
        ch,
    }

    // 入队
    requestQueue = append(requestQueue, req)
}

func requestHandler() {
    for {
        if len(requestQueue) > 0 {
            // 出队
            req := requestQueue[len(requestQueue)-1]
            requestQueue = requestQueue[:len(requestQueue)-1]

            req.channel <- req.function
        }

        <-time.After(1200 * time.Millisecond)
    }
}

这是我想要实现的一个示例(GetLeagueEntries(string, string)和GetSummonerName(int, int)是函数):

ch := make(chan interface{})
pushQueue(l.GetLeagueEntries, ch)
pushQueue(l.GetSummonerName, ch)

leagues, _ := <-ch(string1, string2)
summoners, _ := <-ch(int1, int2)

希望对你有帮助!

英文:

I'm trying to rate limit the functions that I call by placing them through a queue to be accessed later. Below I have a slice of requests that I have created, and the requestHandler function processes each request at a certain rate.

I want it to accept all kinds of functions with different types of parameters, hence the interface{} type.

How would I be able to pass the functions through a channel and successfully call them?

type request struct {
    function interface{}
    channel  chan interface{}
}

var requestQueue []request

func pushQueue(f interface{}, ch chan interface{}) {
    req := request{
        f,
        ch,
    }

    //push
    requestQueue = append(requestQueue, req)
}

func requestHandler() {
    for {
        if len(requestQueue) &gt; 0 {
            //pop
            req := requestQueue[len(requestQueue)-1]
            requestQueue = requestQueue[:len(requestQueue)-1]

            req.channel &lt;- req.function
        }

        &lt;-time.After(1200 * time.Millisecond)
    }
}

Here is an example of what I'm trying to achieve (GetLeagueEntries(string, string) and GetSummonerName(int, int) are functions):

ch := make(chan interface{})
    pushQueue(l.GetLeagueEntries, ch)
    pushQueue(l.GetSummonerName, ch)

    leagues, _ := &lt;-ch(string1, string2)
    summoners, _ := &lt;-ch(int1, int2)

答案1

得分: 2

首先,我会将其写成以下形式:

leagues := server.GetLeagueEntries()
summoners := server.GetSummoners()

然后,将速率限制放入服务器中,可以使用其中一个速率限制库。

然而,可以使用接口来统一请求,并使用func类型来允许闭包(就像http.HandleFunc中那样):

type Command interface {
    Execute(server *Server)
}

type CommandFunc func(server *Server)
func (fn CommandFunc) Execute(server *Server) { fn(server) }

type GetLeagueEntries struct { Leagues []League }

func (entries *GetLeagueEntries) Execute(server *Server) {
    // ...
}

func GetSummonerName(id int, result *string) CommandFunc {
    return CommandFunc(func(server *Server){
        *result = "hello"
    })
}

get := GetLeagueEntries{}
requests <- &get

requests <- CommandFunc(func(server *Server){
    // ... 在这里处理一些东西
})

当然,这需要一些同步操作。

英文:

First, I would write it as:

leagues := server.GetLeagueEntries()
summoners := server.GetSummoners()

And, put the rate limiting into the server. With one of the rate-limiting libraries.

However, it is possible to use an interface to unify the requests, and use a func type to allow closures (as in http.HandleFunc):

type Command interface {
	Execute(server *Server)
}

type CommandFunc func(server *Server)
func (fn CommandFunc) Execute(server *Server) { fn(server) }

type GetLeagueEntries struct { Leagues []League }

func (entries *GetLeagueEntries) Execute(server *Server) {
	// ...
}

func GetSummonerName(id int, result *string) CommandFunc {
	return CommandFunc(func(server *Server){
		*result = &quot;hello&quot;
	})
}

get := GetLeagueEnties{}
requests &lt;- &amp;get

requests &lt;- CommandFunc(func(server *Server){
	// ... handle struff here
})

Of course, this needs some synchronization.

答案2

得分: 2

好的,这是代码的翻译结果:

好的这是代码https://play.golang.org/p/XZvb_4BaJF

*请注意这并不完美你有一个每秒执行一次的队列如果队列为空并且添加了新项新项可能要等待将近一秒才能执行*

但是这应该非常接近你所需要的 :)

这段代码可以分为三个部分

1. 速率限制的队列执行器我称之为服务器我很糟糕地给东西命名- 服务器对函数一无所知它所做的只是启动一个永不停止的goroutine每秒弹出队列中最旧的函数并调用它我上面提到的问题就在这部分代码中如果你愿意我可以帮你修复它
2. 按钮点击功能 - 这展示了每次按钮点击如何使用服务器调用3个不同的函数你当然可以调用更多/更少的函数),并确保它们之间相隔1秒你甚至可以给任何一个函数添加超时模拟延迟),它们仍然会被1秒钟调用一次这是唯一需要使用通道的地方因为你希望尽快地进行所有函数调用如果第一个函数需要5秒钟你只希望等待1秒钟来调用第二个函数),然后等待它们完成所以你需要知道它们何时完成
3. 按钮点击模拟主函数- 这只是展示了3个按钮点击按预期工作你还可以将它们放在goroutine中模拟3个用户同时点击按钮它仍然可以工作

package main

import (
    "fmt"
    "sync"
    "time"
)

const (
    requestFreq = time.Second
)

type (
    // 单个请求
    request func()
    
    // 持有请求队列并在requestFreq时执行它们的服务器
    server struct {
        // 这将每requestFreq触发一次
        ticker     *time.Ticker
        
        requests []request
        // 用于处理请求切片的互斥锁
        sync.RWMutex
    }
)

var (
    createServerOnce sync.Once
    s *server
)

func main() {
    // 多个按钮点击:
    ButtonClick()
    ButtonClick()
    ButtonClick()
    
    fmt.Println("完成!")
}

// 按钮逻辑:

// 调用3个函数并返回3个不同的值。
// 每个函数至少相隔1秒调用一次。
func ButtonClick() (val1 int, val2 string, val3 bool) {
    iCh := make(chan int)
    sCh := make(chan string)
    bCh := make(chan bool)
    
    go func(){
        Server().AppendRequest(func() {
            t := time.Now()
            fmt.Println("调用函数1(时间:" + t.Format("15:04:05") + ")")
            // 做一些事情
            iCh <- 1
        })
    }()
    go func(){
        Server().AppendRequest(func() {
            t := time.Now()
            fmt.Println("调用函数2(时间:" + t.Format("15:04:05") + ")")
            // 做一些事情
            sCh <- "Yo"
        })
    }()
    go func(){
        Server().AppendRequest(func() {
            t := time.Now()
            fmt.Println("调用函数3(时间:" + t.Format("15:04:05") + ")")
            // 做一些事情
            bCh <- true
        })
    }()
    
    // 等待所有3个调用返回
    for count := 0; count < 3; count++ {
        select {
        case val1 = <-iCh:
        case val2 = <-sCh:
        case val3 = <-bCh:
        }
    }

    return
}

// 服务器逻辑

// 只创建一个服务器的工厂函数
func Server() *server {
    // 整个应用程序只有一个服务器
    createServerOnce.Do(func() {
        s = &server{ticker: time.NewTicker(requestFreq), requests: []request{}}

        // 启动一个线程来发出请求。
        go s.makeRequests()
    })
    return s
}
func (s *server) makeRequests() {
    if s == nil || s.ticker == nil {
        return
    }
    
    // 这将每requestFreq继续进行一次
    for _ = range s.ticker.C {
    
        var r request
        
        // 你不能直接访问s.requests,因为你在一个goroutine中,
        // 而在这个goroutine中,有人可能在外部添加新的请求,
        // 所以你必须使用锁。
        s.Lock()
        if len(s.requests) > 0 {
            // 我们在这里有一个锁,它阻塞了所有其他操作,
            // 所以只需将第一个请求移出,保存它并在执行任何工作之前释放锁。
            r = s.requests[0]
            s.requests = s.requests[1:]
        }
        s.Unlock()
        
        if r != nil {
            // 发出请求!
            r()
        }
    }
}
func (s *server) AppendRequest(r request) {
    if s == nil {
        return
    }
    s.Lock()
    s.requests = append(s.requests, r)
    s.Unlock()
}

希望对你有帮助!如果有任何问题,请随时问我。

英文:

Alright, here is the codez: https://play.golang.org/p/XZvb_4BaJF

Notice that it's not perfect. You have a queue that is executed every second. If the queue is empty and a new item is added, the new item can wait for almost a second before being executed.

But this should get you very close to what you need anyway Go:通过通道传递函数

This code can be split into 3 section:

  1. The rate limited queue executor, which I call the server (I'm horrible at naming things) - The server doesn't know anything about the functions. All it does is start a never-ending goroutine that pops the oldest function in the queue, once every second, and calls it. The issue that I talked about above is in this section of the code BTW and I could help you fix it if you want.

  2. The Button Click functionality - This shows you how each button click could call 3 diff functions (you could obviously make more/less function calls) using the server and make sure that they are each 1 second apart from each other. You can even add a timeout to any of the functions (to fake latency) and they would still get called 1 second apart. This is the only place that you need channels because you want to make all the function calls as fast as possible (if the first function takes 5 seconds, you only want to wait 1 second to call the second function) and then wait for them to finish so you need to know when they are all done.

  3. The Button Click simulation (the main func) - this just shows that 3 button clicks would work as expected. You can also put them in a goroutine to simulate 3 users clicking the button at the same time and it would still work.

    package main
    import (
    &quot;fmt&quot;
    &quot;sync&quot;
    &quot;time&quot;
    )
    const (
    requestFreq = time.Second
    )
    type (
    // A single request
    request func()
    // The server that will hold a queue of requests and make them once a requestFreq
    server struct {
    // This will tick once per requestFreq
    ticker     *time.Ticker
    requests []request
    // Mutex for working with the request slice
    sync.RWMutex
    }
    )
    var (
    createServerOnce sync.Once
    s *server
    )
    func main() {
    // Multiple button clicks:
    ButtonClick()
    ButtonClick()
    ButtonClick()
    fmt.Println(&quot;Done!&quot;)
    }
    // BUTTON LOGIC:
    // Calls 3 functions and returns 3 diff values.
    // Each function is called at least 1 second appart.
    func ButtonClick() (val1 int, val2 string, val3 bool) {
    iCh := make(chan int)
    sCh := make(chan string)
    bCh := make(chan bool)
    go func(){
    Server().AppendRequest(func() {
    t := time.Now()
    fmt.Println(&quot;Calling func1 (time: &quot; + t.Format(&quot;15:04:05&quot;) + &quot;)&quot;)
    // do some stuff
    iCh &lt;- 1
    })
    }()
    go func(){
    Server().AppendRequest(func() {
    t := time.Now()
    fmt.Println(&quot;Calling func2 (time: &quot; + t.Format(&quot;15:04:05&quot;) + &quot;)&quot;)
    // do some stuff
    sCh &lt;- &quot;Yo&quot;
    })
    }()
    go func(){
    Server().AppendRequest(func() {
    t := time.Now()
    fmt.Println(&quot;Calling func3 (time: &quot; + t.Format(&quot;15:04:05&quot;) + &quot;)&quot;)
    // do some stuff
    bCh &lt;- true
    })
    }()
    // Wait for all 3 calls to come back
    for count := 0; count &lt; 3; count++ {
    select {
    case val1 = &lt;-iCh:
    case val2 = &lt;-sCh:
    case val3 = &lt;-bCh:
    }
    }
    return
    }
    // SERVER LOGIC
    // Factory function that will only create a single server
    func Server() *server {
    // Only one server for the entire application
    createServerOnce.Do(func() {
    s = &amp;server{ticker: time.NewTicker(requestFreq), requests: []request{}}
    // Start a thread to make requests.
    go s.makeRequests()
    })
    return s
    }
    func (s *server) makeRequests() {
    if s == nil || s.ticker == nil {
    return
    }
    // This will keep going once per each requestFreq
    for _ = range s.ticker.C {
    var r request
    // You can&#39;t just access s.requests because you are in a goroutine
    // here while someone could be adding new requests outside of the 
    // goroutine so you have to use locks.
    s.Lock()
    if len(s.requests) &gt; 0 {
    // We have a lock here, which blocks all other operations 
    // so just shift the first request out, save it and give 
    // the lock back before doing any work.
    r = s.requests[0]
    s.requests = s.requests[1:]
    }
    s.Unlock()
    if r != nil {
    // make the request!
    r()
    }
    }
    }
    func (s *server) AppendRequest(r request) {
    if s == nil {
    return
    }
    s.Lock()
    s.requests = append(s.requests, r)
    s.Unlock()
    }
    

答案3

得分: 0

我本来以为使用信号量或工作池会更容易。这样,你可以有一定数量的工作线程来执行任务。也可以有多个工作池。

你需要这些调用是并发/异步的吗?如果不需要,并且它们可以按顺序调用,你可以使用可配置的延迟(这是一种不太好的方法)。

尝试使用工作池或信号量,而不是函数通道。

英文:

I would have thought it easier to use some sort of semaphore or worker pool. That way you have limited number of workers who can do anything. It would be possible to have multiple worker pools too.

Do you need any of these calls to be concurrent/asynchronous? If not, and they can be called in order you could have configurable sleep (a nasty hack mind).

Try out a worker pool or semaphore rather than a chan of functions.

huangapple
  • 本文由 发表于 2016年1月7日 14:32:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/34648705.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定