英文:
MassTransit StateMachine not initiated
问题
The corrected translation of your update is as follows:
更新 1
将这个 EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
更改为这个 EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(DeleteTenant)}"));
即可解决问题。
英文:
I'm trying to configure a MassTransit StateMachine saga that should be triggered when a message is sent, but sending the message does not initiate the saga; the message remains in the queue. If I publish the message, it works, but I think that for this use-case, sending the message is the right approach.
What am I missing here?
I have the strong feeling that the Consumer Endpoint for the InitiateAccountDeletion queue is not being configured, but I would expect the ConfigureEndpoints() method to do so.
public class DeleteAccountSaga : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string? CurrentState { get; set; }
public int FilesState { get; set; }
}
public class DeleteAccountStateMachine : MassTransitStateMachine<DeleteAccountSaga>
{
// Files are being deleted.
public State DeletingFiles { get; }
// Files were deleted.
public State FilesDeletionCompleted { get; }
// The account and related data is being deleted from the database.
public State DeletingAccount { get; }
// The account and related data have been deleted from the database.
public State AccountDeletionCompleted { get; }
// This message should initiate the saga.
public Event<InitiateAccountDeletion> InitiateAccountDeletion { get; }
// This event indicates that files from a specific server (T1) were deleted.
public Event<AccountFilesT1Deleted> AccountFilesT1Deleted { get; }
// This event indicates that files from a specific server (T2) were deleted.
public Event<AccountFilesT2Deleted> AccountFilesT2Deleted { get; }
// Composite event that indicates that files from all servers were deleted.
public Event AccountFilesDeleted { get; }
// This event indicates that the database records were deleted.
public Event<AccountDeleted> AccountDeleted { get; }
public DeleteAccountStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => InitiateAccountDeletion, x =>
{
x.InsertOnInitial = true;
x.CorrelateById(context => context.Message.AccountId);
x.SetSagaFactory(context => new()
{
CorrelationId = context.Message.AccountId
});
});
Event(() => AccountFilesT1Deleted, x => x.CorrelateById(context => context.Message.AccountId));
Event(() => AccountFilesT2Deleted, x => x.CorrelateById(context => context.Message.AccountId));
Event(() => AccountDeleted, x => x.CorrelateById(context => context.Message.AccountId));
// The saga should be started when the InitiateAccountDeletion message is sent.
// After, it should send DeleteFilesT1 and DeleteFilesT2 messages so that files from corresponsing servers are deleted.
Initially(
When(InitiateAccountDeletion)
.Send(context => new DeleteFilesT1(context.Message.AccountId))
.Send(context => new DeleteFilesT2(context.Message.AccountId))
.TransitionTo(DeletingFiles)
);
// Indicate that all files were deleted.
CompositeEvent(() => FilesDeleted, x => x.FilesState,
AccountFilesT1Deleted,
AccountFilesT2Deleted
);
// When all files have been deleted, send DeleteAccount message which will delete all data from the database.
During(DeletingFiles,
Ignore(InitiateAccountDeletion),
When(FilesDeleted)
.TransitionTo(FilesDeletionCompleted)
.Send(context => new DeleteAccount(context.Saga.CorrelationId))
);
During(FilesDeletionCompleted,
When(AccountDeleted)
.TransitionTo(AccountDeletionCompleted)
.Finalize()
);
SetCompletedWhenFinalized();
}
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
// [...]
EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
EndpointConvention.Map<DeleteFilesT1>(new Uri($"queue:{nameof(DeleteFilesT1)}"));
EndpointConvention.Map<DeleteFilesT2>(new Uri($"queue:{nameof(DeleteFilesT2)}"));
EndpointConvention.Map<DeleteAccount>(new Uri($"queue:{nameof(DeleteAccount)}"));
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<AccountDbContext>(options =>
{
options.UsePostgres();
options.UseBusOutbox();
});
// This registers:
// - DeleteFilesT1Consumer
// - DeleteFilesT2Consumer
// - DeleteAccountConsumer
x.AddConsumers(typeof(Startup).Assembly);
x.AddSagaStateMachine<DeleteAccountStateMachine, DeleteAccountSaga>()
.EntityFrameworkRepository(options =>
{
options.ExistingDbContext<AccountDbContext>();
options.UsePostgres();
});
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(configuration["AWS:Region"], (_) => { });
cfg.ConfigureEndpoints(context);
});
});
}
}
public class AccountController : ControllerBase
{
[HttpDelete("{accountId}")]
public async Task<IActionResult> Delete(Guid accountId)
{
var account = await _dbContext.Account.FindAsync(accountId);
if (account == null)
return NotFound("...");
account.Status = "Deleting";
// This message remains in the InitiateAccountDeletion queue. It doesn't get processed by the StateMachine.
await _sendEndpointProvider.Send<InitiateAccountDeletion>(new(
AccountId: account.AccountId
));
await _dbContext.SaveChangesAsync();
return Accepted();
}
}
Update 1
It works by changing this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
to this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(DeleteTenant)}"));
答案1
得分: 0
使用默认的端点名称格式化程序,saga状态机将配置在名为DeleteAccountSaga
的接收端点上 - 这不是您发送消息的位置。
我建议增加日志级别并查看日志,以了解每个端点的配置以及消息发送到何处。很明显,您为该消息类型配置的端点约定不是正确的目标。
这也是为什么“发布(Publish)”有效的原因,因为MassTransit知道如何将消息正确路由到saga。
英文:
With the default endpoint name formatter, the saga state machine will be configured on a receive endpoint named DeleteAccountSaga
- which is not where you are sending the message.
I'd suggest increasing the log level and reviewing the logs to see how each endpoint is configured and where messages are being sent. It's clear that the endpoint convention you're configuring for that message type is not the correct destination.
> It's also why Publish works since MassTransit knows how to properly route the message to the saga.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论