checking api with golang for changes in data continuously

huangapple go评论66阅读模式
英文:

checking api with golang for changes in data continuously

问题

我正在尝试轮询一个API以保持交通数据的时间序列,并在有变化时将该数据保存到PostgreSQL中。

目前,我有一个类似于这样的实现:

// 每隔X秒检查API是否有新信息
func Poll(req *http.Request, client *http.Client) ([]byte, error) {
    r := rand.New(rand.NewSource(99))
    c := time.Tick(10 * time.Second)
    for _ = range c {
        // 下载URL的当前内容并对其进行处理
        response, err := client.Do(req)
        data, _ := io.ReadAll(response.Body)

        if err != nil {
            return nil, err
        }
        return data, nil
        // 添加一点抖动
        jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
        time.Sleep(jitter)
    }
}

func main() {
    client := &http.Client{
        Timeout: time.Second * 60 * 60 * 600,
    }
    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
    req, err := http.NewRequest("GET", url, nil)
    if err != nil {
        return err
    }
    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

    data, err := Poll(req, client)
    fmt.Println(string(data))
}

接下来,我将编写一个比较函数。

基本上,我正在努力确保循环首先调用查询并返回适当的值。

我认为这个实现可能不是很好,我不确定如何正确实现它。我能得到一些指导吗?

英文:

I'm trying to poll an API to keep a time series of traffic data, and save that data to postgres when there has been a change.

At the moment I've got an implementation sort of like this

//this needs to check the api for new information every X seconds
func Poll(req *http.Request, client *http.Client) ([]byte, error) {
	r := rand.New(rand.NewSource(99))
	c := time.Tick(10 * time.Second)
	for _ = range c {
		//Download the current contents of the URL and do something with it
		response, err := client.Do(req)
		data, _ := io.ReadAll(response.Body)

		if err != nil {
			return nil, err
		}
		return data, nil
		// add a bit of jitter
		jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
		time.Sleep(jitter)
	}

}



func main() {

	client := &http.Client{
		Timeout: time.Second * 60 * 60 * 600,
	}
	url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return err
	}
	req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

	// response, err := client.Do(req)
	data, err := Poll(req, client)
	fmt.Println(string(data))

}

I will do a comparison function next.

Basically, I'm trying to work out how to ensure the loop calls the query in the first place and returns an appropriate value.

I think this implementation is probably not very good and I'm just not sure how to really properly implement it. Could I get some pointers?

答案1

得分: 1

你的问题涉及到一个典型的生产者/消费者场景,因为你的Poll()函数产生了响应数据,而这些数据被main()函数消费(可能是为了将数据保存在PostgreSQL中)。
这个问题可以通过使用Go协程和通道来优雅地解决。

可以在一个goroutine中完成轮询工作,并通过通道将响应数据传递给main函数。在轮询工作中可能会出现错误(响应错误或IO错误),因此也需要将其传递给main()函数。

首先定义一个新类型来保存轮询的数据和错误:

type PollResponse struct {
	Data []byte
	Err error
}

在Poll()函数中,启动一个goroutine来执行轮询工作,并返回一个通道以在goroutine外部共享数据:

func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){
	ch = make(chan PollResponse) // 缓冲通道也是可以的
	go func() {
		defer func() {
			close(ch)
		}()
		r := rand.New(rand.NewSource(99))
		c := time.Tick(10 * time.Second)

		for range c {
			res, err := client.Do(req);
			pollRes := PollResponse {}
			if err != nil {
				pollRes.Data, pollRes.Err = nil, err
				ch <- pollRes
				break
			}
			pollRes.Data, pollRes.Err = io.ReadAll(res.Body)
			ch <- pollRes
			if pollRes.Err != nil {
				break
			}
			jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
			time.Sleep(jitter)
		}
	}()
	return
}

最后,在main()函数中调用Poll()并读取通道以获取轮询响应:

func main() {
	client := &http.Client{
		Timeout: time.Second * 60 * 60 * 600,
	}
	url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"

	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return
	}
    req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

	pollCh := Poll(req, client)
	
	for item := range pollCh {
		if item.Err == nil {
			fmt.Println(string(item.Data)) // 或将其保存到PostgreSQL数据库中
		}		
	}
}
英文:

Your problem presents a typical producer/consumer scenario since your Poll() function is producing the response data which is consumed by your main() function (may be to save data in postgres).
This problem can be solved excellently by using go routines and channels.

The polling work can be done in a goroutine which communicates the response data to the main function over a channel. There could also be an error while the polling work (response error or io error) so it should also be communicated to the main() function.

First define a new type to hold polled data and an error:

type PollResponse struct {
	Data []byte
	Err error
}

In Poll() function, start a go routine to do poll work and return a channel to share data outside the go routine:

