英文:
Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage - Spring Webflux usage correct?
问题
我喜欢直接通过REST传输大数据文件到Azure Blob Storage。现在我面临的问题是,当我将一个大小为250MB的过滤器通过REST发送到Azure Blob Storage时,但是当大小为260MB时,它会永远卡住。
我创建了一个示例项目,您可以验证这种行为。https://github.com/git9999999/azure-blob-large-file-upload-problem
问题是我的响应式代码正确吗?
@GetMapping(path = "/trigger-download-to-blob/{fileSizeInMb}")
public void triggerDownloadToBlob(@PathVariable int fileSizeInMb) {
log.info("triggerDownload");
var flux = this.webClient
.get()
.uri("/serve-file/" + fileSizeInMb)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.exchangeToFlux(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()));
var destination = "TestDownloadToAzureBlobStorage" + System.currentTimeMillis() + ".pdf";
var blobClientTarget = this.containerClient.getBlobClient(destination);
try (var outputStream = blobClientTarget.getBlockBlobClient().getBlobOutputStream(this.parallelTransferOptions, null, null, null, null)) {
DataBufferUtils.write(flux, outputStream)
.map(DataBufferUtils::release)
.blockLast(Duration.ofHours(22));
outputStream.flush();
} catch (IOException e) {
throw new IllegalStateException(e);
}
log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!! end download of {}", destination);
}
英文:
I like to stream large data files over REST directly to a Azure Blob Storage.
Now i face the problem that when I send a filter with 250MB over REST to the Azure Blob Storage, but with 260MB it stuck for ever.
I made an example Project where you can verify this behavior. https://github.com/git9999999/azure-blob-large-file-upload-problem
Question is my Reactive Code correct?
@GetMapping(path = "/trigger-download-to-blob/{fileSizeInMb}")
public void triggerDownloadToBlob(@PathVariable int fileSizeInMb) {
log.info("triggerDownload");
var flux = this.webClient
.get()
.uri("/serve-file/" + fileSizeInMb)
.accept(MediaType.APPLICATION_OCTET_STREAM)
.exchangeToFlux(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()));
var destination = "TestDownloadToAzureBlobStorage" + System.currentTimeMillis() + ".pdf";
var blobClientTarget = this.containerClient.getBlobClient(destination);
try (var outputStream = blobClientTarget.getBlockBlobClient().getBlobOutputStream(this.parallelTransferOptions, null, null, null, null)) {
DataBufferUtils.write(flux, outputStream)
.map(DataBufferUtils::release)
.blockLast(Duration.ofHours(22));
outputStream.flush();
} catch (IOException e) {
throw new IllegalStateException(e);
}
log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!! end download of {}", destination);
}
答案1
得分: 0
-
uploadFile
方法现在直接接受Flux<DataBuffer>
作为请求的一部分,而不再使用MultipartFile
。 -
使用响应式编程的
DataBufferUtils.write
来流式传输文件数据,请查看下面的代码:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;
@Controller
public class AzureBlobUploaderController {
// 省略其它部分
// ...
@PostMapping(path = "/upload")
public void uploadFile(@RequestPart("file") Flux<DataBuffer> file) {
// 省略其它部分
// ...
}
// 省略其它部分
// ...
public static void main(String[] args) {
// 省略其它部分
// ...
}
}
输出:
- 通过POST请求,我能够将1GB文件上传到我的Blob容器。
- 在这里,我可以看到我的容器中的文件。
更新后的代码:
public Mono<Void> uploadFileToBlobStorage(WebClient webClient, String containerName, String blobName, File file) {
return webClient.put()
.uri("/" + containerName + "/" + blobName)
.header(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()))
.header(HttpHeaders.CONTENT_TYPE, "application/octet-stream")
.body(BodyInserters.fromPublisher(getFileDataBufferFlux(file), DataBuffer.class))
.retrieve()
.bodyToMono(Void.class);
}
private Flux<DataBuffer> getFileDataBufferFlux(File file) {
return DataBufferUtils.read(FileChannel::open, file.toPath(), DEFAULT_BUFFER_SIZE)
.doFinally(signalType -> file.delete()); // 在读取数据后删除文件
}
- getFileDataBufferFlux以响应式方式读取文件,返回
Flux<DataBuffer>
,uploadFileToBlobStorage方法使用WebClient
执行PUT请求,并将文件数据作为请求体。
英文:
-
The
uploadFile
method now accepts aFlux<DataBuffer>
directly as a request part instead of usingMultipartFile
-
The file data is streamed using reactive programming with
DataBufferUtils.write
, Check the below code:
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;
@Controller
public class AzureBlobUploaderController {
private final AzureBlobContainerClient containerClient;
private final int bufferSize;
private final int timeoutSeconds;
public AzureBlobUploaderController(@Value("${azure.storage.containerName}") String containerName,
@Value("${bufferSize}") int bufferSize,
@Value("${timeoutSeconds}") int timeoutSeconds) {
this.containerClient = new AzureBlobContainerClient(containerName);
this.bufferSize = bufferSize;
this.timeoutSeconds = timeoutSeconds;
}
@PostConstruct
public void init() {
// Create the container if it doesn't exist
if (!containerClient.exists()) {
containerClient.create();
}
}
@PostMapping(path = "/upload")
public void uploadFile(@RequestPart("file") Flux<DataBuffer> file) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String destinationBlobName = "UploadedFile_" + System.currentTimeMillis() + ".bin";
var blobClient = containerClient.getBlobClient(destinationBlobName);
var blobOutputStream = blobClient.getBlockBlobClient().getBlobOutputStream();
DataBufferUtils.write(file, blobOutputStream, bufferSize)
.doOnError(throwable -> {
// Handle the error gracefully
stopWatch.stop();
System.out.println("Error occurred during data streaming: " + throwable.getMessage());
})
.doFinally(signal -> {
try {
blobOutputStream.flush();
blobOutputStream.close();
stopWatch.stop();
System.out.println("File uploaded successfully " + stopWatch.getTotalTimeMillis() + "ms");
} catch (IOException e) {
System.out.println("Error occurred while closing the output stream: " + e.getMessage());
}
})
.blockLast(Duration.ofSeconds(timeoutSeconds));
}
public static void main(String[] args) {
// Replace with your Azure Blob Storage container name
String containerName = "your-container-name";
// Replace with your buffer size and timeout settings
int bufferSize = ;
int timeoutSeconds = ;
AzureBlobUploaderController controller = new AzureBlobUploaderController(containerName, bufferSize, timeoutSeconds);
}
}
Output :
- Here by the post request, I am able to upload the 1GB file to my blob container.
- Here I am able to see the file in my container.
Updated code:
public Mono<Void> uploadFileToBlobStorage(WebClient webClient, String containerName, String blobName, File file) {
return webClient.put()
.uri("/" + containerName + "/" + blobName)
.header(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()))
.header(HttpHeaders.CONTENT_TYPE, "application/octet-stream")
.body(BodyInserters.fromPublisher(getFileDataBufferFlux(file), DataBuffer.class))
.retrieve()
.bodyToMono(Void.class);
}
private Flux<DataBuffer> getFileDataBufferFlux(File file) {
return DataBufferUtils.read(FileChannel::open, file.toPath(), DEFAULT_BUFFER_SIZE)
.doFinally(signalType -> file.delete()); // Delete the file after reading its data
}
- The getFileDataBufferFlux reads the file in a reactive manner, returning a
Flux<DataBuffer>
the uploadFileToBlobStorage method uses theWebClient
to perform a PUT request with the file's data as the request body.
答案2
得分: 0
问题是我使用了BlobContainerClient
而不是BlobContainerAsyncClient
。BlobContainerAsyncClient
具有处理Flux-"Stuff"的特殊API。
这是我现在使用的代码:
public void uploadAsync(Flux<DataBuffer> flux, Path destination) {
BlobAsyncClient blobClientTarget = this.blobContainerAsyncClient.getBlobAsyncClient(relativePathUnix(destination));
blobClientTarget.upload(flux.map((dataBuffer) -> {
ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(buffer);
DataBufferUtils.release(dataBuffer);
return buffer;
}), this.parallelTransferOptions).block();
}
这是带有解决方案的示例存储库:https://github.com/git9999999/azure-blob-large-file-upload-problem
这是解决问题的工单:https://github.com/Azure/azure-sdk-for-java/issues/35477
英文:
The Problem was that I used the BlobContainerClient
and not the BlobContainerAsyncClient
. The BlobContainerAsyncClient
has special API's to handle the Flux-"Stuff"
Hear is the code that I use now:
public void uploadAsync(Flux<DataBuffer> flux, Path destination) {
BlobAsyncClient blobClientTarget = this.blobContainerAsyncClient.getBlobAsyncClient(relativePathUnix(destination));
blobClientTarget.upload(flux.map((dataBuffer) -> {
ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(buffer);
DataBufferUtils.release(dataBuffer);
return buffer;
}), this.parallelTransferOptions).block();
}
And Here is the Example repo with solution https://github.com/git9999999/azure-blob-large-file-upload-problem
Here the Ticket that solve the Problem: https://github.com/Azure/azure-sdk-for-java/issues/35477
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论