无法在通过Camel处理后从SQS中删除消息。

huangapple go评论101阅读模式
英文:

java - Unable to delete the message from the SQS after it gets processed through the camel

问题

这是您要翻译的内容:

"I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...

Here is the SQS part I'm sending

@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
    // ...
}

private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
    // ...
}

Here is the route itself

@Override
public void configure() {
    // ...
}

The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.

Here is the stack trace I get:

WARN  o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)

If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue..."

Is there anything else you would like to know or clarify?

英文:

I'm being unable to delete the message from the SQS after it gets processed... I tried several ways explained here on StackOverflow, but none of them worked for me...

Here is the SQS part I'm sending

@Override
public void sendTransaction(TransactionModel transactionToSend, String preText) throws EMException {
    try {
        Long companyID = transactionToSend.getCompanyID();
        validateRequest(companyID, transactionToSend.getCostCenterID());
        VendorTransaction vtr = ediManager.convertTransactionToVendorTransaction(transactionToSend);
        EDIOrderRequest ediOrderRequest = ediManager.createEDIOrderRequest(transactionToSend);

        String messageBody = getSQSMessageBody(vtr);
        SendMessageRequest sendMessageRequest = new SendMessageRequest(getQueueName(), messageBody);

        setMessageAttributes(sendMessageRequest, transactionToSend, ediOrderRequest);

        AmazonSQSAsync client = ServerConnector.getServerBean(AWSSQSService.ILocal.class).getSQSClient();
        SendMessageResult result = client.sendMessage(sendMessageRequest);
        if (result == null) {
            throw new EDIException("An error occurred while sending the message.");
        }

        log.info("Transaction: " + transactionToSend.getId() + " for a company: " + companyID + " sent. Message id is: " + result.getMessageId());
    } catch (Exception e) {
        e.printStackTrace();
        log.error(String.format("Error sending transaction: %s", e.getMessage()));
        throw new EMException(-1, String.format("Error sending transaction: %s", e.getMessage()), e);
    }
}

private void setMessageAttributes(SendMessageRequest sendMessageRequest, TransactionModel transactionToSend, EDIOrderRequest ediOrderRequest) {
    addMessageAttributesEntry(sendMessageRequest, "companyId", transactionToSend.getCompanyID());
    addMessageAttributesEntry(sendMessageRequest, "standard", ediConfig.getMapOfParameters().get("EDI_STD"));
    addMessageAttributesEntry(sendMessageRequest, "endpoint", ediConfig.getTargetAddress());
    addMessageAttributesEntry(sendMessageRequest, "prefix", ediConfig.getMapOfParameters().get("FILE_PREFIX"));
    addMessageAttributesEntry(sendMessageRequest, "requestId", ediOrderRequest != null ? ediOrderRequest.getId() : null);
}

Here is the route itself

@Override
public void configure() {

    // on exception deal with EDIOrder stuff
    onException(Exception.class).handled(true).process(failedOrdersProcessor).end();

    String destination = from != null ? from : (sqsQueue);
    destination += "&attributeNames=All&messageAttributeNames=All";
    log.info(destination);
    from(destination)
            .to("direct:ediOrder");
    // order processing
    from("direct:ediOrder")
            .choice()
            .when(body().isInstanceOf(String.class))
            .process(exchange -> {
                //filter out duplicates
                String body = "undefined";
                try {
                    body = exchange.getIn().getBody().toString();
                    VendorTransaction transaction = objectMapper.readValue(body, VendorTransaction.class);
                    if (!IDEMPOTENT_ORDERS_REPOSITORY.containsKey(transaction.getDeduplicatableIdentifier())) {
                        IDEMPOTENT_ORDERS_REPOSITORY.put(transaction.getDeduplicatableIdentifier(), transaction);
                    } else {
                        exchange.getIn().setHeader(Exchange.DUPLICATE_MESSAGE, Boolean.TRUE);
                        if (transaction.getDeduplicatableIdentifier() != null) {
                            exchange.getIn().setHeader(ConfigKey.DEDUPLICATE_IDENTIFIER.getKey(), transaction.getDeduplicatableIdentifier());
                        }
                    }
                } catch (IOException e) {
                    log.error("Unable to convert the body to VendorTransaction, body:\n" + body + "\n" + e.getMessage());
                }
            })
            .end()
            .filter(header(Exchange.DUPLICATE_MESSAGE).isEqualTo(Boolean.TRUE))
            .bean(DuplicateOrderProcessor.class)
            .stop()
            .end()
            .bean(SQSOrderQueueBean.class)
            .choice()
            .when(body().isInstanceOf(EDIOrder.class))
            .recipientList(simple("${body.endpoint}"))
            .log("Uploaded ${body.genericFile.fileName} to ${body.endpointNoPassword}")
            .bean(OrderCleaner.class)
            .endChoice()
            .when(body().isInstanceOf(VendorTransactionContainer.class))
            .log("Send to direct:confirmation")
            .to(RouteConstants.DIRECT_CONFIRMATION)
            .endChoice()
            .end();

The file gets processed and sent to the server, but I'm unable to delete the message after it gets processed.

Here is the stack trace I get:

WARN  o.a.c.component.aws.sqs.SqsConsumer.log - Error occurred during deleting message. This exception is ignored.. Exchange[ID-AD-MacBook-Pro-local-1353421234311-0-1]. Caused by: [com.amazonaws.services.sqs.model.AmazonSQSException - The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)]com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageHandle. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: e3252adsaf-1242-2244-a5f6-1b51e2e47e9c)

If someone thinks that this could be a duplicate question, there are few answers which are similar to this one, here on StackOverflow, but none of them helped me resolve the issue...

答案1

得分: 0

我已通过将标头传递给exchange.getOut()消息来解决此问题,因为当我们像这样设置主体时:

exchange.getOut().setBody(body);

我们正在创建一个新的消息实例,因此来自exchange.getIn()的标头和来自getIn()的其他所有内容都不会出现在exchange.getOut()消息中。

英文:

I've fixed this issue by passing the headers to exchange.getOut() message as when we set the body like this

exchange.getOut().setBody(body);

we are creating a new instance of a Message, so the headers from exchange.getIn() and everything else from getIn() won't be present in exchange.getOut() message.

huangapple
  • 本文由 发表于 2020年7月22日 21:42:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/63035608.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定