Golang Azure Blob存储,0B的Blob和覆盖下载的Blob数据。

huangapple go评论152阅读模式
英文:

Golang azure blob store, 0b blobs and overwrites downloaded blobs data

问题

目前正在使用的是https://github.com/Azure/azure-sdk-for-go。

概述:我目前正在从Azure Blob存储中下载一个Blob,解析该Blob,并将转录后的Blob上传回存储中的另一个名为filtered的文件夹。

问题:上传的Blob不在filtered文件夹中,而是在根目录中,并且Blob的大小为0B,没有数据。上传Blob似乎还会破坏我刚刚下载的Blob,导致Blob的大小为0B,没有数据。下载Blob工作正常,我能够获取数据的[]byte数组。

代码:

import (
	"bufio"
	"fmt"
	"os"
	"strings"
	"strconv"
	"math/big"
	"bytes"
	"io/ioutil"
	"github.com/Azure/azure-sdk-for-go/storage"
	"compress/gzip"
	"encoding/base64"
	"crypto/md5"
)

func main() {
	var filter bool = true //检查智能过滤器
	test := 0
	configfile, err := os.Open("config.txt") //打开配置文件
	check(err)                              //检查文件是否打开
	ConfigScanner := bufio.NewScanner(configfile) //打开缓冲区
	ConfigScanner.Scan()                           //获取序列号
	serialnum := ConfigScanner.Text()
	configfile.Close() //关闭配置文件
	CanLUT := ParseDBC("file.dbc") //解析关联的DBC文件
	check(err)                     //检查文件是否打开
	m := make(map[int64]string)    //最后一次看到的消息的映射
	//Azure API
	client, err := storage.NewBasicClient(accountName, accountKey) //从Azure获取客户端
	check(err)
	bsc := client.GetBlobService() //访问Blob服务
	cnt := bsc.GetContainerReference("containerblob")               //获取Blob的容器
	LBP := storage.ListBlobsParameters{}
	LBP.Prefix = "dev4/dev4" //只获取具有dev4/dev4前缀的Blob
	list, err := cnt.ListBlobs(LBP) //获取所有匹配的Blob列表
	check(err)
	for _, b := range list.Blobs { //从Azure中读取具有dev4/dev4前缀的所有Blob
		oa := make([]byte, 0)
		fmt.Println("getting blob: ", b.Name)
		readCloser, err := b.Get(nil) //获取Blob数据
		check(err)
		bytesRead, err := ioutil.ReadAll(readCloser) //将Blob数据读取到[]byte数组中
		check(err)
		if len(bytesRead) < 1 {
			continue
		}
		br := bytes.NewReader(bytesRead)
		zr, err := gzip.NewReader(br) //使用gzip读取器读取压缩数据
		check(err)
		uz, err := ioutil.ReadAll(zr) //uz解压缩文件的字节数组
		check(err)
		readCloser.Close() //关闭读取器
		zr.Close()         //关闭gzip读取器
		r := bytes.NewReader(uz)
		scanner := bufio.NewScanner(r)
		for scanner.Scan() { //循环处理输入文件中的每一行
			temp := ParseToFrame(scanner.Text()) //将行解析为可用的结构体
			_, exists := m[temp.nodeid]          //检查帧是否已经被看到并存储在哈希映射中
			if exists {                         //如果在映射中存在
				if ChkDuplicate(m, temp) { //消息是否重复?如果是,则不是重复消息,因此添加它
					m[temp.nodeid] = temp.data //更新哈希映射中的数据
					DecodeFrame(temp, &oa, CanLUT, filter, serialnum) //解码帧并将其输出到另一个文件中
				}
			} else { //在映射中不存在,因此添加它
				m[temp.nodeid] = temp.data
				DecodeFrame(temp, &oa, CanLUT, filter, serialnum) //解码帧并将其输出到另一个文件中
			}
		} //结束Blob文件
		filestr := strings.Split(b.Name, "_")[1]
		filestr = "filtered/filtered_" + filestr
		var buffout bytes.Buffer
		gz := gzip.NewWriter(&buffout)
		_, err = gz.Write(oa)
		check(err)
		gz.Flush()
		gz.Close()
		compressedData := buffout.Bytes()
		//将块Blob推送到Azure
		fmt.Println("uploading: ", filestr)
		clientnew, err := storage.NewBasicClient(accountName, accountKey) //从Azure获取客户端
		check(err)
		senderbsc := clientnew.GetBlobService() //访问Blob服务
		sendercnt := senderbsc.GetContainerReference("storeblob") //获取存储Blob的容器
		bblob := sendercnt.GetBlobReference("filtered_" + strings.Split(b.Name, "/")[1])
		err = bblob.CreateBlockBlob(nil)
		check(err)
		blockID := base64.StdEncoding.EncodeToString([]byte("00000"))
		err = bblob.PutBlock(blockID, compressedData, nil)
		check(err)
		list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)
		check(err)
		uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
		for i := range list.UncommittedBlocks {
			uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
			uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
		}
		err = b.PutBlockList(uncommittedBlocksList, nil)
		//检查上传是否成功。
		CheckHash(&compressedData, filestr, sendercnt)
		check(err)
		if test == 0 {
			break //仅测试读取一个文件
		}
		test++
	} //结束for循环
} //结束main函数

