使用Go SDK检查AWS Data Pipeline的状态。

huangapple go评论175阅读模式
英文:

Checking status of AWS Data Pipeline using Go SDK

问题

情况:我有两个按需运行的数据管道。管道B在管道A完成之前无法运行。我正在尝试在单个脚本/程序中自动运行这两个管道,但我不确定如何在Go中完成所有这些操作。

我有一些Go代码来激活一个数据管道:

  1. func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
  2. svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
  3. input := &datapipeline.ActivatePipelineInput{
  4. PipelineId: aws.String(pipelineID),
  5. }
  6. result, err := svc.ActivatePipeline(input)
  7. if err != nil {
  8. fmt.Println("error activating pipeline: ", err)
  9. }
  10. fmt.Println(result)
  11. return result, nil
  12. }

激活后,我希望能够监视该管道,并确定何时完成,以便我可以运行第二个管道。类似于list-runs CLI命令,但我不确定相应的Go函数是什么。

  1. $ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
  2. Name Scheduled Start Status
  3. ID Started Ended
  4. ---------------------------------------------------------------------------------------------------
  5. 1. EC2ResourceObj 2017-09-12T17:49:55 FINISHED
  6. @EC2ResourceObj_2017-09-12T17:49:55 2017-09-12T17:49:58 2017-09-12T17:56:52
  7. 2. Installation 2017-09-12T17:49:55 FINISHED
  8. @Installation_@ShellCommandActivityObj_2017-09-12T 2017-09-12T17:49:57 2017-09-12T17:54:09
  9. 3. S3OutputLocation 2017-09-12T17:49:55 FINISHED
  10. @S3OutputLocation_2017-09-12T17:49:55 2017-09-12T17:49:58 2017-09-12T17:54:50
  11. 4. ShellCommandActivityObj 2017-09-12T17:49:55 FINISHED
  12. @ShellCommandActivityObj_2017-09-12T17:49:55 2017-09-12T17:49:57 2017-09-12T17:54:49

因此,一旦所有操作都标记为“FINISHED”,我希望激活我的第二个管道。如何最好地实现这一点?

英文:

Situation: I have 2 data pipelines that run on-demand. Pipeline B cannot run until Pipeline A has completed. I'm trying to automate running both pipelines in a single script/program but I'm unsure how to do all of this in Go.

I have some Go code that activates a data pipeline:

  1. func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
  2. svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
  3. input := &datapipeline.ActivatePipelineInput{
  4. PipelineId: aws.String(pipelineID),
  5. }
  6. result, err := svc.ActivatePipeline(input)
  7. if err != nil {
  8. fmt.Println("error activating pipeline: ", err)
  9. }
  10. fmt.Println(result)
  11. return result, nil
  12. }

After activating, I want to be able to monitor that pipeline and determine when it's finished so that I can run a second pipeline. Similar to the list-runs CLI command but I'm not sure what the corresponding Go function would be.

  1. $ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
  2. Name Scheduled Start Status
  3. ID Started Ended
  4. ---------------------------------------------------------------------------------------------------
  5. 1. EC2ResourceObj 2017-09-12T17:49:55 FINISHED
  6. @EC2ResourceObj_2017-09-12T17:49:55 2017-09-12T17:49:58 2017-09-12T17:56:52
  7. 2. Installation 2017-09-12T17:49:55 FINISHED
  8. @Installation_@ShellCommandActivityObj_2017-09-12T 2017-09-12T17:49:57 2017-09-12T17:54:09
  9. 3. S3OutputLocation 2017-09-12T17:49:55 FINISHED
  10. @S3OutputLocation_2017-09-12T17:49:55 2017-09-12T17:49:58 2017-09-12T17:54:50
  11. 4. ShellCommandActivityObj 2017-09-12T17:49:55 FINISHED
  12. @ShellCommandActivityObj_2017-09-12T17:49:55 2017-09-12T17:49:57 2017-09-12T17:54:49

So once all actions are marked 'FINISHED', I want to activate my second pipeline. What's the best way to accomplish this?

答案1

得分: 0

以下是我翻译好的内容:

如果有其他人遇到这个问题,以下是我解决的方法:

