DynamoDB 转至 Elasticsearch 使用 AWS SDK v2?

huangapple go评论89阅读模式
英文:

DynamoDB to Elasticsearch with AWS SDK v2?

问题

我遇到了这个问题和答案,展示了如何从DynamoDB将数据推送到Elasticsearch以进行全文搜索索引。然而,我们的应用程序并没有使用Lambda。相反,我们正在使用Apache Camel捕获DynamoDB Streams事件,并希望从那里将记录推送到Elasticsearch。

由于我们使用的是AWS SDK v2,我们并没有捕获DynamodbEvent类或包含DynamoDB记录的相应DynamodbStreamRecord记录类。相反,我们收到了一个software.amazon.awssdk.services.dynamodb.model.Record对象。在这种情况下,我们该如何将此数据序列化并随后在Elasticsearch中建立索引?在所引用的其他问题中,记录被转换为JSON字符串,然后发送到Elasticsearch。是否有一种方法可以使用v2的Record类来实现这一点?答案中提到的ItemUtils类已经不存在了,因此我不知道是否还有其他方法可以进行序列化。

非常感谢您能提供的任何帮助!!

英文:

I came across this question and answer, showing how to push data from DynamoDB to Elasticsearch for full-text search indexing. Our application, however, is not making use of Lambdas. Instead, we're using Apache Camel to capture DynamoDB Streams events and want to push the records to Elasticsearch from there.

Since we are using AWS SDK v2, we're not capturing a DynamodbEvent class or corresponding DynamodbStreamRecord record class containing the DynamoDB record. Instead, we are receiving a software.amazon.awssdk.services.dynamodb.model.Record object. Given that, how can we serialize and subsequently index this data in Elasticsearch? In the other question referenced, the record is converted to a JSON string and then sent over to Elasticsearch. Is there a way to do this with the v2 Record class? The ItemUtils class mentioned in the answer no longer exists, so I was unaware of another way to serialize it.

Any help you can give is greatly appreciated!!

答案1

得分: 3

与您提供的示例类似,您可以尝试类似以下的代码:

public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
  // 获取操作类型
  final OperationType operationType = record.eventName();
  // 获取实际的DynamoDB流记录引用
  final StreamRecord streamRecord = record.dynamodb();
  // 获取ID。假设单个数值属性作为分区键
  final Map<String,AttributeValue> keys = streamRecord.keys();
  final String recordId = keys.get("ID").n();

  switch (operationType) {
    case INSERT:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("插入时没有新图像");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      JsonObject jsonObject = toJson(newImage);
      IndexRequest indexRequest = new IndexRequest(index, type, recordId);
      indexRequest.source(jsonObject.toString(), XContentType.JSON);
      IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
      System.out.println("新内容成功索引:" + indexResponse);
      break;
    case MODIFY:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException("更新时没有新图像");
      }
      Map<String,AttributeValue> newImage = streamRecord.newImage();
      JsonObject jsonObject = toJson(newImage);
      UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
      request.doc(jsonObject.toString(), XContentType.JSON);
      UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
      System.out.println("内容成功更新:" + updateResponse);
      break;
    case REMOVE:
      DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
      DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
      System.out.println("成功删除:" + deleteResponse);
      break;
    default:
      throw new UnsupportedOperationException("不支持的操作类型:" + operationType);  
  }
}

toJson 方法在此类中有定义:https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java

以下是源代码:

import ...

public class DynamoDBUtil {
    // 此处省略类的其余部分...
}

这个类是最初在这个gist中介绍的版本的更新版本。

此帖子还提供了一个链接,其中包含了一个Jackson的AtributeValue序列化器,如果您更喜欢使用该库进行JSON序列化的话。

英文:

Similar to the example you provided, you can try something like the following:

