通过HTTP启动/停止和处理自定义调度程序

huangapple go评论98阅读模式
英文:

start/stop and handler custom scheduler via http

问题

我想创建一个调度程序,以便每秒执行一个任务,同时还希望有一个HTTP接口来停止/启动调度程序并获取更多统计信息。在阅读了有关定时器和计时器通道goroutine的更多信息后,我得出了以下结论:

https://gist.github.com/nbari/483c5b382c795bf290b5

package main

import (
	"fmt"
	"log"
	"net/http"
	"time"
)

var timer *time.Ticker

func scheduler(seconds time.Duration) *time.Ticker {
	ticker := time.NewTicker(seconds * time.Second)
	go func() {
		for t := range ticker.C {
			// do stuff
			fmt.Println(t)
		}
	}()
	return ticker
}

func Start(timer *time.Ticker) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		timer = scheduler(1)
		w.Write([]byte("Starting scheduler"))
	})
}

func Stop(timer *time.Ticker) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		timer.Stop()
		w.Write([]byte("Stopping scheduler"))
	})
}

func main() {
	timer = scheduler(1)
	http.Handle("/start", Start(timer))
	http.Handle("/stop", Stop(timer))
	log.Fatal(http.ListenAndServe(":8080", nil))
}

上述代码是可行的,但我有一个全局变量"timer",我想知道是否有更好的实现方式,以及处理多个调度程序的方法。我正在考虑实现一种容器来存储所有的调度程序,但我希望能得到一些反馈,以帮助我找到更巧妙的解决方案。

英文:

I want to create a scheduler so that it executes a task every second for example, but also would like to have and http interface to stop/start the scheduler and get more stats/info, after reading more about timers & tickers, channels and gorutines I came out with this:

https://gist.github.com/nbari/483c5b382c795bf290b5

<!-- language: lang-golang -->

package main

import (
	&quot;fmt&quot;
	&quot;log&quot;
	&quot;net/http&quot;
	&quot;time&quot;
)

var timer *time.Ticker

func scheduler(seconds time.Duration) *time.Ticker {
	ticker := time.NewTicker(seconds * time.Second)
	go func() {
		for t := range ticker.C {
			// do stuff
			fmt.Println(t)
		}
	}()
	return ticker
}

func Start(timer *time.Ticker) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		timer = scheduler(1)
		w.Write([]byte(&quot;Starting scheduler&quot;))
	})
}

func Stop(timer *time.Ticker) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		timer.Stop()
		w.Write([]byte(&quot;Stoping scheduler&quot;))
	})
}

func main() {
	timer = scheduler(1)
	http.Handle(&quot;/start&quot;, Start(timer))
	http.Handle(&quot;/stop&quot;, Stop(timer))
	log.Fatal(http.ListenAndServe(&quot;:8080&quot;, nil))
}

The above code is working but I have a global "timer" variable, I would like to know if there is a better way to implement this and also a way for handle more than 1 scheduler, currently thinking on probably implementing kind of a container for all the scheduler but would like to have some feedbacks that could help me find clever solutions.

答案1

得分: 0

是的,有一种更好的方法:

在这里,你应该使用通道,并且像你建议的那样,使用某种数据结构来保存更多的调度器。

我想出了这个最基本的工作示例,展示了我会怎么做:

package main

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

// 调度器在每次从 t 接收到数据时运行 f,并在从 quit 接收到非阻塞数据时退出
type scheduler struct {
	t    <-chan time.Time
	quit chan struct{}
	f    func()
}

// 调度器池保存多个调度器
type schedulerPool struct {
	schedulers map[int]scheduler // 我在这里使用了 map,这样你可以使用更聪明的键
	counter    int
	mut        sync.Mutex
}

func newPool() *schedulerPool {
	return &schedulerPool{
		schedulers: make(map[int]scheduler),
	}
}

// start 添加并启动一个新的调度器,该调度器将在每个 interval 执行一次 f。
// 它返回生成的调度器的 ID,或者返回一个错误。
func (p *schedulerPool) start(interval time.Duration, f func()) (int, error) {
	p.mut.Lock()
	defer p.mut.Unlock()
	index := p.counter
	p.counter++

	if _, ok := p.schedulers[index]; ok {
		return 0, errors.New("key already in use")
	}

	sched := scheduler{
		t:    time.NewTicker(interval).C,
		quit: make(chan struct{}),
		f:    f,
	}

	p.schedulers[index] = sched
	go func() {
		for {
			select {
			case <-sched.t:
				sched.f()
			case <-sched.quit:
				return
			}
		}
	}()
	return index, nil
}

