如何使用服务器发送事件(Server-Sent Events)流式传输JSON数据

huangapple go评论64阅读模式
英文:

How to stream JSON data using Server-Sent Events

问题

设置服务器推送事件(Server-Sent Events)相对简单,尤其是使用FastAPI时,可以像这样做:

def fake_data_streamer():
    for i in range(10):
        yield "some streamed data"
        time.sleep(0.5)

@app.get('/')
async def main():
    return StreamingResponse(fake_data_streamer())

在进行HTTP GET请求时,连接将每隔0.5秒返回一次"some streamed data"。

但是,如果我想要流式传输一些结构化数据,比如JSON呢?

例如,我希望数据是JSON格式的,像这样:

def fake_data_streamer():
    for i in range(10):
        yield json.dumps({'result': 'a lot of streamed data', "seriously": ["so", "much", "data"]}, indent=4)
        time.sleep(0.5)

这是否是一个适当的服务器端实现?这是否会导致客户端接收到部分形式的负载数据,尽管在纯文本情况下这没问题,但却使JSON无法解析?

如果这是可以接受的,那么你可以如何从客户端端读取这些数据?类似这样:

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            while True:
                chunk = await resp.content.readuntil(b"\n")
                await asyncio.sleep(1)
                if not chunk:
                    break
                print(chunk)

尽管我不确定要确保客户端始终接收到完整形式的JSON事件时,适当的分隔符/读取模式是什么。

或者,这是否是实现流式传输完整形式JSON事件的完全不适当的方式。

供参考,OpenAI在其API中实现了完整JSON对象的流式传输:https://github.com/openai/openai-cookbook/blob/main/examples/How_to_stream_completions.ipynb

英文:

Setting up Server-Sent events is relatively simple - especially using FastAPI - you can do something like this:

def fake_data_streamer():
for i in range(10):
    yield "some streamed data"
    time.sleep(0.5)

@app.get('/')
async def main():
    return StreamingResponse(fake_data_streamer())

And upon an HTTP GET, the connection will return "some streamed data" every 0.5 seconds.

What if I wanted to stream some structured data though, like JSON?

For example, I want the data to be JSON, so something like:

def fake_data_streamer():
    for i in range(10):
        yield json.dumps({'result': 'a lot of streamed data', "seriously": ["so", "much", "data"]}, indent=4)
        time.sleep(0.5)

Is this a proper server side implementation? Does this pose a risk of the client receiving partially formed payloads - which is fine in the plaintext case, but makes the JSON un-parseable?

If this is okay, how would you read this from the client side? Something like this:

async def main():
async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
        while True:
            chunk = await resp.content.readuntil(b"\n")
            await asyncio.sleep(1)
            if not chunk:
                break
            print(chunk)

Although I am not sure what the proper separator / reading mode is appropriate to ensure that the client always receives fully formed JSON events.

Or, is this a completely improper way to accomplish streaming of fully formed JSON events.

For reference, OpenAI achieves streaming of complete JSON objects in their API: https://github.com/openai/openai-cookbook/blob/main/examples/How_to_stream_completions.ipynb

答案1

得分: 1

为了澄清并提供一些背景信息,OpenAI的客户端API会对服务器发送的事件进行后处理,将其转换为格式良好的JSON。但是“原始”事件按照服务器发送事件的规范仅以 data: 的形式发送,该规范在这里:https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events。你提到的IPython笔记本中有指向这个规范以及笔记本介绍中的其他资源的链接。

你可以查询OpenAI的终端点以查看原始的服务器事件:

https://asciinema.org/a/TKDYxqh6pgCNN0hX6tdcSgfzU

我正在使用curl命令:

curl https://api.openai.com/v1/chat/completions -u :$OPENAI_API_KEY  -H 'Content-Type: application/json' -d '{"stream": true,"model": "gpt-3.5-turbo-0613", "messages": [ {"role": "user", "content": "用一句话总结莎士比亚的奥赛罗"}]}'

现在来回答你的问题。没有什么可以阻止你做你打算做的事情,规范中的示例代码表示你可以通过在你的data:事件的末尾放置 "\n\n" 来非常简单地做到这一点,然后让客户端解析器解析它(在它看到相关的头部 Content-Type: text/event-stream 后)。Python中的Requests库实际上会自动为你执行此操作,使用 response_obj.iter_lines() 函数。Python还有一个易于阅读的SSE客户端,它可以解析此事件并为你提供完整的 data: 行。

英文:

To clarify and provide some context, OpenAI's Client API post processes Server-sent events to make them into nice JSON. But the "raw" events are sent per data: only spec for Server Sent events, which is here: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events. There are links in the ipython notebook you mentioned that point to this spec and other resources in the notebooks introduction.

You can query the open AI end point to see raw Server-events yourself:

https://asciinema.org/a/TKDYxqh6pgCNN0hX6tdcSgfzU

I am using the curl command:

curl https://api.openai.com/v1/chat/completions -u :$OPENAI_API_KEY  -H 'Content-Type: application/json' -d '{"stream": true,"model": "gpt-3.5-turbo-0613", "messages": [ {"role": "user", "content": "Summarize Othello by Shaekspeare in a one line"}]}'

Now to answer your question. There is nothing preventing you from doing what you are intending, the sample code in the spec says you can do this very simply by placing a "\n\n" at the end of your data: event and have a client parser parse this(After it sees the relevant header "Content-Type: text/event-stream"). Requests library in python actually does this automatically for you using the response_obj.iter_lines() function. Python also has a SSE Client library that is easy to read that can parse this event and give you the full data: lines.

huangapple
  • 本文由 发表于 2023年6月16日 09:57:03
  • 转载请务必保留本文链接:https://go.coder-hub.com/76486517.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定