MassTransit.RequestFaultException:’message’ 请求发生故障:无法访问已释放的对象

huangapple go评论107阅读模式
英文:

MassTransit.RequestFaultException : The 'message' request faulted: Cannot access a disposed object

问题

I have two micro services with rabbitmq and masstransit v8 latest version that I send request from micro A to micro B and get data from database then response the result.
In application every thing work great but in integration test with inmemorytestharness I get this error MassTransit.RequestFaultException : The EventBus.Messages.Contracts.Services.Discounts.Currencies.Query.GetCurrencyByName.GetCurrencyByNameRequest request faulted: Cannot access a disposed object.

Message for request:

  1. public class GetCurrencyByNameRequest
  2. {
  3. public string Name { get; set; }
  4. }

Message for response:

  1. public class GetCurrencyByNameResponse
  2. {
  3. public string Id { get; set; }
  4. public string Name { get; set; }
  5. }

Consumer:

  1. public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
  2. {
  3. private readonly IUnitOfWork _uow;
  4. public GetCurrencyByNameConsumer(IUnitOfWork uow)
  5. {
  6. _uow = uow;
  7. }
  8. public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
  9. {
  10. var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
  11. var currency = await _uow.GenericRepository<Currency>()
  12. .GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
  13. if (currency == null)
  14. {
  15. await context.RespondAsync(new GetCurrencyByNameResponse
  16. {
  17. Id = string.Empty,
  18. Name = string.Empty
  19. });
  20. }
  21. else
  22. {
  23. await context.RespondAsync(new GetCurrencyByNameResponse
  24. {
  25. Id = currency.Id,
  26. Name = currency.CurrencyName
  27. });
  28. }
  29. }
  30. }

Masstransit configuration in micro A:

  1. services.AddMassTransit(x =>
  2. {
  3. var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
  4. .FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
  5. x.SetKebabCaseEndpointNameFormatter();
  6. x.AddConsumers(entryAssembly);
  7. x.UsingRabbitMq((context, cfg) =>
  8. {
  9. //cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
  10. cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
  11. {
  12. h.Username(configuration["RabbitMqSettings:Username"]);
  13. h.Password(configuration["RabbitMqSettings:Password"]);
  14. });
  15. cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
  16. cfg.UseMessageRetry(r => r.Immediate(5));
  17. cfg.UseInMemoryOutbox();
  18. cfg.ConfigureEndpoints(context);
  19. });
  20. });

Masstransit configuration in micro B:

  1. services.AddMassTransit(x =>
  2. {
  3. var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
  4. .FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
  5. x.SetKebabCaseEndpointNameFormatter();
  6. x.AddConsumers(entryAssembly);
  7. x.UsingRabbitMq((context, cfg) =>
  8. {
  9. // cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
  10. cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
  11. {
  12. h.Username(configuration["RabbitMqSettings:Username"]);
  13. h.Password(configuration["RabbitMqSettings:Password"]);
  14. });
  15. cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
  16. cfg.UseMessageRetry(r => r.Immediate(5));
  17. cfg.UseInMemoryOutbox();
  18. cfg.ConfigureEndpoints(context);
  19. });
  20. });

Masstransit inmemorytestharness configuration:

  1. public static InMemoryTestHarness TestHarness { get; set; } = default!;
  2. public static IBus TestHarnessBus { get; set; } = default!;
  3. public static async Task UseHarnessAsync()
  4. {
  5. await using var provider = new ServiceCollection()
  6. .AddScoped<IUnitOfWork, UnitOfWork>()
  7. .AddMassTransitInMemoryTestHarness(x =>
  8. {
  9. x.SetKebabCaseEndpointNameFormatter();
  10. x.AddConsumer<GetCurrencyByNameConsumer>();
  11. // .Endpoint(x=>x.Name = "get-currency-by-name-request");
  12. x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
  13. })
  14. .AddGenericRequestClient()
  15. .BuildServiceProvider(true);
  16. TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
  17. await TestHarness.Start().ConfigureAwait(false);
  18. TestHarnessBus = provider.GetRequiredService<IBus>();
  19. }

