How can authenticate in dataflow + Apache beam using Java or Kotlin if I have the content of the service account in the variable?

huangapple go评论45阅读模式
英文:

How can authenticate in dataflow + Apache beam using Java or Kotlin if I have the content of the service account in the variable?

问题

I tried this code but It doesn't work. However, the service account is correct.

val dataflowOptions = PipelineOptionsFactory.create().`as`(MyOptions::class.java)
dataflowOptions.jobName = "tesjob1"
dataflowOptions.project = getGcpProjectId()
dataflowOptions.region = getGcpRegion()
dataflowOptions.subnetwork = getGcpSubNetwork()
dataflowOptions.usePublicIps = false
dataflowOptions.runner = DataflowRunner::class.java

val serviceAccountJson = ""

val credentials: Credentials = GoogleCredentials.fromStream(ByteArrayInputStream(serviceAccountJson.toByteArray(Charsets.UTF_8)))
dataflowOptions.gcpCredential = credentials

dataflowOptions.setGcpTempLocation("gs://deib_bucket/temp")
return Pipeline.create(dataflowOptions)

Could you give me any suggestions to resolve the issue?

英文:

I tried this code but It doesn't work. However, the service account is correct.

val dataflowOptions = PipelineOptionsFactory.create().`as`(MyOptions::class.java)
       dataflowOptions.jobName = "tesjob1"
       dataflowOptions.project = getGcpProjectId()
       dataflowOptions.region = getGcpRegion()
       dataflowOptions.subnetwork = getGcpSubNetwork()
       dataflowOptions.usePublicIps =false
       dataflowOptions.runner = DataflowRunner::class.java

val serviceAccountJson= ""

val credentials: Credentials = GoogleCredentials.fromStream(ByteArrayInputStream(serviceAccountJson.toByteArray(Charsets.UTF_8)))
dataflowOptions.gcpCredential = credentials

dataflowOptions.setGcpTempLocation("gs://deib_bucket/temp")
return Pipeline.create(dataflowOptions)

How can authenticate in dataflow + Apache beam using Java or Kotlin if I have the content of the service account in the variable?

Could you give me any suggestions to resolve the issue?

答案1

得分: 1

I resolved the issue, I couldn't use the service account because I need to pass the scope, for that I changed the code and now it worked properly.

val gcpSCOPES: List<String> = listOf(
    "https://www.googleapis.com/auth/cloud-platform",
    "https://www.googleapis.com/auth/devstorage.full_control",
    "https://www.googleapis.com/auth/userinfo.email",
    "https://www.googleapis.com/auth/datastore",
    "https://www.googleapis.com/auth/pubsub"
)

val options: DataflowPipelineOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions::class.java)
options.jobName = "jobExample"
options.project = getGcpProjectId()
options.region = getGcpRegion()
options.subnetwork = getGcpSubNetwork()
options.usePublicIps = false
options.runner = DataflowRunner::class.java
options.gcpCredential = ServiceAccountCredentials.fromStream(ByteArrayInputStream(DeibDataSyncEnv.getGcpCredentials().toByteArray(Charsets.UTF_8)))
    .createScoped(gcpSCOPES)
options.serviceAccount = getGcpServiceAccountEmail()

options.setGcpTempLocation(DeibDataSyncEnv.getGcpTempLocation())
return Pipeline.create(dataflowOptions)
英文:

I resolved the issue, I couldn't use the service account because I need to pass the scope, for that I changed the code and now it worked properly.


 val gcpSCOPES: List&lt;String&gt; = listOf(
        &quot;https://www.googleapis.com/auth/cloud-platform&quot;,
        &quot;https://www.googleapis.com/auth/devstorage.full_control&quot;,
        &quot;https://www.googleapis.com/auth/userinfo.email&quot;,
        &quot;https://www.googleapis.com/auth/datastore&quot;,
        &quot;https://www.googleapis.com/auth/pubsub&quot;
    )

        val options: DataflowPipelineOptions = PipelineOptionsFactory.create().`as`(DataflowPipelineOptions::class.java)
        options.jobName = &quot;jobExample&quot;
        options.project = getGcpProjectId()
        options.region = getGcpRegion()
        options.subnetwork = getGcpSubNetwork()
        options.usePublicIps =false
        options.runner = DataflowRunner::class.java
        options.gcpCredential = ServiceAccountCredentials.fromStream(ByteArrayInputStream(DeibDataSyncEnv.getGcpCredentials().toByteArray(Charsets.UTF_8)))
            .createScoped(gcpSCOPES)
        options.serviceAccount = getGcpServiceAccountEmail()

options.setGcpTempLocation(DeibDataSyncEnv.getGcpTempLocation())
        return Pipeline.create(dataflowOptions)

答案2

得分: 0

To help you and give more explanations on Dataflow job authentication and the use of Service Account:

如果您从本地计算机启动 Dataflow 作业,您必须导出 GOOGLE_APPLICATION_CREDENTIALS 环境变量,不幸的是,在这种情况下没有选择。

但是,如果您通过像 Airflow 和 Cloud Composer 这样的 DAG 编排器启动作业,无需传递 SA 密钥文件。身份验证由 Airflow 处理,并且由 Cloud Composer 使用。

您还可以使用 Cloud ShellCloud Build 探索其他解决方案来启动作业,但我认为在 CI/CD 部分最好是将作业的部署委托给管道编排工具,例如 Airflow

