MassTransit请求/响应 – 未找到指定地址的主机。

huangapple go评论70阅读模式
英文:

MassTransit request/response - The host was not found for the specified address

问题

I understand you only want the code-related parts to be translated. Here's the relevant code-related part:

I'm trying to implement an integration test scenario where Component A inside an IIS host requests some data from Component B (external component) over the RabbitMQ/Masstransit bus using the IRequestClient. The Component B has its own bus instance running. My goal is to "mock" the target consumer on Component B by registering a replacement consumer inside the test and have that respond to the same request.

The production code works fine. But when it comes to this test, the replacement consumer is invoked as it should and I can debug it, however as soon as `await context.RespondAsync(..);` is called, I get the following exception:

    MassTransit.Messages Error: 0 : R-FAULT rabbitmq://localhost:0/myVHost/Command.Bus.Messages.Data.IGetDataRequest_0.0.0.0 7a000000-9a3c-0005-b155-08db518a24c8 Bus.Messages.Data.IGetDataRequest MassTransit.MessageHandler<Bus.Messages.Data.IGetDataRequest>(00:00:03.2257322) The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&amp;autodelete=true, MassTransit.EndpointNotFoundException: The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&amp;autodelete=true
             at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.GetSendTransport(Uri address)
             at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.MassTransit.ISendTransportProvider.GetSendTransport(Uri address)
             at MassTransit.Transports.SendEndpointProvider.CreateSendEndpoint(Uri address)
             at MassTransit.Transports.SendEndpointCache`1.GetSendEndpointFromFactory(TKey address, SendEndpointFactory`1 factory)
             at GreenPipes.Caching.Internals.PendingValue`2.CreateValue()
             at MassTransit.Transports.SendEndpointCache`1.GetSendEndpoint(TKey key, SendEndpointFactory`1 factory)
             at MassTransit.Context.BaseConsumeContext.GetSendEndpoint(Uri address)
             at MassTransit.Context.BaseConsumeContext.RespondAsync[T](T message, IPipe`1 sendPipe)
    at MyTest.Helpers.Consumers.TestDataRequestConsumer.Consume(ConsumeContext`1 context) in C:\Projects\MyTest\Integration.MyTest\Helpers\Consumers\TestDataRequestConsumer.cs:line 56
       at RBus.MessageHandler.<Setup>b__3_0[T](ConsumeContext`1 c)
       at MassTransit.Pipeline.Filters.HandlerMessageFilter`1.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)

Here's the test setup:

[OneTimeSetUp]
public async Task SetUp()
{
   var webApp = new WebApplicationFactory<Startup.IisHost.Startup>()
            .WithWebHostBuilder(builder =>
            {
                builder.UseTestServer();

                builder.ConfigureTestServices(services =>
                {
                    services.Configure<DatabaseOptions>(o => o.ConnectionString = testEnvironment.DbConnectionString);
                    services.Configure<CustomerClientOptions>(configuration.GetSection("Customer"));
                    services.AddSingleton(subscriptionHelper);
                });
            });
        
    httpClient = webApp.CreateClient();

    var serviceCollection = new ServiceCollection();
    serviceCollection.Configure<CustomerClientOptions>(configuration.GetSection("Customer"));
    serviceCollection.AddCustomerClient();
    serviceCollection.AddTransient<IIdentityProvider, IdentityProvider>();
    serviceCollection.AddBus(configuration.GetSection("BusConfig"), x =>
    {
        x.WithIdentityProvider((provider, id) => provider.GetRequiredService<IIdentityProvider>().ProvideIdentityByCustomerId(id));
        x.WithRequestQueue<IGetDataRequest, TestDataRequestConsumer>();
    });

    var serviceProvider = serviceCollection.BuildServiceProvider();
    var busControl = serviceProvider.GetService<IBusControl>();

    await busControl.StartAsync().ConfigureAwait(false);

    // Further code is irrelevant and removed for brevity
}

I hope this helps! If you have any specific questions or need further assistance with this code, please feel free to ask.

英文:

I'm trying to implement an integration test scenario where Component A inside an IIS host requests some data from Component B (external component) over the RabbitMQ/Masstransit bus using the IRequestClient. The Component B has its own bus instance running. My goal is to "mock" the target consumer on Component B by registering a replacement consumer inside the test and have that respond to the same request.

The production code works fine. But when it comes to this test, the replacement consumer is invoked as it should and I can debug it, however as soon as await context.RespondAsync(..); is called, I get the following exception:

MassTransit.Messages Error: 0 : R-FAULT rabbitmq://localhost:0/myVHost/Command.Bus.Messages.Data.IGetDataRequest_0.0.0.0 7a000000-9a3c-0005-b155-08db518a24c8 Bus.Messages.Data.IGetDataRequest MassTransit.MessageHandler&lt;Bus.Messages.Data.IGetDataRequest&gt;(00:00:03.2257322) The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&amp;autodelete=true, MassTransit.EndpointNotFoundException: The host was not found for the specified address: rabbitmq://localhost:0/myVHost/bus-MYLOCALHOST-testhost-xeyyyyr48oyym8zfbdpidntbrh?durable=false&amp;autodelete=true
at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.GetSendTransport(Uri address)
at MassTransit.RabbitMqTransport.Transport.SendTransportProvider.MassTransit.ISendTransportProvider.GetSendTransport(Uri address)
at MassTransit.Transports.SendEndpointProvider.CreateSendEndpoint(Uri address)
at MassTransit.Transports.SendEndpointCache`1.GetSendEndpointFromFactory(TKey address, SendEndpointFactory`1 factory)
at GreenPipes.Caching.Internals.PendingValue`2.CreateValue()
at MassTransit.Transports.SendEndpointCache`1.GetSendEndpoint(TKey key, SendEndpointFactory`1 factory)
at MassTransit.Context.BaseConsumeContext.GetSendEndpoint(Uri address)
at MassTransit.Context.BaseConsumeContext.RespondAsync[T](T message, IPipe`1 sendPipe)
at MyTest.Helpers.Consumers.TestDataRequestConsumer.Consume(ConsumeContext`1 context) in C:\Projects\MyTest\Integration.MyTest\Helpers\Consumers\TestDataRequestConsumer.cs:line 56
at RBus.MessageHandler.&lt;Setup&gt;b__3_0[T](ConsumeContext`1 c)
at MassTransit.Pipeline.Filters.HandlerMessageFilter`1.GreenPipes.IFilter&lt;MassTransit.ConsumeContext&lt;TMessage&gt;&gt;.Send(ConsumeContext`1 context, IPipe`1 next)

Here's the test setup:

    [OneTimeSetUp]
public async Task SetUp()
{
var webApp = new WebApplicationFactory&lt;Startup.IisHost.Startup&gt;()
.WithWebHostBuilder(builder =&gt;
{
builder.UseTestServer();
builder.ConfigureTestServices(services =&gt;
{
services.Configure&lt;DatabaseOptions&gt;(o =&gt; o.ConnectionString = testEnvironment.DbConnectionString);
services.Configure&lt;CustomerClientOptions&gt;(configuration.GetSection(&quot;Customer&quot;));
services.AddSingleton(subscriptionHelper);
});
});
httpClient = webApp.CreateClient();
var serviceCollection = new ServiceCollection();
serviceCollection.Configure&lt;CustomerClientOptions&gt;(configuration.GetSection(&quot;Customer&quot;));
serviceCollection.AddCustomerClient();
serviceCollection.AddTransient&lt;IIdentityProvider, IdentityProvider&gt;();
serviceCollection.AddBus(configuration.GetSection(&quot;BusConfig&quot;), x =&gt;
{
x.WithIdentityProvider((provider, id) =&gt; provider.GetRequiredService&lt;IIdentityProvider&gt;().ProvideIdentityByCustomerId(id));
x.WithRequestQueue&lt;IGetDataRequest, TestDataRequestConsumer&gt;();
});
var serviceProvider = serviceCollection.BuildServiceProvider();
var busControl = serviceProvider.GetService&lt;IBusControl&gt;();
await busControl.StartAsync().ConfigureAwait(false);
// Further code is irrelevant and removed for brevity
}

In the Startup.IisHost.Startup ConfigureServices method I register the client which has the IRequestClient injected in the constructor, then add the bus.

        services.AddTransient&lt;IDataClient, DataClient&gt;();
services.AddBus(Configuration.GetSection(&quot;Bus&quot;), x =&gt;
{
x.WithIdentityProvider((provider, customerId) =&gt; provider.GetRequiredService&lt;IIdentityProvider&gt;().ProvideIdentityByCustomerId(customerId));
});
services.AddHostedService&lt;BusService&gt;();

The "actual" bus is started from within IHostedService:

    public BusService(ILogger&lt;BusService&gt; logger, IBusControl busControl)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.busControl = busControl ?? throw new ArgumentNullException(nameof(busControl));
}
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
await busControl.StartAsync(cancellationToken).ConfigureAwait(false);
}
catch (Exception e)
{
logger.LogCritical(e, &quot;Error while starting IIS Bus service&quot;);
throw;
}
}

