yield not returning result immediately to caller api – C#8, IAsyncEnumerable

huangapple go评论72阅读模式
英文:

yield not returning result immediately to caller api - C#8, IAsyncEnumerable

问题

I am using .Net 6. My use case is to return stream data from "myapi" to a "middle api(BFF)" to client app in react.

I have a code in "myapi" endpoint that should yield a result as soon as it receives it -

myapi code -

public async IAsyncEnumerable<string> GetStreamingResponse()
{            
    var rawAzureOpenAIRequest = new CompletionsRequest();
    rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
    CompletionsOptions optns = new CompletionsOptions();
    optns.Prompts.Add("add 6+1 :");
    optns.Prompts.Add("below is the summary of technical consultant role in software");
    var azResponse = await _openAIRepository.GetStreamingResponse(rawAzureOpenAIRequest.Model, optns, canToken);

    await foreach (var choice in azResponse.Value.GetChoicesStreaming())
    {
        await foreach (var message in choice.GetTextStreaming())
        {
            yield return message;
            await Task.Delay(10000);
        }
    }
}

My consuming "middle bff api" is as below, it is not hitting the breakpoint in consuming api after each yield return which is my issue, i.e., control does not return to consuming api after each yield return. I want as soon as a message is yielded returned from the first api above, the consuming api should receive it.

Consuming api code -

[HttpGet]
[Route("v1/testendpoint")]
public async Task Get()
{            
    using HttpClient Client = new();
    using HttpResponseMessage response = await Client.GetAsync(
        "http://localhost...",
        HttpCompletionOption.ResponseHeadersRead
    ).ConfigureAwait(false);

    response.EnsureSuccessStatusCode();

    Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

    IAsyncEnumerable<object> messages = JsonSerializer.DeserializeAsyncEnumerable<object>(responseStream,
    new JsonSerializerOptions
    {
        PropertyNameCaseInsensitive = true,
        DefaultBufferSize = 10
    });

    Response.Headers.Add("Content-Type", "text/event-stream");

    await foreach (var message in messages)
    {
        debugger;
        byte[] messageBytes = ASCIIEncoding.ASCII.GetBytes("data:" + message + "\n\n");
        await Response.Body.WriteAsync(messageBytes, 0, messageBytes.Length);
        await Response.Body.FlushAsync();
    }
}

Could someone please explain why is it happening?

I have tried to add a delay to check if the control is returning to consuming api after yielding a return, but it is not.

I also tried hitting the first api that yields with below client-side code and it yields in batches.

fetch("http://localhost:7200/v1...", config)
    .then(async response => {
        const reader = response.body?.getReader();
        if (!reader) {
            return;
        }
        const decoder = new TextDecoder();
        while (true) {
            const { done, value } = await reader.read();
            if (done) break;
            var item = decoder.decode(value).replace(/\[|]/g, '').replace(/^,/, '');

            var parsedItem = JSON.parse(item);
            console.log(item + "\n");
            debugger;
        }
        reader.releaseLock();
    }, (reason) => {
        console.log(reason);
        debugger;
    });

In the first sending api, the GetTextStreaming method has the following definition -

yield not returning result immediately to caller api – C#8, IAsyncEnumerable

UPDATE:

Trying to return stream directly now -

myapi code

public async Task<Stream> GetRawStreamingCompletionResponse()
{            
    var rawAzureOpenAIRequest = new CompletionsRequest();
    rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
    CompletionsOptions optns = new CompletionsOptions();
    optns.Prompts.Add("add 6+1 :");
    optns.Prompts.Add("below is the summary of technical consultant role in software");

    var azResponse = await _openAIRepository
        .GetStreamingResponse(rawAzureOpenAIRequest.ModelToUse, optns, canToken);

    return azResponse.GetRawResponse().ContentStream;
}

In consuming api -

public async Task Get() {
    var stream = await Client.GetStreamAsync("http://localhost...");
    Response.Headers.Add("Content-Type", "text/event-stream");
    stream.CopyToAsync(this.Response.Body);
    await Response.Body.FlushAsync();            
}
英文:

I am using .Net 6. My use case is to return stream data from "myapi" to a "middle api(BFF)" to client app in react.

I have a code in "myapi" endpoint that should yield a result as soon as it receives it -

myapi code -


public async IAsyncEnumerable&lt;string&gt; GetStreamingResponse()
        {            
            var rawAzureOpenAIRequest = new CompletionsRequest();
            rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
            CompletionsOptions optns = new CompletionsOptions();
            optns.Prompts.Add(&quot;add 6+1 :&quot;);
            optns.Prompts.Add(&quot;below is the summary of technical consultant role in software&quot;);
var azResponse = await _openAIRepository.GetStreamingResponse(rawAzureOpenAIRequest.Model, optns,
                canToken);

            await foreach (var choice in azResponse.Value.GetChoicesStreaming())
            {
                await foreach (var message in choice.GetTextStreaming())
                {
                    yield return message;
                    await Task.Delay(10000);
                }
            }
}

