英文:
yield not returning result immediately to caller api - C#8, IAsyncEnumerable
问题
I am using .Net 6. My use case is to return stream data from "myapi" to a "middle api(BFF)" to client app in react.
I have a code in "myapi" endpoint that should yield a result as soon as it receives it -
myapi code -
public async IAsyncEnumerable<string> GetStreamingResponse()
{
var rawAzureOpenAIRequest = new CompletionsRequest();
rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
CompletionsOptions optns = new CompletionsOptions();
optns.Prompts.Add("add 6+1 :");
optns.Prompts.Add("below is the summary of technical consultant role in software");
var azResponse = await _openAIRepository.GetStreamingResponse(rawAzureOpenAIRequest.Model, optns, canToken);
await foreach (var choice in azResponse.Value.GetChoicesStreaming())
{
await foreach (var message in choice.GetTextStreaming())
{
yield return message;
await Task.Delay(10000);
}
}
}
My consuming "middle bff api" is as below, it is not hitting the breakpoint in consuming api after each yield return which is my issue, i.e., control does not return to consuming api after each yield return. I want as soon as a message is yielded returned from the first api above, the consuming api should receive it.
Consuming api code -
[HttpGet]
[Route("v1/testendpoint")]
public async Task Get()
{
using HttpClient Client = new();
using HttpResponseMessage response = await Client.GetAsync(
"http://localhost...",
HttpCompletionOption.ResponseHeadersRead
).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
IAsyncEnumerable<object> messages = JsonSerializer.DeserializeAsyncEnumerable<object>(responseStream,
new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
DefaultBufferSize = 10
});
Response.Headers.Add("Content-Type", "text/event-stream");
await foreach (var message in messages)
{
debugger;
byte[] messageBytes = ASCIIEncoding.ASCII.GetBytes("data:" + message + "\n\n");
await Response.Body.WriteAsync(messageBytes, 0, messageBytes.Length);
await Response.Body.FlushAsync();
}
}
Could someone please explain why is it happening?
I have tried to add a delay to check if the control is returning to consuming api after yielding a return, but it is not.
I also tried hitting the first api that yields with below client-side code and it yields in batches.
fetch("http://localhost:7200/v1...", config)
.then(async response => {
const reader = response.body?.getReader();
if (!reader) {
return;
}
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
var item = decoder.decode(value).replace(/\[|]/g, '').replace(/^,/, '');
var parsedItem = JSON.parse(item);
console.log(item + "\n");
debugger;
}
reader.releaseLock();
}, (reason) => {
console.log(reason);
debugger;
});
In the first sending api, the GetTextStreaming method has the following definition -
UPDATE:
Trying to return stream directly now -
myapi code
public async Task<Stream> GetRawStreamingCompletionResponse()
{
var rawAzureOpenAIRequest = new CompletionsRequest();
rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
CompletionsOptions optns = new CompletionsOptions();
optns.Prompts.Add("add 6+1 :");
optns.Prompts.Add("below is the summary of technical consultant role in software");
var azResponse = await _openAIRepository
.GetStreamingResponse(rawAzureOpenAIRequest.ModelToUse, optns, canToken);
return azResponse.GetRawResponse().ContentStream;
}
In consuming api -
public async Task Get() {
var stream = await Client.GetStreamAsync("http://localhost...");
Response.Headers.Add("Content-Type", "text/event-stream");
stream.CopyToAsync(this.Response.Body);
await Response.Body.FlushAsync();
}
英文:
I am using .Net 6. My use case is to return stream data from "myapi" to a "middle api(BFF)" to client app in react.
I have a code in "myapi" endpoint that should yield a result as soon as it receives it -
myapi code -
public async IAsyncEnumerable<string> GetStreamingResponse()
{
var rawAzureOpenAIRequest = new CompletionsRequest();
rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
CompletionsOptions optns = new CompletionsOptions();
optns.Prompts.Add("add 6+1 :");
optns.Prompts.Add("below is the summary of technical consultant role in software");
var azResponse = await _openAIRepository.GetStreamingResponse(rawAzureOpenAIRequest.Model, optns,
canToken);
await foreach (var choice in azResponse.Value.GetChoicesStreaming())
{
await foreach (var message in choice.GetTextStreaming())
{
yield return message;
await Task.Delay(10000);
}
}
}
My consuming "middle bff api" is as below, it is not hitting the breakpoint in consuming api after each yield return which is my issue, ie, control does not return to consuming api after each yield return. I want as soon as a message is yielded returned from the first api above, the consuming api should receive it.
Consuming api code -
[HttpGet]
[Route("v1/testendpoint")]
public async Task Get()
{
using HttpClient Client = new();
using HttpResponseMessage response = await Client.GetAsync(
"http://localhost...",
HttpCompletionOption.ResponseHeadersRead
).ConfigureAwait(false);
response.EnsureSuccessStatusCode();
Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
IAsyncEnumerable<object> messages = JsonSerializer.DeserializeAsyncEnumerable<object>(responseStream,
new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
DefaultBufferSize = 10
});
Response.Headers.Add("Content-Type", "text/event-stream");
await foreach (var message in messages)
{
debugger;
byte[] messageBytes = ASCIIEncoding.ASCII.GetBytes("data:" + message + "\n\n");
await Response.Body.WriteAsync(messageBytes, 0, messageBytes.Length);
await Response.Body.FlushAsync();
}
}
Could someone please explain why is it happening?
I have tried to add a delay to check if the control is returning to consuming api after yielding a return, but it is not.
I also tried hitting the first api that yields with below client-side code and it yields in batches.
fetch("http://localhost:7200/v1...", config)
.then(async response => {
const reader = response.body?.getReader();
if (!reader) {
return;
}
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
var item = decoder.decode(value).replace(/\[|]/g, '').replace(/^,/, '');
var parsedItem = JSON.parse(item);
console.log(item + "\n");
debugger;
}
reader.releaseLock();
}, (reason) => {
console.log(reason);
debugger;
});
In the first sending api, the GetTextStreaming method has the following definition -
UPDATE:
Trying to return stream directly now -
myapi code
public async Task<Stream> GetRawStreamingCompletionResponse()
{
var rawAzureOpenAIRequest = new CompletionsRequest();
rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
CompletionsOptions optns = new CompletionsOptions();
optns.Prompts.Add("add 6+1 :");
optns.Prompts.Add("below is the summary of technical consultant role in software");
var azResponse = await _openAIRepository
.GetStreamingResponse(rawAzureOpenAIRequest.ModelToUse, optns,
canToken);
return azResponse.GetRawResponse().ContentStream;
}
In consuming api -
public async Task Get() {
var stream = await Client.GetStreamAsync("http://localhost...");
Response.Headers.Add("Content-Type", "text/event-stream");
stream.CopyToAsync(this.Response.Body);
await Response.Body.FlushAsync();
}
答案1
得分: 1
我认为我找到了其中的一些原因。这取决于System.Text.Json
库在序列化IAsyncEnumerable
时何时刷新响应主体。它并不在每一步的异步枚举之后刷新响应主体,而是在JSON序列化器内部的缓冲区大小限制时刷新。
在他们的页面上,提到的默认缓冲区大小为16,384字节。
我通过在每次yield
后刷新响应来处理它。
英文:
I think I found some reasoning behind it. It depends on when the System.Text.Json
library flushes the reponse body when serializing an IAsyncEnumerable
. It does not flush the response body after every step of async enumeration, rather it flushes at a buffer size limit that's internal to the JSON serializer.
On their page the mentioned default buffer size, in bytes, is 16,384.
I handled it by Flushing Response after every yield.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论