英文:
Why is the piped input stream closed?
问题
我运行一个小型的Java 11 HTTP服务器,处理包含压缩数据的多部分流请求。数据通过管道流读取并解压缩。有时会出现IOException: Pipe closed
的异常,不过解压后的文件是正常的。我无法找到代码中的错误。感谢任何帮助。
异常堆栈跟踪如下:
[java] 27016 [HTTP-Dispatcher] ERROR c.example.AhFormatter - e9137279-7c15-41a3-9783-eb029a975767 - Error while processing finish request
[java] java.io.IOException: Pipe closed
[java] at java.base/java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:260)
[java] at java.base/java.io.PipedInputStream.receive(PipedInputStream.java:226)
[java] at java.base/java.io.PipedOutputStream.write(PipedOutputStream.java:149)
[java] at java.base/java.io.InputStream.transferTo(InputStream.java:705)
[java] at com.example.upload.MultipartStream.readBodyData(MultipartStream.java:469)
[java] at com.example.AhFormatter.processPdfData(AhFormatter.java:291)
以下方法处理多部分流:
private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
String headers = multipartStream.readHeaders();
if (!headers.contains("name=\"pdf\"")) {
logger.warn("{} - Header with name pdf not found", taskId);
throw new IllegalStateException();
}
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream();
pipedInputStream.connect(pipedOutputStream);
Thread unzipThread = new Thread(() -> {
try {
Zips.unzip(pipedInputStream, targetPath, true);
} catch (IOException e) {
logger.error(format("%s - Error during unzip", taskId), e);
throw new ZipException(e);
}
}, "PipedZipStream");
unzipThread.setName("UnzipThread");
unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
unzipThread.start();
multipartStream.readBodyData(pipedOutputStream);
unzipThread.join();
pipedOutputStream.close();
pipedInputStream.close();
}
解压方法如下:
public static void unzip(InputStream zip, Path targetDirectory, boolean close) throws IOException {
ZipInputStream zipStream = new ZipInputStream(zip);
try {
ZipEntry zipEntry;
while ((zipEntry = zipStream.getNextEntry()) != null) {
File targetFile = guardAgainstZipSlip(targetDirectory, zipEntry);
if (zipEntry.isDirectory()) {
if (!targetFile.isDirectory() && !targetFile.mkdirs()) {
throw an IOException("Failed to create directory " + targetFile);
}
continue;
}
// fix for Windows-created archives
File parent = targetFile.getParentFile();
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory " + parent);
}
copy(zipStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
} finally {
if (close) {
zipStream.close();
}
}
}
private static File guardAgainstZipSlip(Path destinationDir, ZipEntry zipEntry) throws IOException {
File targetFile = new File(destinationDir.toFile(), zipEntry.getName());
if (!targetFile.getCanonicalPath().startsWith(destinationDir.toFile().getCanonicalPath() + File.separator)) {
throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
}
return targetFile;
}
MultipartStream
类来自Apache。我包含了这个类,因为我不想因为一个类而引入整个库。
我的理解是,解压线程会一直保持打开状态,直到ZipStream没有条目为止。然后它将关闭。这发生在多部分流将zip的最后字节写入管道输出流时。
当我不显式关闭流时,我看不到这个异常,至少到目前为止,所有我的测试都很好。但是关闭异常会有什么问题呢?
英文:
I run a small Java 11 http server which processes multipart stream requests containing zipped data. The data is read and unzipped with piped streams. Sometimes IOException: Pipe closed
occur, however the unzipped files are ok. I am not able to spot the flaw in my code. Every help is appreciated.
The exception stacktrace
[java] 27016 [HTTP-Dispatcher] ERROR c.example.AhFormatter - e9137279-7c15-41a3-9783-eb029a975767 - Error while processing finish request
[java] java.io.IOException: Pipe closed
[java] at java.base/java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:260)
[java] at java.base/java.io.PipedInputStream.receive(PipedInputStream.java:226)
[java] at java.base/java.io.PipedOutputStream.write(PipedOutputStream.java:149)
[java] at java.base/java.io.InputStream.transferTo(InputStream.java:705)
[java] at com.example.upload.MultipartStream.readBodyData(MultipartStream.java:469)
[java] at com.example.AhFormatter.processPdfData(AhFormatter.java:291)
Following method processes the multipart stream:
private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
String headers = multipartStream.readHeaders();
if (!headers.contains("name=\"pdf\"")) {
logger.warn("{} - Header with name pdf not found", taskId);
throw new IllegalStateException();
}
PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream();
pipedInputStream.connect(pipedOutputStream);
Thread unzipThread = new Thread(() -> {
try {
Zips.unzip(pipedInputStream, targetPath, true);
} catch (IOException e) {
logger.error(format("%s - Error during unzip", taskId), e);
throw new ZipException(e);
}
}, "PipedZipStream");
unzipThread.setName("UnzipThread");
unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
unzipThread.start();
multipartStream.readBodyData(pipedOutputStream);
unzipThread.join();
pipedOutputStream.close();
pipedInputStream.close();
}
And the unzip method:
public static void unzip(InputStream zip, Path targetDirectory, boolean close) throws IOException {
ZipInputStream zipStream = new ZipInputStream(zip);
try {
ZipEntry zipEntry;
while ((zipEntry = zipStream.getNextEntry()) != null) {
File targetFile = guardAgainstZipSlip(targetDirectory, zipEntry);
if (zipEntry.isDirectory()) {
if (!targetFile.isDirectory() && !targetFile.mkdirs()) {
throw new IOException("Failed to create directory " + targetFile);
}
continue;
}
// fix for Windows-created archives
File parent = targetFile.getParentFile();
if (!parent.isDirectory() && !parent.mkdirs()) {
throw new IOException("Failed to create directory " + parent);
}
copy(zipStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
} finally {
if (close) {
zipStream.close();
}
}
}
private static File guardAgainstZipSlip(Path destinationDir, ZipEntry zipEntry) throws IOException {
File targetFile = new File(destinationDir.toFile(), zipEntry.getName());
if (!targetFile.getCanonicalPath().startsWith(destinationDir.toFile().getCanonicalPath() + File.separator)) {
throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
}
return targetFile;
}
The multipartstream class is from Apache. The class itself is included because I didn't want to pull in a whole library because of one class.
My understandig is that the unzip thread stays open until the ZipStream has no entries left. Then it will shutdown. This happens when the multipartstream wrote the last bytes of the zip into the piped output stream.
When I don't close the streams explicitly I don't see this exception, at least all my tests so far are good. But what should be wrong with closing the exceptions?
答案1
得分: 0
g00se的评论让我继续前进。 readBodyData
方法涉及两个流,即来自HTTP请求的流和PipedOutputStream
。 该方法不会关闭这两个流,它在内部使用public long transferTo(OutputStream out) throws IOException
,这个方法不会关闭任何一个流。在将此部分移到另一个线程中,即在创建管道并启动数据传输的控制线程之外,我的错误消失了。
现在的解决方案由三个独立的线程组成。第一个线程是控制线程,仅创建管道。第二个线程处理与以前相同的解压缩。第三个线程实际上是新线程,负责复制数据。流在线程内部关闭。请查看以下代码以了解详细信息。
控制线程
private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
String headers = multipartStream.readHeaders();
if (!headers.contains("name=\"pdf\"")) {
logger.warn("{} - Header with name pdf not found", taskId);
throw new IllegalStateException();
}
final PipedInputStream pipedInputStream = new PipedInputStream();
final PipedOutputStream pipedOutputStream = new PipedOutputStream();
pipedInputStream.connect(pipedOutputStream);
UnzipThread unzipThread = new UnzipThread(pipedInputStream, targetPath);
unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
ZipReaderThread zipReaderThread = new ZipReaderThread(multipartStream, pipedOutputStream);
zipReaderThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while transferring bytes to unzip thread", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
unzipThread.start();
zipReaderThread.start();
unzipThread.join();
}
解压线程
package com.example.dita;
import com.example.Zips;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
public class UnzipThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(UnzipThread.class);
private final InputStream stream;
private final Path targetDirectory;
public UnzipThread(InputStream stream, Path targetDirectory) {
this.stream = stream;
this.targetDirectory = targetDirectory;
this.setName("UnzipThread");
}
@Override
public void run() {
try {
Zips.unzip(stream, targetDirectory, true);
} catch (IOException e) {
logger.error("Error during unzip", e);
throw new ZipException(e);
}
}
}
数据复制线程
package com.example.dita;
import com.example.dita.upload.MultipartStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
public class ZipReaderThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(ZipReaderThread.class);
private final MultipartStream multipartStream;
private final OutputStream stream;
public ZipReaderThread(MultipartStream multipartStream, OutputStream stream) {
this.multipartStream = multipartStream;
this.stream = stream;
this.setName("ZipReaderThread");
}
@Override
public void run() {
try {
multipartStream.readBodyData(stream);
stream.close();
} catch (IOException e) {
logger.error("Error during shuffling bytes into unzipper", e);
throw new ZipException(e);
}
}
}
不幸的是,我无法解释为什么只使用两个线程不起作用。根据我从Java文档中了解到的内容,它应该能够正常工作。无论如何,现在似乎可以正常工作了,希望这个答案也能帮助其他人。
英文:
The comments from g00se kept me going. The readBodyData
method touches two streams, the stream from the http request and the PipedOutputStream
. Both streams are not closed by this method, internally it uses
public long transferTo(OutputStream out) throws IOException
which doesn't close either stream.
After moving this part into another thread, i.e. out of the controlling thread which created the pipes and started the data shuffeling, my errors disappeared.
The solution consists now of three independent threads. The first one is the controlling thread which just creates the pipe now. The second thread handles the unzipping like before. The third thread is actually new and handles the copy of the data. The streams are closed inside of the threads. Please have a look at the following code for clarification.
Controlling thread
private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
String headers = multipartStream.readHeaders();
if (!headers.contains("name=\"pdf\"")) {
logger.warn("{} - Header with name pdf not found", taskId);
throw new IllegalStateException();
}
final PipedInputStream pipedInputStream = new PipedInputStream();
final PipedOutputStream pipedOutputStream = new PipedOutputStream();
pipedInputStream.connect(pipedOutputStream);
UnzipThread unzipThread = new UnzipThread(pipedInputStream, targetPath);
unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
ZipReaderThread zipReaderThread = new ZipReaderThread(multipartStream, pipedOutputStream);
zipReaderThread.setUncaughtExceptionHandler((thread, throwable) -> {
logger.error(format("%s - Uncaught exception while transfering bytes to unzip thread", taskId), throwable);
server.stop(0);
stopServer.set(true);
});
unzipThread.start();
zipReaderThread.start();
unzipThread.join();
}
#Unzip thread
package com.example.dita;
import com.example.Zips;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
public class UnzipThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(UnzipThread.class);
private final InputStream stream;
private final Path targetDirectory;
public UnzipThread(InputStream stream, Path targetDirectory) {
this.stream = stream;
this.targetDirectory = targetDirectory;
this.setName("UnzipThread");
}
@Override
public void run() {
try {
Zips.unzip(stream, targetDirectory, true);
} catch (IOException e) {
logger.error("Error during unzip", e);
throw new ZipException(e);
}
}
}
#Data copy thread
package com.example.dita;
import com.example.dita.upload.MultipartStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
public class ZipReaderThread extends Thread {
private static final Logger logger = LoggerFactory.getLogger(ZipReaderThread.class);
private final MultipartStream multipartStream;
private final OutputStream stream;
public ZipReaderThread(MultipartStream multipartStream, OutputStream stream) {
this.multipartStream = multipartStream;
this.stream = stream;
this.setName("ZipReaderThread");
}
@Override
public void run() {
try {
multipartStream.readBodyData(stream);
stream.close();
} catch (IOException e) {
logger.error("Error during shuffling bytes into unzipper", e);
throw new ZipException(e);
}
}
}
Unfortunatly I cannot expain why it doesn't work with just two threads. From what I understood from the Java docs it should have worked. Anyway it seems to work now and I hope this answer helps others too.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论