在生产环境中,您还可以使用 Dataflow Flex Template 标准化基于 bucketDocker 镜像的 Dataflow 作业的部署。

在这种情况下,如果您使用像 Cloud Build 这样的工具,无需传递服务帐号密钥文件。

您可以查看我写的这篇文章,其中显示了使用 Dataflow Flex Template 的 CI/CD 管道的完整示例。

在生产环境中,为了获得最佳的作业标准化,我认为 Flex 模板 应该是最佳解决方案。

使用 Flex Template 部署作业的命令示例:

gcloud dataflow flex-template build "$METADATA_TEMPLATE_FILE_PATH" \
  --image-gcr-path "$LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$IMAGE_NAME:$IMAGE_TAG" \
  --sdk-language "$SDK_LANGUAGE" \
  --flex-template-base-image "$FLEX_TEMPLATE_BASE_IMAGE" \
  --metadata-file "$METADATA_FILE" \
  --jar "$JAR" \
  --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="$FLEX_TEMPLATE_JAVA_MAIN_CLASS"

使用 serviceAccount 程序参数启动作业的命令示例:

gcloud dataflow flex-template run "$JOB_NAME-$(date +%Y%m%d-%H%M%S)" \
  --template-file-gcs-location "$METADATA_TEMPLATE_FILE_PATH" \
  --project="$PROJECT_ID" \
  --region="$LOCATION" \
  --temp-location="$TEMP_LOCATION" \
  --staging-location="$STAGING_LOCATION" \
  --parameters serviceAccount="$SA_EMAIL" \
  --parameters inputJsonFile="$INPUT_FILE" \
  --parameters inputFileSlogans="$SIDE_INPUT_FILE" \
  --parameters teamLeagueDataset="$TEAM_LEAGUE_DATASET" \
  --parameters teamStatsTable="$TEAM_STATS_TABLE" \
  --parameters jobType="$JOB_TYPE" \
  --parameters failureOutputDataset="$FAILURE_OUTPUT_DATASET" \
  --parameters failureOutputTable="$FAILURE_OUTPUT_TABLE" \
  --parameters failureFeatureName="$FAILURE_FEATURE_NAME"
英文:

To help you and give more explanations on Dataflow job authentication and the use of Service Account :

If you launch your Dataflow job from your local machine, your have to export the GOOGLE_APPLICATION_CREDENTIALS env var, unfortunately there is no choice in this case.

However if you launch your job via a DAG orchestrator like Airflow and Cloud Composer no need to pass a SA key file. The authentication is handled with Airflow and the SA used by Cloud Composer.

You can also explore other solutions with Cloud Shell or Cloud Build to launch your job, but I think it's better for the CI CD part to deploy the job and delegate the responsability of job execution to a pipeline orchestration tool like Airflow.

In production environment, you can also use Dataflow Flex Template to standardize the deployment of your Dataflow jobs based on a bucket and a Docker image.

In this case, if you use a tool like Cloud Build, no need to pass a Service Account key file.

You can check this article I written, that shows a complete example of CI CD pipeline with Dataflow Flex Template.

In prod and to have the best standardisation for your jobs, I think Flex template should be the best solution.

An example of command to deploy a job with Flex Template :

gcloud dataflow flex-template build &quot;$METADATA_TEMPLATE_FILE_PATH&quot; \
  --image-gcr-path &quot;$LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$IMAGE_NAME:$IMAGE_TAG&quot; \
  --sdk-language &quot;$SDK_LANGUAGE&quot; \
  --flex-template-base-image &quot;$FLEX_TEMPLATE_BASE_IMAGE&quot; \
  --metadata-file &quot;$METADATA_FILE&quot; \
  --jar &quot;$JAR&quot; \
  --env FLEX_TEMPLATE_JAVA_MAIN_CLASS=&quot;$FLEX_TEMPLATE_JAVA_MAIN_CLASS&quot;

An example of command to launch the job with the serviceAccount program argument :

gcloud dataflow flex-template run &quot;$JOB_NAME-$(date +%Y%m%d-%H%M%S)&quot; \
  --template-file-gcs-location &quot;$METADATA_TEMPLATE_FILE_PATH&quot; \
  --project=&quot;$PROJECT_ID&quot; \
  --region=&quot;$LOCATION&quot; \
  --temp-location=&quot;$TEMP_LOCATION&quot; \
  --staging-location=&quot;$STAGING_LOCATION&quot; \
  --parameters serviceAccount=&quot;$SA_EMAIL&quot; \
  --parameters inputJsonFile=&quot;$INPUT_FILE&quot; \
  --parameters inputFileSlogans=&quot;$SIDE_INPUT_FILE&quot; \
  --parameters teamLeagueDataset=&quot;$TEAM_LEAGUE_DATASET&quot; \
  --parameters teamStatsTable=&quot;$TEAM_STATS_TABLE&quot; \
  --parameters jobType=&quot;$JOB_TYPE&quot; \
  --parameters failureOutputDataset=&quot;$FAILURE_OUTPUT_DATASET&quot; \
  --parameters failureOutputTable=&quot;$FAILURE_OUTPUT_TABLE&quot; \
  --parameters failureFeatureName=&quot;$FAILURE_FEATURE_NAME&quot;

huangapple
  • 本文由 发表于 2023年5月10日 23:42:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76220357.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定