public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
  // Get operation
  final OperationType operationType = record.eventName();
  // Obtain a reference to actual DynamoDB stream record
  final StreamRecord streamRecord = record.dynamodb();
  // Get ID. Assume single numeric attribute as partition key
  final Map&lt;String,AttributeValue&gt; keys = streamRecord.keys();
  final String recordId = keys.get(&quot;ID&quot;).n();

  switch (operationType) {
    case INSERT:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException(&quot;No new image when inserting&quot;);
      }
      Map&lt;String,AttributeValue&gt; newImage = streamRecord.newImage();
      // Where toJson is defined here https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
      // and included below
      JsonObject jsonObject = toJson(newImage);
      IndexRequest indexRequest = new IndexRequest(index, type, recordId);
      indexRequest.source(jsonObject.toString(), XContentType.JSON);
      IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
      System.out.println(&quot;New content successfully indexed: &quot; + indexResponse);
      break;
    case MODIFY:
      if (!streamRecord.hasNewImage()) {
        throw new IllegalArgumentException(&quot;No new image when updating&quot;);
      }
      Map&lt;String,AttributeValue&gt; newImage = streamRecord.newImage();
      JsonObject jsonObject = toJson(newImage);
      UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
      request.doc(jsonObject.toString(), XContentType.JSON);
      UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
      System.out.println(&quot;Content successfully updated: &quot; + updateResponse);
      break;
    case REMOVE:
      DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
      DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
      System.out.println(&quot;Successfully removed: &quot; + deleteResponse);
      break;
    default:
      throw new UnsupportedOperationException(&quot;Operation type &quot; + opetationType + &quot; not supportd&quot;);  
  }
}

The toJson method is defined is this class: https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java

The source code is reproduced here:

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;

import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.JsonStructure;
import javax.json.JsonValue;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

/** This is a utility for converting DynamoDB AttributeValues to and from Java JSON-P objects */
public class DynamoDBUtil {

    public static void addList(String key, JsonObjectBuilder objectBuilder, List&lt;JsonObject&gt; items) {
        if (!items.isEmpty()) {
            JsonArrayBuilder builder = Json.createArrayBuilder();
            items.forEach(i -&gt; builder.add(i));
            objectBuilder.add(key, builder.build());
        }

    }