func Poll(req *http.Request, client *http.Client) (ch chan PollResponse){
	ch = make(chan PollResponse) // Buffered channel is also good
	go func() {
		defer func() {
			close(ch)
		}()
		r := rand.New(rand.NewSource(99))
		c := time.Tick(10 * time.Second)

		for range c {
			res, err := client.Do(req);
			pollRes := PollResponse {}
			if err != nil {
				pollRes.Data, pollRes.Err = nil, err
				ch &lt;- pollRes
				break
			}
			pollRes.Data, pollRes.Err = io.ReadAll(res.Body)
			ch &lt;- pollRes
			if pollRes.Err != nil {
				break
			}
			jitter := time.Duration(r.Int31n(5000)) * time.Millisecond
			time.Sleep(jitter)
		}
	}()
	return
}

And finally in the main() function, call Poll() and read the channel to get poll response:

func main() {
	client := &amp;http.Client{
		Timeout: time.Second * 60 * 60 * 600,
	}
	url := &quot;https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links&quot;

	req, err := http.NewRequest(&quot;GET&quot;, url, nil)
	if err != nil {
		return
	}
    req.Header.Set(&quot;Ocp-Apim-Subscription-Key&quot;, &quot;xx&quot;)

	pollCh := Poll(req, client)
	
	for item := range pollCh {
		if item.Err == nil {
			fmt.Println(string(item.Data)) // or save it to postgres database
		}		
	}
}

答案2

得分: 1

在ticker通道上进行范围遍历。在每次迭代中,获取数据,检查数据是否发生变化并处理数据。关键点是在循环内部处理数据,而不是从函数返回数据。

假设你有以下函数:

// processChangedData使用来自API端点的新数据更新数据库。
func processChangedData(data []byte) error {
    // 实现保存到PostgreSQL的逻辑
}

使用以下函数进行轮询:

func Poll(client *http.Client) error {

    url := "https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links"

    // 使用NewTicker而不是Tick,以便在函数返回时清理ticker。
    t := time.NewTicker(10 * time.Second)
    defer t.Stop()

    var prev []byte

    for _ = range t.C {

        // 每个请求创建一个新的请求对象。
        req, err := http.NewRequest("GET", url, nil)
        if err != nil {
            return err
        }
        req.Header.Set("Ocp-Apim-Subscription-Key", "xx")

        resp, err := client.Do(req)
        if err != nil {
            // 编辑错误处理以符合应用程序要求。我在这里返回一个错误。继续循环也是一种选择。
            return err
        }

        data, err := io.ReadAll(resp.Body)

        // 在处理下面的错误之前确保关闭body。
        resp.Body.Close()

        if err != nil {
            // 编辑错误处理以符合应用程序要求。我在这里返回一个错误。继续循环也是一种选择。
            return err
        }

        if resp.StatusCode != http.StatusOK {
            // 编辑错误处理以符合应用程序要求。我在这里返回一个错误。继续循环也是一种选择。
            return fmt.Errorf("bad status %d", resp.StatusCode)
        }

        if bytes.Equal(data, prev) {
            continue
        }
        prev = data

        if err := processChangedData(data); err != nil {
            // 编辑错误处理以符合应用程序要求。我在这里返回一个错误。继续循环也是一种选择。
            return err
        }
    }
    panic("unexpected break from loop")
}
英文:

Range over the ticker channel. On each iteration, get the data, check if the data has changed and process the data. The key point is to process the data from inside of loop instead of returning the data from the function.

Assuming that you have the following function:

// procesChangedData updates the database with new
// data from the API endpoint.
func processChangedData(data []byte) error {
    // implement save to postgress
}

Use the following function to poll:

func Poll(client *http.Client) error {

	url := &quot;https://data-exchange-api.vicroads.vic.gov.au/bluetooth_data/links&quot;

	// Use NewTicker instead of Tick so we can cleanup
	// ticker on return from the function.
	t := time.NewTicker(10 * time.Second)
	defer t.Stop()

	var prev []byte

	for _ = range t.C {

		// Create a new request objet for each request.
		req, err := http.NewRequest(&quot;GET&quot;, url, nil)
		if err != nil {
			return err
		}
		req.Header.Set(&quot;Ocp-Apim-Subscription-Key&quot;, &quot;xx&quot;)

		resp, err := client.Do(req)
		if err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
		}

		data, err := io.ReadAll(resp.Body)

		// Ensure that body is closed before handling errors
		// below.
		resp.Body.Close()

		if err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
		}

		if resp.StatusCode != http.StatusOK {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return fmt.Errorf(&quot;bad status %d&quot;, resp.StatusCode)
		}

		if bytes.Equal(data, prev) {
			continue
		}
		prev = data

		if err := processChangedData(data); err != nil {
            // Edit error handling to match application 
            // requirements. I return an error here. Continuing
            // the loop is also an option.
            return err
		}
	}
	panic(&quot;unexpected break from loop&quot;)
}

huangapple
  • 本文由 发表于 2021年6月29日 16:01:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/68174740.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定