可以在不将其读入内存的情况下多次重用HTTP请求体吗?

huangapple go评论96阅读模式
英文:

Is it possible to reuse an HTTP Request Body multiple times without reading it into memory?

问题

我的要求是进行大文件上传,使用 curl -T <file> 命令,服务器端是使用 Rust 编写的 HTTP 服务器,可以使用 actix-web(或者如果有更合适的实现方式,可以使用 hyper),使用 HTTP/1.1 协议。在 HTTP/1.1 中,服务器只能一次读取文件上传的数据流,我想要在不将文件数据流读入内存的情况下计算文件的 MD5SHA-1SHA-256(或者 BLAKE3),然后将其上传到 S3。如果有并行计算的解决方案,那将更好,但可能会更复杂,关键仍然是要重用数据流。

最好不要使用 channel 来实现,因为数据流将通过多个 channel 流动以计算校验和,这可能比同时重用数据流进行计算的性能要低。

这需要基于 Rust 2021 Edition,actix-web 4.x 版本(最新版本为 4.3.0),如果使用 tokio,则必须至少为 1.0.0+(我当前使用的是 1.25.0)。这是因为 Rust 中库版本的兼容性非常差,每个次要版本之间可能存在 API 不兼容性。

请问我的需求可以使用 Rust 实现吗?在 Go 中,除了 io.Pipe,似乎没有更好的解决方案,但 io.Pipe 的性能似乎不太好。

非常感谢您的建议和耐心!


更新:您可以使用 Go 或 Java 来满足这个需求。

英文:

My requirement is a large file upload, using curl -T &lt;file&gt;, the server side is an HTTP Server in Rust, the implementation can be actix-web (or hyper if other implementations are more suitable), using HTTP/1.1. In HTTP/1.1, the server can only read the file upload data stream once, and I want to calculate the file's MD5, SHA-1 and SHA-256 (or BLAKE3) without reading the file data stream into memory, and then upload it to S3. If there is a solution for parallel calculation, it will be better, but it may be more complex, the key is still to reuse the stream.

It is best not to use a channel to implement it, as the stream will flow through multiple channel to calculate the Checksum, which may have a lower performance than reusing the stream to calculate at the same time.

This needs to be based on the Rust 2021 Edition, actix-web 4.x version (the latest is 4.3.0), and if tokio is used, it must be at least 1.0.0+ (I am currently 1.25.0). This is because the compatibility of library versions in Rust is too poor, and there may be API incompatibilities between each minor version.

May I ask if my requirement can be implemented using Rust? In Go, it seems that there is no better solution besides io.Pipe, but the performance of io.Pipe does not look good.

Thank you very much for your advice and patience!


Update: You can use either Go or Java for this requirement.

答案1

得分: 1

在标准硬件上,没有办法在不将文件放入内存的情况下进行计算。解决方案是使用BufRead的实现者(例如BufReader)以块的方式读取文件。

这在同步的Rust中完全可行,但由于你指定了tokio,我将其改为了异步的方式,使用了tokio的IO类型和特性。你也可以使用标准库的IO类型和特性以同步的方式实现,它们的命名方式类似。

首先,这是一个函数,它接受一个读取器(reader),更新哈希函数的切片,并将这些字节写入一个写入器(writer)。本质上,这与tokio::io::copy_buf相似,但在中间加入了哈希函数。

pub async fn calculate_hashes<R, W>(
    mut reader: R,
    hashers: &mut [&mut dyn Update],
    mut writer: W,
) -> Result<(), std::io::Error>
where
    R: AsyncBufRead + Unpin,
    W: AsyncWrite + Unpin,
{
    loop {
        let part = reader.fill_buf().await?;
        if part.is_empty() {
            break;
        }

        for hasher in hashers.iter_mut() {
            hasher.update(part);
        }

        writer.write_all(part).await?;

        let part_len = part.len();
        reader.consume(part_len);
    }

    Ok(())
}

这里使用了digest库中的一个特性:Update。幸运的是,已经有了使用digest的特定哈希函数的实现:md-5sha1sha2(确保使用与digest相同版本的依赖,目前都是0.10.5)。

下面是该函数的使用示例:

pub async fn md5_sha1_sha256<R, W>(
    reader: R,
    writer: W,
) -> Result<(Md5, Sha1, Sha256), std::io::Error>
where
    R: AsyncBufRead + Unpin,
    W: AsyncWrite + Unpin,
{
    let mut md5_hasher = Md5::new();
    let mut sha1_hasher = Sha1::new();
    let mut sha256_hasher = Sha256::new();

    calculate_hashes(
        reader,
        &mut [&mut md5_hasher, &mut sha1_hasher, &mut sha256_hasher],
        writer,
    )
    .await?;

    Ok((md5_hasher, sha1_hasher, sha256_hasher))
}

如果你想进一步,可以创建一个包装类型,为所有的Update类型实现AsyncWrite,这样你甚至不需要分开hasherswriter。你也可以反过来,将hashers拆分为特定的哈希函数类型或一定数量的泛型参数,这样可以避免使用dyn。我在这里使用了dyn切片,以便在不太具体的情况下对其他人有帮助。

这是在playground上的完整代码链接,不过由于缺少依赖项,它无法编译通过。

英文:

There's no such thing as doing calculations without putting the file in memory, at least on standard hardware. The solution is reading it in chunks with an implementer of BufRead, such as BufReader.

This is fully possible in synchronous rust, but since you specified tokio, I made this async, using tokio's IO types and traits. You can make the same thing synchronously with the standard library's IO types and traits, which are named similarly.

