Streaming high frequency data with Python requests API – latency issues

huangapple go评论69阅读模式
英文:

Streaming high frequency data with Python requests API - latency issues

问题

我正在使用requests来订阅高频数据流。在繁忙时段,消息的时间戳发送给我的时间戳和我在脚本中处理它们的时间戳之间存在显著的延迟。在安静时段,这种延迟通常约为500毫秒。在繁忙时段,它会增加到超过50秒。以下是一些更多的观察结果:

  1. 我在繁忙时段查看了我的机器资源使用情况,CPU负载和内存几乎没有上升。

  2. 在繁忙时段,延迟从开始运行脚本时的约<1秒开始,但随着脚本的运行,延迟增加到50秒。因此,这种延迟不是数据发送方固有的,而是在我的脚本中进行的一些处理导致的。随着我的脚本运行,延迟会不断增加。

因此,我得出结论问题出在我对数据的处理上。以下是我用来处理数据的方法。

该函数基本上将字典对象发送回回调函数进行进一步处理。每个字典都是由流API发送的JSON字典。

def receive_stream(callback):

    s = requests.Session()

    with s.get(...stream=True) as resp:

        buffer = ''
        
        for line in resp.iter_lines():
            line = line.decode('utf-8')
            json_dict = None

            if line == '}':
                json_dict = buffer + line
                buffer = ''
            else:
                buffer = buffer + line

            if json_dict is not None:
                parsed_json = json.loads(json_dict) 

                if parsed_json['type'] != 'hb': 
                    t = Thread(target=callback, args=(parsed_json))
                    t.start()

注意: 回调函数每50条消息测量一次延迟(取平均值),并将其计算为 datetime.datetime.now() - JSON 字典中的时间戳。

如果我在上面的函数中测量延迟并删除回调,几乎没有什么变化--相同的观察结果适用。因此,下游处理不是问题(而且我将其发送到另一个线程中,所以不应该是问题)。

我的问题:

  1. 我处理传入数据行的方式是否本质上低效,以至于在繁忙时段会有大量未处理的行排队等待?是否可能是json.loads()line.decode()造成的?<---我必须调用后者吗?

  2. 我使用线程的方式是否有问题?我认为下游处理的成本不是特别高,它只是使用zmq发送消息并测量延迟,删除回调几乎不会影响这个问题。我应该使用队列吗?

谢谢。

英文:

I am using requests to subscribe to a high frequency data stream. During busy times, there is a significant latency between the timestamp of the messages being sent to me and the timestamp of when I am able to process them in my script. During quiet times, this latency is consistently around 500 ms. During busy times, it rises to over 50 seconds. Here are some more observations:

  1. I looked at the resource usage of my machine during busy times and CPU load and memory hardly rise.

  2. The latency during busy times begins (when I started my script) at around <1s but as the script runs, the latency increases to 50s. Therefore, this latency is not inherent to the sender of the data but to some processing going on in my script. As my script runs, the latency gets higher and higher.

Therefore, I am concluding that the problem is with my processing of the data. Here is what I am doing to process the data.

The function essentially sends dict objects back to a callback for further processing. Each dict is a JSON dict being sent by the streaming API.

def receive_stream(callback):

s = requests.Session()