以上是要翻译的内容。

英文:

Currently using: https://github.com/Azure/azure-sdk-for-go

overview: I'm currently downloading a blob from the azure blob store, parsing the blob and uploading a transcribed blob back to the store in another folder called filtered.

Problem: The blob uploaded is not in the folder filtered but in the root directory and the blob is 0B with no data. The blob upload also seems to destroy the blob i just downloaded resulting in the blob being 0B with no data. Downloading the blob works fine and i'm able to get the []byte array of the data.

Code:

import (
&quot;bufio&quot;
&quot;fmt&quot;
&quot;os&quot;
&quot;strings&quot;
&quot;strconv&quot;
&quot;math/big&quot;
&quot;bytes&quot;
&quot;io/ioutil&quot;
&quot;github.com/Azure/azure-sdk-for-go/storage&quot;
&quot;compress/gzip&quot;
&quot;encoding/base64&quot;
&quot;crypto/md5&quot;
)
func main() {
var filter bool = true													//check smart filter
test := 0
configfile, err := os.Open(&quot;config.txt&quot;) 			    				//open configfile
check(err)																//check file opened
ConfigScanner := bufio.NewScanner(configfile) 							//open buffer
ConfigScanner.Scan()													//get serial number
serialnum := ConfigScanner.Text()
configfile.Close()														//close the config file
CanLUT := ParseDBC(&quot;file.dbc&quot;)											//parse the associated DBC file
check(err)																//check file opened
m := make(map[int64]string)       										//map of last seen message
//Azure API
client, err := storage.NewBasicClient(accountName, accountKey)			//get client from azure
check(err)
bsc := client.GetBlobService()											//access blob service
cnt := bsc.GetContainerReference(&quot;containerblob&quot;)							//get container of the blob
LBP := storage.ListBlobsParameters{}									
LBP.Prefix = &quot;dev4/dev4&quot;												//only get blobs with dev4/dev4 prefix
list, err := cnt.ListBlobs(LBP)											//get list of all matching blobs
check(err)
for _, b := range list.Blobs {											//read all blobs from azure with prefix dev4/dev4
oa := make([]byte,0)
fmt.Println(&quot;getting blob: &quot;,b.Name)
readCloser, err := b.Get(nil)										//get blob data
check(err)
bytesRead, err := ioutil.ReadAll(readCloser)						//read blob data to byte[]
check(err)
if len(bytesRead) &lt; 1 {
continue
}
br := bytes.NewReader(bytesRead)
zr, err := gzip.NewReader(br)										//use gzip reader for zipped data
check(err)
uz, err := ioutil.ReadAll(zr) 										//uz byte[] of unzipped file
check(err)
readCloser.Close()													//close the reader
zr.Close()															//close gzip reader
r := bytes.NewReader(uz)
scanner := bufio.NewScanner(r)
for scanner.Scan() {												//loop on each line in the input file
temp := ParseToFrame(scanner.Text())							//parse the line into a usable struct
_, exists := m[temp.nodeid]										//check if the frame has alread been seen and is stored in the hashmap
if exists { 													//if exists in the map
if ChkDuplicate(m, temp) { 									//is the msg a duplicate? if true the message isnt so add it
m[temp.nodeid] = temp.data 								//update the data to the hashmap
DecodeFrame(temp, &amp;oa, CanLUT, filter, serialnum)  		//decode the frame and output it to another file
}
} else { 														//DNE in map so add it
m[temp.nodeid] = temp.data
DecodeFrame(temp, &amp;oa, CanLUT,filter, serialnum) 			//decode the frame and output it to another file
}
}//end blob file
filestr := strings.Split(b.Name, &quot;_&quot;)[1]
filestr = &quot;filtered/filtered_&quot; + filestr
var buffout bytes.Buffer
gz := gzip.NewWriter(&amp;buffout)
_, err = gz.Write(oa)
check(err)
gz.Flush()
gz.Close()
compressedData := buffout.Bytes()
//push block blob to azure
fmt.Println(&quot;uploading: &quot;,filestr)
clientnew, err := storage.NewBasicClient(accountName, accountKey)			//get client from azure
check(err)
senderbsc := clientnew.GetBlobService()											//access blob service
sendercnt := senderbsc.GetContainerReference(&quot;storeblob&quot;)							//get container of store blob
bblob := sendercnt.GetBlobReference(&quot;filtered_&quot; + strings.Split(b.Name, &quot;/&quot;)[1])
err = bblob.CreateBlockBlob(nil)
check(err)
blockID := base64.StdEncoding.EncodeToString([]byte(&quot;00000&quot;))
err = bblob.PutBlock(blockID, compressedData, nil)
check(err)
list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)
check(err)
uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
for i := range list.UncommittedBlocks {
uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
}
err = b.PutBlockList(uncommittedBlocksList, nil)
//check if upload was good.
CheckHash(&amp;compressedData,filestr,sendercnt)
check(err)
if(test == 0){
break		//test only read one file
}
test++
}//end for blobs	 
}//end main

