Masstransit: TransitionTo 不会在 SQLServer 中更新 Saga 状态(EntityFrameworkCoreSaga)。

huangapple go评论67阅读模式
英文:

Masstransit: TransitionTo doesn't update the Saga state in SQLServer (EntityFrameworkCoreSaga)

问题

我最近开始调查如何将 MassTransit 添加到我的项目中,并在此过程中遇到了这个问题。我不确定自己是否做得对(我是 MassTransit 新手),所以任何带有解释的评论都将很棒!

基本上,我有一个具有端点的 REST API(在被触发时)需要在单独的服务上触发一系列事件链(这些服务之间使用 RMQ 进行通信)。我决定使用具有持久状态的 Saga(EnitityFrameworkCore Saga)来实现,并且这是我的实现:

REST API 端点:

[HttpPost]
[SwaggerOperation(Summary = MethodDescriptions.StartExport)]
[SwaggerResponse(StatusCodes.Status200OK, type: typeof(ResponseModel))]
[SwaggerResponse(StatusCodes.Status400BadRequest, type: typeof(ResponseModel))]
[SwaggerResponse(StatusCodes.Status500InternalServerError, type: typeof(ResponseModel))]
//[ServiceFilter(typeof(ValidationFilterAttribute))]
[Route("start-export")]
public async Task<IActionResult> ExportProcessInitializeEvent([FromBody] ExportCommonRequest request)
{
    try
    {
        if (!request.IsRequestResponsePattern)
        {
            await _publishEndpoint.Publish<ExportProcessInitializeEvent>(new { request.ExportId });
        }

        return StatusCode(StatusCodes.Status200OK, new ResponseModel
        {
            Content = new NoContentResult(),
            StatusCode = StatusCodes.Status200OK,
            Message = string.Empty
        });
    }
    catch(Exception ex)
    {
        _logger.LogError(ex, $"Something went wrong while trying to start export (ExportId: {request.ExportId}).");
        return StatusCode(StatusCodes.Status500InternalServerError, new ResponseModel
        {
            Content = new NoContentResult(),
            StatusCode = StatusCodes.Status200OK,
            Message = ex.Message
        });
    }
}

它将事件发布到 RMQ,稍后在独立服务上被消耗。(注意:我不等待这些服务的响应,因为调用者将在事件链的最后一步收到有关导出状态的通知。

REST API MassTransit 设置:

// 添加 MassTransit 配置
public static void ConfigureMassTransit(this IServiceCollection services, IConfiguration configuration)
{
    //...(这部分内容较长,包括了对 RabbitMQ 的配置,以及各个消费者和请求客户端的添加)
}

Saga MassTransit 设置:

// 导出状态
public class ExportState : SagaStateMachineInstance
{
    //...(包括了导出状态的各种属性)
}

// DbContext
public class ExportStateMap : SagaClassMap<ExportState>
{
    //...(设置了导出状态的数据库映射)
}

public class ExportStateDbContext : SagaDbContext
{
    //...(SagaDbContext 的设置)
}

// Program.cs 配置
Host.CreateDefaultBuilder(args)
    .ConfigureAppConfiguration(builder =>
    {
        //...(这部分设置了环境变量和 JSON 文件的配置)
    })
    .ConfigureServices((hostContext, services) =>
    {
        //...(这部分设置了 MassTransit 的 SagaStateMachine 以及 RabbitMQ 的配置)
    })
    .Build()
    .Run();

// 导出状态机
public class ExportStateMachine : MassTransitStateMachine<ExportState>
{
    //...(这部分包括了各个事件、状态以及它们之间的流程定义)
}

// 消费者(包括了一系列消费者的定义)

如你所见,这是一个简单的 Console.WriteLine 链,当向 REST API 发送 POST 请求时,链被正确执行(按正确的顺序)。

但是,应该代表 Saga 当前状态的数据库记录并未发生变化。

看起来 TransitionTo(可能是所有 During,除了初始状态)根本没有被执行?我是不是在某种情况下对它们进行了覆盖?

如果有人能够解释我在这里做错了什么,那将会很棒。

英文:

I recently started investigating how to add MassTransit to my project, and I've stumbled on this issue along the way. I'm not sure that I'm doing all of this right (I'm new to MassTransit) so any comment with the explanation would be awesome!

Basically I have a REST API with the endpoint that (when hit) needs to trigger some chain of events on separate services (that are communicating between each other using RMQ). I've decided to implement Saga with persistent state (EnitityFrameworkCore Saga) and here's what I did:

