如何在WebFlux中异步解析/编写JSON?ObjectMapper的方法是阻塞的。

huangapple go评论64阅读模式
英文:

How to async parse/write json in webflux? ObjectMapper methods are blocking

问题

我看到自2.9版本起,Jackson支持非阻塞功能,但如何在Webflux中使用它?有示范吗?

英文:

I saw that jackson supports non-blocking since 2.9, but how to use it with webflux? Is there a demo?

答案1

得分: 1

以下是如何使用WebClient来对Github的“列出仓库”API进行GET请求的示例代码:

public Flux<GithubRepo> listGithubRepositories(String username, String token) {
     return webClient.get()
            .uri("/user/repos")
            .header("Authorization", "Basic " + Base64Utils
                    .encodeToString((username + ":" + token).getBytes(UTF_8)))
            .retrieve()
            .bodyToFlux(GithubRepo.class);
}

假设我们有一个名为GithubRepo的类,它符合Github的API响应格式,上述函数将返回一个GithubRepo对象的Flux流。

英文:

Here is how you can use WebClient to make a GET request to Github’s List Repositories API

public Flux&lt;GithubRepo&gt; listGithubRepositories(String username, String token) {
     return webClient.get()
            .uri(&quot;/user/repos&quot;)
            .header(&quot;Authorization&quot;, &quot;Basic &quot; + Base64Utils
                    .encodeToString((username + &quot;:&quot; + token).getBytes(UTF_8)))
            .retrieve()
            .bodyToFlux(GithubRepo.class);
}

Assuming we have a class named GithubRepo that confirms to the Github’s API response, the above function will return a Flux of GithubRepo objects.

答案2

得分: 0

如果您正在寻找阻塞的ObjectMapper,可以尝试使用subscribeOn方法在诸如有界弹性线程池中进行操作。并使用Mono.fromCallable对其进行包装。

英文:

Just in case if you are looking for blocking ObjectMapper, try with subscribeOn method in a thread pool such as bounded elastic. And wrap it with Mono.fromCallable

答案3

得分: -1

这是代码的中文翻译部分:

我在这里找到了展示实现的代码 - 查看我下面的修改

https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java

请求处理器

    public Mono<ServerResponse> myHttpHandler(ServerRequest request) {
        return this.handlerJsonRequestBodyParser.parse(request)
        .flatMap(map
                -> this.myService.doSomething(x,y)
        )
        .flatMap(response -> ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(id, String.class)
                .log()
        );

    }

类的实现

    package lsp.order.handler;

    import java.io.IOException;
    import java.util.Map;
    import lsp.order.exception.BadJsonRequestBodyException;
    import lsp.order.util.AsyncJsonParser;
    import org.springframework.core.io.ByteArrayResource;
    import org.springframework.stereotype.Component;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import reactor.core.publisher.Mono;

    @Component
    public class HandlerJsonRequestBodyParser {

    public Mono<Map<String, Object>> parse(ServerRequest request) {

        return request.bodyToMono(ByteArrayResource.class)
                .map(ByteArrayResource::getByteArray)
                .flatMap(byteArray -> {
                    try {
                        AsyncJsonParser parser = new AsyncJsonParser();
                        return parser.parse(byteArray);
                    } catch (IOException ex) {
                        return Mono.error(new BadJsonRequestBodyException());
                    }
                });

        }

    }

