Asp.Net Core: Signalr: 队列: 将组名传递到队列

huangapple go评论124阅读模式
英文:

Asp.Net Core: Signalr: Queue: Pass group name into queue

问题

I have an asp.net core project using signalr. I'm trying to pass group name as a parameter to my backgroundservice via a queue. The, in my hub, I wrote a function to join the group with the group name passed in from the javascript. I get just one message in my browser.

Here's my code:

LiveDataHub.cs:

public sealed class LiveDataHub : Hub
{
    private readonly IMyBackgroundQueue _queue;
    public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;

    public async Task JoinToGroup(string group)
    {
        Console.WriteLine("New group joined: " + group);
        await _queue.QueueAsync(group);
        await Groups.AddToGroupAsync(Context.ConnectionId, group);
        await Clients.Group(group).SendAsync("Send", $"{Context.ConnectionId} has joined the group {group}.");
    }
}

MyBackgroundQueue.cs:

public interface IMyBackgroundQueue
{
    ValueTask QueueAsync(string item);

    IAsyncEnumerable<string> DequeueAllAsync(
        CancellationToken cancellationToken);
}

public class MyBackgroundQueue : IMyBackgroundQueue
{
    //private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
    private readonly Channel<string> _channel;
    public MyBackgroundQueue(int capacity)
    {
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<string>(options);
    }
    public ValueTask QueueAsync(string item) => _channel.Writer.WriteAsync(item);
    public IAsyncEnumerable<string> DequeueAllAsync(CancellationToken ct) =>
        _channel.Reader.ReadAllAsync(ct);
}

MyBackgroundService.cs:

public class MyBackgroundService : BackgroundService
{
    private readonly IHubContext<LiveDataHub> _hub;
    public IMyBackgroundQueue _queue { get; }
    public MyBackgroundService(IHubContext<LiveDataHub> hub, IMyBackgroundQueue queue)
    {
        _hub = hub;
        _queue = queue;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var eventMessage = new Models.EventMessage($"Id_{Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
            await foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
            {
                Console.WriteLine("group name: " + _item.ToString());
                await _hub.Clients.Group(_item.ToString()).SendAsync("onMessageReceived", eventMessage, stoppingToken);
            }
            await Task.Delay(1000, stoppingToken);
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        await base.StopAsync(stoppingToken);
    }
}

Startup.cs:

public void ConfigureServices(IServiceCollection services)
{
    services.AddRazorPages();
    services.AddSignalR(hubOptions => { hubOptions.EnableDetailedErrors = true; });

    services.AddHostedService<MyBackgroundService>();
    services.AddSingleton<IMyBackgroundQueue, MyBackgroundQueue>(ctx => {
        return new MyBackgroundQueue(100);
    }); // I think the error is here on this line but I can't seem to figure out what it is
}

javascript.js:

const signalrConnection = new signalR.HubConnectionBuilder()
    .withUrl("/messagebroker")
    .configureLogging(signalR.LogLevel.Information)
    .build();

signalrConnection.start().then(function () {
    console.log("SignalR Hub Connected");

    signalrConnection.invoke("JoinToGroup", "Group01")
        .catch(function (err) {
            console.error(err.toString());
        });

}).catch(function (err) {
    signalrConnection.stop();
    console.error(err.toString());
});
英文:

I have an asp.net core project using signalr. I'm trying to pass group name as a parameter to my backgroundservice via a queue. The, in my hub, I wrote a function to join the group with the group name passed in from the javascript. I get just one message in my browser.

Here's my code:

LiveDataHub.cs:

public sealed class LiveDataHub : Hub     
{           
    private readonly IMyBackgroundQueue _queue;
    public LiveDataHub(IMyBackgroundQueue queue) =&gt; _queue = queue;

