英文:
java - Unable to delete the message from the SQS after it gets processed through the camel
问题
这是您要翻译的内容:
"I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...
Here is the SQS part I'm sending
@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
// ...
}
private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
// ...
}
Here is the route itself
@Override
public void configure() {
// ...
}
The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.
Here is the stack trace I get:
WARN o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)
If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue..."
Is there anything else you would like to know or clarify?
英文:
I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...
Here is the SQS part I'm sending
@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
try {
Long companyID = transactionToSend.getCompanyID();
validateRequest(companyID, transactionToSend.getCostCenterID());
VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);
String messageBody = getSQSMessageBody(vtr);
SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);
setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);
AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
SendMessageResult result = client.sendMessage(sendMessageRequest);
if (result == null) {
throw new EDIException("An error occurred while sending the message.");
}
log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
} catch (Exception e) {
e.printStackTrace();
log.error(String.format("Error sending transaction: %s", e.getMessage()));
throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
}
}
private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}
Here is the route itself
@Override
public void configure() {
// on exception deal with EDIOrder stuff
onException(Exception.class).handled(true).process(failedOrdersProcessor).end();
String destination = from != null ? from : (sqsQueue);
destination += "&attributeNames=All&messageAttributeNames=All";
log.info(destination);
from(destination)
.to("direct:ediOrder");
// order processing
from("direct:ediOrder")
.choice()
.when(body().isInstanceOf(String.class))
.process(exchange -> {
//filter out duplicates
String body = "undefined";
try {
body = exchange.getIn().getBody().toString();
VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
} else {
exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
if (transaction.getDeduplicatableIdentifier() != null) {
exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
}
}
} catch (IOException e) {
log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
}
})
.end()
.filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
.bean(DuplicateOrderProcessor.class)
.stop()
.end()
.bean(SQSOrderQueueBean.class)
.choice()
.when(body().isInstanceOf(EDIOrder.class))
.recipientList(simple("${body.endpoint}"))
.log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
.bean(OrderCleaner.class)
.endChoice()
.when(body().isInstanceOf(VendorTransactionContainer.class))
.log("Send to direct:confirmation")
.to(RouteConstants.DIRECT_CONFIRMATION)
.endChoice()
.end();
The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.
Here is the stack trace I get:
WARN o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)
If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue...
答案1
得分: 0
我已通过将标头传递给exchange.getOut()
消息来解决此问题,因为当我们像这样设置主体时:
exchange.getOut().setBody(body);
我们正在创建一个新的消息实例,因此来自exchange.getIn()
的标头和来自getIn()
的其他所有内容都不会出现在exchange.getOut()
消息中。
英文:
I've fixed this issue by passing the headers to exchange.getOut() message as when we set the body like this
exchange.getOut().setBody(body);
we are creating a new instance of a Message, so the headers from exchange.getIn() and everything else from getIn() won't be present in exchange.getOut() message.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论