// stop 停止具有给定 ID 的调度器。
func (p *schedulerPool) stop(index int) error {
	p.mut.Lock()
	defer p.mut.Unlock()

	sched, ok := p.schedulers[index]
	if !ok {
		return errors.New("does not exist")
	}

	close(sched.quit)
	return nil
}

func main() {
	// 获取一个新的池
	pool := newPool()

	// 启动一个调度器
	idx1, err := pool.start(time.Second, func() { fmt.Println("hello 1") })
	if err != nil {
		panic(err)
	}

	// 启动另一个调度器
	idx2, err := pool.start(time.Second, func() { fmt.Println("hello 2") })
	if err != nil {
		panic(err)
	}

	// 等待一段时间
	time.Sleep(3 * time.Second)

	// 停止第二个调度器
	err = pool.stop(idx2)
	if err != nil {
		panic(err)
	}

	// 再等待一段时间
	time.Sleep(3 * time.Second)

	// 停止第一个调度器
	err = pool.stop(idx1)
	if err != nil {
		panic(err)
	}
}

playground 上查看它。

请注意,不能保证第一个调度器总共运行六次,第二个调度器运行三次。

还请注意我做了一些其他的事情:

  • 必须同步地访问映射
  • 生成 ID 必须同步进行
  • 你应该检查键是否实际上在映射中
  • 使用通道来启动、停止和控制调度器(例如,添加一个名为 beep 的通道,让调度器向控制台输出一些内容(比如 beep))
  • 为你想要执行的函数定义一个精确签名的类型,例如 type schedFunc func(int, int)

显然,你需要将这些内容封装在 http 处理程序中,可能从查询字符串中获取 ID 或参数,或者进行其他一些花哨的操作。这只是为了演示。

英文:

Yes, there is a better way:

You should use channels here, and, as you suggested, some data structure to hold more schedulers.

I came up with this, it's a most basic working example of what I'd do:

package main
import (
&quot;errors&quot;
&quot;fmt&quot;
&quot;sync&quot;
&quot;time&quot;
)
// a scheduler runs f on every receive on t and exits when receiving from quit is non-blocking
type scheduler struct {
t    &lt;-chan time.Time
quit chan struct{}
f    func()
}
// a schedulerPool holds multiple schedulers
type schedulerPool struct {
schedulers map[int]scheduler //I used a map here so you can use more clever keys
counter    int
mut        sync.Mutex
}
func newPool() *schedulerPool {
return &amp;schedulerPool{
schedulers: make(map[int]scheduler),
}
}
// start adds and starts a new scheduler that will execute f every interval.
// It returns the generated ID for the scheduler, or an error.
func (p *schedulerPool) start(interval time.Duration, f func()) (int, error) {
p.mut.Lock()
defer p.mut.Unlock()
index := p.counter
p.counter++
if _, ok := p.schedulers[index]; ok {
return 0, errors.New(&quot;key already in use&quot;)
}
sched := scheduler{
t:    time.NewTicker(interval).C,
quit: make(chan struct{}),
f:    f,
}
p.schedulers[index] = sched
go func() {
for {
select {
case &lt;-sched.t:
sched.f()
case &lt;-sched.quit:
return
}
}
}()
return index, nil
}
// stop stops the scheduler with the given ID.
func (p *schedulerPool) stop(index int) error {
p.mut.Lock()
defer p.mut.Unlock()
sched, ok := p.schedulers[index]
if !ok {
return errors.New(&quot;does not exist&quot;)
}
close(sched.quit)
return nil
}
func main() {
// get a new pool
pool := newPool()
// start a scheduler
idx1, err := pool.start(time.Second, func() { fmt.Println(&quot;hello 1&quot;) })
if err != nil {
panic(err)
}
// start another scheduler
idx2, err := pool.start(time.Second, func() { fmt.Println(&quot;hello 2&quot;) })
if err != nil {
panic(err)
}
// wait some time
time.Sleep(3 * time.Second)
// stop the second scheduler
err = pool.stop(idx2)
if err != nil {
panic(err)
}
// wait some more
time.Sleep(3 * time.Second)
// stop the first scheduler
err = pool.stop(idx1)
if err != nil {
panic(err)
}
}

Check it out on the playground.

Note that there is no guarantee that the first scheduler will run six times in total and the second one three times.

Also note a few other things I've done:

  • Map access has to be synchronized
  • Generating IDs must be synchronized
  • You should check if the key is actually in the map
  • Use channels to start, stop and control schedulers (for example, add a channel beep to have a scheduler dump something to the console (like &quot;beep&quot;))
  • Define a type for the exact signature of function you want to execute, for example type schedFunc func(int, int)

Obviously, you need to wrap that stuff inside of http handlers, probably take the ID or parameters from the querystring, or some other fancy stuff. This is just for demonstration.

huangapple
  • 本文由 发表于 2015年10月29日 19:33:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/33413105.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定