    public static JsonArray toJson(List&lt;AttributeValue&gt; attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonArrayBuilder valueBuilder = Json.createArrayBuilder();
        for (AttributeValue a : attributeValues) {
            add(toJson(a), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static JsonObject toJson(Map&lt;String, AttributeValue&gt; attributeValues) {
        if (attributeValues == null) {
            return null;
        }
        JsonObjectBuilder valueBuilder = Json.createObjectBuilder();
        for (Map.Entry&lt;String, AttributeValue&gt; a : attributeValues.entrySet()) {
            add(a.getKey(), toJson(a.getValue()), valueBuilder);
        }
        return valueBuilder.build();
    }

    public static void add(String key, Object value, JsonObjectBuilder object) {
        if (value instanceof JsonValue) {
            object.add(key, (JsonValue) value);
            // with json-p 1.0 can&#39;t create JsonString or JsonNumber so simply setting JsonValue not an option.
        } else if (value instanceof String) {
            object.add(key, (String) value);
        } else if (value instanceof BigDecimal) {
            object.add(key, (BigDecimal) value);
        } else if (value instanceof Boolean) {
            object.add(key, (Boolean) value);
        } else if (value == null || value.equals(JsonValue.NULL)) {
            object.addNull(key);
        }

    }

    public static void add(Object value, JsonArrayBuilder array) {
        if (value instanceof JsonValue) {
            array.add((JsonValue) value);
        } else if (value instanceof String) {
            array.add((String) value);
        } else if (value instanceof BigDecimal) {
            array.add((BigDecimal) value);
        } else if (value instanceof Boolean) {
            array.add((Boolean) value);
        } else if (value.equals(JsonValue.NULL)) {
            array.addNull();
        }

    }

    public static Object toJson(AttributeValue attributeValue) {
        // with json-p 1.1 Json.createValue() can be used.

        if (attributeValue == null) {
            return null;
        }
        if (attributeValue.s() != null) {
            return attributeValue.s();
        }
        if (attributeValue.n() != null) {
            return new BigDecimal(attributeValue.n());
        }
        if (attributeValue.bool() != null) {
            // return attributeValue.bool() ? JsonValue.TRUE : JsonValue.FALSE;
            return attributeValue.bool();
        }

        if (attributeValue.b() != null) {
            // return Base64.getEncoder().encodeToString(attributeValue.b().array());
            return null;
        }

        if (attributeValue.nul() != null &amp;&amp; attributeValue.nul()) {
            return JsonValue.NULL;
        }

        if (!attributeValue.m().isEmpty()) {
            return toJson(attributeValue.m());
        }
        if (!attributeValue.l().isEmpty()) {
            return toJson(attributeValue.l());
        }

        if (!attributeValue.ss().isEmpty()) {
            return attributeValue.ss();
        }

        if (!attributeValue.ns().isEmpty()) {
            return attributeValue.ns();
        }

        if (!attributeValue.bs().isEmpty()) {
            //return attributeValue.bs();
            return null;
        }
        return null;
    }

    public static Map&lt;String, AttributeValue&gt; toAttribute(JsonObject jsonObject) {
        Map&lt;String, AttributeValue&gt; attribute = new HashMap&lt;&gt;();
        jsonObject.entrySet().forEach(e -&gt; {
            attribute.put(e.getKey(), toAttribute(e.getValue()));
        });
        return attribute;
    }

    public static List&lt;AttributeValue&gt; toAttribute(JsonArray jsonArray) {
        List&lt;AttributeValue&gt; attributes = new LinkedList&lt;&gt;();
        jsonArray.forEach(e -&gt; {
            attributes.add(toAttribute(e));
        });
        return attributes;
    }

    public static AttributeValue toAttribute(JsonValue jsonValue) {
        if (jsonValue == null) {
            return null;
        }
        switch (jsonValue.getValueType()) {
        case STRING:
            return AttributeValue.builder().s(((JsonString) jsonValue).getString()).build();
        case OBJECT:
            return AttributeValue.builder().m(toAttribute((JsonObject) jsonValue)).build();
        case ARRAY:
            return AttributeValue.builder().l(toAttribute((JsonArray) jsonValue)).build();
        case NUMBER:
            return AttributeValue.builder().n(((JsonNumber) jsonValue).toString()).build();
        case TRUE:
            return AttributeValue.builder().bool(true).build();
        case FALSE:
            return AttributeValue.builder().bool(false).build();
        case NULL:
            return AttributeValue.builder().nul(true).build();
        }

        return null;
    }

    public static AttributeValue compress(Map&lt;String, AttributeValue&gt; attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(List&lt;AttributeValue&gt; attributeValues) throws IOException {
        return compress(toJson(attributeValues));
    }

    public static AttributeValue compress(JsonStructure jsonStructure) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        Json.createWriter(outputStream).write(jsonStructure);
        outputStream.close();
        byte[] jsonBinary = outputStream.toByteArray();

        outputStream = new ByteArrayOutputStream();
        Deflater deflater = new Deflater();
        deflater.setInput(jsonBinary);
        deflater.finish();
        byte[] buffer = new byte[1024];
        while (!deflater.finished()) {
            int count = deflater.deflate(buffer); // returns the generated code... index
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        jsonBinary = outputStream.toByteArray();

        return AttributeValue.builder().b(SdkBytes.fromByteArray(jsonBinary)).build();
    }

    public static JsonStructure decompress(AttributeValue attributeValue) throws IOException, DataFormatException {
        Inflater inflater = new Inflater();
        byte[] jsonBinary = attributeValue.b().asByteArray();
        inflater.setInput(jsonBinary);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream(jsonBinary.length);
        byte[] buffer = new byte[1024];
        while (!inflater.finished()) {
            int count = inflater.inflate(buffer);
            outputStream.write(buffer, 0, count);
        }
        outputStream.close();
        byte[] output = outputStream.toByteArray();
        ByteArrayInputStream bis = new ByteArrayInputStream(output);
        return Json.createReader(bis).read();
    }

}

This class is an updated version of the originally introduced in this gist.

This post also provide a link to a Jackson's AtributeValue serializer if your prefer to use that library for JSON serialization.

huangapple
  • 本文由 发表于 2020年10月28日 03:35:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/64561687.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定