Here's the request client:

internal class DataClient : IDataClient
{
private readonly IRequestClient requestClient;
public DataClient (IRequestClient requestClient)
{
this.requestClient = requestClient ?? throw new ArgumentNullException(nameof(requestClient));
}
/// &lt;inheritdoc/&gt;
public async Task&lt;IDictionary&lt;SomeData, OtherData&gt;&gt; GetData(IEnumerable&lt;string&gt; names, IEnumerable&lt;int&gt; ids)
{
if (names == null) throw new ArgumentNullException(nameof(names));
if (ids == null) throw new ArgumentNullException(nameof(ids));
var namesList = names.ToList();
var idsList = ids.ToList();
if (!idsList.Any())
{
return new Dictionary&lt;SomeData, OtherData&gt;();
}
var response = await requestClient.Request&lt;IGetDataRequest, IGetDataResponse&gt;(new
{
Names = namesList ,
Ids = idsList 
}).ConfigureAwait(false);
var riiDeviceCredentialsResponse = response.Message;
...
}
}

And here's the consumer (identical to the one in Component B):

public class TestDataRequestConsumer: IConsumer&lt;IGetDataRequest&gt;
{
private readonly GetDataOptions options;
public TestDataRequestConsumer(IOptions&lt;GetDataOptions&gt; options)
{
if (options == null) throw new ArgumentNullException(nameof(options));
this.options = options.Value;
}
public async Task Consume(ConsumeContext&lt;IGetDataRequest&gt; context)
{
// Do some stuff..
await context.RespondAsync(response).ConfigureAwait(false);
}
}