REST API endpoint:

[HttpPost]
    [SwaggerOperation(Summary = MethodDescriptions.StartExport)]
    [SwaggerResponse(StatusCodes.Status200OK, type: typeof(ResponseModel))]
    [SwaggerResponse(StatusCodes.Status400BadRequest, type: typeof(ResponseModel))]
    [SwaggerResponse(StatusCodes.Status500InternalServerError, type: typeof(ResponseModel))]
    //[ServiceFilter(typeof(ValidationFilterAttribute))]
    [Route(&quot;start-export&quot;)]
    public async Task&lt;IActionResult&gt; ExportProcessInitializeEvent([FromBody] ExportCommonRequest request)
    {
        try
        {
            if (!request.IsRequestResponsePattern)
            {
                await _publishEndpoint.Publish&lt;ExportProcessInitializeEvent&gt;(new { request.ExportId });
            }

            return StatusCode(StatusCodes.Status200OK, new ResponseModel
            {
                Content = new NoContentResult(),
                StatusCode = StatusCodes.Status200OK,
                Message = string.Empty
            });
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, $&quot;Something went wrong while trying to start export (ExportId: {request.ExportId}).&quot;);
            return StatusCode(StatusCodes.Status500InternalServerError, new ResponseModel
            {
                Content = new NoContentResult(),
                StatusCode = StatusCodes.Status200OK,
                Message = ex.Message
            });
        }
    }

it publishes event to RMQ that's later on consumed on separate service. (Notice: I'm not waitig for response from those services because the caller will be notified about the export status as a last step of the event chain)

REST API MassTransit setup:

public static void ConfigureMassTransit(this IServiceCollection services, IConfiguration configuration)
    {
        var messageBrokerQueueSettings = configuration.GetSection(&quot;MessageBroker:QueueSettings&quot;).Get&lt;MessageBrokerQueueSettings&gt;();

        services.AddMassTransit(x =&gt;
        {
            x.UsingRabbitMq((context, cfg) =&gt;
            {
                cfg.Host(messageBrokerQueueSettings.HostName, messageBrokerQueueSettings.VirtualHost, h =&gt; {
                    h.Username(messageBrokerQueueSettings.UserName);
                    h.Password(messageBrokerQueueSettings.Password);
                });

                cfg.ConfigureEndpoints(context);
            });

            x.AddConsumer&lt;ExportProcessInitializeEventConsumer&gt;();
            x.AddRequestClient&lt;ExportProcessInitializeEventConsumer&gt;();
            x.AddConsumer&lt;ExportProcessInitializeFaultEventConsumer&gt;();
            x.AddRequestClient&lt;ExportProcessInitializeFaultEventConsumer&gt;();

            x.AddConsumer&lt;DownloadMHTMLSEventConsumer&gt;();
            x.AddRequestClient&lt;DownloadMHTMLSEventConsumer&gt; ();
            x.AddConsumer&lt;DownloadMHTMLSFaultEventConsumer&gt;();
            x.AddRequestClient&lt;DownloadMHTMLSFaultEventConsumer&gt;();

            x.AddConsumer&lt;TakeScreenshotsEventConsumer&gt;();
            x.AddRequestClient&lt;TakeScreenshotsEventConsumer&gt;();
            x.AddConsumer&lt;TakeScreenshotsFaultEventConsumer&gt;();
            x.AddRequestClient&lt;TakeScreenshotsFaultEventConsumer&gt;();


            x.AddConsumer&lt;CreatePPTsEventConsumer&gt;();
            x.AddRequestClient&lt;CreatePPTsEventConsumer&gt;();
            x.AddConsumer&lt;CreatePPTsFaultEventConsumer&gt;();
            x.AddRequestClient&lt;CreatePPTsFaultEventConsumer&gt;();

            x.AddConsumer&lt;CreateResultZIPFileEventConsumer&gt;();
            x.AddRequestClient&lt;CreateResultZIPFileEventConsumer&gt;();
            x.AddConsumer&lt;CreateResultZIPFileFaultEventConsumer&gt;();
            x.AddRequestClient&lt;CreateResultZIPFileFaultEventConsumer&gt;();

            x.AddConsumer&lt;NotifyWebsiteInstanceEventConsumer&gt;();
            x.AddRequestClient&lt;NotifyWebsiteInstanceEventConsumer&gt;();
            x.AddConsumer&lt;NotifyWebsiteInstanceFaultEventConsumer&gt;();
            x.AddRequestClient&lt;NotifyWebsiteInstanceFaultEventConsumer&gt;();

            x.AddConsumer&lt;ExportProcessFinalizedEventConsumer&gt;();
            x.AddRequestClient&lt;ExportProcessFinalizedEventConsumer&gt;();
        });
    }