Golang调用AWS API来描述数据管道的对象/操作,如果所有对象都已完成,则返回true。

  1. func awsDescribeObjects(pipelineID, region string, objects []string) bool {
  2. var r Object
  3. var s []string
  4. var f bool
  5. svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
  6. input := &datapipeline.DescribeObjectsInput{
  7. PipelineId: aws.String(pipelineID),
  8. ObjectIds: aws.StringSlice(objects),
  9. }
  10. result, err := svc.DescribeObjects(input)
  11. if err != nil {
  12. fmt.Println("error describing pipeline objects: ", err)
  13. f = false
  14. return f
  15. }
  16. //fmt.Println("original result: ", result)
  17. result2 := re.ReplaceAllString(result.String(), `"$1"$2`) //add "" around keys
  18. result3 := re1.ReplaceAllString(result2, `$3$2`) //remove key and string/ref value from fields struct
  19. result4 := strings.Replace(result3, "@", "", -1) //remove @ from keys and values
  20. result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`) //remove "" from timestamps
  21. result6 := re3.ReplaceAllString(result5, `$1,`) // remove {} from fields struct
  22. json.Unmarshal([]byte(result6), &r)
  23. // fmt.Printf("R: %+v\n", r)
  24. p := r.PipelineObjects
  25. // fmt.Printf("P: %+v\n", p)
  26. for i := range p {
  27. for m := range p[i].Fields {
  28. fmt.Printf("%v STATUS: %v\n", p[i].Name, p[i].Fields[m].Status)
  29. s = append(s, p[i].Fields[m].Status)
  30. if p[i].Fields[m].Status != "FINISHED" {
  31. f = false
  32. } else {
  33. f = true
  34. }
  35. }
  36. // fmt.Println("bool: ", f)
  37. }
  38. return f
  39. }
  40. func main() {
  41. if *action == "describe" {
  42. obj := strings.Split(*object, ",")
  43. for i := 0; i <= 20; i++ {
  44. f := awsDescribeObjects(*pipeline, *region, obj)
  45. fmt.Printf("%v - Status Check %v - Finished?: %v\n", time.Now(), i, f)
  46. if f == true {
  47. fmt.Println("FINISHED describing pipeline complete")
  48. break
  49. }
  50. time.Sleep(5 * time.Minute)
  51. if i == 20 {
  52. fmt.Println("TIME OUT - describe pipeline timed out, max time reached")
  53. os.Exit(1)
  54. }
  55. }
  56. }
  57. }

带有go可执行文件的Shell脚本:

  1. #PIPELINE 1
  2. echo "Starting Pipeline 1..."
  3. echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
  4. echo sleep 1m
  5. echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
  6. echo "Pipeline 1 complete"
  7. #PIPELINE 2
  8. echo "Starting Pipeline 2..."
  9. echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
  10. echo sleep 1m
  11. echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
  12. echo "Pipeline 2 complete"
  13. echo "FINISHED"
英文:

FYI in case anyone else comes across this, this is how I resolved this:

Golang AWS API call to describe objects/actions of a data pipeline, returns true if all objects are finished

  1. func awsDescribeObjects(pipelineID, region string, objects []string) bool {
  2. var r Object
  3. var s []string
  4. var f bool
  5. svc := datapipeline.New(session.New(&amp;aws.Config{Region: aws.String(region)}))
  6. input := &amp;datapipeline.DescribeObjectsInput{
  7. PipelineId: aws.String(pipelineID),
  8. ObjectIds: aws.StringSlice(objects),
  9. }
  10. result, err := svc.DescribeObjects(input)
  11. if err != nil {
  12. fmt.Println(&quot;error describing pipeline objects: &quot;, err)
  13. f = false
  14. return f
  15. }
  16. //fmt.Println(&quot;original result: &quot;, result)
  17. result2 := re.ReplaceAllString(result.String(), `&quot;$1&quot;$2`) //add &quot;&quot; around keys
  18. result3 := re1.ReplaceAllString(result2, `$3$2`) //remove key and string/ref value from fields struct
  19. result4 := strings.Replace(result3, &quot;@&quot;, &quot;&quot;, -1) //remove @ from keys and values
  20. result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`) //remove &quot;&quot; from timestamps
  21. result6 := re3.ReplaceAllString(result5, `$1,`) // remove {} from fields struct
  22. json.Unmarshal([]byte(result6), &amp;r)
  23. // fmt.Printf(&quot;R: %+v\n&quot;, r)
  24. p := r.PipelineObjects
  25. // fmt.Printf(&quot;P: %+v\n&quot;, p)
  26. for i := range p {
  27. for m := range p[i].Fields {
  28. fmt.Printf(&quot;%v STATUS: %v\n&quot;, p[i].Name, p[i].Fields[m].Status)
  29. s = append(s, p[i].Fields[m].Status)
  30. if p[i].Fields[m].Status != &quot;FINISHED&quot; {
  31. f = false
  32. } else {
  33. f = true
  34. }
  35. }
  36. // fmt.Println(&quot;bool: &quot;, f)
  37. }
  38. return f
  39. }

my main go function

  1. func main() {
  2. if *action == &quot;describe&quot; {
  3. obj := strings.Split(*object, &quot;,&quot;)
  4. for i := 0; i &lt;= 20; i++ {
  5. f := awsDescribeObjects(*pipeline, *region, obj)
  6. fmt.Printf(&quot;%v - Status Check %v - Finished?: %v\n&quot;, time.Now(), i, f)
  7. if f == true {
  8. fmt.Println(&quot;FINISHED describing pipeline complete&quot;)
  9. break
  10. }
  11. time.Sleep(5 * time.Minute)
  12. if i == 20 {
  13. fmt.Println(&quot;TIME OUT - describe pipeline timed out, max time reached&quot;)
  14. os.Exit(1)
  15. }
  16. }
  17. }
  18. }

Shell script with go executable:

  1. #PIPELINE 1
  2. echo &quot;Starting Pipeline 1...&quot;
  3. echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
  4. echo sleep 1m
  5. echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
  6. echo &quot;Pipeline 1 complete&quot;
  7. #PIPELINE 2
  8. echo &quot;Starting Pipeline 2...&quot;
  9. echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
  10. echo sleep 1m
  11. echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
  12. echo &quot;Pipeline 2 complete&quot;
  13. echo &quot;FINISHED&quot;

huangapple
  • 本文由 发表于 2017年9月13日 03:32:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/46184057.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定