如何为Java 11的HttpRequest创建自定义的BodyPublisher

huangapple go评论83阅读模式
英文:

How to create a custom BodyPublisher for Java 11 HttpRequest

问题

我正在尝试创建一个自定义的BodyPublisher,用于反序列化我的JSON对象。当我创建请求并使用BodyPublishersofByteArray方法时,我可以简单地反序列化JSON,但我更愿意使用自定义的发布者。

public class CustomPublisher implements HttpRequest.BodyPublisher {
	private byte[] bytes;
	
	public CustomPublisher(ObjectNode jsonData) {
		...
		// 反序列化jsonData为字节数组
		...
	}
	
	@Override
	public long contentLength() {
		if (bytes == null) return 0;
		return bytes.length;
	}
	
	@Override
	public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);		
	}

    private class CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber<? super ByteBuffer> subscriber;
         private boolean cancelled;
         private Iterator<Byte> byterator;

         private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List<Byte> bytelist = new ArrayList<>();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if (cancelled) return;
             if (n < 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if (byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()}));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

这个实现可以工作,但只有当订阅的request方法以1作为参数调用时才有效。但这是当我与HttpRequest一起使用时发生的情况。

我相当肯定这不是创建自定义订阅的首选或最佳方式,但我尚未找到更好的方法来使其工作。如果有人能引导我走向更好的方向,我将不胜感激。

英文:

I'm trying to create a custom BodyPublisher that would deserialize my JSON object. I could just deserialize the JSON when I'm creating the request and use the ofByteArray method of BodyPublishers but I would rather use a custom publisher.

public class CustomPublisher implements HttpRequest.BodyPublisher {
	private byte[] bytes;
	
	public CustomPublisher(ObjectNode jsonData) {
		...
		// Deserialize jsonData to bytes
		...
	}
	
	@Override
	public long contentLength() {
		if(bytes == null) return 0;
		return bytes.length
	}
	
	@Override
	public void subscribe(Flow.Subscriber&lt;? super ByteBuffer&gt; subscriber) {
        CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
        subscriber.onSubscribe(subscription);		
	}

    private CustomSubscription implements Flow.Subscription {
         private final Flow.Subscriber&lt;? super ByteBuffer&gt; subscriber;
         private boolean cancelled;
         private Iterator&lt;Byte&gt; byterator;

         private CustomSubscription(Flow.Subscriber&lt;? super ByteBuffer&gt; subscriber, byte[] bytes) {
             this.subscriber = subscriber;
             this.cancelled = false;
             List&lt;Byte&gt; bytelist = new ArrayList&lt;&gt;();
             for(byte b : bytes) {
                 bytelist.add(b);
             }
             this.byterator = bytelist.iterator();
         }

         @Override
         public void request(long n) {
             if(cancelled) return;
             if(n &lt; 0) {
                 subscriber.onError(new IllegalArgumentException());
             } else if(byterator.hasNext()) {
                 subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
             } else {
                 subscriber.onComplete();
             }
         }

         @Override
         public void cancel() {
             this.cancelled = true;
         }
    }
}

This implementation works, but only if subscriptions request method gets called with 1 as a parameter. But that's what happens when I am using it with the HttpRequest.

I'm pretty sure this is not any way preferred or optimal way of creating the custom subscription but I have yet to found better way to make it work.

I would greatly appreciate if anyone can lead me to a better path.

答案1

得分: 1

你是正确的,不要将其转换为字节数组,因为这将对大型对象造成内存问题。

我不建议尝试编写自定义的发布者。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream

HttpRequest.BodyPublisher publisher =
    HttpRequest.BodyPublishers.ofInputStream(() -> {
        PipedInputStream in = new PipedInputStream();

        ForkJoinPool.commonPool().submit(() -> {
            try (PipedOutputStream out = new PipedOutputStream(in)) {
                objectMapper.writeTree(
                    objectMapper.getFactory().createGenerator(out),
                    jsonData);
            }
            return null;
        });

        return in;
    });

正如您所注意到的,您可以使用HttpRequest.BodyPublishers.ofByteArray。这对于相对较小的对象是可以的,但我习惯性地进行可扩展性编程。假设代码不需要扩展的问题在于其他开发人员可能会假设可以安全地传递大型对象,而没有意识到性能的影响。

编写自己的请求主体发布者将是很多工作。它的subscribe方法是从Flow.Publisher继承的。

subscribe方法的文档以以下内容开头:

如果可能的话,添加给定的Subscriber。

每次调用您的subscribe方法时,您需要将Subscriber添加到某种集合中,需要创建Flow.Subscription的实现,并立即将其传递给订阅者的onSubscribe方法。您的Subscription实现对象需要在订阅方法调用时通过调用相应的Subscriber的(不仅仅是任何Subscriber的)onNext方法发送一个或多个ByteBuffer,并且一旦发送了所有数据,必须调用相同Subscriber的onComplete()方法。此外,Subscription实现对象需要处理cancel请求。

您可以通过扩展SubmissionPublisher来简化这些操作,它是Flow.Publisher的默认实现,然后向其添加一个contentLength()方法。但是,正如SubmissionPublisher文档所示,即使是最小的工作实现也仍然需要相当多的工作。

HttpRequest.BodyPublishers.of…方法将为您完成所有这些工作。ofByteArray适用于小对象,但ofInputStream将适用于您可能传递的任何对象。

英文:

You are right to avoid making a byte array out of it, as that would create memory issues for large objects.

I wouldn’t try to write a custom publisher. Rather, just take advantage of the factory method HttpRequest.BodyPublishers.ofInputStream.

HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -&gt;  {
PipedInputStream in = new PipedInputStream();
ForkJoinPool.commonPool().submit(() -&gt; {
try (PipedOutputStream out = new PipedOutputStream(in)) {
objectMapper.writeTree(
objectMapper.getFactory().createGenerator(out),
jsonData);
}
return null;
});
return in;
});

As you have noted, you can use HttpRequest.BodyPublishers.ofByteArray. That is fine for relatively small objects, but I program for scalability out of habit. The problem with assuming code won’t need to scale is that other developers will assume it is safe to pass large objects, without realizing the impact on performance.

Writing your own body publisher will be a lot of work. Its subscribe method is inherited from Flow.Publisher.

The documentation for the subscribe method starts with this:

> Adds the given Subscriber if possible.

Each time your subscribe method is called, you need to add the Subscriber to some sort of colllection, you need to create an implementation of Flow.Subscription, and you need to immediately pass it to the subscriber’s onSubscribe method. Your Subscription implementation object needs to send back one or more ByteBuffers, only when the Subscription’s request method is called, by invoking the corresponding Subscriber’s (not just any Subscriber’s) onNext method, and once you’ve sent all of the data, you must call the same Subscriber’s onComplete() method. On top of that, the Subscription implementation object needs to handle cancel requests.

You can make a lot of this easier by extending SubmissionPublisher, which is a default implementation of Flow.Publisher, and then adding a contentLength() method to it. But as the SubmissionPublisher documentation shows, you still have a fair amount of work to do, for even a minimal working implementation.

The HttpRequest.BodyPublishers.of… methods will do all of this for you. ofByteArray is okay for small objects, but ofInputStream will work for any object you could ever pass in.

huangapple
  • 本文由 发表于 2020年8月7日 20:43:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/63302128.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定