英文:
How to handle large file upload to Google Bucket?
问题
我有以下的Golang代码用于将文件上传到Google Bucket:
package main
import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"
"cloud.google.com/go/storage"
)
var serverPort = ":8444"
var googleCredential = "./credential.json"
var googleBucket = "g1-BUCKET-001"
var googleFolder = "test_folder/"
func uploadToBucket(w http.ResponseWriter, r *http.Request) {
t1 := time.Now()
fmt.Println("File Upload Endpoint Hit")
// 解析我们的多部分表单,10 << 20 指定了最大
// 上传10MB文件。
r.ParseMultipartForm(10 << 20)
file, handler, err := r.FormFile("myFile")
if err != nil {
fmt.Fprintf(w, fmt.Sprintf("上传文件时出错:%v", err))
return
}
defer file.Close()
fmt.Printf("已上传的文件:%+v\n", handler.Filename)
fmt.Printf("文件大小:%+v\n", handler.Size)
fmt.Printf("MIME 头:%+v\n", handler.Header)
// 将文件上传到存储桶
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", googleCredential)
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
fmt.Fprintf(w, fmt.Sprintf("创建 storage.NewClient 时出错:%v", err))
return
}
defer client.Close()
fmt.Println("存储桶客户端已创建。")
// 设置超时时间
ctx, cancel := context.WithTimeout(ctx, time.Second*7200)
defer cancel()
// 为流式复制创建存储桶对象
destFilePath := googleFolder + handler.Filename
fmt.Printf("目标存储桶:gs://%s/。\n", googleBucket)
fmt.Printf("目标文件路径:%s。\n", destFilePath)
o := client.Bucket(googleBucket).Object(destFilePath)
o = o.If(storage.Conditions{DoesNotExist: true})
// 使用 storage.Writer 上传对象。
wc := o.NewWriter(ctx)
if _, err = io.Copy(wc, file); err != nil {
fmt.Fprintf(w, fmt.Sprintf("io.Copy 出错:%v", err))
return
}
if err := wc.Close(); err != nil {
fmt.Fprintf(w, fmt.Sprintf("Writer.Close() 出错:%v", err))
return
}
fmt.Printf("%s 已上传到 gs://%s/%s。\n", handler.Filename, googleBucket, googleFolder)
t2 := time.Now()
diff := t2.Sub(t1)
fmt.Printf("开始时间:%+v\n", t1)
fmt.Printf("结束时间:%+v\n", t2)
fmt.Printf("时间差:%+v\n", diff)
// 返回成功上传文件的消息!
fmt.Fprintf(w, "文件上传成功\n")
}
func setupRoutes() {
http.HandleFunc("/upload", uploadToBucket)
http.ListenAndServe(serverPort, nil)
}
func main() {
setupRoutes()
}
它可以正常处理大约100MB的文件。但是当文件大小超过1GB时,等待时间会很长。用户可能会认为代码停止工作并在完成之前退出。目前,他们只会在最后得到这一行:
fmt.Fprintf(w, "文件上传成功\n")
我该如何实现一种方式来给用户一些反馈,比如一个完成进度条?
英文:
I have the following Golang code to upload a file to Google bucket:
package main
import (
"context"
"fmt"
"io"
"net/http"
"os"
"time"
"cloud.google.com/go/storage"
)
var serverPort = ":8444"
var googleCredential = "./credential.json"
var googleBucket = "g1-BUCKET-001"
var googleFolder = "test_folder/"
func uploadToBucket(w http.ResponseWriter, r *http.Request) {
t1 := time.Now()
fmt.Println("File Upload Endpoint Hit")
// Parse our multipart form, 10 << 20 specifies a maximum
// upload of 10 MB files.
r.ParseMultipartForm(10 << 20)
file, handler, err := r.FormFile("myFile")
if err != nil {
fmt.Fprintf(w, fmt.Sprintf("Error uploading file: %v", err))
return
}
defer file.Close()
fmt.Printf("Uploaded File: %+v\n", handler.Filename)
fmt.Printf("File Size: %+v\n", handler.Size)
fmt.Printf("MIME Header: %+v\n", handler.Header)
// Upload file to bucket
os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", googleCredential)
ctx := context.Background()
client, err := storage.NewClient(ctx)
if err != nil {
fmt.Fprintf(w, fmt.Sprintf("Error creating storage.NewClient: %v", err))
return
}
defer client.Close()
fmt.Println("Bucket client created.")
// Set timeout
ctx, cancel := context.WithTimeout(ctx, time.Second*7200)
defer cancel()
// Create bucket object for stream copy
destFilePath := googleFolder + handler.Filename
fmt.Printf("Target bucket: gs://" + googleBucket + "/.\n")
fmt.Printf("Destination file path: " + destFilePath + ".\n")
o := client.Bucket(googleBucket).Object(destFilePath)
o = o.If(storage.Conditions{DoesNotExist: true})
// Upload an object with storage.Writer.
wc := o.NewWriter(ctx)
if _, err = io.Copy(wc, file); err != nil {
fmt.Fprintf(w, fmt.Sprintf("io.Copy error: %v", err))
return
}
if err := wc.Close(); err != nil {
fmt.Fprintf(w, fmt.Sprintf("Writer.Close() error: %v", err))
return
}
fmt.Printf("%s uploaded to gs://%s/%s.", handler.Filename, googleBucket, googleFolder)
t2 := time.Now()
diff := t2.Sub(t1)
fmt.Printf("Time start: %+v\n", t1)
fmt.Printf("Time end: %+v\n", t2)
fmt.Printf("Time diff: %+v\n", diff)
// Return that we have successfully uploaded our file!
fmt.Fprintf(w, "Successfully Uploaded File\n")
}
func setupRoutes() {
http.HandleFunc("/upload", uploadToBucket)
http.ListenAndServe(serverPort, nil)
}
func main() {
setupRoutes()
}
It works fine with file around 100MB. However when it gets to 1GB+, the wait time is so long. The user may think the code stops working and quit before it can finish. For now, all they get is this line at the end:
fmt.Fprintf(w, "Successfully Uploaded File\n")
How can I implement a way to give the user some feedback, like a completion bar for instance?
答案1
得分: 1
这里有一种方法可以为任意的io.Writer
(例如从o.NewWriter(ctx)
获得的)添加进度跟踪。使用io.MultiWriter
可以将写操作复制到多个写入器中。其中一个可以是你的o.NewWriter(ctx)
,另一个可以是一个简单地计算字节数并与总大小进行比较的io.Writer
实现。
然后,你可以运行一个goroutine,定期通过写入到stdout(或其他地方)来更新进度。
请参考这个示例,使用这个概念验证实现:
package main
import (
"fmt"
"io"
"os"
"github.com/farrellit/writeprogress"
)
func main() {
in, err := os.Open("/dev/urandom")
if err != nil {
panic(err)
}
out, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
if err != nil {
panic(err)
}
defer in.Close()
defer out.Close()
length := int64(1e6)
wp := writeprogress.NewProgressWriter(uint64(length))
d, _ := wp.Watch(func(p float64) { fmt.Printf("\r%2.0f%%", p*100) })
if b, err := io.Copy(
io.MultiWriter(out, wp),
&io.LimitedReader{R: in, N: length},
); err != nil {
panic(err)
} else {
<-d
fmt.Printf("\n%d/%d %2.0f%%\n", b, length, wp.GetProgress()*100)
}
}
你可以使用相同的方法,将o.NewWriter(ctx)
和由stat
确定的文件长度结合起来。如果你想要更漂亮的效果,你还可以与https://github.com/schollz/progressbar等工具一起使用。
英文:
Here's one option to add progress tracking to an arbitrary io.Writer
such as you get from o.NewWriter(ctx)
. Using io.MultiWriter
you can duplicate writes to multiple writers. One can be your o.NewWriter(ctx)
and another can be an implementation of io.Writer
that simply counts bytes and can compare against a total size.
You could then run a goroutine that could periodically update progress by writing to stdout (or something else).
Take a look at this example using This Proof of concept implementation:
package main
import (
"fmt"
"io"
"os"
"github.com/farrellit/writeprogress"
)
func main() {
in, err := os.Open("/dev/urandom")
if err != nil {
panic(err)
}
out, err := os.OpenFile("/dev/null", os.O_WRONLY, 0)
if err != nil {
panic(err)
}
defer in.Close()
defer out.Close()
length := int64(1e6)
wp := writeprogress.NewProgressWriter(uint64(length))
d, _ := wp.Watch(func(p float64) { fmt.Printf("\r%2.0f%%", p*100) })
if b, err := io.Copy(
io.MultiWriter(out, wp),
&io.LimitedReader{R: in, N: length},
); err != nil {
panic(err)
} else {
<-d
fmt.Printf("\n%d/%d %2.0f%%\n", b, length, wp.GetProgress()*100)
}
}
You can do the same thing with your o.NewWriter(ctx)
and a file length determined by stat
. you could also use it in conjunction with something like https://github.com/schollz/progressbar if you wanted something pretty.
答案2
得分: 0
以下是一个工作示例,它在每个部分上传后流式传输响应。更改您的存储桶、文件源、目标对象名称和内容类型。
使用Curl命令访问localhost:8080,您将收到分块响应。然后,将该逻辑注入到您的代码中,使用JSON响应或其他方式。
我之前的问题是由于我的测试中chunkSize
小于5MiB。
package main
import (
"bytes"
"encoding/xml"
"fmt"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"io/ioutil"
"log"
"net/http"
)
const (
bucket = "gib-multiregion-us"
fileName = "./20220909_ListePieces.csv"
objectName = "test-multipart.csv"
contentType = "text/csv"
// 最小为5MiB,且为256 KiB的倍数
chunckSize = 5242880 + (256 * 1024 * 0)
// API密钥值
storageapi = "storage.googleapis.com"
uploadsExtension = "uploads"
partNumberExtension = "partNumber"
uploadIdExtension = "uploadId"
etagKey = "ETag"
)
var httpClient *http.Client
func main() {
ctx := context.Background()
c, err := google.DefaultClient(ctx)
if err != nil {
log.Fatal(err)
}
httpClient = c
http.HandleFunc("/", multipartUpload)
http.ListenAndServe(":8080", nil)
}
func multipartUpload(w http.ResponseWriter, r *http.Request) {
url := fmt.Sprintf("https://%s.%s/%s", bucket, storageapi, objectName)
// 初始化
resp, err := httpClient.Post(fmt.Sprintf("%s?%s", url, uploadsExtension), contentType, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("初始化时出错:", err)
fmt.Fprintf(w, "初始化时出错:%s\n", err)
return
}
respbody, err := ioutil.ReadAll(resp.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("读取初始化响应时出错:", err)
fmt.Fprintf(w, "读取初始化响应时出错:%s\n", err)
return
}
defer resp.Body.Close()
var initMultiPart InitiateMultipartUploadResult
err = xml.Unmarshal(respbody, &initMultiPart)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("解析初始化响应时出错:", err)
fmt.Fprintf(w, "解析初始化响应时出错:%s\n", err)
return
}
uploadId := initMultiPart.UploadId
// 发送内容
content, err := ioutil.ReadFile(fileName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("读取输入文件时出错:", err)
fmt.Fprintf(w, "读取输入文件时出错:%s\n", err)
return
}
// 准备分块响应
flusher, ok := w.(http.Flusher)
if !ok {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("无法创建流式刷新器:", err)
fmt.Fprintf(w, "无法创建流式刷新器:%s\n", err)
return
}
w.Header().Set("Transfer-Encoding", "chunked")
flusher.Flush()
complMulti := CompleteMultipartUpload{}
counter := 0
for {
if len(content) <= counter*chunckSize {
break
}
var toSend []byte
if len(content) >= (counter+1)*chunckSize {
toSend = content[counter*chunckSize : (counter+1)*chunckSize]
} else {
toSend = content[counter*chunckSize:]
}
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s?%s=%d&%s=%s", url, partNumberExtension, counter+1, uploadIdExtension, uploadId), bytes.NewReader(toSend))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("创建分块请求时出错:", err)
fmt.Fprintf(w, "创建分块请求时出错:%s\n", err)
return
}
resp, err = httpClient.Do(req)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("提交分块时出错:", err)
fmt.Fprintf(w, "提交分块时出错:%s\n", err)
return
}
etag := resp.Header.Get(etagKey)
p := Part{
PartNumber: counter + 1,
ETag: etag,
}
fmt.Printf("上传完成:%d%%\n", 100*counter*chunckSize/len(content))
fmt.Fprintf(w, "上传完成:%d%%\n", 100*counter*chunckSize/len(content))
flusher.Flush()
complMulti.Parts = append(complMulti.Parts, p)
counter++
}
toSend, err := xml.Marshal(complMulti)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("编组完整多部分时出错:", err)
fmt.Fprintf(w, "编组完整多部分时出错:%s\n", err)
return
}
// 完成传输
resp, err = httpClient.Post(fmt.Sprintf("%s?%s=%s", url, uploadIdExtension, uploadId), "application/xml", bytes.NewReader(toSend))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("发送完成时出错:", err)
fmt.Fprintf(w, "发送完成时出错:%s\n", err)
return
}
if resp.StatusCode == http.StatusOK {
fmt.Fprintf(w, "完成")
fmt.Println("完成")
} else {
b, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("错误 %s,响应体:%s\n", resp.Status, string(b))
fmt.Fprintf(w, "错误 %s,响应体:%s\n", resp.Status, string(b))
}
}
type InitiateMultipartUploadResult struct {
InitiateMultipartUploadResult xml.Name `xml:"InitiateMultipartUploadResult"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadId string `xml:"UploadId"`
}
type CompleteMultipartUpload struct {
completeMultipartUpload xml.Name `xml:"CompleteMultipartUpload"`
Parts []Part `xml:"Part"`
}
type Part struct {
part xml.Name `xml:"Part"`
PartNumber int `xml:"PartNumber"`
ETag string `xml:"ETag"`
}
英文:
here a working example that stream the response after each part upload. Change your bucket, file source, the object target name and the content Type.
Curl the localhost:8080, you will received chunk response. Then, inject that logic in your code, with JSON response or whatever.
My previous issue comes from the chunkSize
below 5MiB in my tests.
package main
import (
"bytes"
"encoding/xml"
"fmt"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"io/ioutil"
"log"
"net/http"
)
const (
bucket = "gib-multiregion-us"
fileName = "./20220909_ListePieces.csv"
objectName = "test-multipart.csv"
contentType = "text/csv"
// Min 5MiB with multiple of 256 KiB
chunckSize = 5242880 + (256 * 1024 * 0)
// API key values
storageapi = "storage.googleapis.com"
uploadsExtension = "uploads"
partNumberExtension = "partNumber"
uploadIdExtension = "uploadId"
etagKey = "ETag"
)
var httpClient *http.Client
func main() {
ctx := context.Background()
c, err := google.DefaultClient(ctx)
if err != nil {
log.Fatal(err)
}
httpClient = c
http.HandleFunc("/", multipartUpload)
http.ListenAndServe(":8080", nil)
}
func multipartUpload(w http.ResponseWriter, r *http.Request) {
url := fmt.Sprintf("https://%s.%s/%s", bucket, storageapi, objectName)
// init
resp, err := httpClient.Post(fmt.Sprintf("%s?%s", url, uploadsExtension), contentType, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error during init: ", err)
fmt.Fprintf(w, "error during init: %s\n", err)
return
}
respbody, err := ioutil.ReadAll(resp.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error during reading init resp: ", err)
fmt.Fprintf(w, "error during reading init resp: %s\n", err)
return
}
defer resp.Body.Close()
var initMultiPart InitiateMultipartUploadResult
err = xml.Unmarshal(respbody, &initMultiPart)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error during unmarshalling init resp: ", err)
fmt.Fprintf(w, "error during unmarshalling init resp: %s\n", err)
return
}
uploadId := initMultiPart.UploadId
// send content
content, err := ioutil.ReadFile(fileName)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error during reading input file: ", err)
fmt.Fprintf(w, "error during reading input file: %s\n", err)
return
}
// Prepare the multipart response
flusher, ok := w.(http.Flusher)
if !ok {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("Impossible to create the stream flusher: ", err)
fmt.Fprintf(w, "Impossible to create the stream flusher: %s\n", err)
return
}
w.Header().Set("Transfer-Encoding", "chunked")
flusher.Flush()
complMulti := CompleteMultipartUpload{}
counter := 0
for {
if len(content) <= counter*chunckSize {
break
}
var toSend []byte
if len(content) >= (counter+1)*chunckSize {
toSend = content[counter*chunckSize : (counter+1)*chunckSize]
} else {
toSend = content[counter*chunckSize:]
}
req, err := http.NewRequest(http.MethodPut, fmt.Sprintf("%s?%s=%d&%s=%s", url, partNumberExtension, counter+1, uploadIdExtension, uploadId), bytes.NewReader(toSend))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error chunk req creation: ", err)
fmt.Fprintf(w, "error chunk req creation: %s\n", err)
return
}
resp, err = httpClient.Do(req)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error chunk submission: ", err)
fmt.Fprintf(w, "error chunk submission: %s\n", err)
return
}
etag := resp.Header.Get(etagKey)
p := Part{
PartNumber: counter + 1,
ETag: etag,
}
fmt.Printf("upload is done at %d%%\n", 100*counter*chunckSize/len(content))
fmt.Fprintf(w, "upload is done at %d%%\n", 100*counter*chunckSize/len(content))
flusher.Flush()
complMulti.Parts = append(complMulti.Parts, p)
counter++
}
toSend, err := xml.Marshal(complMulti)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error marshal complete multi: ", err)
fmt.Fprintf(w, "error marshal complete multi: %s\n", err)
return
}
// Complete transfert
resp, err = httpClient.Post(fmt.Sprintf("%s?%s=%s", url, uploadIdExtension, uploadId), "application/xml", bytes.NewReader(toSend))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("error during send complete: ", err)
fmt.Fprintf(w, "error during send complete: %s\n", err)
return
}
if resp.StatusCode == http.StatusOK {
fmt.Fprintf(w, "done")
fmt.Println("done")
} else {
b, _ := ioutil.ReadAll(resp.Body)
fmt.Printf("error %s with body %s\n", resp.Status, string(b))
fmt.Fprintf(w, "error %s with body %s\n", resp.Status, string(b))
}
}
type InitiateMultipartUploadResult struct {
InitiateMultipartUploadResult xml.Name `xml:"InitiateMultipartUploadResult"`
Bucket string `xml:"Bucket"`
Key string `xml:"Key"`
UploadId string `xml:"UploadId"`
}
type CompleteMultipartUpload struct {
completeMultipartUpload xml.Name `xml:"CompleteMultipartUpload"`
Parts []Part `xml:"Part"`
}
type Part struct {
part xml.Name `xml:"Part"`
PartNumber int `xml:"PartNumber"`
ETag string `xml:"ETag"`
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论