MassTransit 状态机未初始化

huangapple go评论63阅读模式
英文:

MassTransit StateMachine not initiated

问题

The corrected translation of your update is as follows:

更新 1
将这个 EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}")); 更改为这个 EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(DeleteTenant)}")); 即可解决问题。

英文:

I'm trying to configure a MassTransit StateMachine saga that should be triggered when a message is sent, but sending the message does not initiate the saga; the message remains in the queue. If I publish the message, it works, but I think that for this use-case, sending the message is the right approach.

What am I missing here?

I have the strong feeling that the Consumer Endpoint for the InitiateAccountDeletion queue is not being configured, but I would expect the ConfigureEndpoints() method to do so.

public class DeleteAccountSaga : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string? CurrentState { get; set; }
    public int FilesState { get; set; }
}

public class DeleteAccountStateMachine : MassTransitStateMachine<DeleteAccountSaga>
{
	// Files are being deleted.
    public State DeletingFiles { get; }
	// Files were deleted.
    public State FilesDeletionCompleted { get; }
	// The account and related data is being deleted from the database.
    public State DeletingAccount { get; }
	// The account and related data have been deleted from the database.
    public State AccountDeletionCompleted { get; }
	
	// This message should initiate the saga.
    public Event<InitiateAccountDeletion> InitiateAccountDeletion { get; }
	// This event indicates that files from a specific server (T1) were deleted.
    public Event<AccountFilesT1Deleted> AccountFilesT1Deleted { get; }
	// This event indicates that files from a specific server (T2) were deleted.
    public Event<AccountFilesT2Deleted> AccountFilesT2Deleted { get; }
	// Composite event that indicates that files from all servers were deleted.
    public Event AccountFilesDeleted { get; }
	// This event indicates that the database records were deleted.
    public Event<AccountDeleted> AccountDeleted { get; }

    public DeleteAccountStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => InitiateAccountDeletion, x =>
        {
            x.InsertOnInitial = true;

            x.CorrelateById(context => context.Message.AccountId);
            x.SetSagaFactory(context => new()
            {
                CorrelationId = context.Message.AccountId
            });
        });

        Event(() => AccountFilesT1Deleted, x => x.CorrelateById(context => context.Message.AccountId));
        Event(() => AccountFilesT2Deleted, x => x.CorrelateById(context => context.Message.AccountId));
        Event(() => AccountDeleted, x => x.CorrelateById(context => context.Message.AccountId));
		
		// The saga should be started when the InitiateAccountDeletion message is sent.
		// After, it should send DeleteFilesT1 and DeleteFilesT2 messages so that files from corresponsing servers are deleted.
        Initially(
            When(InitiateAccountDeletion)
                .Send(context => new DeleteFilesT1(context.Message.AccountId))
                .Send(context => new DeleteFilesT2(context.Message.AccountId))
                .TransitionTo(DeletingFiles)
        );
		
		// Indicate that all files were deleted.
        CompositeEvent(() => FilesDeleted, x => x.FilesState,
            AccountFilesT1Deleted,
            AccountFilesT2Deleted
        );
		
		// When all files have been deleted, send DeleteAccount message which will delete all data from the database.
        During(DeletingFiles,
            Ignore(InitiateAccountDeletion),
            When(FilesDeleted)
                .TransitionTo(FilesDeletionCompleted)
                .Send(context => new DeleteAccount(context.Saga.CorrelationId))
        );
		
        During(FilesDeletionCompleted,
            When(AccountDeleted)
                .TransitionTo(AccountDeletionCompleted)
                .Finalize()
        );

        SetCompletedWhenFinalized();
    }
}

public class Startup
{
	public void ConfigureServices(IServiceCollection services)
	{
		// [...]
		
		EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}"));
		EndpointConvention.Map<DeleteFilesT1>(new Uri($"queue:{nameof(DeleteFilesT1)}"));
        EndpointConvention.Map<DeleteFilesT2>(new Uri($"queue:{nameof(DeleteFilesT2)}"));
		EndpointConvention.Map<DeleteAccount>(new Uri($"queue:{nameof(DeleteAccount)}"));

        services.AddMassTransit(x =>
        {
            x.AddEntityFrameworkOutbox<AccountDbContext>(options =>
            {
                options.UsePostgres();
                options.UseBusOutbox();
            });

			// This registers:
			// - DeleteFilesT1Consumer
			// - DeleteFilesT2Consumer
			// - DeleteAccountConsumer
            x.AddConsumers(typeof(Startup).Assembly);

            x.AddSagaStateMachine<DeleteAccountStateMachine, DeleteAccountSaga>()
                .EntityFrameworkRepository(options =>
                {
                    options.ExistingDbContext<AccountDbContext>();
                    options.UsePostgres();
                });
			
			x.UsingAmazonSqs((context, cfg) =>
			{
				cfg.Host(configuration["AWS:Region"], (_) => { });
				cfg.ConfigureEndpoints(context);
			});
        });
    }
}

public class AccountController : ControllerBase
{
	[HttpDelete("{accountId}")]
    public async Task<IActionResult> Delete(Guid accountId)
    {
        var account = await _dbContext.Account.FindAsync(accountId);

        if (account == null)
            return NotFound("...");

        account.Status = "Deleting";
		
		// This message remains in the InitiateAccountDeletion queue. It doesn't get processed by the StateMachine.
        await _sendEndpointProvider.Send<InitiateAccountDeletion>(new(
            AccountId: account.AccountId
        ));

        await _dbContext.SaveChangesAsync();

        return Accepted();
    }
}

Update 1

It works by changing this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(InitiateAccountDeletion)}")); to this EndpointConvention.Map<InitiateAccountDeletion>(new Uri($"queue:{nameof(DeleteTenant)}"));

答案1

得分: 0

使用默认的端点名称格式化程序,saga状态机将配置在名为DeleteAccountSaga的接收端点上 - 这不是您发送消息的位置。

我建议增加日志级别并查看日志,以了解每个端点的配置以及消息发送到何处。很明显,您为该消息类型配置的端点约定不是正确的目标。

这也是为什么“发布(Publish)”有效的原因,因为MassTransit知道如何将消息正确路由到saga。

英文:

With the default endpoint name formatter, the saga state machine will be configured on a receive endpoint named DeleteAccountSaga - which is not where you are sending the message.

I'd suggest increasing the log level and reviewing the logs to see how each endpoint is configured and where messages are being sent. It's clear that the endpoint convention you're configuring for that message type is not the correct destination.

> It's also why Publish works since MassTransit knows how to properly route the message to the saga.

huangapple
  • 本文由 发表于 2023年5月10日 23:40:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/76220335.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定