英文:
DynamoDB to Elasticsearch with AWS SDK v2?
问题
我遇到了这个问题和答案,展示了如何从DynamoDB将数据推送到Elasticsearch以进行全文搜索索引。然而,我们的应用程序并没有使用Lambda。相反,我们正在使用Apache Camel捕获DynamoDB Streams事件,并希望从那里将记录推送到Elasticsearch。
由于我们使用的是AWS SDK v2,我们并没有捕获DynamodbEvent
类或包含DynamoDB记录的相应DynamodbStreamRecord
记录类。相反,我们收到了一个software.amazon.awssdk.services.dynamodb.model.Record
对象。在这种情况下,我们该如何将此数据序列化并随后在Elasticsearch中建立索引?在所引用的其他问题中,记录被转换为JSON字符串,然后发送到Elasticsearch。是否有一种方法可以使用v2的Record
类来实现这一点?答案中提到的ItemUtils
类已经不存在了,因此我不知道是否还有其他方法可以进行序列化。
非常感谢您能提供的任何帮助!!
英文:
I came across this question and answer, showing how to push data from DynamoDB to Elasticsearch for full-text search indexing. Our application, however, is not making use of Lambdas. Instead, we're using Apache Camel to capture DynamoDB Streams events and want to push the records to Elasticsearch from there.
Since we are using AWS SDK v2, we're not capturing a DynamodbEvent
class or corresponding DynamodbStreamRecord
record class containing the DynamoDB record. Instead, we are receiving a software.amazon.awssdk.services.dynamodb.model.Record
object. Given that, how can we serialize and subsequently index this data in Elasticsearch? In the other question referenced, the record is converted to a JSON string and then sent over to Elasticsearch. Is there a way to do this with the v2 Record
class? The ItemUtils
class mentioned in the answer no longer exists, so I was unaware of another way to serialize it.
Any help you can give is greatly appreciated!!
答案1
得分: 3
与您提供的示例类似,您可以尝试类似以下的代码:
public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
// 获取操作类型
final OperationType operationType = record.eventName();
// 获取实际的DynamoDB流记录引用
final StreamRecord streamRecord = record.dynamodb();
// 获取ID。假设单个数值属性作为分区键
final Map<String,AttributeValue> keys = streamRecord.keys();
final String recordId = keys.get("ID").n();
switch (operationType) {
case INSERT:
if (!streamRecord.hasNewImage()) {
throw new IllegalArgumentException("插入时没有新图像");
}
Map<String,AttributeValue> newImage = streamRecord.newImage();
JsonObject jsonObject = toJson(newImage);
IndexRequest indexRequest = new IndexRequest(index, type, recordId);
indexRequest.source(jsonObject.toString(), XContentType.JSON);
IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("新内容成功索引:" + indexResponse);
break;
case MODIFY:
if (!streamRecord.hasNewImage()) {
throw new IllegalArgumentException("更新时没有新图像");
}
Map<String,AttributeValue> newImage = streamRecord.newImage();
JsonObject jsonObject = toJson(newImage);
UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
request.doc(jsonObject.toString(), XContentType.JSON);
UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
System.out.println("内容成功更新:" + updateResponse);
break;
case REMOVE:
DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println("成功删除:" + deleteResponse);
break;
default:
throw new UnsupportedOperationException("不支持的操作类型:" + operationType);
}
}
toJson
方法在此类中有定义:https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
以下是源代码:
import ...
public class DynamoDBUtil {
// 此处省略类的其余部分...
}
这个类是最初在这个gist中介绍的版本的更新版本。
此帖子还提供了一个链接,其中包含了一个Jackson的AtributeValue
序列化器,如果您更喜欢使用该库进行JSON序列化的话。
英文:
Similar to the example you provided, you can try something like the following:
public void processRecord(Record record, String index, String type, RestHighLevelClient esClient) throws Exception {
// Get operation
final OperationType operationType = record.eventName();
// Obtain a reference to actual DynamoDB stream record
final StreamRecord streamRecord = record.dynamodb();
// Get ID. Assume single numeric attribute as partition key
final Map<String,AttributeValue> keys = streamRecord.keys();
final String recordId = keys.get("ID").n();
switch (operationType) {
case INSERT:
if (!streamRecord.hasNewImage()) {
throw new IllegalArgumentException("No new image when inserting");
}
Map<String,AttributeValue> newImage = streamRecord.newImage();
// Where toJson is defined here https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
// and included below
JsonObject jsonObject = toJson(newImage);
IndexRequest indexRequest = new IndexRequest(index, type, recordId);
indexRequest.source(jsonObject.toString(), XContentType.JSON);
IndexResponse indexResponse = esClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println("New content successfully indexed: " + indexResponse);
break;
case MODIFY:
if (!streamRecord.hasNewImage()) {
throw new IllegalArgumentException("No new image when updating");
}
Map<String,AttributeValue> newImage = streamRecord.newImage();
JsonObject jsonObject = toJson(newImage);
UpdateRequest updateRequest = new UpdateRequest(index, type, recordId);
request.doc(jsonObject.toString(), XContentType.JSON);
UpdateResponse updateResponse = esClient.update(updateRequest, RequestOptions.DEFAULT);
System.out.println("Content successfully updated: " + updateResponse);
break;
case REMOVE:
DeleteRequest deleteRequest = new DeleteRequest(index, type, recordId);
DeleteResponse deleteResponse = esClient.delete(deleteRequest, RequestOptions.DEFAULT);
System.out.println("Successfully removed: " + deleteResponse);
break;
default:
throw new UnsupportedOperationException("Operation type " + opetationType + " not supportd");
}
}
The toJson
method is defined is this class: https://github.com/aaronanderson/aws-java-sdk-v2-utils/blob/master/src/main/java/DynamoDBUtil.java
The source code is reproduced here:
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import javax.json.Json;
import javax.json.JsonArray;
import javax.json.JsonArrayBuilder;
import javax.json.JsonNumber;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.json.JsonString;
import javax.json.JsonStructure;
import javax.json.JsonValue;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
/** This is a utility for converting DynamoDB AttributeValues to and from Java JSON-P objects */
public class DynamoDBUtil {
public static void addList(String key, JsonObjectBuilder objectBuilder, List<JsonObject> items) {
if (!items.isEmpty()) {
JsonArrayBuilder builder = Json.createArrayBuilder();
items.forEach(i -> builder.add(i));
objectBuilder.add(key, builder.build());
}
}
public static JsonArray toJson(List<AttributeValue> attributeValues) {
if (attributeValues == null) {
return null;
}
JsonArrayBuilder valueBuilder = Json.createArrayBuilder();
for (AttributeValue a : attributeValues) {
add(toJson(a), valueBuilder);
}
return valueBuilder.build();
}
public static JsonObject toJson(Map<String, AttributeValue> attributeValues) {
if (attributeValues == null) {
return null;
}
JsonObjectBuilder valueBuilder = Json.createObjectBuilder();
for (Map.Entry<String, AttributeValue> a : attributeValues.entrySet()) {
add(a.getKey(), toJson(a.getValue()), valueBuilder);
}
return valueBuilder.build();
}
public static void add(String key, Object value, JsonObjectBuilder object) {
if (value instanceof JsonValue) {
object.add(key, (JsonValue) value);
// with json-p 1.0 can't create JsonString or JsonNumber so simply setting JsonValue not an option.
} else if (value instanceof String) {
object.add(key, (String) value);
} else if (value instanceof BigDecimal) {
object.add(key, (BigDecimal) value);
} else if (value instanceof Boolean) {
object.add(key, (Boolean) value);
} else if (value == null || value.equals(JsonValue.NULL)) {
object.addNull(key);
}
}
public static void add(Object value, JsonArrayBuilder array) {
if (value instanceof JsonValue) {
array.add((JsonValue) value);
} else if (value instanceof String) {
array.add((String) value);
} else if (value instanceof BigDecimal) {
array.add((BigDecimal) value);
} else if (value instanceof Boolean) {
array.add((Boolean) value);
} else if (value.equals(JsonValue.NULL)) {
array.addNull();
}
}
public static Object toJson(AttributeValue attributeValue) {
// with json-p 1.1 Json.createValue() can be used.
if (attributeValue == null) {
return null;
}
if (attributeValue.s() != null) {
return attributeValue.s();
}
if (attributeValue.n() != null) {
return new BigDecimal(attributeValue.n());
}
if (attributeValue.bool() != null) {
// return attributeValue.bool() ? JsonValue.TRUE : JsonValue.FALSE;
return attributeValue.bool();
}
if (attributeValue.b() != null) {
// return Base64.getEncoder().encodeToString(attributeValue.b().array());
return null;
}
if (attributeValue.nul() != null && attributeValue.nul()) {
return JsonValue.NULL;
}
if (!attributeValue.m().isEmpty()) {
return toJson(attributeValue.m());
}
if (!attributeValue.l().isEmpty()) {
return toJson(attributeValue.l());
}
if (!attributeValue.ss().isEmpty()) {
return attributeValue.ss();
}
if (!attributeValue.ns().isEmpty()) {
return attributeValue.ns();
}
if (!attributeValue.bs().isEmpty()) {
//return attributeValue.bs();
return null;
}
return null;
}
public static Map<String, AttributeValue> toAttribute(JsonObject jsonObject) {
Map<String, AttributeValue> attribute = new HashMap<>();
jsonObject.entrySet().forEach(e -> {
attribute.put(e.getKey(), toAttribute(e.getValue()));
});
return attribute;
}
public static List<AttributeValue> toAttribute(JsonArray jsonArray) {
List<AttributeValue> attributes = new LinkedList<>();
jsonArray.forEach(e -> {
attributes.add(toAttribute(e));
});
return attributes;
}
public static AttributeValue toAttribute(JsonValue jsonValue) {
if (jsonValue == null) {
return null;
}
switch (jsonValue.getValueType()) {
case STRING:
return AttributeValue.builder().s(((JsonString) jsonValue).getString()).build();
case OBJECT:
return AttributeValue.builder().m(toAttribute((JsonObject) jsonValue)).build();
case ARRAY:
return AttributeValue.builder().l(toAttribute((JsonArray) jsonValue)).build();
case NUMBER:
return AttributeValue.builder().n(((JsonNumber) jsonValue).toString()).build();
case TRUE:
return AttributeValue.builder().bool(true).build();
case FALSE:
return AttributeValue.builder().bool(false).build();
case NULL:
return AttributeValue.builder().nul(true).build();
}
return null;
}
public static AttributeValue compress(Map<String, AttributeValue> attributeValues) throws IOException {
return compress(toJson(attributeValues));
}
public static AttributeValue compress(List<AttributeValue> attributeValues) throws IOException {
return compress(toJson(attributeValues));
}
public static AttributeValue compress(JsonStructure jsonStructure) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Json.createWriter(outputStream).write(jsonStructure);
outputStream.close();
byte[] jsonBinary = outputStream.toByteArray();
outputStream = new ByteArrayOutputStream();
Deflater deflater = new Deflater();
deflater.setInput(jsonBinary);
deflater.finish();
byte[] buffer = new byte[1024];
while (!deflater.finished()) {
int count = deflater.deflate(buffer); // returns the generated code... index
outputStream.write(buffer, 0, count);
}
outputStream.close();
jsonBinary = outputStream.toByteArray();
return AttributeValue.builder().b(SdkBytes.fromByteArray(jsonBinary)).build();
}
public static JsonStructure decompress(AttributeValue attributeValue) throws IOException, DataFormatException {
Inflater inflater = new Inflater();
byte[] jsonBinary = attributeValue.b().asByteArray();
inflater.setInput(jsonBinary);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(jsonBinary.length);
byte[] buffer = new byte[1024];
while (!inflater.finished()) {
int count = inflater.inflate(buffer);
outputStream.write(buffer, 0, count);
}
outputStream.close();
byte[] output = outputStream.toByteArray();
ByteArrayInputStream bis = new ByteArrayInputStream(output);
return Json.createReader(bis).read();
}
}
This class is an updated version of the originally introduced in this gist.
This post also provide a link to a Jackson's AtributeValue
serializer if your prefer to use that library for JSON serialization.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论