如何使用Go将来自传入请求的UploadPart S3操作流式传输到AWS S3?

huangapple go评论135阅读模式
英文:

How to stream the UploadPart S3 operation from an incoming request to AWS S3 using go?

问题

上下文:

我和我的团队正在构建一个反向代理,拦截所有发往S3的出站请求,以便审计和控制来自不同应用程序的访问。

我们已经成功地通过流式传输文件内容来实现了几乎所有的操作。例如,为了使用单个操作进行上传,我们使用s3manager.Uploader将传入请求的主体(一个io.Reader)流式传输到S3,并且为了下载(包括单个和多部分),我们使用原始的io.Copy将来自s3.GetObjectOutput.Body(一个io.ReadCloser)的响应写入。

问题:

我们仍然无法通过流式传输来实现upload-part(在多部分上传的上下文中)。问题在于s3.UploadPartInput需要一个aws.ReadSeekCloser,而要传递传入请求的主体,您需要在某个地方缓冲它(例如,内存中)。

到目前为止,我们的代码如下:

func (ph *VaultProxyHandler) HandleUploadPart(w http.ResponseWriter, r *http.Request, s3api s3iface.S3API, bucket string, key string, uploadID string, part int64) {
    buf := bytes.NewBuffer(nil)
    
    // 这里将整个主体加载到内存中
    if _, err := io.Copy(buf, r.Body); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    payload := buf.Bytes()

    input := &s3.UploadPartInput{
        Bucket:     aws.String(bucket),
        Key:        aws.String(key),
        UploadId:   aws.String(uploadID),
        PartNumber: aws.Int64(part),
        Body:       aws.ReadSeekCloser(bytes.NewReader(payload)),
    }

    output, err := s3api.UploadPart(input)

    // 其他操作...
}

问题:

有没有办法将UploadPart的传入请求流式传输到S3?(通过流式传输,我指的是不将整个主体存储在内存中)。

英文:

Context

With my team we are building a reverse proxy to intercept all outgoing requests to S3 in order to audit and control the access from the different apps.

We've successfully implemented almost all operations by streaming the content of the files. For instance, to upload with single operation we used the s3manager.Uploader to stream the body of the incoming request (which is an io.Reader) to S3 and to download (both single and multipart flavor) we used the primitive io.Copy to write the response from the s3.GetObjectOutput.Body (which is an io.ReadCloser).

The problem:

The only operation we still couldn't implement via streaming was the upload-part (in the context of a multipart uploading). The problem is that s3.UploadPartInput needs a aws.ReadSeekCloser and to pass the body of the incoming request you need to buffer it in some place (for example, in memory).

This is what we have so far:

func (ph *VaultProxyHandler) HandleUploadPart(w http.ResponseWriter, r *http.Request, s3api s3iface.S3API, bucket string, key string, uploadID string, part int64) {
	buf := bytes.NewBuffer(nil)
    
    // here loads the entire body to memory
	if _, err := io.Copy(buf, r.Body); err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}

	payload := buf.Bytes()

	input := &s3.UploadPartInput{
		Bucket:     aws.String(bucket),
		Key:        aws.String(key),
		UploadId:   aws.String(uploadID),
		PartNumber: aws.Int64(part),
		Body:       aws.ReadSeekCloser(bytes.NewReader(payload)),
	}

	output, err := s3api.UploadPart(input)

	// and so on...
}

Question:

Is there a way to stream an incoming request of an UploadPart to S3? (by stream I mean not to store the entire body in memory).

答案1

得分: 0

最后,我找到了一种通过使用AWS SDK构建请求并使用"unsigned payload"进行签名的方法,以流处理方式反向代理传入的UploadPart

这里是一个基本示例:

type AwsService struct {
    Region   string
    S3Client s3iface.S3API
    Signer   *v4.Signer
}

