英文:
How to create a custom BodyPublisher for Java 11 HttpRequest
问题
我正在尝试创建一个自定义的BodyPublisher
,用于反序列化我的JSON对象。当我创建请求并使用BodyPublishers
的ofByteArray
方法时,我可以简单地反序列化JSON,但我更愿意使用自定义的发布者。
public class CustomPublisher implements HttpRequest.BodyPublisher {
private byte[] bytes;
public CustomPublisher(ObjectNode jsonData) {
...
// 反序列化jsonData为字节数组
...
}
@Override
public long contentLength() {
if (bytes == null) return 0;
return bytes.length;
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
subscriber.onSubscribe(subscription);
}
private class CustomSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private boolean cancelled;
private Iterator<Byte> byterator;
private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
this.subscriber = subscriber;
this.cancelled = false;
List<Byte> bytelist = new ArrayList<>();
for(byte b : bytes) {
bytelist.add(b);
}
this.byterator = bytelist.iterator();
}
@Override
public void request(long n) {
if (cancelled) return;
if (n < 0) {
subscriber.onError(new IllegalArgumentException());
} else if (byterator.hasNext()) {
subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()}));
} else {
subscriber.onComplete();
}
}
@Override
public void cancel() {
this.cancelled = true;
}
}
}
这个实现可以工作,但只有当订阅的request
方法以1作为参数调用时才有效。但这是当我与HttpRequest一起使用时发生的情况。
我相当肯定这不是创建自定义订阅的首选或最佳方式,但我尚未找到更好的方法来使其工作。如果有人能引导我走向更好的方向,我将不胜感激。
英文:
I'm trying to create a custom BodyPublisher
that would deserialize my JSON object. I could just deserialize the JSON when I'm creating the request and use the ofByteArray
method of BodyPublishers
but I would rather use a custom publisher.
public class CustomPublisher implements HttpRequest.BodyPublisher {
private byte[] bytes;
public CustomPublisher(ObjectNode jsonData) {
...
// Deserialize jsonData to bytes
...
}
@Override
public long contentLength() {
if(bytes == null) return 0;
return bytes.length
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
CustomSubscription subscription = new CustomSubscription(subscriber, bytes);
subscriber.onSubscribe(subscription);
}
private CustomSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private boolean cancelled;
private Iterator<Byte> byterator;
private CustomSubscription(Flow.Subscriber<? super ByteBuffer> subscriber, byte[] bytes) {
this.subscriber = subscriber;
this.cancelled = false;
List<Byte> bytelist = new ArrayList<>();
for(byte b : bytes) {
bytelist.add(b);
}
this.byterator = bytelist.iterator();
}
@Override
public void request(long n) {
if(cancelled) return;
if(n < 0) {
subscriber.onError(new IllegalArgumentException());
} else if(byterator.hasNext()) {
subscriber.onNext(ByteBuffer.wrap(new byte[]{byterator.next()));
} else {
subscriber.onComplete();
}
}
@Override
public void cancel() {
this.cancelled = true;
}
}
}
This implementation works, but only if subscriptions request
method gets called with 1 as a parameter. But that's what happens when I am using it with the HttpRequest.
I'm pretty sure this is not any way preferred or optimal way of creating the custom subscription but I have yet to found better way to make it work.
I would greatly appreciate if anyone can lead me to a better path.
答案1
得分: 1
你是正确的,不要将其转换为字节数组,因为这将对大型对象造成内存问题。
我不建议尝试编写自定义的发布者。相反,只需利用工厂方法HttpRequest.BodyPublishers.ofInputStream。
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -> {
PipedInputStream in = new PipedInputStream();
ForkJoinPool.commonPool().submit(() -> {
try (PipedOutputStream out = new PipedOutputStream(in)) {
objectMapper.writeTree(
objectMapper.getFactory().createGenerator(out),
jsonData);
}
return null;
});
return in;
});
正如您所注意到的,您可以使用HttpRequest.BodyPublishers.ofByteArray
。这对于相对较小的对象是可以的,但我习惯性地进行可扩展性编程。假设代码不需要扩展的问题在于其他开发人员可能会假设可以安全地传递大型对象,而没有意识到性能的影响。
编写自己的请求主体发布者将是很多工作。它的subscribe
方法是从Flow.Publisher继承的。
subscribe
方法的文档以以下内容开头:
如果可能的话,添加给定的Subscriber。
每次调用您的subscribe
方法时,您需要将Subscriber添加到某种集合中,需要创建Flow.Subscription的实现,并立即将其传递给订阅者的onSubscribe
方法。您的Subscription实现对象需要在订阅方法调用时通过调用相应的Subscriber的(不仅仅是任何Subscriber的)onNext方法发送一个或多个ByteBuffer,并且一旦发送了所有数据,必须调用相同Subscriber的onComplete()
方法。此外,Subscription实现对象需要处理cancel
请求。
您可以通过扩展SubmissionPublisher来简化这些操作,它是Flow.Publisher的默认实现,然后向其添加一个contentLength()
方法。但是,正如SubmissionPublisher文档所示,即使是最小的工作实现也仍然需要相当多的工作。
HttpRequest.BodyPublishers.of…
方法将为您完成所有这些工作。ofByteArray
适用于小对象,但ofInputStream
将适用于您可能传递的任何对象。
英文:
You are right to avoid making a byte array out of it, as that would create memory issues for large objects.
I wouldn’t try to write a custom publisher. Rather, just take advantage of the factory method HttpRequest.BodyPublishers.ofInputStream.
HttpRequest.BodyPublisher publisher =
HttpRequest.BodyPublishers.ofInputStream(() -> {
PipedInputStream in = new PipedInputStream();
ForkJoinPool.commonPool().submit(() -> {
try (PipedOutputStream out = new PipedOutputStream(in)) {
objectMapper.writeTree(
objectMapper.getFactory().createGenerator(out),
jsonData);
}
return null;
});
return in;
});
As you have noted, you can use HttpRequest.BodyPublishers.ofByteArray
. That is fine for relatively small objects, but I program for scalability out of habit. The problem with assuming code won’t need to scale is that other developers will assume it is safe to pass large objects, without realizing the impact on performance.
Writing your own body publisher will be a lot of work. Its subscribe
method is inherited from Flow.Publisher.
The documentation for the subscribe
method starts with this:
> Adds the given Subscriber if possible.
Each time your subscribe
method is called, you need to add the Subscriber to some sort of colllection, you need to create an implementation of Flow.Subscription, and you need to immediately pass it to the subscriber’s onSubscribe
method. Your Subscription implementation object needs to send back one or more ByteBuffers, only when the Subscription’s request
method is called, by invoking the corresponding Subscriber’s (not just any Subscriber’s) onNext method, and once you’ve sent all of the data, you must call the same Subscriber’s onComplete()
method. On top of that, the Subscription implementation object needs to handle cancel
requests.
You can make a lot of this easier by extending SubmissionPublisher, which is a default implementation of Flow.Publisher, and then adding a contentLength()
method to it. But as the SubmissionPublisher documentation shows, you still have a fair amount of work to do, for even a minimal working implementation.
The HttpRequest.BodyPublishers.of… methods will do all of this for you. ofByteArray
is okay for small objects, but ofInputStream
will work for any object you could ever pass in.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论