英文:
How to async parse/write json in webflux? ObjectMapper methods are blocking
问题
我看到自2.9版本起,Jackson支持非阻塞功能,但如何在Webflux中使用它?有示范吗?
英文:
I saw that jackson supports non-blocking since 2.9, but how to use it with webflux? Is there a demo?
答案1
得分: 1
以下是如何使用WebClient来对Github的“列出仓库”API进行GET请求的示例代码:
public Flux<GithubRepo> listGithubRepositories(String username, String token) {
return webClient.get()
.uri("/user/repos")
.header("Authorization", "Basic " + Base64Utils
.encodeToString((username + ":" + token).getBytes(UTF_8)))
.retrieve()
.bodyToFlux(GithubRepo.class);
}
假设我们有一个名为GithubRepo的类,它符合Github的API响应格式,上述函数将返回一个GithubRepo对象的Flux流。
英文:
Here is how you can use WebClient to make a GET request to Github’s List Repositories API
public Flux<GithubRepo> listGithubRepositories(String username, String token) {
return webClient.get()
.uri("/user/repos")
.header("Authorization", "Basic " + Base64Utils
.encodeToString((username + ":" + token).getBytes(UTF_8)))
.retrieve()
.bodyToFlux(GithubRepo.class);
}
Assuming we have a class named GithubRepo that confirms to the Github’s API response, the above function will return a Flux of GithubRepo objects.
答案2
得分: 0
如果您正在寻找阻塞的ObjectMapper,可以尝试使用subscribeOn方法在诸如有界弹性线程池中进行操作。并使用Mono.fromCallable
对其进行包装。
英文:
Just in case if you are looking for blocking ObjectMapper, try with subscribeOn method in a thread pool such as bounded elastic. And wrap it with Mono.fromCallable
答案3
得分: -1
这是代码的中文翻译部分:
我在这里找到了展示实现的代码 - 查看我下面的修改。
https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java
请求处理器:
public Mono<ServerResponse> myHttpHandler(ServerRequest request) {
return this.handlerJsonRequestBodyParser.parse(request)
.flatMap(map
-> this.myService.doSomething(x,y)
)
.flatMap(response -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(id, String.class)
.log()
);
}
类的实现:
package lsp.order.handler;
import java.io.IOException;
import java.util.Map;
import lsp.order.exception.BadJsonRequestBodyException;
import lsp.order.util.AsyncJsonParser;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;
@Component
public class HandlerJsonRequestBodyParser {
public Mono<Map<String, Object>> parse(ServerRequest request) {
return request.bodyToMono(ByteArrayResource.class)
.map(ByteArrayResource::getByteArray)
.flatMap(byteArray -> {
try {
AsyncJsonParser parser = new AsyncJsonParser();
return parser.parse(byteArray);
} catch (IOException ex) {
return Mono.error(new BadJsonRequestBodyException());
}
});
}
}
我修改了它以便按原样使用
package lsp.order.util;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import reactor.core.publisher.Mono;
/**
* @credit https://github.com/mmimica/async-jackson
*/
public class AsyncJsonParser {
private final NonBlockingJsonParser parser;
private final Stack stack = new Stack();
private String fieldName;
public AsyncJsonParser() throws IOException {
JsonFactory factory = new JsonFactory();
parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();
}
public Mono<Map<String, Object>> parse(byte[] bytes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder();
boolean consumed = false;
while (!consumed) {
if (feeder.needMoreInput()) {
feeder.feedInput(bytes, 0, bytes.length);
consumed = true;
}
JsonToken event;
while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) {
JsonNode root = buildTree(event);
if (root != null) {
Map<String, Object> jsonMap = mapper.treeToValue(root, Map.class);
return Mono.just(jsonMap);
}
}
}
return Mono.error(new RuntimeException("Cannot parse JSON"));
}
private static final class Stack {
private final LinkedList<JsonNode> list = new LinkedList<>();
JsonNode pop() {
return list.removeLast();
}
JsonNode top() {
if (list.isEmpty()) {
return null;
}
return list.getLast();
}
void push(JsonNode n) {
list.add(n);
}
boolean isEmpty() {
return list.isEmpty();
}
}
/**
* @return 当整个树构建完成时的根节点。
*
*/
private JsonNode buildTree(JsonToken event) throws IOException {
switch (event) {
case FIELD_NAME:
assert !stack.isEmpty();
fieldName = parser.getCurrentName();
return null;
case START_OBJECT:
stack.push(createNode(stack.top()));
return null;
case START_ARRAY:
stack.push(createArray(stack.top()));
return null;
case END_OBJECT:
case END_ARRAY:
assert !stack.isEmpty();
JsonNode current = stack.pop();
if (stack.isEmpty()) {
return current;
} else {
return null;
}
case VALUE_NUMBER_INT:
assert !stack.isEmpty();
addLong(stack.top(), parser.getLongValue());
return null;
case VALUE_STRING:
assert !stack.isEmpty();
addString(stack.top(), parser.getValueAsString());
return null;
case VALUE_NUMBER_FLOAT:
assert !stack.isEmpty();
addFloat(stack.top(), parser.getFloatValue());
return null;
case VALUE_NULL:
assert !stack.isEmpty();
addNull(stack.top());
return null;
case VALUE_TRUE:
assert !stack.isEmpty();
addBoolean(stack.top(), true);
return null;
case VALUE_FALSE:
assert !stack.isEmpty();
addBoolean(stack.top(), false);
return null;
default:
throw new RuntimeException("未知的 JSON 事件 " + event);
}
}
private JsonNode createNode(JsonNode current) {
if (ObjectNode.class.isInstance(current)) {
return ObjectNode.class.cast(current).putObject(fieldName);
} else if (ArrayNode.class.isInstance(current)) {
return ArrayNode.class.cast(current).addObject();
} else {
return JsonNodeFactory.instance.objectNode();
}
}
private JsonNode createArray(JsonNode current) {
if (ObjectNode.class.isInstance(current)) {
return ObjectNode.class.cast(current).putArray(fieldName);
} else if (ArrayNode.class.isInstance(current)) {
return ArrayNode.class.cast(current).addArray();
} else {
return JsonNodeFactory.instance.arrayNode();
}
}
private void addLong(JsonNode current, long v) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, v);
} else {
ArrayNode.class.cast(current).add(v);
}
}
private void addString(JsonNode current, String s) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, s);
} else {
ArrayNode.class.cast(current).add(s);
}
}
private void addFloat(JsonNode current, float f) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, f);
} else {
ArrayNode.class.cast(current).add(f);
}
}
private void addNull(JsonNode current) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).putNull(fieldName);
} else {
ArrayNode.class.cast(current).addNull();
}
}
private void addBoolean(JsonNode current, boolean b) {
assert current != null
<details>
<summary>英文:</summary>
I found this code showing the implementation -- see how I modified it below.
https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java
Request Handler:
public Mono<ServerResponse> myHttpHandler(ServerRequest request) {
return this.handlerJsonRequestBodyParser.parse(request)
.flatMap(map
-> this.myService.doSomething(x,y)
)
.flatMap(response -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(id, String.class)
.log()
);
}
Class implement:
package lsp.order.handler;
import java.io.IOException;
import java.util.Map;
import lsp.order.exception.BadJsonRequestBodyException;
import lsp.order.util.AsyncJsonParser;
import org.springframework.core.io.ByteArrayResource;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import reactor.core.publisher.Mono;
@Component
public class HandlerJsonRequestBodyParser {
public Mono<Map<String, Object>> parse(ServerRequest request) {
return request.bodyToMono(ByteArrayResource.class)
.map(ByteArrayResource::getByteArray)
.flatMap(byteArray -> {
try {
AsyncJsonParser parser = new AsyncJsonParser();
return parser.parse(byteArray);
} catch (IOException ex) {
return Mono.error(new BadJsonRequestBodyException());
}
});
}
}
I modified it to use it as is
package lsp.order.util;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Map;
import reactor.core.publisher.Mono;
/**
* @credit https://github.com/mmimica/async-jackson
*/
public class AsyncJsonParser {
private final NonBlockingJsonParser parser;
private final Stack stack = new Stack();
private String fieldName;
public AsyncJsonParser() throws IOException {
JsonFactory factory = new JsonFactory();
parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();
}
public Mono<Map<String, Object>> parse(byte[] bytes) throws IOException {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder();
boolean consumed = false;
while (!consumed) {
if (feeder.needMoreInput()) {
feeder.feedInput(bytes, 0, bytes.length);
consumed = true;
}
JsonToken event;
while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) {
JsonNode root = buildTree(event);
if (root != null) {
Map<String, Object> jsonMap = mapper.treeToValue(root, Map.class);
return Mono.just(jsonMap);
}
}
}
return Mono.error(new RuntimeException("Cannot parse JSON"));
}
private static final class Stack {
private final LinkedList<JsonNode> list = new LinkedList<>();
JsonNode pop() {
return list.removeLast();
}
JsonNode top() {
if (list.isEmpty()) {
return null;
}
return list.getLast();
}
void push(JsonNode n) {
list.add(n);
}
boolean isEmpty() {
return list.isEmpty();
}
}
/**
* @return The root node when the whole tree is built.
*
*/
private JsonNode buildTree(JsonToken event) throws IOException {
switch (event) {
case FIELD_NAME:
assert !stack.isEmpty();
fieldName = parser.getCurrentName();
return null;
case START_OBJECT:
stack.push(createNode(stack.top()));
return null;
case START_ARRAY:
stack.push(createArray(stack.top()));
return null;
case END_OBJECT:
case END_ARRAY:
assert !stack.isEmpty();
JsonNode current = stack.pop();
if (stack.isEmpty()) {
return current;
} else {
return null;
}
case VALUE_NUMBER_INT:
assert !stack.isEmpty();
addLong(stack.top(), parser.getLongValue());
return null;
case VALUE_STRING:
assert !stack.isEmpty();
addString(stack.top(), parser.getValueAsString());
return null;
case VALUE_NUMBER_FLOAT:
assert !stack.isEmpty();
addFloat(stack.top(), parser.getFloatValue());
return null;
case VALUE_NULL:
assert !stack.isEmpty();
addNull(stack.top());
return null;
case VALUE_TRUE:
assert !stack.isEmpty();
addBoolean(stack.top(), true);
return null;
case VALUE_FALSE:
assert !stack.isEmpty();
addBoolean(stack.top(), false);
return null;
default:
throw new RuntimeException("Unknown json event " + event);
}
}
private JsonNode createNode(JsonNode current) {
if (ObjectNode.class.isInstance(current)) {
return ObjectNode.class.cast(current).putObject(fieldName);
} else if (ArrayNode.class.isInstance(current)) {
return ArrayNode.class.cast(current).addObject();
} else {
return JsonNodeFactory.instance.objectNode();
}
}
private JsonNode createArray(JsonNode current) {
if (ObjectNode.class.isInstance(current)) {
return ObjectNode.class.cast(current).putArray(fieldName);
} else if (ArrayNode.class.isInstance(current)) {
return ArrayNode.class.cast(current).addArray();
} else {
return JsonNodeFactory.instance.arrayNode();
}
}
private void addLong(JsonNode current, long v) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, v);
} else {
ArrayNode.class.cast(current).add(v);
}
}
private void addString(JsonNode current, String s) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, s);
} else {
ArrayNode.class.cast(current).add(s);
}
}
private void addFloat(JsonNode current, float f) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, f);
} else {
ArrayNode.class.cast(current).add(f);
}
}
private void addNull(JsonNode current) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).putNull(fieldName);
} else {
ArrayNode.class.cast(current).addNull();
}
}
private void addBoolean(JsonNode current, boolean b) {
assert current != null;
if (ObjectNode.class.isInstance(current)) {
ObjectNode.class.cast(current).put(fieldName, b);
} else {
ArrayNode.class.cast(current).add(b);
}
}
}
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论