英文:
Asp.Net Core: Signalr: Queue: Pass group name into queue
问题
I have an asp.net core project using signalr. I'm trying to pass group name as a parameter to my backgroundservice via a queue. The, in my hub, I wrote a function to join the group with the group name passed in from the javascript. I get just one message in my browser.
Here's my code:
LiveDataHub.cs:
public sealed class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;
public async Task JoinToGroup(string group)
{
Console.WriteLine("New group joined: " + group);
await _queue.QueueAsync(group);
await Groups.AddToGroupAsync(Context.ConnectionId, group);
await Clients.Group(group).SendAsync("Send", $"{Context.ConnectionId} has joined the group {group}.");
}
}
MyBackgroundQueue.cs:
public interface IMyBackgroundQueue
{
ValueTask QueueAsync(string item);
IAsyncEnumerable<string> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
//private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
private readonly Channel<string> _channel;
public MyBackgroundQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_channel = Channel.CreateBounded<string>(options);
}
public ValueTask QueueAsync(string item) => _channel.Writer.WriteAsync(item);
public IAsyncEnumerable<string> DequeueAllAsync(CancellationToken ct) =>
_channel.Reader.ReadAllAsync(ct);
}
MyBackgroundService.cs:
public class MyBackgroundService : BackgroundService
{
private readonly IHubContext<LiveDataHub> _hub;
public IMyBackgroundQueue _queue { get; }
public MyBackgroundService(IHubContext<LiveDataHub> hub, IMyBackgroundQueue queue)
{
_hub = hub;
_queue = queue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var eventMessage = new Models.EventMessage($"Id_{Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
await foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
{
Console.WriteLine("group name: " + _item.ToString());
await _hub.Clients.Group(_item.ToString()).SendAsync("onMessageReceived", eventMessage, stoppingToken);
}
await Task.Delay(1000, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
await base.StopAsync(stoppingToken);
}
}
Startup.cs:
public void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddSignalR(hubOptions => { hubOptions.EnableDetailedErrors = true; });
services.AddHostedService<MyBackgroundService>();
services.AddSingleton<IMyBackgroundQueue, MyBackgroundQueue>(ctx => {
return new MyBackgroundQueue(100);
}); // I think the error is here on this line but I can't seem to figure out what it is
}
javascript.js:
const signalrConnection = new signalR.HubConnectionBuilder()
.withUrl("/messagebroker")
.configureLogging(signalR.LogLevel.Information)
.build();
signalrConnection.start().then(function () {
console.log("SignalR Hub Connected");
signalrConnection.invoke("JoinToGroup", "Group01")
.catch(function (err) {
console.error(err.toString());
});
}).catch(function (err) {
signalrConnection.stop();
console.error(err.toString());
});
英文:
I have an asp.net core project using signalr. I'm trying to pass group name as a parameter to my backgroundservice via a queue. The, in my hub, I wrote a function to join the group with the group name passed in from the javascript. I get just one message in my browser.
Here's my code:
LiveDataHub.cs:
public sealed class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
public LiveDataHub(IMyBackgroundQueue queue) => _queue = queue;
public async Task JoinToGroup(string group)
{
Console.WriteLine("New group joined: " + group);
await _queue.QueueAsync(group);
await Groups.AddToGroupAsync(Context.ConnectionId, group);
await Clients.Group(group).SendAsync("Send", $"{Context.ConnectionId} has joined the group {group}.");
}
}
MyBackgroundQueue.cs
public interface IMyBackgroundQueue
{
ValueTask QueueAsync(string item);
IAsyncEnumerable<string> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
//private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
private readonly Channel<string> _channel;
public MyBackgroundQueue(int capacity)
{
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_channel = Channel.CreateBounded<string>(options);
}
public ValueTask QueueAsync(string item) => _channel.Writer.WriteAsync(item);
public IAsyncEnumerable<string> DequeueAllAsync(CancellationToken ct) =>
_channel.Reader.ReadAllAsync(ct);
}
MyBackgroundService.cs:
public class MyBackgroundService : BackgroundService
{
private readonly IHubContext<LiveDataHub> _hub;
public IMyBackgroundQueue _queue { get; }
public MyBackgroundService(IHubContext<LiveDataHub> hub, IMyBackgroundQueue queue)
{
_hub = hub;
_queue = queue;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var eventMessage = new Models.EventMessage($"Id_{ Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
await foreach(var _item in _queue.DequeueAllAsync(stoppingToken))
{
Console.WriteLine("group name: " + _item.ToString());
await _hub.Clients.Group(_item.ToString()).SendAsync("onMessageReceived", eventMessage, stoppingToken);
}
await Task.Delay(1000, stoppingToken);
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
await base.StopAsync(stoppingToken);
}
}
Startup.cs:
public void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddSignalR(hubPotions=> { hubPotions.EnableDetailedErrors = true; });
services.AddHostedService<MyBackgroundService>();
services.AddSingleton<IMyBackgroundQueue, MyBackgroundQueue>(ctx => {
return new MyBackgroundQueue(100);
}); // I think the error is here on this line but I can't seem to figure out what it is
}
javascript.js
const signalrConnection = new signalR.HubConnectionBuilder()
.withUrl("/messagebroker")
.configureLogging(signalR.LogLevel.Information)
.build();
signalrConnection.start().then(function () {
console.log("SignalR Hub Connected");
signalrConnection.invoke("JoinToGroup", "Group01")
.catch(function (err) {
console.error(err.toString());
});
}).catch(function (err) {
signalrConnection.stop();
console.error(err.toString());
});
答案1
得分: 0
以下是您要翻译的内容:
There is a problem with your _queue.DequeueAllAsync(stoppingToken)
method, which causes this thread to wait all the time, so you can only enter while (!stoppingToken.IsCancellationRequested)
when the program starts and when you click JoinToGroup
.
You can use TryRead
instead of ReadAllAsync
:
MyBackgroundQueue:
public interface IMyBackgroundQueue
{
//...
IEnumerable<string> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
//...
public IEnumerable<string> DequeueAllAsync(CancellationToken ct)
{
while (_channel.Reader.TryRead(out string? item))
{
yield return item;
}
}
}
MyBackgroundService:
public class MyBackgroundService : BackgroundService
{
//...
while (!stoppingToken.IsCancellationRequested)
{
var eventMessage = new Models.EventMessage($"Id_{ Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
{
Console.WriteLine("group name: " + _item.ToString());
await _hub.Clients.Group(_item.ToString()).SendAsync("Send", eventMessage, stoppingToken);
}
await Task.Delay(1000, stoppingToken);
}
//...
}
Note the way to register MyBackgroundQueue
:
services.AddSingleton<IMyBackgroundQueue>(ctx => {
var queueCapacity = 100;
return new MyBackgroundQueue(queueCapacity);
});
Refer to Queued background tasks.
In addition, you can only send a message when you click JoinToGroup
, because when you store the groupName
in the Queue
, it will be consumed in the background service, which means that every time you click, it will be stored in the Queue
once, and after one second will delete it, groupName
only exists in Queue
for one second.
There is no TryPeek() in Channel
About how to stop the background service, you can refer to the two methods in this link.
For example, in LiveDataHub
:
public sealed class LiveDataHub : Hub
{
IHostApplicationLifetime _lifetime;
public ChatHub(IHostApplicationLifetime lifeTime)
{
_lifetime = lifeTime;
}
public override async Task OnDisconnectedAsync(Exception exception)
{
_lifetime.StopApplication();
await base.OnDisconnectedAsync(exception);
}
}
In this way, it will stop your application.
Another way is to call StopAsync()
in OnDisconnectedAsync
, usage is similar to this.
Edit:
I found that there seems to be a problem with the second method of stopping the background service, it doesn't work in my code. I found a solution, hope it can help you.
MyBackgroundService:
public interface ICustomServiceStopper : IHostedService
{
Task StopAsync(CancellationToken token = default);
}
public class MyBackgroundService : BackgroundService, ICustomServiceStopper
{
private static MyBackgroundService _instance;
public static MyBackgroundService Instance => _instance;
private readonly IHubContext<ChatHub> _hub;
public IMyBackgroundQueue _queue { get; }
public MyBackgroundService(IHubContext<ChatHub> hub, IMyBackgroundQueue queue)
{
_hub = hub;
_queue = queue;
if (_instance == null)
{
_instance = this;
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
//...
}
public Task StopAsync(CancellationToken token = default)
{
base.StopAsync(token);
return Task.CompletedTask;
}
}
In Startup.cs:
services.AddHostedService<MyBackgroundService>();
services.AddSingleton<ICustomServiceStopper, MyBackgroundService >(serviceProvider => {
return MyBackgroundService.Instance;
});
LiveDataHub:
public sealed class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
private ICustomServiceStopper _customServiceStopper;
public ChatHub(IMyBackgroundQueue queue, ICustomServiceStopper customServiceStopper)
{
_queue = queue;
_customServiceStopper= customServiceStopper;
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await _customServiceStopper.StopAsync();
await base.OnDisconnectedAsync(exception);
}
}
英文:
There is a problem with your _queue.DequeueAllAsync(stoppingToken)
method, which causes this thread to wait all the time, so you can only enter while (!stoppingToken.IsCancellationRequested)
when the program starts and when you click JoinToGroup
.
You can use TryRead
instead of ReadAllAsync
:
MyBackgroundQueue:
public interface IMyBackgroundQueue
{
//...
IEnumerable<string> DequeueAllAsync(
CancellationToken cancellationToken);
}
public class MyBackgroundQueue : IMyBackgroundQueue
{
//...
public IEnumerable<string> DequeueAllAsync(CancellationToken ct)
{
while (_channel.Reader.TryRead(out string? item))
{
yield return item;
}
}
}
MyBackgroundService:
public class MyBackgroundService : BackgroundService
{
//...
while (!stoppingToken.IsCancellationRequested)
{
var eventMessage = new Models.EventMessage($"Id_{ Guid.NewGuid():N}", $"Title_{Guid.NewGuid():N}", DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss"));
foreach (var _item in _queue.DequeueAllAsync(stoppingToken))
{
Console.WriteLine("group name: " + _item.ToString());
await _hub.Clients.Group(_item.ToString()).SendAsync("Send", eventMessage, stoppingToken);
}
await Task.Delay(1000,stoppingToken);
}
//...
}
Note the way to register MyBackgroundQueue
:
services.AddSingleton<IMyBackgroundQueue>(ctx => {
var queueCapacity = 100;
return new MyBackgroundQueue(queueCapacity);
});
Refer to Queued background tasks.
In addition, you can only send a message when you click JoinToGroup
, because when you store the groupName
in the Queue
, it will be consumed in the background service, which means that every time you click, it will be stored in the Queue
once, and after one second will delete it, groupName
only exists in Queue
for one second.
There is no TryPeek() in Channel<T> Class, but in ChannelReader<T> Class for .NET 6+. You can take a look at it if you have a business need.
About how to stop background service, you can refer to the two methods in this link.
For example, in LiveDataHub
:
public sealed class LiveDataHub : Hub
{
IHostApplicationLifetime _lifetime;
public ChatHub(IHostApplicationLifetime lifeTime)
{
_lifetime = lifeTime;
}
public override async Task OnDisconnectedAsync(Exception exception)
{
_lifetime.StopApplication();
await base.OnDisconnectedAsync(exception);
}
}
In this way, it will stop your application.
Another way is to call StopAsync()
in OnDisconnectedAsync
, usage is similar to this.
Edit:
I found that there seems to be a problem with the second method of stopping the background service, it doesn't work in my code. I found a solution, hope it can help you.
MyBackgroundService:
public interface ICustomServiceStopper : IHostedService
{
Task StopAsync(CancellationToken token = default);
}
public class MyBackgroundService : BackgroundService, ICustomServiceStopper
{
private static MyBackgroundService _instance;
public static MyBackgroundService Instance => _instance;
private readonly IHubContext<ChatHub> _hub;
public IMyBackgroundQueue _queue { get; }
public MyBackgroundService(IHubContext<ChatHub> hub, IMyBackgroundQueue queue)
{
_hub = hub;
_queue = queue;
if (_instance == null)
{
_instance = this;
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
//...
}
public Task StopAsync(CancellationToken token = default)
{
base.StopAsync(token);
return Task.CompletedTask;
}
}
In Startup.cs:
services.AddHostedService<MyBackgroundService>();
services.AddSingleton<ICustomServiceStopper, MyBackgroundService >(serviceProvider =>
{
return MyBackgroundService.Instance;
});
LiveDataHub:
public sealed class LiveDataHub : Hub
{
private readonly IMyBackgroundQueue _queue;
private ICustomServiceStopper _customServiceStopper;
public ChatHub(IMyBackgroundQueue queue, ICustomServiceStopper customServiceStopper)
{
_queue = queue;
_customServiceStopper= customServiceStopper;
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await _customServiceStopper.StopAsync();
await base.OnDisconnectedAsync(exception);
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论