SAGA MassTransit setup:

public class ExportState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public DateTime ExportStartDate { get; set; }
    public DateTime? ExportEndDate { get; set; }
    public byte[] RowVersion { get; set; }
}

DbContext:

public class ExportStateMap : SagaClassMap&lt;ExportState&gt;
{
    protected override void Configure(EntityTypeBuilder&lt;ExportState&gt; entity, ModelBuilder model)
    {
        entity.Property(x =&gt; x.CurrentState).HasMaxLength(64);
        entity.Property(x =&gt; x.ExportStartDate);
        entity.Property(x =&gt; x.ExportEndDate);
        entity.Property(x =&gt; x.RowVersion).IsRowVersion();
    }
}

public class ExportStateDbContext : SagaDbContext
{
    public ExportStateDbContext(DbContextOptions options)
    : base(options)
    {
    }

    protected override IEnumerable&lt;ISagaClassMap&gt; Configurations
    {
        get { yield return new ExportStateMap(); }
    }
}

Program.cs

Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration(builder =&gt;
{
    var configurationBuilder = new ConfigurationBuilder();
    var configuration = configurationBuilder.AddEnvironmentVariables().AddJsonFile(&quot;appsettings.json&quot;)
        .AddJsonFile($&quot;appsettings.{Environment.GetEnvironmentVariable(&quot;ASPNETCORE_ENVIRONMENT&quot;)}.json&quot;)
        .Build();

    builder.Sources.Clear();
    builder.AddConfiguration(configuration);
})
.ConfigureServices((hostContext, services) =&gt;
{
    var messageBrokerQueueSettings = hostContext.Configuration.GetSection(&quot;MessageBroker:QueueSettings&quot;).Get&lt;MessageBrokerQueueSettings&gt;();
    var messageBrokerPersistenceSettings = hostContext.Configuration.GetSection(&quot;MessageBroker:StateMachinePersistence&quot;).Get&lt;MessageBrokerPersistenceSettings&gt;();

    services.AddMassTransit(x =&gt;
    {
        x.AddSagaStateMachine&lt;ExportStateMachine, ExportState&gt;().EntityFrameworkRepository(r =&gt;
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic;

            r.AddDbContext&lt;DbContext, ExportStateDbContext&gt;((provider, builder) =&gt;
            {
                builder.UseSqlServer(messageBrokerPersistenceSettings.ConnectionString, m =&gt;
                {
                    m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
                    m.MigrationsHistoryTable($&quot;__{nameof(ExportStateDbContext)}&quot;);
                });
            });
        });

        x.UsingRabbitMq((context, cfg) =&gt;
        {
            cfg.Host(messageBrokerQueueSettings.HostName, messageBrokerQueueSettings.VirtualHost, h =&gt;
            {
                h.Username(messageBrokerQueueSettings.UserName);
                h.Password(messageBrokerQueueSettings.Password);
            });

            cfg.ConfigureEndpoints(context);
        });
    });
})
.Build()
.Run();

ExportStateMachine:

public class ExportStateMachine : MassTransitStateMachine&lt;ExportState&gt;
{
    public ExportStateMachine()
    {
        #region Events Definitions

        Event(() =&gt; ExportProcessInitializeEvent);
        Event(() =&gt; ExportProcessInitializeFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; DownloadMHTMLsEvent);
        Event(() =&gt; DownloadMHTMLsFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; TakeScreenshotsEvent);
        Event(() =&gt; TakeScreenshotsFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; CreatePPTsEvent);
        Event(() =&gt; CreatePPTsFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; CreateResultZIPFileEvent);
        Event(() =&gt; CreateResultZIPFileFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; NotifyWebsiteInstanceEvent);
        Event(() =&gt; NotifyWebsiteInstanceFaultEvent,
            x =&gt; x.CorrelateById(context =&gt; context.InitiatorId ?? context.Message.Message.ExportId));

        Event(() =&gt; ExportProcessFinalizedEvent);

        #endregion

        InstanceState(x =&gt; x.CurrentState);

        #region Flow

        During(Initial,
            When(ExportProcessInitializeEvent)
                .Then(x =&gt; x.Saga.ExportStartDate = DateTime.UtcNow)
                .TransitionTo(ExportProcessInitializedState));

        During(ExportProcessInitializedState,
            When(DownloadMHTMLsEvent)
                .TransitionTo(DownloadMHTMLsState));

        During(DownloadMHTMLsState,
            When(TakeScreenshotsEvent)
                .TransitionTo(TakeScreenshotsState));

        During(TakeScreenshotsState,
            When(CreatePPTsEvent)
                .TransitionTo(CreatePPTsState));

        During(CreatePPTsState,
            When(CreateResultZIPFileEvent)
                .TransitionTo(CreateResultZIPFileState));

        During(CreateResultZIPFileState,
            When(NotifyWebsiteInstanceEvent)
                .TransitionTo(NotifyWebsiteInstanceState));

        #endregion

        #region Fault-Companse State

        // if the export fails on any of the steps that are in the chain, it automatically should default to last event and that&#39;s notifying MW instance that export failed.

        DuringAny(When(ExportProcessInitializeFaultEvent)
            .TransitionTo(ExportProcessInitializedFaultedState)
            .Then(context =&gt; context.Publish&lt;NotifyWebsiteInstanceEvent&gt;(new { context.Message })));

        DuringAny(When(DownloadMHTMLsFaultEvent)
            .TransitionTo(DownloadMHTMLsFaultedState)
            .Then(context =&gt; context.Publish&lt;NotifyWebsiteInstanceEvent&gt;(new { context.Message })));

        DuringAny(When(TakeScreenshotsFaultEvent)
            .TransitionTo(TakeScreenshotsFaultedState)
            .Then(context =&gt; context.Publish&lt;NotifyWebsiteInstanceEvent&gt;(new { context.Message })));

        DuringAny(When(CreatePPTsFaultEvent)
            .TransitionTo(CreatePPTsFaultedState)
            .Then(context =&gt; context.Publish&lt;NotifyWebsiteInstanceEvent&gt;(new { context.Message })));

        DuringAny(When(CreateResultZIPFileFaultEvent)
            .TransitionTo(CreateResultZIPFileFaultedState)
            .Then(context =&gt; context.Publish&lt;NotifyWebsiteInstanceEvent&gt;(new { context.Message })));

        DuringAny(When(ExportProcessFinalizedEvent)
                .Then(x =&gt; x.Saga.ExportEndDate = DateTime.UtcNow)
                .TransitionTo(ExportProcessFinalizedState));
        #endregion
    }

    #region Events

    public Event&lt;ExportProcessInitializeEvent&gt; ExportProcessInitializeEvent { get; }
    public Event&lt;Fault&lt;ExportProcessInitializeEvent&gt;&gt; ExportProcessInitializeFaultEvent { get; }

    public Event&lt;DownloadMHTMLsEvent&gt; DownloadMHTMLsEvent { get; }
    public Event&lt;Fault&lt;DownloadMHTMLsEvent&gt;&gt; DownloadMHTMLsFaultEvent { get; }

    public Event&lt;TakeScreenshotsEvent&gt; TakeScreenshotsEvent { get; }
    public Event&lt;Fault&lt;TakeScreenshotsEvent&gt;&gt; TakeScreenshotsFaultEvent { get; }

    public Event&lt;CreatePPTsEvent&gt; CreatePPTsEvent { get; }
    public Event&lt;Fault&lt;CreatePPTsEvent&gt;&gt; CreatePPTsFaultEvent { get; }

    public Event&lt;CreateResultZIPFileEvent&gt; CreateResultZIPFileEvent { get; }
    public Event&lt;Fault&lt;CreateResultZIPFileEvent&gt;&gt; CreateResultZIPFileFaultEvent { get; }

    public Event&lt;NotifyWebsiteInstanceEvent&gt; NotifyWebsiteInstanceEvent { get; }
    public Event&lt;Fault&lt;NotifyWebsiteInstanceEvent&gt;&gt; NotifyWebsiteInstanceFaultEvent { get; }

    public Event&lt;ExportProcessFinalizedEvent&gt; ExportProcessFinalizedEvent { get; }
    #endregion

    #region States

    public State ExportProcessInitializedState { get; private set; }
    public State ExportProcessInitializedFaultedState { get; private set; }

