英文:
MassTransit.RequestFaultException : The 'message' request faulted: Cannot access a disposed object
问题
I have two micro services with rabbitmq and masstransit v8 latest version that I send request from micro A to micro B and get data from database then response the result.
In application every thing work great but in integration test with inmemorytestharness I get this error MassTransit.RequestFaultException : The EventBus.Messages.Contracts.Services.Discounts.Currencies.Query.GetCurrencyByName.GetCurrencyByNameRequest request faulted: Cannot access a disposed object.
Message for request:
public class GetCurrencyByNameRequest
{
public string Name { get; set; }
}
Message for response:
public class GetCurrencyByNameResponse
{
public string Id { get; set; }
public string Name { get; set; }
}
Consumer:
public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
{
private readonly IUnitOfWork _uow;
public GetCurrencyByNameConsumer(IUnitOfWork uow)
{
_uow = uow;
}
public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
{
var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
var currency = await _uow.GenericRepository<Currency>()
.GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
if (currency == null)
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = string.Empty,
Name = string.Empty
});
}
else
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = currency.Id,
Name = currency.CurrencyName
});
}
}
}
Masstransit configuration in micro A:
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
//cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit configuration in micro B:
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
// cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit inmemorytestharness configuration:
public static InMemoryTestHarness TestHarness { get; set; } = default!;
public static IBus TestHarnessBus { get; set; } = default!;
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
.AddScoped<IUnitOfWork, UnitOfWork>()
.AddMassTransitInMemoryTestHarness(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<GetCurrencyByNameConsumer>();
// .Endpoint(x=>x.Name = "get-currency-by-name-request");
x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
})
.AddGenericRequestClient()
.BuildServiceProvider(true);
TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
await TestHarness.Start().ConfigureAwait(false);
TestHarnessBus = provider.GetRequiredService<IBus>();
}
IntegrationTest:
[Test]
public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
{
IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
await Task.Delay(20000);
var req = new GetCurrencyByNameRequest
{
Name = "Dollar"
};
//error occurs in this line
var response = await client.GetResponse<GetCurrencyByNameResponse>(req);
(await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
(await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();
var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();
(await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
var command = new CreatePackageCommand
{
IsActive = true,
Description = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Sort = 1,
Name = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Price = 999,
CurrencyName = "Dollar"
};
var packageId = await SendAsync(command);
}
英文:
I have two micro services with rabbitmq and masstransit v8 latest version that I send request from micro A to micro B and get data from database then response the result.
In application every thing work great but in integration test with inmemorytestharness I get this error MassTransit.RequestFaultException : The EventBus.Messages.Contracts.Services.Discounts.Currencies.Query.GetCurrencyByName.GetCurrencyByNameRequest request faulted: Cannot access a disposed object.
Message for request:
public class GetCurrencyByNameRequest
{
public string Name { get; set; }
}
Message for response:
public class GetCurrencyByNameResponse
{
public string Id { get; set; }
public string Name { get; set; }
}
Consumer:
public class GetCurrencyByNameConsumer : IConsumer<GetCurrencyByNameRequest>
{
private readonly IUnitOfWork _uow;
public GetCurrencyByNameConsumer(IUnitOfWork uow)
{
_uow = uow;
}
public async Task Consume(ConsumeContext<GetCurrencyByNameRequest> context)
{
var filter = Builders<Currency>.Filter.Eq(x => x.CurrencyName, new CurrencyName(context.Message.Name));
var currency = await _uow.GenericRepository<Currency>()
.GetSingleDocumentByFilterAsync(filter, context.CancellationToken);
if (currency == null)
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = string.Empty,
Name = string.Empty
});
}
else
{
await context.RespondAsync(new GetCurrencyByNameResponse
{
Id = currency.Id,
Name = currency.CurrencyName
});
}
}
}
Masstransit configuration in micro A:
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Payment.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
//cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit configuration in micro B
services.AddMassTransit(x =>
{
var entryAssembly = AppDomain.CurrentDomain.GetAssemblies()
.FirstOrDefault(x => x.FullName.Contains("Discount.Application"));
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
// cfg.Host(new Uri(configuration["RabbitMqSettings:Uri"]));
cfg.Host(configuration["RabbitMqSettings:Host"], configuration["RabbitMqSettings:VirtualHost"], h =>
{
h.Username(configuration["RabbitMqSettings:Username"]);
h.Password(configuration["RabbitMqSettings:Password"]);
});
cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
cfg.UseMessageRetry(r => r.Immediate(5));
cfg.UseInMemoryOutbox();
cfg.ConfigureEndpoints(context);
});
});
Masstransit inmemorytestharness configuration
public static InMemoryTestHarness TestHarness { get; set; } = default!;
public static IBus TestHarnessBus { get; set; } = default!;
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
.AddScoped<IUnitOfWork, UnitOfWork>()
.AddMassTransitInMemoryTestHarness(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumer<GetCurrencyByNameConsumer>();
// .Endpoint(x=>x.Name = "get-currency-by-name-request");
x.AddConsumerTestHarness<GetCurrencyByNameConsumer>();
})
.AddGenericRequestClient()
.BuildServiceProvider(true);
TestHarness = provider.GetRequiredService<InMemoryTestHarness>();
await TestHarness.Start().ConfigureAwait(false);
TestHarnessBus = provider.GetRequiredService<IBus>();
}
IntegrationTest
[Test]
public async Task CreatePackageCommand_InsertToDb_ReturnPackage()
{
IRequestClient<GetCurrencyByNameRequest>? client = TestHarnessBus.CreateRequestClient<GetCurrencyByNameRequest>();
await Task.Delay(20000);
var req = new GetCurrencyByNameRequest
{
Name = "Dollar"
};
//error occurs in this line
var response = await client.GetResponse<GetCurrencyByNameResponse>(req);
(await TestHarness.Sent.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
(await TestHarness.Consumed.Any<GetCurrencyByNameResponse>()).Should().BeTrue();
var consumerHarness = GetConsumerTestHarness<GetCurrencyByNameConsumer>();
(await consumerHarness.Consumed.Any<GetCurrencyByNameRequest>()).Should().BeTrue();
var command = new CreatePackageCommand
{
IsActive = true,
Description = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Sort = 1,
Name = new Translation
{
Arabic = StringHelper.RandomString(10),
English = StringHelper.RandomString(10),
Persian = StringHelper.RandomString(10),
},
Price = 999,
CurrencyName = "Dollar"
};
var packageId = await SendAsync(command);
}
答案1
得分: 2
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
一旦该方法退出,它将释放服务提供程序。
<details>
<summary>英文:</summary>
Well.
```lang-cs
public static async Task UseHarnessAsync()
{
await using var provider = new ServiceCollection()
As soon as that method exits, it's going to dispose of the service provider.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论