Masstransit EntityFramework Saga 管理

huangapple go评论83阅读模式

Masstransit EntityFramework Saga management


我一直在尝试使用Devart Postgres EFCore库,因为这是我工作的公司使用的库。从我所了解的情况来看,NpgSql库与Mass Transit一起运行得很好。

但是,我几乎立刻遇到了与Mass Transit和Devart相关的问题。

Devart.Data.PostgreSql.PgSqlException (0x80004005): 列 "p0" 不存在



  1. var queryable = dbContext.Set<MyEntity>().FromSqlRaw("SELECT * FROM \"my_entity\" WHERE \"correlation_id\" = :p0 FOR UPDATE", Guid.NewGuid());




  1. public class DevArtPostgresLockStatementFormatter : ILockStatementFormatter
  2. {
  3. public void Create(StringBuilder sb, string schema, string table)
  4. {
  5. sb.AppendFormat("SELECT * FROM {0} WHERE ", FormatTableName(schema, table));
  6. }
  7. public void AppendColumn(StringBuilder sb, int index, string columnName)
  8. {
  9. if (index == 0)
  10. sb.AppendFormat("\"{0}\" = :p0", columnName);
  11. else
  12. sb.AppendFormat(" AND \"{0}\" = :p{1}", columnName, index);
  13. }
  14. public void Complete(StringBuilder sb)
  15. {
  16. sb.Append(" FOR UPDATE");
  17. }
  18. public void CreateOutboxStatement(StringBuilder sb, string schema, string table, string columnName)
  19. {
  20. sb.AppendFormat(@"SELECT * FROM {0} ORDER BY ""{1}"" LIMIT 1 FOR UPDATE SKIP LOCKED", FormatTableName(schema, table), columnName);
  21. }
  22. static string FormatTableName(string schema, string table)
  23. {
  24. return string.IsNullOrEmpty(schema) ? $"\"{table}\"" : $"\"{schema}\".\"{table}\"";
  25. }
  26. }


  1. x.AddSagaRepository<MyStateMachine>()
  2. .EntityFrameworkRepository(r =>
  3. {
  4. r.LockStatementProvider = new DevArtPostgresLockStatementProvider();
  5. r.ExistingDbContext<MyDbContext>();
  6. });






I have been trying to use the Devart Postgres EFCore library as this is used at the company I work at. The NpgSql lib works fine with Mass Transit from what I can tell.

However I almost immediately ran into a problem with Mass Transit and Devart.

Devart.Data.PostgreSql.PgSqlException (0x80004005): column &quot;p0&quot; does not exist

I tracked this down to the PostgresLockStatementFormatter.AppendColumn method.

Masstransit uses this to help build a sql statement which is executed as RawSql by the DbContext. This is basically the statement:

  1. var queryable = dbContext.Set&lt;MyEntity&gt;().FromSqlRaw(&quot; SELECT * FROM \&quot;my_entity\&quot; WHERE \&quot;correlation_id\&quot; = :p0 FOR UPDATE&quot;, Guid.NewGuid());

It seems that the Devart libs don't like the use of parameters named @p0

The correct format for Devart is :p0

My approach was to replace the LockStatmentFormatter method with my own implementation specific to Devart:

  1. public class DevArtPostgresLockStatementFormatter : ILockStatementFormatter
  2. {
  3. public void Create(StringBuilder sb, string schema, string table)
  4. {
  5. sb.AppendFormat(&quot;SELECT * FROM {0} WHERE &quot;, FormatTableName(schema, table));
  6. }
  7. public void AppendColumn(StringBuilder sb, int index, string columnName)
  8. {
  9. if (index == 0)
  10. sb.AppendFormat(&quot;\&quot;{0}\&quot; = :p0&quot;, columnName);
  11. else
  12. sb.AppendFormat(&quot; AND \&quot;{0}\&quot; = :p{1}&quot;, columnName, index);
  13. }
  14. public void Complete(StringBuilder sb)
  15. {
  16. sb.Append(&quot; FOR UPDATE&quot;);
  17. }
  18. public void CreateOutboxStatement(StringBuilder sb, string schema, string table, string columnName)
  19. {
  20. sb.AppendFormat(@&quot;SELECT * FROM {0} ORDER BY &quot;&quot;{1}&quot;&quot; LIMIT 1 FOR UPDATE SKIP LOCKED&quot;, FormatTableName(schema, table), columnName);
  21. }
  22. static string FormatTableName(string schema, string table)
  23. {
  24. return string.IsNullOrEmpty(schema) ? $&quot;\&quot;{table}\&quot;&quot; : $&quot;\&quot;{schema}\&quot;.\&quot;{table}\&quot;&quot;;
  25. }
  26. }

This has to be placed in the service config section:

  1. x.AddSagaRepository&lt;MyStateMachine&gt;()
  2. .EntityFrameworkRepository(r =&gt;
  3. {
  4. r.LockStatementProvider = new DevArtPostgresLockStatementProvider();
  5. r.ExistingDbContext&lt;MyDbContext&gt;();
  6. });

So this seems to fix my issue. I wonder is there anything else I should be aware of where it's also used.

This was my only real find in the durable Outbox feature I think?

I can test this but thought it was worth asking the question incase anyone has similar issues. Or perhaps it can help someone.



得分: 1



As you have found by searching the codebase, the lock statement provider is used by both the saga repository and transactional outbox. It's the only dialect-specific component, and the right place to tweak the SQL for different providers.

  • 本文由 发表于 2023年7月4日 20:38:50
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