First, a function that takes a reader, updates a slice of hash functions, and then writes those bytes to a writer. This is essentially tokio::io::copy_buf but with hash functions in the middle.

pub async fn calculate_hashes&lt;R, W&gt;(
    mut reader: R,
    hashers: &amp;mut [&amp;mut dyn Update],
    mut writer: W,
) -&gt; Result&lt;(), std::io::Error&gt;
where
    R: AsyncBufRead + Unpin,
    W: AsyncWrite + Unpin,
{
    loop {
        let part = reader.fill_buf().await?;
        if part.is_empty() {
            break;
        }

        for hasher in hashers.iter_mut() {
            hasher.update(part);
        }

        writer.write_all(part).await?;

        let part_len = part.len();
        reader.consume(part_len);
    }

    Ok(())
}

This uses a trait from the digest crate: Update. Luckily, there's already implementations of your specific hash functions that use digest: md-5, sha1, and sha2 (make sure you use versions that depend on the same version of digest, which conveniently are all 0.10.5 at the moment).

Here's that function in use:

pub async fn md5_sha1_sha256&lt;R, W&gt;(
    reader: R,
    writer: W,
) -&gt; Result&lt;(Md5, Sha1, Sha256), std::io::Error&gt;
where
    R: AsyncBufRead + Unpin,
    W: AsyncWrite + Unpin,
{
    let mut md5_hasher = Md5::new();
    let mut sha1_hasher = Sha1::new();
    let mut sha256_hasher = Sha256::new();

    calculate_hashes(
        reader,
        &amp;mut [&amp;mut md5_hasher, &amp;mut sha1_hasher, &amp;mut sha256_hasher],
        writer,
    )
    .await?;

    Ok((md5_hasher, sha1_hasher, sha256_hasher))
}

If you want to go further, you could make a wrapper type that implements AsyncWrite for all Update types, and then you wouldn't even need to separate hashers and writer. You could also go the other way and split hashers into your specific hash function types or a specific number of generic arguments, which avoids dyn. I used a dyn slice here so that it is helpful to others in less specific situations.

Here's a link to the whole thing on playground. It doesn't compile because of missing dependencies, though.

答案2

得分: 0

这里是一个Go语言的实现思路:在将请求体写入S3存储时计算校验和。

// ...

md5Hasher := md5.New()
sha1Hasher := sha1.New()
sha256Hasher := sha256.New()

type Checksum struct {
    Md5    string `json:"md5"`
    Sha1   string `json:"sha1"`
    Sha256 string `json:"sha256"`
}

hasher := io.MultiWriter(md5Hasher, sha1Hasher, sha256Hasher)
tr := io.TeeReader(r.Body, hasher)

// 将文件上传到S3。
_, err := uploader.Upload(&s3manager.UploadInput{
    Bucket: aws.String("bucket"),
    Key:    aws.String("key"),
    Body:   tr,
})
if err != nil {
    log.Printf("上传文件失败,%v", err)
    w.WriteHeader(http.StatusInternalServerError)
    return
}

checksum := Checksum{
    Md5:    fmt.Sprintf("%x", md5Hasher.Sum(nil)),
    Sha1:   fmt.Sprintf("%x", sha1Hasher.Sum(nil)),
    Sha256: fmt.Sprintf("%x", sha256Hasher.Sum(nil)),
}

respBody, _ := json.MarshalIndent(checksum, "", "  ")
fmt.Printf("%s\n", respBody)

w.Write(respBody)

但是现在我在思考,我能否通过io.MultiWriter先计算校验和,然后再决定是否上传请求体。但是在这一点上,似乎没有好的方法可以在不将其读入内存或写入临时文件的情况下使用已经读取的请求体作为上传逻辑。

英文:

Here is one implementation idea in Go: Calculate the Checksum of the Request Body while writing it to the S3 storage.

// ...

md5Hasher := md5.New()
sha1Hasher := sha1.New()
sha256Hasher := sha256.New()

type Checksum struct {
	Md5    string `json:&quot;md5&quot;`
	Sha1   string `json:&quot;sha1&quot;`
	Sha256 string `json:&quot;sha256&quot;`
}

hasher := io.MultiWriter(md5Hasher, sha1Hasher, sha256Hasher)
tr := io.TeeReader(r.Body, hasher)

// Upload the file to S3.
_, err := uploader.Upload(&amp;s3manager.UploadInput{
    Bucket: aws.String(&quot;bucket&quot;),
    Key:    aws.String(&quot;key&quot;),
    Body:   tr,
})
if err != nil {
    log.Printf(&quot;failed to upload file, %v&quot;, err)
    w.WriteHeader(http.StatusInternalServerError)
    return
}

checksum := Checksum{
	Md5:    fmt.Sprintf(&quot;%x&quot;, md5Hasher.Sum(nil)),
	Sha1:   fmt.Sprintf(&quot;%x&quot;, sha1Hasher.Sum(nil)),
	Sha256: fmt.Sprintf(&quot;%x&quot;, sha256Hasher.Sum(nil)),
}

respBody, _ := json.MarshalIndent(checksum, &quot;&quot;, &quot;  &quot;)
fmt.Printf(&quot;%s\n&quot;, respBody)

w.Write(respBody)

But now I am thinking, can I calculate the Checksum first through io.MultiWriter and then decide whether to upload the Request Body. But at this point, it seems there is no good way to use a Request Body that has already been read as an upload logic without reading it into memory or writing it to a temporary file.

huangapple
  • 本文由 发表于 2023年2月21日 09:35:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75515595.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定