英文:
Is there any way to decrease consumer lag while using Apache beam KafkaIO.readFromKafka?
问题
我想要在一天内从Azure EVENT HUB中消耗大约200GB的数据,然后发布到Pubsub Topic中,为此我已经部署了以下的数据流作业。
期望的处理速率:2314814 B/s或2.3 MB/s
当前处理速率:408000 B/s或0.408 MB/s
机器类型:n1-standard-2
maxNumWorkers:4
apache_beam版本:2.46.0
Runner v2:已启用
Streaming Engine:已启用
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions
class encode(beam.DoFn):
def process(self, element):
"""
从ReadFromKafka转换中传递的记录属性:'topic','value'
'count','headers','index','key','offset','partition',
'timestamp','timestampTypeId','timestampTypeName'。
:return: 以字符串形式的消息值
"""
if hasattr(element, 'value'):
value = element.value
elif isinstance(element, tuple):
value = element[1]
else:
raise RuntimeError('未知的记录类型:%s' % type(element))
yield value.encode("UTF-8") if isinstance(value, bytes) == False else value
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
kafka_consumer_config = {
'bootstrap.servers': 'Event-hub-name.servicebus.windows.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
"sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Password";',
'group.id': 'reporting',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'True',
'max.poll.records': '180000',
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',
'receive.buffer.bytes': '-1'
}
pubsub_topic = 'projects/gcp_project_name/topics/pubsub_topic_name'
p = beam.Pipeline(options=pipeline_options)
(p
| "从Kafka读取" >> ReadFromKafka(consumer_config=kafka_consumer_config, topics=['products'],
with_metadata=False)
| '编码消息' >> beam.ParDo(encode())
| "发布到Pub/Sub" >> beam.io.WriteToPubSub(pubsub_topic)
)
p.run().wait_until_finish()
if __name__ == '__main__':
run()
以下是Kafka消费者参数的调整:
'max.poll.records': '180000',
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',
请建议如何通过ReadFromKafka实现2.3 MB/s的吞吐量。
英文:
I want to consume approx 200GB of Data in the day from Azure EVENT HUB to publish to Pubsub Topic For which I have deployed the below dataflow job.
Desired Processing Rate: 2314814 B/s or 2.3 MB/s
Current Processing Rate: 408000 B/s 0.408 MB/s
Machine type : n1-standard-2
maxNumWorkers : 4
apache_beam : 2.46.0
Runner v2: Enabled
Streaming Engine :Enabled
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions
class encode(beam.DoFn):
def process(self, element):
"""
Record attributes passed from ReadFromKafka transform: 'topic', 'value'
'count', 'headers', 'index', 'key', 'offset', 'partition',
'timestamp', 'timestampTypeId', 'timestampTypeName'.
:return: Message value as string
"""
if hasattr(element, 'value'):
value = element.value
elif isinstance(element, tuple):
value = element[1]
else:
raise RuntimeError('unknown record type: %s' % type(element))
yield value.encode("UTF-8") if isinstance(value, bytes) == False else value
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
kafka_consumer_config = {
'bootstrap.servers': 'Event-hub-name.servicebus.windows.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
"sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Password";',
'group.id': 'reporting',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'True',
'max.poll.records': '180000',
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',
'receive.buffer.bytes': '-1'
}
pubsub_topic = 'projects/gcp_project_name/topics/pubsub_topic_name'
p = beam.Pipeline(options=pipeline_options)
(p
| "read from kafka" >> ReadFromKafka(consumer_config=kafka_consumer_config, topics=['products'],
with_metadata=False)
| 'encode the message ' >> beam.ParDo(encode())
| "Publish to Pub/Sub" >> beam.io.WriteToPubSub(pubsub_topic)
)
p.run().wait_until_finish()
if __name__ == '__main__':
run()
Following Kafka Consumer Parameters tweeked
'max.poll.records': '180000',
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',`
Please suggest how can i achieve the 2.3 MB/s throughPut from ReadFromKafka
答案1
得分: 1
Kafka 吞吐量通常的主要限制因素是主题的分区数量。我建议增加您 Kafka 主题的分区数量,以便 Beam 可以扩展。
此外,出于相同的原因,我建议提高最大机器数量的上限。
英文:
The primary limiter on Kafka throughput in general is the number of partitions of a topic. I'd recommend increasing the number of partitions on your Kafka topic, so that Beam can scale up.
In addition, I'd recommend a higher cap on the maximum number of machines, for the same reason
答案2
得分: 0
我建议您在调整Kafka参数时,也可以检查作业指标(https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf)。
英文:
I suggest you could check the job metrics (https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf) as well when tuning the parameters for Kafka.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论