答案1

得分: 1

如@DavidMakogon所说,您可以使用Azure Storage SDK for Go的API CreateBlockBlobFromReader,将实现io.Reader接口的任何读取器上传到Azure Blob Storage。

以下是我的示例代码。

accountName := "<your-storage-account-name>"
accountKey := "<your-storage-account-key>"
client, _ := storage.NewBasicClient(accountName, accountKey)
blobClinet := client.GetBlobService()
containerName := "mycontainer"
container := blobClinet.GetContainerReference(containerName)
// 两种上传的示例方式
// 1. 从字符串读取器上传文本 blob
blobName := "upload.txt"
blob := container.GetBlobReference(blobName)
strReader := strings.NewReader("upload text to blob from string reader")
blob.CreateBlockBlobFromReader(strReader, nil)
// 2. 从文件读取器上传文件
fileName := "hello.png"
file, _ := os.Open(fileName)
blobName := "hello.png"
blob := container.GetBlobReference(blobName)
blob.CreateBlockBlobFromReader(file, nil)

希望对您有所帮助。

英文:

As @DavidMakogon said, you can use the API CreateBlockBlobFromReader of Azure Storage SDK for Go to upload from any reader implements the interface io.Reader to Azure Blob Storage.

Here are my sample code as below.

accountName := &quot;&lt;your-storage-account-name&gt;&quot;
accountKey := &quot;&lt;your-storage-account-key&gt;&quot;
client, _ := storage.NewBasicClient(accountName, accountKey)
blobClinet := client.GetBlobService()
containerName := &quot;mycontainer&quot;
container := blobClinet.GetContainerReference(containerName)
// Two sample ways for uploading
// 1. Upload a text blob from string reader
blobName := &quot;upload.txt&quot;
blob := container.GetBlobReference(blobName)
strReader := strings.NewReader(&quot;upload text to blob from string reader&quot;)
blob.CreateBlockBlobFromReader(strReader, nil)
// 2. Upload a file from file reader
fileName := &quot;hello.png&quot;
file, _ := os.Open(fileName)
blobName := &quot;hello.png&quot;
blob := container.GetBlobReference(blobName)
blob.CreateBlockBlobFromReader(file, nil)

Hope it helps.

答案2

得分: 0

压缩数据 := buffout.Bytes()
// 将块 Blob 推送到 Azure
fmt.Println("上传中:", filestr)
blockID := base64.StdEncoding.EncodeToString([]byte("00001"))
newblob := cnt.GetBlobReference(filestr)
err = newblob.CreateBlockBlobFromReader(bytes.NewReader(compressedData), nil)
check(err)
err = newblob.PutBlock(blockID, compressedData, nil)
check(err)
list, err := newblob.GetBlockList(storage.BlockListTypeUncommitted, nil)
check(err)
uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
for i := range list.UncommittedBlocks {
uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
}
err = newblob.PutBlockList(uncommittedBlocksList, nil)
check(err)

这修复了我的问题,原来是我在调用时有一个拼写错误。

list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)

这导致 Azure 同时获取了一个名为 filestr 的新 Blob,并覆盖了原始 Blob。

英文:
    compressedData := buffout.Bytes()
//push block blob to azure
fmt.Println(&quot;uploading: &quot;,filestr)
blockID := base64.StdEncoding.EncodeToString([]byte(&quot;00001&quot;))
newblob := cnt.GetBlobReference(filestr)
err = newblob.CreateBlockBlobFromReader(bytes.NewReader(compressedData),nil)
check(err)
err = newblob.PutBlock(blockID, compressedData, nil)
check(err)
list, err := newblob.GetBlockList(storage.BlockListTypeUncommitted, nil)
check(err)
uncommittedBlocksList := make([]storage.Block, len(list.UncommittedBlocks))
for i := range list.UncommittedBlocks {
uncommittedBlocksList[i].ID = list.UncommittedBlocks[i].Name
uncommittedBlocksList[i].Status = storage.BlockStatusUncommitted
}
err = newblob.PutBlockList(uncommittedBlocksList, nil)
check(err)

This fixed my problem looking at the original i had a typo calling.

list, err := b.GetBlockList(storage.BlockListTypeUncommitted, nil)

This caused azure to get both a new blob called filestr and to overwrite the original blob.

huangapple
  • 本文由 发表于 2017年8月3日 05:05:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/45470989.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定