func NewAwsService(region string, accessKey string, secretKey string, sessionToken string) (*AwsService, error) {
    creds := credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
    awsConfig := aws.NewConfig().
        WithRegion(region).
        WithCredentials(creds).
        WithCredentialsChainVerboseErrors(true)
    sess, err := session.NewSession(awsConfig)
    if err != nil {
        return nil, err
    }
    svc := s3.New(sess)

    signer := v4.NewSigner(creds)
    v4.WithUnsignedPayload(signer)

    return &AwsService{
        Region:   region,
        S3Client: svc,
        Signer:   signer,
    }, nil
}

func (s *AwsService) UploadPart(bucket string, key string, part int, uploadID string, payloadReader io.Reader, contentLength int64) (string, error) {

    input := &s3.UploadPartInput{
        Bucket:        aws.String(bucket),
        Key:           aws.String(key),
        UploadId:      aws.String(uploadID),
        PartNumber:    aws.Int64(int64(part)),
        ContentLength: aws.Int64(contentLength),
        Body:          aws.ReadSeekCloser(payloadReader),
    }

    req, output := s.S3Client.UploadPartRequest(input)

    _, err := s.Signer.Sign(req.HTTPRequest, req.Body, s3.ServiceName, s.Region, time.Now())
    err = req.Send()
    if err != nil {
        return "", err
    }

    return *output.ETag, nil
}

然后,可以从处理程序中调用它:

func HandleUploadPart(w http.ResponseWriter, r *http.Request) {

    query := r.URL.Query()
    region := query.Get("region")
    bucket := query.Get("bucket")
    key := query.Get("key")
    part, err := strconv.Atoi(query.Get("part"))
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    uploadID := query.Get("upload-id")
    payloadReader := r.Body

    contentLength, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    etag, err := awsService.UploadPart(region, bucket, key, part, uploadID, payloadReader, contentLength)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("ETag", etag)
}

缺点:

  • 客户端必须预先知道内容长度并发送它。
  • 无法对有效载荷进行签名。
英文:

Finally I got a way to reverse-proxy an incoming UploadPart with stream processing by building a request using the AWS SDK and signing it with unsigned payload.

Here a basic example:

type AwsService struct {
Region   string
S3Client s3iface.S3API
Signer   *v4.Signer
}
func NewAwsService(region string, accessKey string, secretKey string, sessionToken string) (*AwsService, error) {
creds := credentials.NewStaticCredentials(accessKey, secretKey, sessionToken)
awsConfig := aws.NewConfig().
WithRegion(region).
WithCredentials(creds).
WithCredentialsChainVerboseErrors(true)
sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, err
}
svc := s3.New(sess)
signer := v4.NewSigner(creds)
v4.WithUnsignedPayload(signer)
return &AwsService{
Region:   region,
S3Client: svc,
Signer:   signer,
}, nil
}
func (s *AwsService) UploadPart(bucket string, key string, part int, uploadID string, payloadReader io.Reader, contentLength int64) (string, error) {
input := &s3.UploadPartInput{
Bucket:        aws.String(bucket),
Key:           aws.String(key),
UploadId:      aws.String(uploadID),
PartNumber:    aws.Int64(int64(part)),
ContentLength: aws.Int64(contentLength),
Body:          aws.ReadSeekCloser(payloadReader),
}
req, output := s.S3Client.UploadPartRequest(input)
_, err := s.Signer.Sign(req.HTTPRequest, req.Body, s3.ServiceName, s.Region, time.Now())
err = req.Send()
if err != nil {
return "", err
}
return *output.ETag, nil
}

Then, it can be invoked from a handler:

func HandleUploadPart(w http.ResponseWriter, r *http.Request) {
query := r.URL.Query()
region := query.Get("region")
bucket := query.Get("bucket")
key := query.Get("key")
part, err := strconv.Atoi(query.Get("part"))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
uploadID := query.Get("upload-id")
payloadReader := r.Body
contentLength, err := strconv.ParseInt(r.Header.Get("Content-Length"), 10, 64)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
etag, err := awsService.UploadPart(region, bucket, key, part, uploadID, payloadReader, contentLength)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("ETag", etag)
}

Downsides:

  • The client must know the content length in advance and send it.
  • The payload cannot be signed.

huangapple
  • 本文由 发表于 2021年5月21日 00:53:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/67624774.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定