英文:
Dataflow python flex template fails with Java must be installed error
问题
I'm running flex template for PubsubLite to BigQuery Dataflow job.
This is my code:
from __future__ import annotations
import argparse
import json
import logging
import apache_beam.io.gcp.pubsublite as psub_lite
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Defines the BigQuery schema for the output table.
schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
class ModifyDataForBQ(beam.DoFn):
def process(self, pubsub_message, *args, **kwargs):
obj = json.loads(pubsub_message.message.data.decode("utf-8"))
yield obj
def run(
subscription_id: str,
dataset: str,
table: str,
beam_args: list[str] = None,
) -> None:
options = PipelineOptions(beam_args, save_main_session=True, streaming=True)
table = '{}.{}'.format(dataset, table)
p = beam.Pipeline(options=options)
pubsub_pipeline = (
p
| 'Read from pubsub lite topic' >> psub_lite.ReadFromPubSubLite(subscription_path=subscription_id)
| 'Print Message' >> beam.ParDo(ModifyDataForBQ())
| 'Write Record to BigQuery' >> beam.io.WriteToBigQuery(table=table, schema=schema,
write_disposition=beam.io.BigQueryDisposition
.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition
.CREATE_IF_NEEDED, )
)
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--subscription_id",
type=str,
help="Region of Pub/Sub Lite subscription.",
default=None
)
parser.add_argument(
"--dataset",
type=str,
help="BigQuery Dataset name.",
default=None
)
parser.add_argument(
"--table",
type=str,
help="BigQuery destination table name.",
default=None
)
args, beam_args = parser.parse_known_args()
run(
subscription_id=args.subscription_id,
dataset=args.dataset,
table=args.table,
beam_args=beam_args,
)
This is my docker file:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py"
COPY . /template
RUN apt-get update \
&& apt-get install -y openjdk-11-jdk libffi-dev git \
&& rm -rf /var/lib/apt/lists/* \
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
&& pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PIP_NO_DEPS=True
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
This is how I'm building the template:
gcloud dataflow flex-template build gs://my-bucket-xxxx/templates/streaming-beam-sql.json \
--image-gcr-path "us-central1-docker.pkg.dev/xxxx-xxx-2/dataflow-pubsublite-bigquery/test:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--metadata-file "metadata.json" \
--py-path "." \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=streaming_beam.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
--project "xxxx-xxx-2"
Now I'm invoking the template:
gcloud dataflow flex-template run "streaming-beam-sql" \
--template-file-gcs-location gs://my-bucket-xxxx/templates/streaming-beam-sql.json \
--project "xxxx-xxx-2" \
--parameters "subscription_id=projects/xxxx-xxx-/locations/us-central1/subscriptions/data-streaming-xxxx-subscription,dataset=omer_poc,table=trip2"
Pipeline launch fails, and in the logs, I see the following error:
INFO 2023-06-08T22:27:23.260235Z INFO:root:Starting a JAR-based expansion service from JAR /root/.apache_beam/cache/jars/beam-sdks-java-io-google-cloud-platform-expansion-service-2.41.0.jar
INFO 2023-06-08T22:27:23.261209Z ERROR:apache_beam.utils.subprocess_server:Error bringing up service
INFO 2023-06-08T22:27:23.261252Z Traceback (most recent call last):
INFO 2023-06-08T22:27:23.261270Z File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/subprocess_server.py", line 79, in start
INFO 2023-06-08T22:27:23.261296Z endpoint = self.start_process()
INFO 2023-06-08T22:27:23.261313Z File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/subprocess_server.py", line 181, in start_process
INFO 2023-06-08T22:27:23.261329Z 'Java must be installed on this system to use this '
INFO 2023-06-08T22:27:23.261343Z RuntimeError: Java must be installed on this system to use this transform/runner.
I've followed Google's tutorials and workshop materials but can't find what the problem is. Please help.
Update: I already installed JDK 11 as part of my Dockerfile. I also verified that JAVA_HOME is set in the image, and Java is accessible.
英文:
I'm running flex template for PubsubLite to BigQuery Dataflow job.
This is my code:
from __future__ import annotations
import argparse
import json
import logging
import apache_beam.io.gcp.pubsublite as psub_lite
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# Defines the BigQuery schema for the output table.
schema = 'trip_id:INTEGER,vendor_id:INTEGER,trip_distance:FLOAT,fare_amount:STRING,store_and_fwd_flag:STRING'
class ModifyDataForBQ(beam.DoFn):
def process(self, pubsub_message, *args, **kwargs):
# attributes = dict(pubsub_message.attributes)
obj = json.loads(pubsub_message.message.data.decode("utf-8"))
yield obj
def run(
subscription_id: str,
dataset: str,
table: str,
beam_args: list[str] = None,
) -> None:
options = PipelineOptions(beam_args, save_main_session=True, streaming=True)
table = '{}.{}'.format(dataset, table)
p = beam.Pipeline(options=options)
pubsub_pipeline = (
p
| 'Read from pubsub lite topic' >> psub_lite.ReadFromPubSubLite(subscription_path=subscription_id)
| 'Print Message' >> beam.ParDo(ModifyDataForBQ())
| 'Write Record to BigQuery' >> beam.io.WriteToBigQuery(table=table, schema=schema,
write_disposition=beam.io.BigQueryDisposition
.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition
.CREATE_IF_NEEDED, )
)
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
"--subscription_id",
type=str,
help="Region of Pub/Sub Lite subscription.",
default=None
)
parser.add_argument(
"--dataset",
type=str,
help="BigQuery Dataset name.",
default=None
)
parser.add_argument(
"--table",
type=str,
help="BigQuery destination table name.",
default=None
)
args, beam_args = parser.parse_known_args()
run(
subscription_id=args.subscription_id,
dataset=args.dataset,
table=args.table,
beam_args=beam_args,
)
This is my docker file:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="/template/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/template/streaming_beam.py"
COPY . /template
RUN apt-get update \
&& apt-get install -y openjdk-11-jdk libffi-dev git \
&& rm -rf /var/lib/apt/lists/* \
# Upgrade pip and install the requirements.
&& pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE \
# Download the requirements to speed up launching the Dataflow job.
&& pip download --no-cache-dir --dest /tmp/dataflow-requirements-cache -r $FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE
ENV JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
ENV PIP_NO_DEPS=True
ENTRYPOINT ["/opt/google/dataflow/python_template_launcher"]
This is How I'm building template:
gcloud dataflow flex-template build gs://my-bucket-xxxx/templates/streaming-beam-sql.json \
--image-gcr-path "us-central1-docker.pkg.dev/xxxx-xxx-2/dataflow-pubsublite-bigquery/test:latest" \
--sdk-language "PYTHON" \
--flex-template-base-image "PYTHON3" \
--metadata-file "metadata.json" \
--py-path "." \
--env "FLEX_TEMPLATE_PYTHON_PY_FILE=streaming_beam.py" \
--env "FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=requirements.txt" \
--project "xxxx-xxx-2"
Now I'm invoking the template:
gcloud dataflow flex-template run "streaming-beam-sql" \
--template-file-gcs-location gs://my-bucket-xxxx/templates/streaming-beam-sql.json \
--project "xxxx-xxx-2" \
--parameters "subscription_id=projects/xxxx-xxx-/locations/us-central1/subscriptions/data-streaming-xxxx-subscription,dataset=omer_poc,table=trip2"
Pipeline launch fails in the logs I see the following:
INFO 2023-06-08T22:27:23.260235Z INFO:root:Starting a JAR-based expansion service from JAR /root/.apache_beam/cache/jars/beam-sdks-java-io-google-cloud-platform-expansion-service-2.41.0.jar
INFO 2023-06-08T22:27:23.261209Z ERROR:apache_beam.utils.subprocess_server:Error bringing up service
INFO 2023-06-08T22:27:23.261252Z Traceback (most recent call last):
INFO 2023-06-08T22:27:23.261270Z File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/subprocess_server.py", line 79, in start
INFO 2023-06-08T22:27:23.261296Z endpoint = self.start_process()
INFO 2023-06-08T22:27:23.261313Z File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/subprocess_server.py", line 181, in start_process
INFO 2023-06-08T22:27:23.261329Z 'Java must be installed on this system to use this '
INFO 2023-06-08T22:27:23.261343Z RuntimeError: Java must be installed on this system to use this transform/runner.
I'm followed google tutorials and workshop materials, but can't find what is the problem. Please help.
Update: I already installed jdk 11 as part of my Dockerfile. I also verified that JAVA_HOME is set in the image and java is accessible,
答案1
得分: 2
我遇到了相同的问题。问题出在你使用 gcloud dataflow flex-template build
构建模板的方式上。当你使用 --flex-template-base-image
参数时,会忽略你的 Dockerfile。
你需要按照文档中的说明进行构建。这将使用你的 Dockerfile 并将镜像推送到 Artifact Registry:
gcloud builds submit --tag "$TEMPLATE_IMAGE" .
一旦镜像构建完成,构建弹性模板:
gcloud dataflow flex-template build ${TEMPLATE_GCS_PATH} \
--image "${TEMPLATE_IMAGE}" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
希望这有所帮助。
英文:
I was having the same issue. The problem is with the way you are building the template using gcloud dataflow flex-template build
. When you use the --flex-template-base-image
parameter, your Dockerfile is ignored.
You need to follow the instructions from the documentation:
Build this way. This will use your Dockerfile and push an image to Artifact Registry
gcloud builds submit --tag "$TEMPLATE_IMAGE" .
Once the image is built, build the flex template:
gcloud dataflow flex-template build ${TEMPLATE_GCS_PATH} \
--image "${TEMPLATE_IMAGE}" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
Hope this helps.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论