我修改了它以便按原样使用

    package lsp.order.util;

    import com.fasterxml.jackson.core.JsonFactory;
    import com.fasterxml.jackson.core.JsonToken;
    import com.fasterxml.jackson.core.async.ByteArrayFeeder;
    import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.node.ArrayNode;
    import com.fasterxml.jackson.databind.node.ObjectNode;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import com.fasterxml.jackson.databind.node.JsonNodeFactory;
    import java.io.IOException;
    import java.util.LinkedList;
    import java.util.Map;
    import reactor.core.publisher.Mono;

    /**
     * @credit https://github.com/mmimica/async-jackson
     */
    public class AsyncJsonParser {

    private final NonBlockingJsonParser parser;

    private final Stack stack = new Stack();

    private String fieldName;

    public AsyncJsonParser() throws IOException {

        JsonFactory factory = new JsonFactory();
        parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();

    }

    public Mono<Map<String, Object>> parse(byte[] bytes) throws IOException {

        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());

        ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder();
        boolean consumed = false;
        while (!consumed) {
            if (feeder.needMoreInput()) {
                feeder.feedInput(bytes, 0, bytes.length);
                consumed = true;
            }

            JsonToken event;
            while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) {
                JsonNode root = buildTree(event);
                if (root != null) {
                    Map<String, Object> jsonMap = mapper.treeToValue(root, Map.class);
                    return Mono.just(jsonMap);
                }
            }
        }

        return Mono.error(new RuntimeException("Cannot parse JSON"));

    }

    private static final class Stack {

        private final LinkedList<JsonNode> list = new LinkedList<>();

        JsonNode pop() {
            return list.removeLast();
        }

        JsonNode top() {
            if (list.isEmpty()) {
                return null;
            }
            return list.getLast();
        }

        void push(JsonNode n) {
            list.add(n);
        }

        boolean isEmpty() {
            return list.isEmpty();
        }
    }

    /**
     * @return 当整个树构建完成时的根节点。
     *
     */
    private JsonNode buildTree(JsonToken event) throws IOException {
        switch (event) {
            case FIELD_NAME:
                assert !stack.isEmpty();
                fieldName = parser.getCurrentName();
                return null;

            case START_OBJECT:
                stack.push(createNode(stack.top()));
                return null;

            case START_ARRAY:
                stack.push(createArray(stack.top()));
                return null;

            case END_OBJECT:
            case END_ARRAY:
                assert !stack.isEmpty();
                JsonNode current = stack.pop();
                if (stack.isEmpty()) {
                    return current;
                } else {
                    return null;
                }

            case VALUE_NUMBER_INT:
                assert !stack.isEmpty();
                addLong(stack.top(), parser.getLongValue());
                return null;

            case VALUE_STRING:
                assert !stack.isEmpty();
                addString(stack.top(), parser.getValueAsString());
                return null;

            case VALUE_NUMBER_FLOAT:
                assert !stack.isEmpty();
                addFloat(stack.top(), parser.getFloatValue());
                return null;

            case VALUE_NULL:
                assert !stack.isEmpty();
                addNull(stack.top());
                return null;

            case VALUE_TRUE:
                assert !stack.isEmpty();
                addBoolean(stack.top(), true);
                return null;

            case VALUE_FALSE:
                assert !stack.isEmpty();
                addBoolean(stack.top(), false);
                return null;

            default:
                throw new RuntimeException("未知的 JSON 事件 " + event);
        }
    }

    private JsonNode createNode(JsonNode current) {
        if (ObjectNode.class.isInstance(current)) {
            return ObjectNode.class.cast(current).putObject(fieldName);
        } else if (ArrayNode.class.isInstance(current)) {
            return ArrayNode.class.cast(current).addObject();
        } else {
            return JsonNodeFactory.instance.objectNode();
        }
    }

    private JsonNode createArray(JsonNode current) {
        if (ObjectNode.class.isInstance(current)) {
            return ObjectNode.class.cast(current).putArray(fieldName);
        } else if (ArrayNode.class.isInstance(current)) {
            return ArrayNode.class.cast(current).addArray();
        } else {
            return JsonNodeFactory.instance.arrayNode();
        }
    }

    private void addLong(JsonNode current, long v) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, v);
        } else {
            ArrayNode.class.cast(current).add(v);
        }
    }

    private void addString(JsonNode current, String s) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, s);
        } else {
            ArrayNode.class.cast(current).add(s);
        }
    }

    private void addFloat(JsonNode current, float f) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, f);
        } else {
            ArrayNode.class.cast(current).add(f);
        }
    }

    private void addNull(JsonNode current) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).putNull(fieldName);
        } else {
            ArrayNode.class.cast(current).addNull();
        }
    }

    private void addBoolean(JsonNode current, boolean b) {
        assert current != null

<details>
<summary>英文:</summary>

I found this code showing the implementation -- see how I modified it below.

https://github.com/mmimica/async-jackson/blob/master/src/main/java/com/mmimica/ajackson/AsyncJsonParser.java

Request Handler:

    public Mono&lt;ServerResponse&gt; myHttpHandler(ServerRequest request) {
        return this.handlerJsonRequestBodyParser.parse(request)
        .flatMap(map
                -&gt; this.myService.doSomething(x,y)
        )
        .flatMap(response -&gt; ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(id, String.class)
                .log()
        );

    }

Class implement:

    package lsp.order.handler;

    import java.io.IOException;
    import java.util.Map;
    import lsp.order.exception.BadJsonRequestBodyException;
    import lsp.order.util.AsyncJsonParser;
    import org.springframework.core.io.ByteArrayResource;
    import org.springframework.stereotype.Component;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import reactor.core.publisher.Mono;

    @Component
    public class HandlerJsonRequestBodyParser {

    public Mono&lt;Map&lt;String, Object&gt;&gt; parse(ServerRequest request) {

        return request.bodyToMono(ByteArrayResource.class)
                .map(ByteArrayResource::getByteArray)
                .flatMap(byteArray -&gt; {
                    try {
                        AsyncJsonParser parser = new AsyncJsonParser();
                        return parser.parse(byteArray);
                    } catch (IOException ex) {
                        return Mono.error(new BadJsonRequestBodyException());
                    }
                });

        }

    }

I modified it to use it as is

    package lsp.order.util;

    import com.fasterxml.jackson.core.JsonFactory;
    import com.fasterxml.jackson.core.JsonToken;
    import com.fasterxml.jackson.core.async.ByteArrayFeeder;
    import com.fasterxml.jackson.core.json.async.NonBlockingJsonParser;
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.node.ArrayNode;
    import com.fasterxml.jackson.databind.node.ObjectNode;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import com.fasterxml.jackson.databind.node.JsonNodeFactory;
    import java.io.IOException;
    import java.util.LinkedList;
    import java.util.Map;
    import reactor.core.publisher.Mono;

    /**
     * @credit https://github.com/mmimica/async-jackson
     */
    public class AsyncJsonParser {

    private final NonBlockingJsonParser parser;

    private final Stack stack = new Stack();

    private String fieldName;

    public AsyncJsonParser() throws IOException {

        JsonFactory factory = new JsonFactory();
        parser = (NonBlockingJsonParser) factory.createNonBlockingByteArrayParser();

    }

    public Mono&lt;Map&lt;String, Object&gt;&gt; parse(byte[] bytes) throws IOException {

        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        
        ByteArrayFeeder feeder = parser.getNonBlockingInputFeeder();
        boolean consumed = false;
        while (!consumed) {
            if (feeder.needMoreInput()) {
                feeder.feedInput(bytes, 0, bytes.length);
                consumed = true;
            }
    
            JsonToken event;
            while ((event = parser.nextToken()) != JsonToken.NOT_AVAILABLE) {
                JsonNode root = buildTree(event);
                if (root != null) {
                    Map&lt;String, Object&gt; jsonMap = mapper.treeToValue(root, Map.class);
                    return Mono.just(jsonMap);
                }
            }
        }
        
        return Mono.error(new RuntimeException(&quot;Cannot parse JSON&quot;));

    }

    private static final class Stack {

        private final LinkedList&lt;JsonNode&gt; list = new LinkedList&lt;&gt;();

        JsonNode pop() {
            return list.removeLast();
        }

        JsonNode top() {
            if (list.isEmpty()) {
                return null;
            }
            return list.getLast();
        }

        void push(JsonNode n) {
            list.add(n);
        }

        boolean isEmpty() {
            return list.isEmpty();
        }
    }

    /**
     * @return The root node when the whole tree is built.
     *
     */
    private JsonNode buildTree(JsonToken event) throws IOException {
        switch (event) {
            case FIELD_NAME:
                assert !stack.isEmpty();
                fieldName = parser.getCurrentName();
                return null;

            case START_OBJECT:
                stack.push(createNode(stack.top()));
                return null;

            case START_ARRAY:
                stack.push(createArray(stack.top()));
                return null;

            case END_OBJECT:
            case END_ARRAY:
                assert !stack.isEmpty();
                JsonNode current = stack.pop();
                if (stack.isEmpty()) {
                    return current;
                } else {
                    return null;
                }

            case VALUE_NUMBER_INT:
                assert !stack.isEmpty();
                addLong(stack.top(), parser.getLongValue());
                return null;

            case VALUE_STRING:
                assert !stack.isEmpty();
                addString(stack.top(), parser.getValueAsString());
                return null;

            case VALUE_NUMBER_FLOAT:
                assert !stack.isEmpty();
                addFloat(stack.top(), parser.getFloatValue());
                return null;

            case VALUE_NULL:
                assert !stack.isEmpty();
                addNull(stack.top());
                return null;

            case VALUE_TRUE:
                assert !stack.isEmpty();
                addBoolean(stack.top(), true);
                return null;

            case VALUE_FALSE:
                assert !stack.isEmpty();
                addBoolean(stack.top(), false);
                return null;

            default:
                throw new RuntimeException(&quot;Unknown json event &quot; + event);
        }
    }

    private JsonNode createNode(JsonNode current) {
        if (ObjectNode.class.isInstance(current)) {
            return ObjectNode.class.cast(current).putObject(fieldName);
        } else if (ArrayNode.class.isInstance(current)) {
            return ArrayNode.class.cast(current).addObject();
        } else {
            return JsonNodeFactory.instance.objectNode();
        }
    }

    private JsonNode createArray(JsonNode current) {
        if (ObjectNode.class.isInstance(current)) {
            return ObjectNode.class.cast(current).putArray(fieldName);
        } else if (ArrayNode.class.isInstance(current)) {
            return ArrayNode.class.cast(current).addArray();
        } else {
            return JsonNodeFactory.instance.arrayNode();
        }
    }

    private void addLong(JsonNode current, long v) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, v);
        } else {
            ArrayNode.class.cast(current).add(v);
        }
    }

    private void addString(JsonNode current, String s) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, s);
        } else {
            ArrayNode.class.cast(current).add(s);
        }
    }

    private void addFloat(JsonNode current, float f) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, f);
        } else {
            ArrayNode.class.cast(current).add(f);
        }
    }

    private void addNull(JsonNode current) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).putNull(fieldName);
        } else {
            ArrayNode.class.cast(current).addNull();
        }
    }

    private void addBoolean(JsonNode current, boolean b) {
        assert current != null;

        if (ObjectNode.class.isInstance(current)) {
            ObjectNode.class.cast(current).put(fieldName, b);
        } else {
            ArrayNode.class.cast(current).add(b);
        }
    }

    }

</details>



huangapple
  • 本文由 发表于 2020年5月29日 11:13:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/62078075.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定