如何从Java中的Cloud Function触发Cloud Dataflow管道作业?

huangapple go评论83阅读模式
英文:

How to trigger Cloud Dataflow pipeline job from Cloud Function in Java?

问题

我有一个要求,需要从Cloud Functions触发Cloud Dataflow管道。但是Cloud函数必须用Java编写。因此,Cloud函数的触发器是Google Cloud Storage的Finalise/Create事件,即当文件上传到GCS存储桶时,Cloud函数必须触发Cloud Dataflow管道。

当我创建一个数据流水线(批处理)并执行该管道时,它会创建一个Dataflow管道模板并创建一个Dataflow作业。

但是当我用Java创建Cloud函数并上传文件时,状态只显示"ok",但它不会触发数据流水线。

package com.example;

import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;

public class Example implements BackgroundFunction<GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
        logger.info("Event: " + context.eventId());
        logger.info("Event Type: " + context.eventType());

        HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();

        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);

        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
                .setApplicationName("Google Dataflow function Demo")
                .build();

        String projectId = "my-project-id";

        RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
        runtimeEnvironment.setBypassTempDirValidation(false);
        runtimeEnvironment.setTempLocation("gs://my-dataflow-job-bucket/tmp");
        CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
        createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
        createJobFromTemplateRequest.setLocation("us-central1");
        createJobFromTemplateRequest.setGcsPath("gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template");
        createJobFromTemplateRequest.setJobName("Dataflow-Cloud-Job");
        createJobFromTemplateRequest.setParameters(new HashMap<String,String>());
        createJobFromTemplateRequest.getParameters().put("inputFile","gs://cloud-dataflow-bucket-input/*.txt");
        dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);

        throw new UnsupportedOperationException("Not supported yet.");
    }

    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }
}

pom.xml(Cloud函数的依赖配置):

<!-- pom.xml内容省略 -->

你提供的内容中还有关于使用Java编写Cloud函数以及与Google Cloud Dataflow集成的信息。如果你需要更多的帮助,欢迎继续提问。

英文:

I have a requirement to trigger the Cloud Dataflow pipeline from Cloud Functions. But the Cloud function must be written in Java. So the Trigger for Cloud Function is Google Cloud Storage's Finalise/Create Event, i.e., when a file is uploaded in a GCS bucket, the Cloud Function must trigger the Cloud dataflow.

When I create a dataflow pipeline (batch) and I execute the pipeline, it creates a Dataflow pipeline template and creates a Dataflow job.

But when I create a cloud function in Java, and a file is uploaded, the status just says "ok", but it does not trigger the dataflow pipeline.

Cloud function