Both the test and Component B are using the same bus configuration:

&quot;UseInMemory&quot;: false,
&quot;HostScheme&quot;: &quot;rabbitmq&quot;,
&quot;HostName&quot;: &quot;localhost&quot;,
&quot;HostPort&quot;: 5672,
&quot;Username&quot;: &quot;myuser&quot;,
&quot;Password&quot;: &quot;mypassword&quot;,
&quot;VHostName&quot;: &quot;myVHost&quot;,
&quot;ServerVersion&quot;: &quot;0.0.0.0&quot;,
&quot;NumberOfRetry&quot;: 3,
&quot;SecBetweenRetry&quot;: 2,
&quot;PrefetchCount&quot;: 4,

When I look into the RabbitMQ management console, I can see that the messages are first added to the correct queue, but then later moved to the error queue.

The proprietary bus libraries I'm using are utilizing MassTransit 5.1.5.1633.

I'm not really sure where the problem might be. Any help and/or pointers for further investigation will be greatly appreciated.

Let me know if there's more info that's needed.

答案1

得分: 0

感谢Chris的帮助,通过将测试项目降级至.NET Core 3.1(.NET 5也可以),这个问题已经解决。另外,最新版本的MassTransit也可以解决问题,但在我的情况下,这个选项相对昂贵,所以在适当的时候,较低版本的.NET就足够了。

英文:

Thanks to Chris, this was resolved by downgrading the test project to .NET Core 3.1 (.NET 5 works too). Alternatively, the latest version of MassTransit would do the job, but in my case this options is rather costly, so in due time a lower .NET version will suffice.

huangapple
  • 本文由 发表于 2023年5月11日 03:24:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/76221949.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定