with s.get(...stream=True) as resp:

	buffer = &#39;&#39;

	for line in resp.iter_lines():

    	line = line.decode(&#39;utf-8&#39;)
        json_dict = None

        if line == &#39;}&#39;:
        	json_dict = buffer + line
            buffer = &#39;&#39;
        else:
            buffer = buffer + line

         if json_dict is not None:
         	parsed_json = json.loads(json_dict) 

         	if parsed_json[&#39;type&#39;] != &#39;hb&#39;: 
         		t = Thread(target=callback, args=(parsed_json))
         		t.start()

Note: The callback function measures the latency over every 50 messages (takes a mean) or so and calculates it as date time.datetime.now() - the timestamp in the json dict being sent to it.

If I measure the latency in this function above AND remove the callback, it makes little difference -- same observations apply. Therefore, the downstream processing is not the issue (plus I am sending it off to another thread, so it shouldn't be)

My questions:

  1. Is the way I am processing the incoming lines of data inherently inefficient, so that during busy times, there is a big backlog of lines that are unprocessed? Could it be the json.loads() or line.decode() <--- I have to call the latter?

  2. Is the way I am using threads, the problem? I don't think the downstream processing is particularly costly, it just sends a message using zmq and measures latency and removing the callback altogether, makes little difference to this problem. Should I be using a queue?

Thanks

答案1

得分: 1

  1. 处理传入数据行的方式是否天生低效,以至于在繁忙时期会有大量未处理的行排队等待?可能是的。

  2. 我们可以考虑一些关于从文件/流中惰性读取多个 JSON 值的答案来寻找将流分割成单独的 JSON 对象的替代方法,但你现在的方法看起来并不太糟糕。只要它是正确的。对于通用的 JSON 来说,这不正确(它对 JSON 字符串的格式做出了不合理的假设),但对于你的特定数据而言可能还可以接受。

  3. 我使用线程的方式是否有问题?

这是我首先会寻找改进的地方。

在任何其他操作之前,我会检查是否需要(额外的)线程。如果你只在一个线程中执行所有操作,是否可以获得足够的性能?

  1. 我认为下游处理并不特别昂贵,它只是使用 zmq 发送消息并测量延迟,删除回调几乎对这个问题没有什么影响。我应该使用队列吗?

如果你需要多个线程,那么是的,你可能应该使用队列。具体来说,你应该将所有消息排入一个单独的线程进行处理。启动线程是昂贵的,同时运行许多线程也是昂贵的,而且你实际上不希望第一条消息的处理与第二条消息等共享 CPU —— 相反,你希望消息按顺序处理,尽快处理完。

此外,CPython VM 不支持并行。一次只能有一个线程执行 Python 字节码。你不需要管理这个,但你应该了解这些影响。

此外,你可能想尝试在读取线程中处理的操作与在处理线程中处理的操作之间的区别。例如,你可以将原始字符串分派到处理线程而不是首先将它们解析为 JSON 对象。你甚至可能需要三个线程 —— 一个读取流并将其分成字符串,另一个将其解析为 JSON 对象,第三个处理这些对象。

英文:

> 1. Is the way I am processing the incoming lines of data inherently inefficient so that during busy times, there is a big backlog of lines that are unprocessed? Could it be the json.loads() or line.decode() <--- I have to call the latter?

Possibly.

You might consider some of the answers to How I can I lazily read multiple JSON values from a file/stream in Python? for alternative approaches to splitting your stream into separate JSON objects, but what you're doing now doesn't look too bad. As long as it's correct. It is not correct for general JSON (it makes unwarranted assumptions about the format of the JSON strings), but it might be fine for your particular data.

> 2. Is the way I am using threads, the problem?

This is where I would look first for an improvement.

Before anything else, I would check whether you need (additional) threads at all. Do you get adequate performance if you just do everything in one thread?

> I don't think the downstream processing is particularly costly, it just sends a message using zmq and measures latency and removing the callback altogether, makes little difference to this problem. Should I be using a queue?

If you need multiple threads then yes, you probably should be using a queue. Specifically, you should be queueing all the messages to a single thread for processing. Launching threads is expensive, and having many threads running at the same time is expensive, and you don't actually want processing of the first message to share CPU with processing the second, and so forth -- rather, you want the messages in order, as fast as you can get them.

Plus, the CPython VM is not parallel. Only one thread at a time can execute Python bytecode. You don't have to manage that, but you should be aware of the implications.

Also, you may want to experiment with what processing you do in the reader thread vs what you do in the processor thread. For example, you might dispatch the raw strings to the processor thread instead of parsing them to JSON objects first. You might even want three threads -- one that reads the stream and breaks it into strings, another that parses those into JSON objects, and a third that processes the objects.

huangapple
  • 本文由 发表于 2023年6月8日 04:59:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76427084.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定