在Go SDK中等待AWS Athena查询执行

huangapple go评论89阅读模式
英文:

Wait for AWS Athena query execution in Go SDK

问题

我有一个工作的代码,它运行一个 Athena 查询,并通过轮询 GetQueryResultserror 返回来等待查询完成,代码如下:

func GetQueryResults(client *athena.Client, QueryID *string) []types.Row {

	params := &athena.GetQueryResultsInput{
		QueryExecutionId: QueryID,
	}

	data, err := client.GetQueryResults(context.TODO(), params)

	for err != nil {
		println(err.Error())
		time.Sleep(time.Second)
		data, err = client.GetQueryResults(context.TODO(), params)
	}

	return data.ResultSet.Rows
}

问题是,如果查询失败,我完全没有办法跳出循环。

例如,在 Python 中,我可以这样做:

    while athena.get_query_execution(QueryExecutionId=execution_id)["QueryExecution"][
        "Status"
    ]["State"] in ["RUNNING", "QUEUED"]:
        sleep(2)

我可以在 for 循环中进行类似 strings.Contains(err.Error(),"FAILED") 的检查,但我正在寻找一种更简洁的方法。

我尝试寻找 Go 的等效方法,但没有成功。Go SDK 中是否有一个可以返回执行状态的函数?在 Go 中,除了 err != nil,是否有更好的方法来进一步检查错误?

英文:

I have a working code that runs an Athena Query and waits for the query to finish by polling the error return from GetQueryResults using the following code:

func GetQueryResults(client *athena.Client, QueryID *string) []types.Row {

	params := &athena.GetQueryResultsInput{
		QueryExecutionId: QueryID,
	}

	data, err := client.GetQueryResults(context.TODO(), params)

	for err != nil {
		println(err.Error())
		time.Sleep(time.Second)
		data, err = client.GetQueryResults(context.TODO(), params)
	}

	return data.ResultSet.Rows
}

The problem is that in case the query fails, I have absolutely no way to break the loop.

For example, in Python I can do something like:

    while athena.get_query_execution(QueryExecutionId=execution_id)["QueryExecution"][
        "Status"
    ]["State"] in ["RUNNING", "QUEUED"]:
        sleep(2)

I can do a check like strings.Contains(err.Error(),"FAILED") inside the for loop, but I am looking for a cleaner way.

I tried looking for an equivalent for Go, but I wasn't successful. Is there a function for Go SDK that can return the execution status? Is there a better way to further examine an error in Go instead of err != nil?

答案1

得分: 1

SDK已经提供了重试功能。

这里有一个示例供您参考,使用的是aws-sdk-go-v2。

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/service/athena"
	"github.com/aws/aws-sdk-go-v2/service/athena/types"
)

func main() {
	cfg := aws.NewConfig()
	ath := athena.NewFromConfig(*cfg)

	ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
	defer cancelFunc()

	rows, err := GetQueryResults(ctx, ath, aws.String("query-id"), 10)
	if err != nil {
		panic(err) // TODO: 处理错误
	}

	fmt.Println(rows)
}

func GetQueryResults(ctx context.Context, client *athena.Client, QueryID *string, attempts int) ([]types.Row, error) {
	t := time.NewTicker(time.Second * 5)
	defer t.Stop()

	attemptsFunc := func(o *athena.Options) { o.RetryMaxAttempts = attempts }

WAIT:
	for {
		select {
		case <-t.C:
			out, err := client.GetQueryExecution(ctx, &athena.GetQueryExecutionInput{
				QueryExecutionId: QueryID,
			}, attemptsFunc)
			if err != nil {
				return nil, err
			}

			switch out.QueryExecution.Status.State {
			case types.QueryExecutionStateCancelled,
				types.QueryExecutionStateFailed,
				types.QueryExecutionStateSucceeded:
				break WAIT
			}

		case <-ctx.Done():
			break WAIT
		}
	}

	data, err := client.GetQueryResults(ctx, &athena.GetQueryResultsInput{
		QueryExecutionId: QueryID,
	})
	if err != nil {
		return nil, err
	}

	return data.ResultSet.Rows, nil
}

希望对您有帮助!

英文:

The SDK has already provided retries.

Here is an example for you, using aws-sdk-go-v2.

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;time&quot;
&quot;github.com/aws/aws-sdk-go-v2/aws&quot;
&quot;github.com/aws/aws-sdk-go-v2/service/athena&quot;
&quot;github.com/aws/aws-sdk-go-v2/service/athena/types&quot;
)
func main() {
cfg := aws.NewConfig()
ath := athena.NewFromConfig(*cfg)
ctx, cancelFunc := context.WithTimeout(context.Background(), time.Minute*5)
defer cancelFunc()
rows, err := GetQueryResults(ctx, ath, aws.String(&quot;query-id&quot;), 10)
if err != nil {
panic(err) // TODO: handle error
}
fmt.Println(rows)
}
func GetQueryResults(ctx context.Context, client *athena.Client, QueryID *string, attempts int) ([]types.Row, error) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
attemptsFunc := func(o *athena.Options) { o.RetryMaxAttempts = attempts }
WAIT:
for {
select {
case &lt;-t.C:
out, err := client.GetQueryExecution(ctx, &amp;athena.GetQueryExecutionInput{
QueryExecutionId: QueryID,
}, attemptsFunc)
if err != nil {
return nil, err
}
switch out.QueryExecution.Status.State {
case types.QueryExecutionStateCancelled,
types.QueryExecutionStateFailed,
types.QueryExecutionStateSucceeded:
break WAIT
}
case &lt;-ctx.Done():
break WAIT
}
}
data, err := client.GetQueryResults(ctx, &amp;athena.GetQueryResultsInput{
QueryExecutionId: QueryID,
})
if err != nil {
return nil, err
}
return data.ResultSet.Rows, nil
}

huangapple
  • 本文由 发表于 2022年11月26日 01:19:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/74576109.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定