    public async Task JoinToGroup(string group)
    {
        Console.WriteLine(&quot;New group joined: &quot; + group);            
        await _queue.QueueAsync(group);
        await Groups.AddToGroupAsync(Context.ConnectionId, group);
        await Clients.Group(group).SendAsync(&quot;Send&quot;, $&quot;{Context.ConnectionId} has joined the group {group}.&quot;);
    }
}

MyBackgroundQueue.cs

public interface IMyBackgroundQueue
{
    ValueTask QueueAsync(string item);

    IAsyncEnumerable&lt;string&gt; DequeueAllAsync(
        CancellationToken cancellationToken);
}

public class MyBackgroundQueue : IMyBackgroundQueue
{
    //private readonly Channel&lt;string&gt; _channel = Channel.CreateUnbounded&lt;string&gt;();
    private readonly Channel&lt;string&gt; _channel;
    public MyBackgroundQueue(int capacity)
    {            
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded&lt;string&gt;(options);
    }
    public ValueTask QueueAsync(string item) =&gt; _channel.Writer.WriteAsync(item);        
    public IAsyncEnumerable&lt;string&gt; DequeueAllAsync(CancellationToken ct) =&gt;
        _channel.Reader.ReadAllAsync(ct);
}

MyBackgroundService.cs:

public class MyBackgroundService : BackgroundService
{
    private readonly IHubContext&lt;LiveDataHub&gt; _hub;
    public IMyBackgroundQueue _queue { get; }
    public MyBackgroundService(IHubContext&lt;LiveDataHub&gt; hub, IMyBackgroundQueue queue)
    {
        _hub = hub;
        _queue = queue;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var eventMessage = new Models.EventMessage($&quot;Id_{ Guid.NewGuid():N}&quot;, $&quot;Title_{Guid.NewGuid():N}&quot;, DateTime.Now.ToString(&quot;yyyy-MM-dd hh:mm:ss&quot;));                
            await foreach(var _item in _queue.DequeueAllAsync(stoppingToken))
            {
                Console.WriteLine(&quot;group name: &quot; + _item.ToString());
                await _hub.Clients.Group(_item.ToString()).SendAsync(&quot;onMessageReceived&quot;, eventMessage, stoppingToken);
            }
            await Task.Delay(1000, stoppingToken);
        }


        
    }
    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        await base.StopAsync(stoppingToken);
    }
}

Startup.cs:

public void ConfigureServices(IServiceCollection services)
    {
        services.AddRazorPages();
        services.AddSignalR(hubPotions=&gt; { hubPotions.EnableDetailedErrors = true; });

        services.AddHostedService&lt;MyBackgroundService&gt;();            
        services.AddSingleton&lt;IMyBackgroundQueue, MyBackgroundQueue&gt;(ctx =&gt; {
            return new MyBackgroundQueue(100);
        }); // I think the error is here on this line but I can&#39;t seem to figure out what it is
    }

javascript.js

const signalrConnection = new signalR.HubConnectionBuilder()
.withUrl(&quot;/messagebroker&quot;)
.configureLogging(signalR.LogLevel.Information)
.build();

signalrConnection.start().then(function () {
console.log(&quot;SignalR Hub Connected&quot;);

signalrConnection.invoke(&quot;JoinToGroup&quot;, &quot;Group01&quot;)
    .catch(function (err) {
        console.error(err.toString());
    });

}).catch(function (err) {
    signalrConnection.stop();
    console.error(err.toString());
});

答案1

得分: 0

以下是您要翻译的内容:

There is a problem with your _queue.DequeueAllAsync(stoppingToken) method, which causes this thread to wait all the time, so you can only enter while (!stoppingToken.IsCancellationRequested) when the program starts and when you click JoinToGroup.

You can use TryRead instead of ReadAllAsync:

MyBackgroundQueue:

public interface IMyBackgroundQueue
{
    //...
    IEnumerable<string> DequeueAllAsync(
        CancellationToken cancellationToken);
}

public class MyBackgroundQueue : IMyBackgroundQueue
{
    //...
    public IEnumerable<string> DequeueAllAsync(CancellationToken ct)
    {
        while (_channel.Reader.TryRead(out string? item))
        {
            yield return item;
        }
    }
}

