使用Go SDK检查AWS Data Pipeline的状态。

huangapple go评论154阅读模式
英文:

Checking status of AWS Data Pipeline using Go SDK

问题

情况:我有两个按需运行的数据管道。管道B在管道A完成之前无法运行。我正在尝试在单个脚本/程序中自动运行这两个管道,但我不确定如何在Go中完成所有这些操作。

我有一些Go代码来激活一个数据管道:

func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
    svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
    input := &datapipeline.ActivatePipelineInput{
        PipelineId: aws.String(pipelineID),
    }
    result, err := svc.ActivatePipeline(input)
    if err != nil {
        fmt.Println("error activating pipeline: ", err)
    }
    fmt.Println(result)
    return result, nil
}

激活后,我希望能够监视该管道,并确定何时完成,以便我可以运行第二个管道。类似于list-runs CLI命令,但我不确定相应的Go函数是什么。

$ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
       Name                                                Scheduled Start      Status                 
       ID                                                  Started              Ended              
---------------------------------------------------------------------------------------------------
   1.  EC2ResourceObj                                      2017-09-12T17:49:55  FINISHED               
       @EC2ResourceObj_2017-09-12T17:49:55                 2017-09-12T17:49:58  2017-09-12T17:56:52

   2.  Installation                                        2017-09-12T17:49:55  FINISHED               
       @Installation_@ShellCommandActivityObj_2017-09-12T  2017-09-12T17:49:57  2017-09-12T17:54:09

   3.  S3OutputLocation                                    2017-09-12T17:49:55  FINISHED               
       @S3OutputLocation_2017-09-12T17:49:55               2017-09-12T17:49:58  2017-09-12T17:54:50

   4.  ShellCommandActivityObj                             2017-09-12T17:49:55  FINISHED               
       @ShellCommandActivityObj_2017-09-12T17:49:55        2017-09-12T17:49:57  2017-09-12T17:54:49

因此,一旦所有操作都标记为“FINISHED”,我希望激活我的第二个管道。如何最好地实现这一点?

英文:

Situation: I have 2 data pipelines that run on-demand. Pipeline B cannot run until Pipeline A has completed. I'm trying to automate running both pipelines in a single script/program but I'm unsure how to do all of this in Go.

I have some Go code that activates a data pipeline:

func awsActivatePipeline(pipelineID, region string) (*datapipeline.ActivatePipelineOutput, error) {
	svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
	input := &datapipeline.ActivatePipelineInput{
		PipelineId: aws.String(pipelineID),
	}
	result, err := svc.ActivatePipeline(input)
	if err != nil {
		fmt.Println("error activating pipeline: ", err)
	}
	fmt.Println(result)
	return result, nil
}

After activating, I want to be able to monitor that pipeline and determine when it's finished so that I can run a second pipeline. Similar to the list-runs CLI command but I'm not sure what the corresponding Go function would be.

$ aws datapipeline list-runs --region us-west-2 --pipeline-id df-EXAMPLE
       Name                                                Scheduled Start      Status                 
       ID                                                  Started              Ended              
---------------------------------------------------------------------------------------------------
   1.  EC2ResourceObj                                      2017-09-12T17:49:55  FINISHED               
       @EC2ResourceObj_2017-09-12T17:49:55                 2017-09-12T17:49:58  2017-09-12T17:56:52

   2.  Installation                                        2017-09-12T17:49:55  FINISHED               
       @Installation_@ShellCommandActivityObj_2017-09-12T  2017-09-12T17:49:57  2017-09-12T17:54:09

   3.  S3OutputLocation                                    2017-09-12T17:49:55  FINISHED               
       @S3OutputLocation_2017-09-12T17:49:55               2017-09-12T17:49:58  2017-09-12T17:54:50

   4.  ShellCommandActivityObj                             2017-09-12T17:49:55  FINISHED               
       @ShellCommandActivityObj_2017-09-12T17:49:55        2017-09-12T17:49:57  2017-09-12T17:54:49

So once all actions are marked 'FINISHED', I want to activate my second pipeline. What's the best way to accomplish this?

答案1

得分: 0

以下是我翻译好的内容:

如果有其他人遇到这个问题,以下是我解决的方法:

Golang调用AWS API来描述数据管道的对象/操作,如果所有对象都已完成,则返回true。

