@SQSListen导致异常,无法正常工作。

huangapple go评论83阅读模式
英文:

@SQSListen results in an exception and not working

问题

这是您提供的内容的翻译:

package com.demo.arf.testsqsboot;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;

@Configuration
public class SQSConfig {

    @Value("${cloud.aws.region.static}")
    private String region;

    @Value("${cloud.aws.credentials.access-key}")
    private String awsAccessKey;

    @Value("${cloud.aws.credentials.secret-key}")
    private String awsSecretKey;

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate() {
        return new QueueMessagingTemplate(amazonSQSAsync());
    }

    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().withRegion(Regions.US_EAST_1)
            .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey, awsSecretKey)))
            .build();
    }

    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQS);
        factory.setMaxNumberOfMessages(10);
        return factory;
    }
}
package com.demo.arf.testsqsboot.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SQSController {
    @Autowired
    private QueueMessagingTemplate queueMessagingTemplate;

    private static final Logger LOG = LoggerFactory.getLogger(SQSController.class);

    @GetMapping("/send-sqs-message")
    public String sendMessage() {
        String sqsEndPoint = "https://sqs.us-east-2.amazonaws.com/1234567879/my_queue";
        queueMessagingTemplate.convertAndSend(sqsEndPoint, MessageBuilder.withPayload("hello from Spring Boot").build());
        return "Hello SQS";
    }

    @SqsListener("my_queue")
    public void getMessage(String message) {
        LOG.info(" *********** Message from SQS Queue - " + message);
    }
}
server:
  port: 9001
cloud:
  aws:
    region:
      static: us-east-1
      auto: false
    credentials:
      access-key: "asdmnasdn"
      secret-key: "sfkjsdjksdkj"
    end-point:
      uri: https://sqs.us-east-2.amazonaws.com/1234567879/my_queue

在启动时,您可能会遇到以下错误:

WARNING: An illegal reflective access operation has occurred...
WARNING: Please consider reporting this to the maintainers of com.amazonaws.util.XpathUtils...
...
s.c.a.m.l.SimpleMessageListenerContainer : Ignoring queue with name 'my_queue': The queue does not exist.; nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException...

另外,关于 @SqsListener 如何知道 AWS 账户信息和 SQS URI 的基本问题,是因为在您的配置文件中,您已经提供了 AWS 账户的访问密钥和所需的队列 URI。@SqsListener 注解通过 Spring Cloud AWS 集成自动将这些信息用于监听 SQS 队列上的消息。

英文:

I've a very simple Spring cloud aws project. I'm using Java 11.
here is the pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.5.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.demo.arf</groupId>
	<artifactId>testsqs-boot</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>testsqs-boot</name>
	<description>Demo project for Spring Boot</description>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-dependencies</artifactId>
			<version>Hoxton.SR3</version>
			<type>pom</type>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-aws</artifactId>
			<version>2.2.1.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-aws-messaging</artifactId>
			<version>2.2.1.RELEASE</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

Config class:

package com.demo.arf.testsqsboot;

import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
//import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
//import com.amazonaws.services.sqs.AmazonSQSAsync;
//import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;

@Configuration
public class SQSConfig {

	@Value("${cloud.aws.region.static}")
	private String region;

	@Value("${cloud.aws.credentials.access-key}")
	private String awsAccessKey;

	@Value("${cloud.aws.credentials.secret-key}")
	private String awsSecretKey;

	@Bean
	public QueueMessagingTemplate queueMessagingTemplate() {
		return new QueueMessagingTemplate(amazonSQSAsync());
	}

	public AmazonSQSAsync amazonSQSAsync() {
		return AmazonSQSAsyncClientBuilder.standard().withRegion(Regions.US_EAST_1)
				.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey, awsSecretKey)))
				.build();
	}

	@Bean
	public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQS){
		SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
		factory.setAmazonSqs(amazonSQS);
		factory.setMaxNumberOfMessages(10);
		return factory;
	}
}

the controller class which send/receive message:

package com.demo.arf.testsqsboot.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SQSController {
    @Autowired
    private QueueMessagingTemplate queueMessagingTemplate;

    private static final Logger LOG = LoggerFactory.getLogger(SQSController.class);
    @GetMapping("/send-sqs-message")
    public String sendMessage() {
        String sqsEndPoint= "https://sqs.us-east-2.amazonaws.com/1234567879/my_queue";
        queueMessagingTemplate.convertAndSend(sqsEndPoint, MessageBuilder.withPayload("hello from Spring Boot").build());
        return "Hello SQS";
    }

    @SqsListener("my_queue")
    public void getMessage(String message) {
      LOG.info(" *********** Message from SQS Queue - "+message);
    }
}

application.yml:

server:
  port: 9001
cloud:
  aws:
    region:
      static: us-east-1
      auto: false
    credentials:
      access-key: "asdmnasdn"
      secret-key: "sfkjsdjksdkj"
    end-point:
      uri: https://sqs.us-east-2.amazonaws.com/1234567879/my_queue

I can get the send working fine. but when I add the listener, I get the following error during startup and listener does not receive messages:

2020-03-15 01:02:00.677  INFO 15423 --- [           main] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 3853 ms
2020-03-15 01:02:01.109  INFO 15423 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
**WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by com.amazonaws.util.XpathUtils (file:/Users/arf/.m2/repository/com/amazonaws/aws-java-sdk-core/1.11.415/aws-java-sdk-core-1.11.415.jar) to method com.sun.org.apache.xpath.internal.XPathContext.getDTMManager()
WARNING: Please consider reporting this to the maintainers of com.amazonaws.util.XpathUtils
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2020-03-15 01:02:01.749  WARN 15423 --- [           main]**
s.c.a.m.l.SimpleMessageListenerContainer : Ignoring queue with name 'my_queue': The queue does not exist.; nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue does not exist for this wsdl version. (Service: AmazonSQS; Status Code: 400; Error Code: AWS.SimpleQueueService.NonExistentQueue; Request ID: 62821505-3f34-5434-a6ee)
2020-03-15 01:02:01.749  INFO 15423 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService

Also, one basic question. How does the @SQSListener know where to find the aws account info and sqs uri?

答案1

得分: 1

@Bean
public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQS) {
    return new QueueMessagingTemplate(amazonSQS);
}

@Bean
@Primary
public AmazonSQSAsync amazonSQS(AWSCredentialsProvider credentials) {
    return AmazonSQSAsyncClientBuilder.standard()
            .withCredentials(credentials)
            .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localStackSqsUrl, awsRegion))
            .build();
}

@Bean
@Primary
public AWSCredentialsProvider awsCredentialsProvider() {
    return new AWSCredentialsProviderChain(
            new AWSStaticCredentialsProvider(
                    new BasicAWSCredentials("local", "stack")));
}
英文:

I've made it work with the following change in config class. However, I wonder, how most of the sample programs online without this code(which construct AmazonSQSAsync with withEndpointConfiguration) is working.

	public QueueMessagingTemplate queueMessagingTemplate(AmazonSQSAsync amazonSQS) {
		return new QueueMessagingTemplate(amazonSQS);
	}

	@Bean
	@Primary
	public AmazonSQSAsync amazonSQS(AWSCredentialsProvider credentials) {
		return AmazonSQSAsyncClientBuilder.standard()
				.withCredentials(credentials)
				.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(localStackSqsUrl, awsRegion))
				.build();
	}

	@Bean
	@Primary
	public AWSCredentialsProvider awsCredentialsProvider() {
		return new AWSCredentialsProviderChain(
				new AWSStaticCredentialsProvider(
						new BasicAWSCredentials("local", "stack")));
	}```

</details>



# 答案2
**得分**: 1

第一点。  
- 首先,永远不要将您的AK/SK存储在属性或yml文件中。我可以看出那些是虚假值,但您总是希望从~/.aws/credentials或实例元数据中获取这些值。如果您只是调用`.standard()`,那么像AmazonSqSAsyncClientBuilder这样的AWS客户端会自动执行。不需要凭证提供程序。  
- 其次,区域也是如此。  
- 第三,我相信`@SqsListener`将使用您之前定义的`ContainerFactory` bean,至少`@JmsListener`是这样工作的。  
- 您收到的错误消息是您的队列名称在您选择的区域中未找到。您告诉我们是us-east-1,但在您的发送代码中,您指定了us-east-2。根据您的帖子,我猜您的队列位于us-east-2,因为您的问题与`@SqsListener`有关,而不是`queueMessagingTemplate`。

<details>
<summary>英文:</summary>

A couple things.  
- First NEVER store your AK/SK in a property or yml file like that.  I can tell those are fake values, but you&#39;ll always want to pull these from ~/.aws/credentials or instance metadata.  The AWS clients like AmazonSqSAsyncClientBuilder will do automatically if you just call .standard().  No need for the credential provider.  
- Second, same with region  
- Third, I believe @SqsListener will use the ContainerFactory bean you defined earlier, at least that&#39;s how the @JmsListener works.   
- The error message you received was that your queue name was not found in your account in your selected region.  You told it us-east-1, but in your send code you specified us-east-2.  My guess based on your post is that your queue is in us-east-2 since your question came up about @SqsListener, not the queueMessagingTemplate.


</details>



huangapple
  • 本文由 发表于 2020年3月15日 14:16:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/60690269.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定