package com.example;
import com.example.Example.GCSEvent;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.CreateJobFromTemplateRequest;
import com.google.api.services.dataflow.model.RuntimeEnvironment;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.HashMap;
import java.util.logging.Logger;
public class Example implements BackgroundFunction&lt;GCSEvent&gt; {
private static final Logger logger = Logger.getLogger(Example.class.getName());
@Override
public void accept(GCSEvent event, Context context) throws IOException, GeneralSecurityException {
logger.info(&quot;Event: &quot; + context.eventId());
logger.info(&quot;Event Type: &quot; + context.eventType());
HttpTransport httpTransport = GoogleNetHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = JacksonFactory.getDefaultInstance();
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
HttpRequestInitializer requestInitializer = new HttpCredentialsAdapter(credentials);
Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, requestInitializer)
.setApplicationName(&quot;Google Dataflow function Demo&quot;)
.build();
String projectId = &quot;my-project-id&quot;;
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation(&quot;gs://my-dataflow-job-bucket/tmp&quot;);
CreateJobFromTemplateRequest createJobFromTemplateRequest = new CreateJobFromTemplateRequest();
createJobFromTemplateRequest.setEnvironment(runtimeEnvironment);
createJobFromTemplateRequest.setLocation(&quot;us-central1&quot;);
createJobFromTemplateRequest.setGcsPath(&quot;gs://my-dataflow-job-bucket-staging/templates/cloud-dataflow-template&quot;);
createJobFromTemplateRequest.setJobName(&quot;Dataflow-Cloud-Job&quot;);
createJobFromTemplateRequest.setParameters(new HashMap&lt;String,String&gt;());
createJobFromTemplateRequest.getParameters().put(&quot;inputFile&quot;,&quot;gs://cloud-dataflow-bucket-input/*.txt&quot;);
dataflowService.projects().templates().create(projectId,createJobFromTemplateRequest);
throw new UnsupportedOperationException(&quot;Not supported yet.&quot;);
}
public static class GCSEvent {
String bucket;
String name;
String metageneration;
}
}

pom.xml(cloud function)

&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;&gt;
&lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;
&lt;groupId&gt;cloudfunctions&lt;/groupId&gt;
&lt;artifactId&gt;http-function&lt;/artifactId&gt;
&lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
&lt;properties&gt;
&lt;maven.compiler.target&gt;11&lt;/maven.compiler.target&gt;
&lt;maven.compiler.source&gt;11&lt;/maven.compiler.source&gt;
&lt;/properties&gt;
&lt;dependencies&gt;
&lt;!-- https://mvnrepository.com/artifact/com.google.auth/google-auth-library-credentials --&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.google.auth&lt;/groupId&gt;
&lt;artifactId&gt;google-auth-library-credentials&lt;/artifactId&gt;
&lt;version&gt;0.21.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.google.apis&lt;/groupId&gt;
&lt;artifactId&gt;google-api-services-dataflow&lt;/artifactId&gt;
&lt;version&gt;v1b3-rev207-1.20.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.google.cloud.functions&lt;/groupId&gt;
&lt;artifactId&gt;functions-framework-api&lt;/artifactId&gt;
&lt;version&gt;1.0.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.google.auth&lt;/groupId&gt;
&lt;artifactId&gt;google-auth-library-oauth2-http&lt;/artifactId&gt;
&lt;version&gt;0.21.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;
&lt;!-- Required for Java 11 functions in the inline editor --&gt;
&lt;build&gt;
&lt;plugins&gt;
&lt;plugin&gt;
&lt;groupId&gt;org.apache.maven.plugins&lt;/groupId&gt;
&lt;artifactId&gt;maven-compiler-plugin&lt;/artifactId&gt;
&lt;version&gt;3.8.1&lt;/version&gt;
&lt;configuration&gt;
&lt;excludes&gt;
&lt;exclude&gt;.google/&lt;/exclude&gt;
&lt;/excludes&gt;
&lt;/configuration&gt;
&lt;/plugin&gt;
&lt;/plugins&gt;
&lt;/build&gt;
&lt;/project&gt;

cloud function logs

如何从Java中的Cloud Function触发Cloud Dataflow管道作业?

I went through the below blogs (adding for reference) where they have triggered dataflow from cloud storage via cloud function. But the code has been written in either Node.js or python. But my cloud function must be written in java.

Triggering Dataflow pipeline via cloud functions in Node.js

https://dzone.com/articles/triggering-dataflow-pipelines-with-cloud-functions

Triggering dataflow pipeline via cloud functions using python

https://medium.com/google-cloud/how-to-kick-off-a-dataflow-pipeline-via-cloud-functions-696927975d4e

Any help on this is very much appreciated.

答案1

得分: 5

RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation("gs://karthiksfirstbucket/temp1");

LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName("newJob" + (new Date()).getTime());

Map<String, String> params = new HashMap<String, String>();
params.put("inputFile", "gs://karthiksfirstbucket/sample.txt");
params.put("output", "gs://karthiksfirstbucket/count1");
launchTemplateParameters.setParameters(params);
writer.write("4");

Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);
launch.setGcsPath("gs://dataflow-templates-us-central1/latest/Word_Count");
launch.execute();

上述代码启动一个模板并执行Dataflow数据管道:

  1. 使用应用程序默认凭据(可以更改为用户凭据或服务凭据)。
  2. 区域为默认区域(可以更改)。
  3. 为每个HTTP触发器创建一个作业(触发器可以更改)。

完整代码可在以下链接中找到:

https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java

英文:
RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment();
runtimeEnvironment.setBypassTempDirValidation(false);
runtimeEnvironment.setTempLocation(&quot;gs://karthiksfirstbucket/temp1&quot;);
LaunchTemplateParameters launchTemplateParameters = new LaunchTemplateParameters();
launchTemplateParameters.setEnvironment(runtimeEnvironment);
launchTemplateParameters.setJobName(&quot;newJob&quot; + (new Date()).getTime());
Map&lt;String, String&gt; params = new HashMap&lt;String, String&gt;();
params.put(&quot;inputFile&quot;, &quot;gs://karthiksfirstbucket/sample.txt&quot;);
params.put(&quot;output&quot;, &quot;gs://karthiksfirstbucket/count1&quot;);
launchTemplateParameters.setParameters(params);
writer.write(&quot;4&quot;);
Dataflow.Projects.Templates.Launch launch = dataflowService.projects().templates().launch(projectId, launchTemplateParameters);            
launch.setGcsPath(&quot;gs://dataflow-templates-us-central1/latest/Word_Count&quot;);
launch.execute();

The above code launches a template and executes the dataflow pipeline

  1. using application default credentials(Which can be changed to user cred or service cred)
  2. region is default region(Which can be changed).
  3. creates a job for every HTTP trigger(Trigger can be changed).

The complete code can be found below:

https://github.com/karthikeyan1127/Java_CloudFunction_DataFlow/blob/master/Hello.java

答案2

得分: 0

以下是使用新数据流依赖关系的解决方案:

public class Example implements BackgroundFunction<Example.GCSEvent> {
    private static final Logger logger = Logger.getLogger(Example.class.getName());

    @Override
    public void accept(GCSEvent event, Context context) throws Exception {
        String filename = event.name;

        logger.info("正在处理文件:" + filename);
        logger.info("存储桶名称:" + event.bucket);

        String projectId = "cedar-router-268801";
        String region = "us-central1";
        String tempLocation = "gs://cedar-router-beam-poc/temp";
        String templateLocation = "gs://cedar-router-beam-poc/template/poc-template.json";

        logger.info("路径:" + String.format("gs://%s/%s", event.bucket, filename));
        String scenario = filename.substring(0, 3); // 它是两个或一个字符

        logger.info("场景:" + scenario);

        Map<String, String> params = Map.of("sourceFile", String.format("%s/%s", event.bucket, filename),
                "scenario", scenario,
                "years", "2013,2014",
                "targetFile", "gs://cedar-router-beam-poc-kms/result/testfile");
        
        extractedJob(projectId, region, tempLocation, templateLocation, params);
    }

    private static void extractedJob(String projectId,
                                     String region,
                                     String tempLocation,
                                     String templateLocation,
                                     Map<String, String> params) throws Exception {

        HttpTransport httpTransport = GoogleApacheHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = GsonFactory.getDefaultInstance();
        GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
        HttpRequestInitializer httpRequestInitializer = new RetryHttpRequestInitializer(ImmutableList.of(404));
        ChainingHttpRequestInitializer chainingHttpRequestInitializer =
                new ChainingHttpRequestInitializer(new HttpCredentialsAdapter(credentials), httpRequestInitializer);

        Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, chainingHttpRequestInitializer)
                .setApplicationName("从云函数进行的数据流")
                .build();

        FlexTemplateRuntimeEnvironment runtimeEnvironment = new FlexTemplateRuntimeEnvironment();
        runtimeEnvironment.setTempLocation(tempLocation);

        LaunchFlexTemplateParameter launchFlexTemplateParameter = new LaunchFlexTemplateParameter();
        launchFlexTemplateParameter.setEnvironment(runtimeEnvironment);
        String jobName = params.get("sourceFile").substring(34, 49).replace("_","");
        logger.info("作业名称:" + jobName);
        launchFlexTemplateParameter.setJobName("job" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));

        launchFlexTemplateParameter.setContainerSpecGcsPath(templateLocation);
        launchFlexTemplateParameter.setParameters(params);

        LaunchFlexTemplateRequest launchFlexTemplateRequest = new LaunchFlexTemplateRequest();
        launchFlexTemplateRequest.setLaunchParameter(launchFlexTemplateParameter);


        Launch launch = dataflowService.projects()
                .locations()
                .flexTemplates()
                .launch(projectId, region, launchFlexTemplateRequest);

        launch.execute();
        logger.info("正在运行作业");
    }


    public static class GCSEvent {
        String bucket;
        String name;
        String metageneration;
    }
}
只需根据您的情况进行适当调整
英文:

This is my solution using the new data flow dependencies

public class Example implements BackgroundFunction&lt;Example.GCSEvent&gt; {
private static final Logger logger = Logger.getLogger(Example.class.getName());
@Override
public void accept(GCSEvent event, Context context) throws Exception {
String filename = event.name;
logger.info(&quot;Processing file: &quot; + filename);
logger.info(&quot;Bucket name&quot; + event.bucket);
String projectId = &quot;cedar-router-268801&quot;;
String region = &quot;us-central1&quot;;
String tempLocation = &quot;gs://cedar-router-beam-poc/temp&quot;;
String templateLocation = &quot;gs://cedar-router-beam-poc/template/poc-template.json&quot;;
logger.info(&quot;path&quot; + String.format(&quot;gs://%s/%s&quot;, event.bucket, filename));
String scenario = filename.substring(0, 3); //it comes TWO OR ONE
logger.info(&quot;scneario &quot; + scenario);
Map&lt;String, String&gt; params = Map.of(&quot;sourceFile&quot;, String.format(&quot;%s/%s&quot;, event.bucket, filename),
&quot;scenario&quot;, scenario,
&quot;years&quot;, &quot;2013,2014&quot;,
&quot;targetFile&quot;, &quot;gs://cedar-router-beam-poc-kms/result/testfile&quot;);
extractedJob(projectId, region, tempLocation, templateLocation, params);
}
private static void extractedJob(String projectId,
String region,
String tempLocation,
String templateLocation,
Map&lt;String, String&gt; params) throws Exception {
HttpTransport httpTransport = GoogleApacheHttpTransport.newTrustedTransport();
JsonFactory jsonFactory = GsonFactory.getDefaultInstance();
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault();
HttpRequestInitializer httpRequestInitializer = new RetryHttpRequestInitializer(ImmutableList.of(404));
ChainingHttpRequestInitializer chainingHttpRequestInitializer =
new ChainingHttpRequestInitializer(new HttpCredentialsAdapter(credentials), httpRequestInitializer);
Dataflow dataflowService = new Dataflow.Builder(httpTransport, jsonFactory, chainingHttpRequestInitializer)
.setApplicationName(&quot;Dataflow from Cloud function&quot;)
.build();
FlexTemplateRuntimeEnvironment runtimeEnvironment = new FlexTemplateRuntimeEnvironment();
runtimeEnvironment.setTempLocation(tempLocation);
LaunchFlexTemplateParameter launchFlexTemplateParameter = new LaunchFlexTemplateParameter();
launchFlexTemplateParameter.setEnvironment(runtimeEnvironment);
String jobName = params.get(&quot;sourceFile&quot;).substring(34, 49).replace(&quot;_&quot;,&quot;&quot;);
logger.info(&quot;job name&quot; + jobName);
launchFlexTemplateParameter.setJobName(&quot;job&quot; + LocalDateTime.now().format(DateTimeFormatter.ofPattern(&quot;yyyyMMddHHmmss&quot;)));
launchFlexTemplateParameter.setContainerSpecGcsPath(templateLocation);
launchFlexTemplateParameter.setParameters(params);
LaunchFlexTemplateRequest launchFlexTemplateRequest = new LaunchFlexTemplateRequest();
launchFlexTemplateRequest.setLaunchParameter(launchFlexTemplateParameter);
Launch launch = dataflowService.projects()
.locations()
.flexTemplates()
.launch(projectId, region, launchFlexTemplateRequest);
launch.execute();
logger.info(&quot;running job&quot;);
}
public static class GCSEvent {
String bucket;
String name;
String metageneration;
}
Just adapt it to your case 

huangapple
  • 本文由 发表于 2020年8月21日 13:24:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/63516968.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定