有没有办法在使用Apache Beam的KafkaIO.readFromKafka时减少消费者延迟?

huangapple go评论58阅读模式
英文:

Is there any way to decrease consumer lag while using Apache beam KafkaIO.readFromKafka?

问题

我想要在一天内从Azure EVENT HUB中消耗大约200GB的数据,然后发布到Pubsub Topic中,为此我已经部署了以下的数据流作业。

期望的处理速率:2314814 B/s或2.3 MB/s
当前处理速率:408000 B/s或0.408 MB/s

机器类型n1-standard-2
maxNumWorkers4
apache_beam版本2.46.0
Runner v2:已启用
Streaming Engine:已启用

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions


class encode(beam.DoFn):
    def process(self, element):
        """
        从ReadFromKafka转换中传递的记录属性:'topic','value'
            'count','headers','index','key','offset','partition',
            'timestamp','timestampTypeId','timestampTypeName'。

        :return: 以字符串形式的消息值
        """
        if hasattr(element, 'value'):
            value = element.value
        elif isinstance(element, tuple):
            value = element[1]
        else:
            raise RuntimeError('未知的记录类型:%s' % type(element))
        yield value.encode("UTF-8") if isinstance(value, bytes) == False else value


def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    global cloud_options
    global custom_options

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True

    kafka_consumer_config = {
        'bootstrap.servers': 'Event-hub-name.servicebus.windows.net:9093',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'PLAIN',
        "sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Password";',
        'group.id': 'reporting',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': 'True',
        'max.poll.records': '180000',
        'session.timeout.ms': '45000',
        'max.partition.fetch.bytes': '600000',
        'fetch.max.bytes': '28000000',
        'fetch.max.wait.ms': '500',
        'fetch.min.bytes': '8000000',
        'receive.buffer.bytes': '-1'
    }

    pubsub_topic = 'projects/gcp_project_name/topics/pubsub_topic_name'
    p = beam.Pipeline(options=pipeline_options)

    (p
     | "从Kafka读取" >> ReadFromKafka(consumer_config=kafka_consumer_config, topics=['products'],
                                  with_metadata=False)
     | '编码消息' >>  beam.ParDo(encode())
     | "发布到Pub/Sub" >> beam.io.WriteToPubSub(pubsub_topic)
     )
    p.run().wait_until_finish()


if __name__ == '__main__':
    run()

以下是Kafka消费者参数的调整:

'max.poll.records': '180000',
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',

请建议如何通过ReadFromKafka实现2.3 MB/s的吞吐量。

英文:

I want to consume approx 200GB of Data in the day from Azure EVENT HUB to publish to Pubsub Topic For which I have deployed the below dataflow job.

Desired Processing Rate: 2314814 B/s or 2.3 MB/s
Current Processing Rate: 408000 B/s 0.408 MB/s

Machine type : n1-standard-2
maxNumWorkers : 4
apache_beam : 2.46.0
Runner v2: Enabled
Streaming Engine :Enabled

from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, StandardOptions
class encode(beam.DoFn):
def process(self, element):
"""
Record attributes passed from ReadFromKafka transform:  'topic', 'value'
'count', 'headers', 'index', 'key', 'offset', 'partition',
'timestamp', 'timestampTypeId', 'timestampTypeName'.
:return: Message value as string
"""
if hasattr(element, 'value'):
value = element.value
elif isinstance(element, tuple):
value = element[1]
else:
raise RuntimeError('unknown record type: %s' % type(element))
yield value.encode("UTF-8") if isinstance(value, bytes) == False else value
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global cloud_options
global custom_options
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
kafka_consumer_config = {
'bootstrap.servers': 'Event-hub-name.servicebus.windows.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'PLAIN',
"sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Password";',
'group.id': 'reporting',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'True',
'max.poll.records': '180000', 
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',
'receive.buffer.bytes': '-1'
}
pubsub_topic = 'projects/gcp_project_name/topics/pubsub_topic_name'
p = beam.Pipeline(options=pipeline_options)
(p
| "read from kafka" >> ReadFromKafka(consumer_config=kafka_consumer_config, topics=['products'],
with_metadata=False)
| 'encode the message ' >>  beam.ParDo(encode())
| "Publish to Pub/Sub" >> beam.io.WriteToPubSub(pubsub_topic)
)
p.run().wait_until_finish()
if __name__ == '__main__':
run()

Following Kafka Consumer Parameters tweeked

'max.poll.records': '180000', 
'session.timeout.ms': '45000',
'max.partition.fetch.bytes': '600000',
'fetch.max.bytes': '28000000',
'fetch.max.wait.ms': '500',
'fetch.min.bytes': '8000000',`

Please suggest how can i achieve the 2.3 MB/s throughPut from ReadFromKafka

答案1

得分: 1

Kafka 吞吐量通常的主要限制因素是主题的分区数量。我建议增加您 Kafka 主题的分区数量,以便 Beam 可以扩展。

此外,出于相同的原因,我建议提高最大机器数量的上限。

英文:

The primary limiter on Kafka throughput in general is the number of partitions of a topic. I'd recommend increasing the number of partitions on your Kafka topic, so that Beam can scale up.

In addition, I'd recommend a higher cap on the maximum number of machines, for the same reason

答案2

得分: 0

我建议您在调整Kafka参数时,也可以检查作业指标(https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf)。

英文:

I suggest you could check the job metrics (https://cloud.google.com/dataflow/docs/guides/using-monitoring-intf) as well when tuning the parameters for Kafka.

huangapple
  • 本文由 发表于 2023年6月2日 04:24:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76385472.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定