IntegrationTest:

  1. [Test]
  2. public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
  3. {
  4. IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
  5. await Task.Delay(20000);
  6. var req = new GetCurrencyByNameRequest
  7. {
  8. Name = "Dollar"
  9. };
  10. //error occurs in this line
  11. var response = await client.GetResponse<GetCurrencyByNameResponse>(req);
  12. (await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
  13. (await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();
  14. var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();
  15. (await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
  16. var command = new CreatePackageCommand
  17. {
  18. IsActive = true,
  19. Description = new Translation
  20. {
  21. Arabic = StringHelper.RandomString(10),
  22. English = StringHelper.RandomString(10),
  23. Persian = StringHelper.RandomString(10),
  24. },
  25. Sort = 1,
  26. Name = new Translation
  27. {
  28. Arabic = StringHelper.RandomString(10),
  29. English = StringHelper.RandomString(10),
  30. Persian = StringHelper.RandomString(10),
  31. },
  32. Price = 999,
  33. CurrencyName = "Dollar"
  34. };
  35. var packageId = await SendAsync(command);
  36. }
英文:

I have two micro services with rabbitmq and masstransit v8 latest version that I send request from micro A to micro B and get data from database then response the result.
In application every thing work great but in integration test with inmemorytestharness I get this error MassTransit.RequestFaultException : The EventBus.Messages.Contracts.Services.Discounts.Currencies.Query.GetCurrencyByName.GetCurrencyByNameRequest request faulted: Cannot access a disposed object.

Message for request:

  1. public class GetCurrencyByNameRequest
  2. {
  3. public string Name { get; set; }
  4. }

Message for response:

  1. public class GetCurrencyByNameResponse
  2. {
  3. public string Id { get; set; }
  4. public string Name { get; set; }
  5. }

Consumer:

  1. public class GetCurrencyByNameConsumer : IConsumer&lt;GetCurrencyByNameRequest&gt;
  2. {
  3. private readonly IUnitOfWork _uow;
  4. public GetCurrencyByNameConsumer(IUnitOfWork uow)
  5. {
  6. _uow = uow;
  7. }
  8. public async Task Consume(ConsumeContext&lt;GetCurrencyByNameRequest&gt; context)
  9. {
  10. var filter = Builders&lt;Currency&gt;.Filter.Eq(x =&gt; x.CurrencyName, new CurrencyName(context.Message.Name));
  11. var currency = await _uow.GenericRepository&lt;Currency&gt;()
  12. .GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
  13. if (currency == null)
  14. {
  15. await context.RespondAsync(new GetCurrencyByNameResponse
  16. {
  17. Id = string.Empty,
  18. Name = string.Empty
  19. });
  20. }
  21. else
  22. {
  23. await context.RespondAsync(new GetCurrencyByNameResponse
  24. {
  25. Id = currency.Id,
  26. Name = currency.CurrencyName
  27. });
  28. }
  29. }
  30. }

Masstransit configuration in micro A:

  1. services.AddMassTransit(x =&gt;
  2. {
  3. var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
  4. .FirstOrDefault(x =&gt; x.FullName.Contains(&quot;Payment.Application&quot;));
  5. x.SetKebabCaseEndpointNameFormatter();
  6. x.AddConsumers(entryAssembly);
  7. x.UsingRabbitMq((context, cfg) =&gt;
  8. {
  9. //cfg.Host(new Uri(configuration[&quot;RabbitMqSettings:Uri&quot;]));
  10. cfg.Host(configuration[&quot;RabbitMqSettings:Host&quot;], configuration[&quot;RabbitMqSettings:VirtualHost&quot;], h =&gt;
  11. {
  12. h.Username(configuration[&quot;RabbitMqSettings:Username&quot;]);
  13. h.Password(configuration[&quot;RabbitMqSettings:Password&quot;]);
  14. });
  15. cfg.UseDelayedRedelivery(r =&gt; r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
  16. cfg.UseMessageRetry(r =&gt; r.Immediate(5));
  17. cfg.UseInMemoryOutbox();
  18. cfg.ConfigureEndpoints(context);
  19. });
  20. });

Masstransit configuration in micro B

  1. services.AddMassTransit(x =&gt;
  2. {
  3. var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
  4. .FirstOrDefault(x =&gt; x.FullName.Contains(&quot;Discount.Application&quot;));
  5. x.SetKebabCaseEndpointNameFormatter();
  6. x.AddConsumers(entryAssembly);
  7. x.UsingRabbitMq((context, cfg) =&gt;
  8. {
  9. // cfg.Host(new Uri(configuration[&quot;RabbitMqSettings:Uri&quot;]));
  10. cfg.Host(configuration[&quot;RabbitMqSettings:Host&quot;], configuration[&quot;RabbitMqSettings:VirtualHost&quot;], h =&gt;
  11. {
  12. h.Username(configuration[&quot;RabbitMqSettings:Username&quot;]);
  13. h.Password(configuration[&quot;RabbitMqSettings:Password&quot;]);
  14. });
  15. cfg.UseDelayedRedelivery(r =&gt; r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
  16. cfg.UseMessageRetry(r =&gt; r.Immediate(5));
  17. cfg.UseInMemoryOutbox();
  18. cfg.ConfigureEndpoints(context);
  19. });
  20. });

Masstransit inmemorytestharness configuration

  1. public static InMemoryTestHarness TestHarness { get; set; } = default!;
  2. public static IBus TestHarnessBus { get; set; } = default!;
  3. public static async Task UseHarnessAsync()
  4. {
  5. await using var provider = new ServiceCollection()
  6. .AddScoped&lt;IUnitOfWork, UnitOfWork&gt;()
  7. .AddMassTransitInMemoryTestHarness(x =&gt;
  8. {
  9. x.SetKebabCaseEndpointNameFormatter();
  10. x.AddConsumer&lt;GetCurrencyByNameConsumer&gt;();
  11. // .Endpoint(x=&gt;x.Name = &quot;get-currency-by-name-request&quot;);
  12. x.AddConsumerTestHarness&lt;GetCurrencyByNameConsumer&gt;();
  13. })
  14. .AddGenericRequestClient()
  15. .BuildServiceProvider(true);
  16. TestHarness = provider.GetRequiredService&lt;InMemoryTestHarness&gt;();
  17. await TestHarness.Start().ConfigureAwait(false);
  18. TestHarnessBus = provider.GetRequiredService&lt;IBus&gt;();
  19. }

IntegrationTest

  1. [Test]
  2. public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
  3. {
  4. IRequestClient&lt;GetCurrencyByNameRequest&gt;? client = TestHarnessBus.CreateRequestClient&lt;GetCurrencyByNameRequest&gt;();
  5. await Task.Delay(20000);
  6. var req = new GetCurrencyByNameRequest
  7. {
  8. Name = &quot;Dollar&quot;
  9. };
  10. //error occurs in this line
  11. var response = await client.GetResponse&lt;GetCurrencyByNameResponse&gt;(req);
  12. (await TestHarness.Sent.Any&lt;GetCurrencyByNameRequest&gt;()).Should().BeTrue();
  13. (await TestHarness.Consumed.Any&lt;GetCurrencyByNameResponse&gt;()).Should().BeTrue();
  14. var consumerHarness = GetConsumerTestHarness&lt;GetCurrencyByNameConsumer&gt;();
  15. (await consumerHarness.Consumed.Any&lt;GetCurrencyByNameRequest&gt;()).Should().BeTrue();
  16. var command = new CreatePackageCommand
  17. {
  18. IsActive = true,
  19. Description = new Translation
  20. {
  21. Arabic = StringHelper.RandomString(10),
  22. English = StringHelper.RandomString(10),
  23. Persian = StringHelper.RandomString(10),
  24. },
  25. Sort = 1,
  26. Name = new Translation
  27. {
  28. Arabic = StringHelper.RandomString(10),
  29. English = StringHelper.RandomString(10),
  30. Persian = StringHelper.RandomString(10),
  31. },
  32. Price = 999,
  33. CurrencyName = &quot;Dollar&quot;
  34. };
  35. var packageId = await SendAsync(command);
  36. }

答案1

得分: 2

  1. public static async Task UseHarnessAsync()
  2. {
  3. await using var provider = new ServiceCollection()

一旦该方法退出,它将释放服务提供程序。

  1. <details>
  2. <summary>英文:</summary>
  3. Well.
  4. ```lang-cs
  5. public static async Task UseHarnessAsync()
  6. {
  7. await using var provider = new ServiceCollection()

As soon as that method exits, it's going to dispose of the service provider.

huangapple
  • 本文由 发表于 2023年7月28日 05:37:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76783539.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定