    public State DownloadMHTMLsState { get; private set; }
    public State DownloadMHTMLsFaultedState { get; private set; }

    public State TakeScreenshotsState { get; private set; }
    public State TakeScreenshotsFaultedState { get; private set; }

    public State CreatePPTsState { get; private set; }
    public State CreatePPTsFaultedState { get; private set; }

    public State CreateResultZIPFileState { get; private set; }
    public State CreateResultZIPFileFaultedState { get; private set; }

    public State NotifyWebsiteInstanceState { get; private set; }
    public State NotifyWebsiteInstanceFaultedState { get; private set; }

    public State ExportProcessFinalizedState { get; private set; }

    #endregion
}

Some consumers:

public class ExportProcessInitializeEventConsumer : ConsumerBase&lt;ExportProcessInitializeEvent&gt;
{
    protected override async Task ConsumeInternal(ConsumeContext&lt;ExportProcessInitializeEvent&gt; context)
    {
        Console.WriteLine($&quot;Export initialized {context.Message.ExportId}&quot;);

        await context.Publish&lt;DownloadMHTMLsEvent&gt;(new { context.Message.ExportId });
    }
}

public class DownloadMHTMLSEventConsumer : ConsumerBase&lt;DownloadMHTMLsEvent&gt;
{
    protected override async Task ConsumeInternal(ConsumeContext&lt;DownloadMHTMLsEvent&gt; context)
    {
        Console.WriteLine($&quot;All Export MHTML files downloaded successfully {context.Message.ExportId}&quot;);

        await context.Publish&lt;TakeScreenshotsEvent&gt;(new { context.Message.ExportId });
    }
}

.
.
.

public class NotifyWebsiteInstanceEventConsumer : ConsumerBase&lt;NotifyWebsiteInstanceEvent&gt;
    {
        protected override async Task ConsumeInternal(ConsumeContext&lt;NotifyWebsiteInstanceEvent&gt; context)
        {
            Console.WriteLine($&quot;Website instance is notified about export status {context.Message.ExportId}&quot;);

            await context.Publish&lt;ExportProcessFinalizedEvent&gt;(new { context.Message.ExportId });
        }
    }

// and the last one in the chain
public class ExportProcessFinalizedEventConsumer : ConsumerBase&lt;ExportProcessFinalizedEvent&gt;
    {
        protected override Task ConsumeInternal(ConsumeContext&lt;ExportProcessFinalizedEvent&gt; context)
        {
            Console.WriteLine($&quot;Export process finalized {context.Message.ExportId}&quot;);

            return Task.CompletedTask;
        }
    }

As you can see it's a simple chain of Console.WriteLines and when send a Post request to the REST API, the chain is executed correctly (in the right order):

Masstransit: TransitionTo 不会在 SQLServer 中更新 Saga 状态(EntityFrameworkCoreSaga)。

but the record in the database that should represent the current state of the Saga doesn't change:

Masstransit: TransitionTo 不会在 SQLServer 中更新 Saga 状态(EntityFrameworkCoreSaga)。

It seems like TransitionTos (possible all Durings except the initial one) aren't being executed at all? Am I overriding those somehow?

It would be great if someone could explain what I'm doing wrong here?

答案1

得分: 1

  1. 你需要从配置中移除每一个 AddRequestClient 调用。它们对你没有任何作用,因为你是按照消费者类型而不是实际消息类型来添加它们的。而且你根本没有配置请求客户端,所以可以将它去掉。MassTransit 已经内置了一个通用的请求客户端。

  2. TransitionTo 不会将任何东西保存到数据库。该saga仅在事件完成处理(即构成事件行为的所有状态机活动都已完成)后才会持久化。由于你没有提供你的状态机,我不太清楚它在做什么。状态应在saga消耗了事件后发生变化。

英文:
  1. You need to remove every single one of the AddRequestClient calls from your configuration. They aren't doing anything for you since you're adding them specifying a consumer type instead of an actual message type. And since you aren't actually configuring the request client at all, leave it out. MassTransit already has a built-in generic request client.

  2. TransitionTo doesn't save anything to the database. The saga is only persisted once the event is finished being processed (meaning all state machine activities that make up the event behavior have completed). Since you didn't post your state machine, I really have no idea what it is doing. The state should change once the event has been consumed by the saga.

huangapple
  • 本文由 发表于 2023年2月9日 03:41:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/75390964.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定