func awsDescribeObjects(pipelineID, region string, objects []string) bool {
    var r Object
    var s []string
    var f bool
    svc := datapipeline.New(session.New(&aws.Config{Region: aws.String(region)}))
    input := &datapipeline.DescribeObjectsInput{
        PipelineId: aws.String(pipelineID),
        ObjectIds:  aws.StringSlice(objects),
    }
    result, err := svc.DescribeObjects(input)
    if err != nil {
        fmt.Println("error describing pipeline objects: ", err)
        f = false
        return f
    }
    //fmt.Println("original result: ", result)
    result2 := re.ReplaceAllString(result.String(), `"$1"$2`) //add "" around keys
    result3 := re1.ReplaceAllString(result2, `$3$2`)          //remove key and string/ref value from fields struct
    result4 := strings.Replace(result3, "@", "", -1)          //remove @ from keys and values
    result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`)    //remove "" from timestamps
    result6 := re3.ReplaceAllString(result5, `$1,`)           // remove {} from fields struct
    json.Unmarshal([]byte(result6), &r)
    // fmt.Printf("R: %+v\n", r)
    p := r.PipelineObjects
    // fmt.Printf("P: %+v\n", p)
    for i := range p {
        for m := range p[i].Fields {
            fmt.Printf("%v STATUS: %v\n", p[i].Name, p[i].Fields[m].Status)
            s = append(s, p[i].Fields[m].Status)
            if p[i].Fields[m].Status != "FINISHED" {
                f = false
            } else {
                f = true
            }
        }
        // fmt.Println("bool: ", f)
    }
    return f
}

func main() {
    if *action == "describe" {
        obj := strings.Split(*object, ",")

        for i := 0; i <= 20; i++ {
            f := awsDescribeObjects(*pipeline, *region, obj)
            fmt.Printf("%v - Status Check %v - Finished?: %v\n", time.Now(), i, f)
            if f == true {
                fmt.Println("FINISHED describing pipeline complete")
                break
            }
            time.Sleep(5 * time.Minute)
            if i == 20 {
                fmt.Println("TIME OUT - describe pipeline timed out, max time reached")
                os.Exit(1)
            }
        }
    }
}

带有go可执行文件的Shell脚本:

#PIPELINE 1
echo "Starting Pipeline 1..."
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
echo "Pipeline 1 complete"
#PIPELINE 2
echo "Starting Pipeline 2..."
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
echo "Pipeline 2 complete"
echo "FINISHED"
英文:

FYI in case anyone else comes across this, this is how I resolved this:

Golang AWS API call to describe objects/actions of a data pipeline, returns true if all objects are finished

    func awsDescribeObjects(pipelineID, region string, objects []string) bool {
var r Object
var s []string
var f bool
svc := datapipeline.New(session.New(&amp;aws.Config{Region: aws.String(region)}))
input := &amp;datapipeline.DescribeObjectsInput{
PipelineId: aws.String(pipelineID),
ObjectIds:  aws.StringSlice(objects),
}
result, err := svc.DescribeObjects(input)
if err != nil {
fmt.Println(&quot;error describing pipeline objects: &quot;, err)
f = false
return f
}
//fmt.Println(&quot;original result: &quot;, result)
result2 := re.ReplaceAllString(result.String(), `&quot;$1&quot;$2`) //add &quot;&quot; around keys
result3 := re1.ReplaceAllString(result2, `$3$2`)          //remove key and string/ref value from fields struct
result4 := strings.Replace(result3, &quot;@&quot;, &quot;&quot;, -1)          //remove @ from keys and values
result5 := re2.ReplaceAllString(result4, `$1$3$5$7$9`)    //remove &quot;&quot; from timestamps
result6 := re3.ReplaceAllString(result5, `$1,`)           // remove {} from fields struct
json.Unmarshal([]byte(result6), &amp;r)
// fmt.Printf(&quot;R: %+v\n&quot;, r)
p := r.PipelineObjects
// fmt.Printf(&quot;P: %+v\n&quot;, p)
for i := range p {
for m := range p[i].Fields {
fmt.Printf(&quot;%v STATUS: %v\n&quot;, p[i].Name, p[i].Fields[m].Status)
s = append(s, p[i].Fields[m].Status)
if p[i].Fields[m].Status != &quot;FINISHED&quot; {
f = false
} else {
f = true
}
}
// fmt.Println(&quot;bool: &quot;, f)
}
return f
}

my main go function

    func main() {
if *action == &quot;describe&quot; {
obj := strings.Split(*object, &quot;,&quot;)
for i := 0; i &lt;= 20; i++ {
f := awsDescribeObjects(*pipeline, *region, obj)
fmt.Printf(&quot;%v - Status Check %v - Finished?: %v\n&quot;, time.Now(), i, f)
if f == true {
fmt.Println(&quot;FINISHED describing pipeline complete&quot;)
break
}
time.Sleep(5 * time.Minute)
if i == 20 {
fmt.Println(&quot;TIME OUT - describe pipeline timed out, max time reached&quot;)
os.Exit(1)
}
}
}
}

Shell script with go executable:

#PIPELINE 1
echo &quot;Starting Pipeline 1...&quot;
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION1 -pipeline-id $PIPELINEID1 -action describe -object ShellCommandActivityObj
echo &quot;Pipeline 1 complete&quot;
#PIPELINE 2
echo &quot;Starting Pipeline 2...&quot;
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action activate
echo sleep 1m
echo ./runpipeline.linux -region $REGION2 -pipeline-id $PIPELINEID2 -action describe -object ShellCommandActivityObj,CliActivity
echo &quot;Pipeline 2 complete&quot;
echo &quot;FINISHED&quot;

huangapple
  • 本文由 发表于 2017年9月13日 03:32:08
  • 转载请务必保留本文链接:https://go.coder-hub.com/46184057.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定