My consuming "middle bff api" is as below, it is not hitting the breakpoint in consuming api after each yield return which is my issue, ie, control does not return to consuming api after each yield return. I want as soon as a message is yielded returned from the first api above, the consuming api should receive it.

Consuming api code -

[HttpGet]
[Route(&quot;v1/testendpoint&quot;)]
        public async Task Get()
        {            
            using HttpClient Client = new();
            using HttpResponseMessage response = await Client.GetAsync(
                &quot;http://localhost...&quot;,
                HttpCompletionOption.ResponseHeadersRead
            ).ConfigureAwait(false);

            response.EnsureSuccessStatusCode();

           Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);

            IAsyncEnumerable&lt;object&gt; messages = JsonSerializer.DeserializeAsyncEnumerable&lt;object&gt;(responseStream,
            new JsonSerializerOptions
            {
                PropertyNameCaseInsensitive = true,
                DefaultBufferSize = 10
            });

            Response.Headers.Add(&quot;Content-Type&quot;, &quot;text/event-stream&quot;);

            await foreach (var message in messages)
            {
                debugger;
                byte[] messageBytes = ASCIIEncoding.ASCII.GetBytes(&quot;data:&quot; + message + &quot;\n\n&quot;);
                await Response.Body.WriteAsync(messageBytes, 0, messageBytes.Length);
                await Response.Body.FlushAsync();
            }
}

Could someone please explain why is it happening?

I have tried to add a delay to check if the control is returning to consuming api after yielding a return, but it is not.

I also tried hitting the first api that yields with below client-side code and it yields in batches.

fetch(&quot;http://localhost:7200/v1...&quot;, config)
      .then(async response =&gt; {
        const reader = response.body?.getReader();
        if (!reader) {
          return;
        }
        const decoder = new TextDecoder();
        while (true) {
          const { done, value } = await reader.read();
          if (done) break;
          var item = decoder.decode(value).replace(/\[|]/g, &#39;&#39;).replace(/^,/, &#39;&#39;);

          var parsedItem = JSON.parse(item);
          console.log(item + &quot;\n&quot;);
          debugger;

        }
        reader.releaseLock();
      }, (reason) =&gt; {
        console.log(reason);
        debugger;
      });

In the first sending api, the GetTextStreaming method has the following definition -
yield not returning result immediately to caller api – C#8, IAsyncEnumerable

UPDATE:

Trying to return stream directly now -
myapi code

public async Task&lt;Stream&gt; GetRawStreamingCompletionResponse()
            {            
                var rawAzureOpenAIRequest = new CompletionsRequest();
                rawAzureOpenAIRequest.ModelToUse = DefaultTextModelToUse;
                CompletionsOptions optns = new CompletionsOptions();
                optns.Prompts.Add(&quot;add 6+1 :&quot;);
                optns.Prompts.Add(&quot;below is the summary of technical consultant role in software&quot;);
    
                var azResponse = await _openAIRepository
                    .GetStreamingResponse(rawAzureOpenAIRequest.ModelToUse, optns,
                    canToken);
    
                return azResponse.GetRawResponse().ContentStream;
            }

In consuming api -

public async Task Get() {
                var stream = await Client.GetStreamAsync(&quot;http://localhost...&quot;);
                Response.Headers.Add(&quot;Content-Type&quot;, &quot;text/event-stream&quot;);
                stream.CopyToAsync(this.Response.Body);
                await Response.Body.FlushAsync();            
}

答案1

得分: 1

我认为我找到了其中的一些原因。这取决于System.Text.Json库在序列化IAsyncEnumerable时何时刷新响应主体。它并不在每一步的异步枚举之后刷新响应主体,而是在JSON序列化器内部的缓冲区大小限制时刷新。

在他们的页面上,提到的默认缓冲区大小为16,384字节。

我通过在每次yield后刷新响应来处理它。

英文:

I think I found some reasoning behind it. It depends on when the System.Text.Json library flushes the reponse body when serializing an IAsyncEnumerable. It does not flush the response body after every step of async enumeration, rather it flushes at a buffer size limit that's internal to the JSON serializer.

On their page the mentioned default buffer size, in bytes, is 16,384.

I handled it by Flushing Response after every yield.

huangapple
  • 本文由 发表于 2023年5月17日 12:36:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76268588.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定