英文:
How can authenticate in dataflow + Apache beam using Java or Kotlin if I have the content of the service account in the variable?
问题
I tried this code but It doesn't work. However, the service account is correct.
val dataflowOptions = PipelineOptionsFactory.create().`as`(MyOptions::class.java)
dataflowOptions.jobName = "tesjob1"
dataflowOptions.project = getGcpProjectId()
dataflowOptions.region = getGcpRegion()
dataflowOptions.subnetwork = getGcpSubNetwork()
dataflowOptions.usePublicIps = false
dataflowOptions.runner = DataflowRunner::class.java
val serviceAccountJson = ""
val credentials: Credentials = GoogleCredentials.fromStream(ByteArrayInputStream(serviceAccountJson.toByteArray(Charsets.UTF_8)))
dataflowOptions.gcpCredential = credentials
dataflowOptions.setGcpTempLocation("gs://deib_bucket/temp")
return Pipeline.create(dataflowOptions)
Could you give me any suggestions to resolve the issue?
英文:
I tried this code but It doesn't work. However, the service account is correct.
val dataflowOptions = PipelineOptionsFactory.create().`as`(MyOptions::class.java)
dataflowOptions.jobName = "tesjob1"
dataflowOptions.project = getGcpProjectId()
dataflowOptions.region = getGcpRegion()
dataflowOptions.subnetwork = getGcpSubNetwork()
dataflowOptions.usePublicIps =false
dataflowOptions.runner = DataflowRunner::class.java
val serviceAccountJson= ""
val credentials: Credentials = GoogleCredentials.fromStream(ByteArrayInputStream(serviceAccountJson.toByteArray(Charsets.UTF_8)))
dataflowOptions.gcpCredential = credentials
dataflowOptions.setGcpTempLocation("gs://deib_bucket/temp")
return Pipeline.create(dataflowOptions)
Could you give me any suggestions to resolve the issue?
答案1
得分: 1
I resolved the issue, I couldn't use the service account because I need to pass the scope, for that I changed the code and now it worked properly.
val gcpSCOPES: List<String> = listOf(
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/devstorage.full_control",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/datastore",
"https://www.googleapis.com/auth/pubsub"
)
val options: DataflowPipelineOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions::class.java)
options.jobName = "jobExample"
options.project = getGcpProjectId()
options.region = getGcpRegion()
options.subnetwork = getGcpSubNetwork()
options.usePublicIps = false
options.runner = DataflowRunner::class.java
options.gcpCredential = ServiceAccountCredentials.fromStream(ByteArrayInputStream(DeibDataSyncEnv.getGcpCredentials().toByteArray(Charsets.UTF_8)))
.createScoped(gcpSCOPES)
options.serviceAccount = getGcpServiceAccountEmail()
options.setGcpTempLocation(DeibDataSyncEnv.getGcpTempLocation())
return Pipeline.create(dataflowOptions)
英文:
I resolved the issue, I couldn't use the service account because I need to pass the scope, for that I changed the code and now it worked properly.
val gcpSCOPES: List<String> = listOf(
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/devstorage.full_control",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/datastore",
"https://www.googleapis.com/auth/pubsub"
)
val options: DataflowPipelineOptions = PipelineOptionsFactory.create().`as`(DataflowPipelineOptions::class.java)
options.jobName = "jobExample"
options.project = getGcpProjectId()
options.region = getGcpRegion()
options.subnetwork = getGcpSubNetwork()
options.usePublicIps =false
options.runner = DataflowRunner::class.java
options.gcpCredential = ServiceAccountCredentials.fromStream(ByteArrayInputStream(DeibDataSyncEnv.getGcpCredentials().toByteArray(Charsets.UTF_8)))
.createScoped(gcpSCOPES)
options.serviceAccount = getGcpServiceAccountEmail()
options.setGcpTempLocation(DeibDataSyncEnv.getGcpTempLocation())
return Pipeline.create(dataflowOptions)
答案2
得分: 0
To help you and give more explanations on Dataflow
job authentication and the use of Service Account:
如果您从本地计算机启动 Dataflow
作业,您必须导出 GOOGLE_APPLICATION_CREDENTIALS
环境变量,不幸的是,在这种情况下没有选择。
但是,如果您通过像 Airflow
和 Cloud Composer 这样的 DAG 编排器启动作业,无需传递 SA 密钥文件。身份验证由 Airflow
处理,并且由 Cloud Composer
使用。
您还可以使用 Cloud Shell
或 Cloud Build
探索其他解决方案来启动作业,但我认为在 CI/CD 部分最好是将作业的部署委托给管道编排工具,例如 Airflow
。
在生产环境中,您还可以使用 Dataflow Flex Template
标准化基于 bucket
和 Docker
镜像的 Dataflow
作业的部署。
在这种情况下,如果您使用像 Cloud Build
这样的工具,无需传递服务帐号密钥文件。
您可以查看我写的这篇文章,其中显示了使用 Dataflow Flex Template
的 CI/CD 管道的完整示例。
在生产环境中,为了获得最佳的作业标准化,我认为 Flex 模板
应该是最佳解决方案。
使用 Flex Template
部署作业的命令示例:
gcloud dataflow flex-template build "$METADATA_TEMPLATE_FILE_PATH" \
--image-gcr-path "$LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$IMAGE_NAME:$IMAGE_TAG" \
--sdk-language "$SDK_LANGUAGE" \
--flex-template-base-image "$FLEX_TEMPLATE_BASE_IMAGE" \
--metadata-file "$METADATA_FILE" \
--jar "$JAR" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="$FLEX_TEMPLATE_JAVA_MAIN_CLASS"
使用 serviceAccount
程序参数启动作业的命令示例:
gcloud dataflow flex-template run "$JOB_NAME-$(date +%Y%m%d-%H%M%S)" \
--template-file-gcs-location "$METADATA_TEMPLATE_FILE_PATH" \
--project="$PROJECT_ID" \
--region="$LOCATION" \
--temp-location="$TEMP_LOCATION" \
--staging-location="$STAGING_LOCATION" \
--parameters serviceAccount="$SA_EMAIL" \
--parameters inputJsonFile="$INPUT_FILE" \
--parameters inputFileSlogans="$SIDE_INPUT_FILE" \
--parameters teamLeagueDataset="$TEAM_LEAGUE_DATASET" \
--parameters teamStatsTable="$TEAM_STATS_TABLE" \
--parameters jobType="$JOB_TYPE" \
--parameters failureOutputDataset="$FAILURE_OUTPUT_DATASET" \
--parameters failureOutputTable="$FAILURE_OUTPUT_TABLE" \
--parameters failureFeatureName="$FAILURE_FEATURE_NAME"
英文:
To help you and give more explanations on Dataflow
job authentication and the use of Service Account :
If you launch your Dataflow
job from your local machine, your have to export the GOOGLE_APPLICATION_CREDENTIALS
env var, unfortunately there is no choice in this case.
However if you launch your job via a DAG orchestrator like Airflow
and Cloud Composer no need to pass a SA key file. The authentication is handled with Airflow
and the SA used by Cloud Composer
.
You can also explore other solutions with Cloud Shell
or Cloud Build
to launch your job, but I think it's better for the CI CD part to deploy the job and delegate the responsability of job execution to a pipeline orchestration tool like Airflow
.
In production environment, you can also use Dataflow Flex Template
to standardize the deployment of your Dataflow
jobs based on a bucket
and a Docker
image.
In this case, if you use a tool like Cloud Build
, no need to pass a Service Account key file.
You can check this article I written, that shows a complete example of CI CD pipeline with Dataflow Flex Template
.
In prod and to have the best standardisation for your jobs, I think Flex template
should be the best solution.
An example of command to deploy a job with Flex Template
:
gcloud dataflow flex-template build "$METADATA_TEMPLATE_FILE_PATH" \
--image-gcr-path "$LOCATION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/$IMAGE_NAME:$IMAGE_TAG" \
--sdk-language "$SDK_LANGUAGE" \
--flex-template-base-image "$FLEX_TEMPLATE_BASE_IMAGE" \
--metadata-file "$METADATA_FILE" \
--jar "$JAR" \
--env FLEX_TEMPLATE_JAVA_MAIN_CLASS="$FLEX_TEMPLATE_JAVA_MAIN_CLASS"
An example of command to launch the job with the serviceAccount
program argument :
gcloud dataflow flex-template run "$JOB_NAME-$(date +%Y%m%d-%H%M%S)" \
--template-file-gcs-location "$METADATA_TEMPLATE_FILE_PATH" \
--project="$PROJECT_ID" \
--region="$LOCATION" \
--temp-location="$TEMP_LOCATION" \
--staging-location="$STAGING_LOCATION" \
--parameters serviceAccount="$SA_EMAIL" \
--parameters inputJsonFile="$INPUT_FILE" \
--parameters inputFileSlogans="$SIDE_INPUT_FILE" \
--parameters teamLeagueDataset="$TEAM_LEAGUE_DATASET" \
--parameters teamStatsTable="$TEAM_STATS_TABLE" \
--parameters jobType="$JOB_TYPE" \
--parameters failureOutputDataset="$FAILURE_OUTPUT_DATASET" \
--parameters failureOutputTable="$FAILURE_OUTPUT_TABLE" \
--parameters failureFeatureName="$FAILURE_FEATURE_NAME"
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论