MyBackgroundService:

public class MyBackgroundService : BackgroundService
{
    //...
    while (!stoppingToken.IsCancellationRequested)
    {
        var eventMessage = new Models.EventMessage($"Id_{ Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));

        foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
        {
            Console.WriteLine("group name: " + _item.ToString());
            await _hub.Clients.Group(_item.ToString()).SendAsync("Send", eventMessage, stoppingToken);
        }
        await Task.Delay(1000, stoppingToken);
    }
    //...
}

Note the way to register MyBackgroundQueue:

services.AddSingleton<IMyBackgroundQueue>(ctx => {
     var queueCapacity = 100;
    return new MyBackgroundQueue(queueCapacity);
});

Refer to Queued background tasks.

In addition, you can only send a message when you click JoinToGroup, because when you store the groupName in the Queue, it will be consumed in the background service, which means that every time you click, it will be stored in the Queue once, and after one second will delete it, groupName only exists in Queue for one second.

There is no TryPeek() in Channel Class, but in ChannelReader Class for .NET 6+. You can take a look at it if you have a business need.

About how to stop the background service, you can refer to the two methods in this link.

For example, in LiveDataHub:

public sealed class LiveDataHub : Hub     
{
    IHostApplicationLifetime _lifetime;
    public ChatHub(IHostApplicationLifetime lifeTime)
    {
        _lifetime = lifeTime;
    }
    public override async Task OnDisconnectedAsync(Exception exception)
    {
        _lifetime.StopApplication();
        await base.OnDisconnectedAsync(exception);
    }
}

In this way, it will stop your application.

Another way is to call StopAsync() in OnDisconnectedAsync, usage is similar to this.

Edit:

I found that there seems to be a problem with the second method of stopping the background service, it doesn't work in my code. I found a solution, hope it can help you.

MyBackgroundService:

public interface ICustomServiceStopper : IHostedService
{
    Task StopAsync(CancellationToken token = default);
}

public class MyBackgroundService : BackgroundService, ICustomServiceStopper
{
    private static MyBackgroundService _instance;
    public static MyBackgroundService Instance => _instance;
    private readonly IHubContext<ChatHub> _hub;
    public IMyBackgroundQueue _queue { get; }
    public MyBackgroundService(IHubContext<ChatHub> hub, IMyBackgroundQueue queue)
    {
        _hub = hub;
        _queue = queue;
        if (_instance == null)
        {
            _instance = this;
        }
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //...
    }
    public Task StopAsync(CancellationToken token = default)
    { 
        base.StopAsync(token);
        return Task.CompletedTask;
    }
}

In Startup.cs:

services.AddHostedService<MyBackgroundService>();
services.AddSingleton<ICustomServiceStopper, MyBackgroundService >(serviceProvider => {
    return MyBackgroundService.Instance;
});

LiveDataHub:

public sealed class LiveDataHub : Hub     
{
    private readonly IMyBackgroundQueue _queue;
    private ICustomServiceStopper _customServiceStopper;
    public ChatHub(IMyBackgroundQueue queue, ICustomServiceStopper customServiceStopper)
    {
        _queue = queue;
        _customServiceStopper= customServiceStopper;
    }
    public override async Task OnDisconnectedAsync(Exception exception)
    {
        await _customServiceStopper.StopAsync();
        await base.OnDisconnectedAsync(exception);
    }
}
英文:

There is a problem with your _queue.DequeueAllAsync(stoppingToken) method, which causes this thread to wait all the time, so you can only enter while (!stoppingToken.IsCancellationRequested) when the program starts and when you click JoinToGroup.

You can use TryRead instead of ReadAllAsync:

MyBackgroundQueue:

public interface IMyBackgroundQueue
{
    //...
    IEnumerable&lt;string&gt; DequeueAllAsync(
        CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
    //...
    public IEnumerable&lt;string&gt; DequeueAllAsync(CancellationToken ct)
    {
        while (_channel.Reader.TryRead(out string? item))
        {
            yield return item;
        }
    }
}

MyBackgroundService:

public class MyBackgroundService : BackgroundService
{
    //...
    while (!stoppingToken.IsCancellationRequested)
    {
        var eventMessage = new Models.EventMessage($&quot;Id_{ Guid.NewGuid():N}&quot;, $&quot;Title_{Guid.NewGuid():N}&quot;, DateTime.Now.ToString(&quot;yyyy-MM-dd hh:mm:ss&quot;));

        foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
        {
            Console.WriteLine(&quot;group name: &quot; + _item.ToString());
            await _hub.Clients.Group(_item.ToString()).SendAsync(&quot;Send&quot;, eventMessage, stoppingToken);
        }
        await Task.Delay(1000,stoppingToken);
    }
    //...
}

Note the way to register MyBackgroundQueue:

services.AddSingleton&lt;IMyBackgroundQueue&gt;(ctx =&gt; {
     var queueCapacity = 100;
    return new MyBackgroundQueue(queueCapacity);
});

Refer to Queued background tasks.

In addition, you can only send a message when you click JoinToGroup, because when you store the groupName in the Queue, it will be consumed in the background service, which means that every time you click, it will be stored in the Queue once, and after one second will delete it, groupName only exists in Queue for one second.

There is no TryPeek() in Channel<T> Class, but in ChannelReader<T> Class for .NET 6+. You can take a look at it if you have a business need.

About how to stop background service, you can refer to the two methods in this link.

For example, in LiveDataHub:

public sealed class LiveDataHub : Hub     
{
    IHostApplicationLifetime _lifetime;
    public ChatHub(IHostApplicationLifetime lifeTime)
    {
        _lifetime = lifeTime;
    }
    public override async Task OnDisconnectedAsync(Exception exception)
    {
        _lifetime.StopApplication();
        await base.OnDisconnectedAsync(exception);
    }
}

In this way, it will stop your application.

Another way is to call StopAsync() in OnDisconnectedAsync, usage is similar to this.

Edit:

I found that there seems to be a problem with the second method of stopping the background service, it doesn't work in my code. I found a solution, hope it can help you.

MyBackgroundService:

public interface ICustomServiceStopper : IHostedService
{
    Task StopAsync(CancellationToken token = default);
}
public class MyBackgroundService : BackgroundService, ICustomServiceStopper
{
    private static MyBackgroundService _instance;
    public static MyBackgroundService Instance =&gt; _instance;
    private readonly IHubContext&lt;ChatHub&gt; _hub;
    public IMyBackgroundQueue _queue { get; }
    public MyBackgroundService(IHubContext&lt;ChatHub&gt; hub, IMyBackgroundQueue queue)
    {
        _hub = hub;
        _queue = queue;
        if (_instance == null)
        {
            _instance = this;
        }
    }
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //...
    }
    public Task StopAsync(CancellationToken token = default)
    { 
        base.StopAsync(token);
        return Task.CompletedTask;
    }
}

In Startup.cs:

services.AddHostedService&lt;MyBackgroundService&gt;();
services.AddSingleton&lt;ICustomServiceStopper, MyBackgroundService &gt;(serviceProvider =&gt;
{
    return MyBackgroundService.Instance;
});

LiveDataHub:

public sealed class LiveDataHub : Hub     
{
    private readonly IMyBackgroundQueue _queue;
    private ICustomServiceStopper _customServiceStopper;
    public ChatHub(IMyBackgroundQueue queue, ICustomServiceStopper customServiceStopper)
    {
        _queue = queue;
        _customServiceStopper= customServiceStopper;
    }
    public override async Task OnDisconnectedAsync(Exception exception)
    {
        await _customServiceStopper.StopAsync();
        await base.OnDisconnectedAsync(exception);
    }
}

huangapple
  • 本文由 发表于 2023年3月12日 11:57:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/75710976.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定