Masstransit EntityFramework Saga 管理

huangapple go评论57阅读模式
英文:

Masstransit EntityFramework Saga management

问题

我一直在尝试使用Devart Postgres EFCore库,因为这是我工作的公司使用的库。从我所了解的情况来看,NpgSql库与Mass Transit一起运行得很好。

但是,我几乎立刻遇到了与Mass Transit和Devart相关的问题。

Devart.Data.PostgreSql.PgSqlException (0x80004005): 列 "p0" 不存在

我将此问题追踪到了PostgresLockStatementFormatterAppendColumn方法。

Masstransit使用这个方法来帮助构建一个由DbContext执行的RawSql语句。基本上,这是这个语句:

var queryable = dbContext.Set<MyEntity>().FromSqlRaw("SELECT * FROM \"my_entity\" WHERE \"correlation_id\" = :p0 FOR UPDATE", Guid.NewGuid());

看起来Devart库不喜欢参数命名为@p0

Devart的正确格式是:p0

我的解决方法是替换LockStatmentFormatter方法,使用我自己针对Devart的特定实现:

public class DevArtPostgresLockStatementFormatter : ILockStatementFormatter
{
    public void Create(StringBuilder sb, string schema, string table)
    {
        sb.AppendFormat("SELECT * FROM {0} WHERE ", FormatTableName(schema, table));
    }

    public void AppendColumn(StringBuilder sb, int index, string columnName)
    {
        if (index == 0)
            sb.AppendFormat("\"{0}\" = :p0", columnName);
        else
            sb.AppendFormat(" AND \"{0}\" = :p{1}", columnName, index);
    }

    public void Complete(StringBuilder sb)
    {
        sb.Append(" FOR UPDATE");
    }

    public void CreateOutboxStatement(StringBuilder sb, string schema, string table, string columnName)
    {
        sb.AppendFormat(@"SELECT * FROM {0} ORDER BY ""{1}"" LIMIT 1 FOR UPDATE SKIP LOCKED", FormatTableName(schema, table), columnName);
    }

    static string FormatTableName(string schema, string table)
    {
        return string.IsNullOrEmpty(schema) ? $"\"{table}\"" : $"\"{schema}\".\"{table}\"";
    }
}

这必须放在服务配置部分:

x.AddSagaRepository<MyStateMachine>()
      .EntityFrameworkRepository(r =>
      {                    
           r.LockStatementProvider = new DevArtPostgresLockStatementProvider();
           r.ExistingDbContext<MyDbContext>();
      });

这似乎解决了我的问题。我想知道是否还有其他地方也在使用类似的情况,我应该注意些什么。

在持久性Outbox功能中,我认为这是我唯一的发现?

https://github.com/MassTransit/MassTransit/blob/develop/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkOutboxContextFactory.cs

我可以测试这个,但认为这个问题值得提出来,以防有人遇到类似的问题。或者也许它可以帮助别人。

谢谢。

英文:

I have been trying to use the Devart Postgres EFCore library as this is used at the company I work at. The NpgSql lib works fine with Mass Transit from what I can tell.

However I almost immediately ran into a problem with Mass Transit and Devart.

Devart.Data.PostgreSql.PgSqlException (0x80004005): column &quot;p0&quot; does not exist

I tracked this down to the PostgresLockStatementFormatter.AppendColumn method.

https://github.com/MassTransit/MassTransit/blob/develop/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/PostgresLockStatementFormatter.cs

Masstransit uses this to help build a sql statement which is executed as RawSql by the DbContext. This is basically the statement:

var queryable = dbContext.Set&lt;MyEntity&gt;().FromSqlRaw(&quot; SELECT * FROM \&quot;my_entity\&quot; WHERE \&quot;correlation_id\&quot; = :p0 FOR UPDATE&quot;, Guid.NewGuid());

It seems that the Devart libs don't like the use of parameters named @p0

The correct format for Devart is :p0

My approach was to replace the LockStatmentFormatter method with my own implementation specific to Devart:

public class DevArtPostgresLockStatementFormatter : ILockStatementFormatter
{
    public void Create(StringBuilder sb, string schema, string table)
    {
        sb.AppendFormat(&quot;SELECT * FROM {0} WHERE &quot;, FormatTableName(schema, table));
    }

    public void AppendColumn(StringBuilder sb, int index, string columnName)
    {
        if (index == 0)
            sb.AppendFormat(&quot;\&quot;{0}\&quot; = :p0&quot;, columnName);
        else
            sb.AppendFormat(&quot; AND \&quot;{0}\&quot; = :p{1}&quot;, columnName, index);
    }

    public void Complete(StringBuilder sb)
    {
        sb.Append(&quot; FOR UPDATE&quot;);
    }

    public void CreateOutboxStatement(StringBuilder sb, string schema, string table, string columnName)
    {
        sb.AppendFormat(@&quot;SELECT * FROM {0} ORDER BY &quot;&quot;{1}&quot;&quot; LIMIT 1 FOR UPDATE SKIP LOCKED&quot;, FormatTableName(schema, table), columnName);
    }

    static string FormatTableName(string schema, string table)
    {
        return string.IsNullOrEmpty(schema) ? $&quot;\&quot;{table}\&quot;&quot; : $&quot;\&quot;{schema}\&quot;.\&quot;{table}\&quot;&quot;;
    }
}

This has to be placed in the service config section:

 x.AddSagaRepository&lt;MyStateMachine&gt;()
      .EntityFrameworkRepository(r =&gt;
      {                    
           r.LockStatementProvider = new DevArtPostgresLockStatementProvider();
           r.ExistingDbContext&lt;MyDbContext&gt;();
      });

So this seems to fix my issue. I wonder is there anything else I should be aware of where it's also used.

This was my only real find in the durable Outbox feature I think?

https://github.com/MassTransit/MassTransit/blob/develop/src/Persistence/MassTransit.EntityFrameworkCoreIntegration/EntityFrameworkCoreIntegration/EntityFrameworkOutboxContextFactory.cs

I can test this but thought it was worth asking the question incase anyone has similar issues. Or perhaps it can help someone.

Thanks

答案1

得分: 1

锁定语句提供程序用于工作流存储库和事务发件箱,正如您通过搜索代码库找到的那样。它是唯一的方言特定组件,也是调整不同提供程序的SQL的正确位置。

英文:

As you have found by searching the codebase, the lock statement provider is used by both the saga repository and transactional outbox. It's the only dialect-specific component, and the right place to tweak the SQL for different providers.

huangapple
  • 本文由 发表于 2023年7月4日 20:38:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76612733.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定