Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

huangapple go评论142阅读模式
英文:

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage - Spring Webflux usage correct?

问题

我喜欢直接通过REST传输大数据文件到Azure Blob Storage。现在我面临的问题是,当我将一个大小为250MB的过滤器通过REST发送到Azure Blob Storage时,但是当大小为260MB时,它会永远卡住。

我创建了一个示例项目,您可以验证这种行为。https://github.com/git9999999/azure-blob-large-file-upload-problem

问题是我的响应式代码正确吗?

https://github.com/git9999999/azure-blob-large-file-upload-problem/blob/main/downloader/src/main/java/com/azureproblem/blob/controller/AzureBlobBugDownloaderController.java#L131

@GetMapping(path = "/trigger-download-to-blob/{fileSizeInMb}")
public void triggerDownloadToBlob(@PathVariable int fileSizeInMb) {
    log.info("triggerDownload");

    var flux = this.webClient
        .get()
        .uri("/serve-file/" + fileSizeInMb)
        .accept(MediaType.APPLICATION_OCTET_STREAM)
        .exchangeToFlux(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()));

    var destination = "TestDownloadToAzureBlobStorage" + System.currentTimeMillis() + ".pdf";

    var blobClientTarget = this.containerClient.getBlobClient(destination);

    try (var outputStream = blobClientTarget.getBlockBlobClient().getBlobOutputStream(this.parallelTransferOptions, null, null, null, null)) {
        DataBufferUtils.write(flux, outputStream)
            .map(DataBufferUtils::release)
            .blockLast(Duration.ofHours(22));
        outputStream.flush();
    } catch (IOException e) {
        throw new IllegalStateException(e);
    }

    log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!   end download of {}", destination);
}
英文:

I like to stream large data files over REST directly to a Azure Blob Storage.
Now i face the problem that when I send a filter with 250MB over REST to the Azure Blob Storage, but with 260MB it stuck for ever.

I made an example Project where you can verify this behavior. https://github.com/git9999999/azure-blob-large-file-upload-problem

Question is my Reactive Code correct?

https://github.com/git9999999/azure-blob-large-file-upload-problem/blob/main/downloader/src/main/java/com/azureproblem/blob/controller/AzureBlobBugDownloaderController.java#L131

    @GetMapping(path = "/trigger-download-to-blob/{fileSizeInMb}")
    public void triggerDownloadToBlob(@PathVariable int fileSizeInMb) {
        log.info("triggerDownload");

        var flux = this.webClient
            .get()
            .uri("/serve-file/" + fileSizeInMb)
            .accept(MediaType.APPLICATION_OCTET_STREAM)
            .exchangeToFlux(clientResponse -> clientResponse.body(BodyExtractors.toDataBuffers()));

        var destination = "TestDownloadToAzureBlobStorage" + System.currentTimeMillis() + ".pdf";

        var blobClientTarget = this.containerClient.getBlobClient(destination);

        try (var outputStream = blobClientTarget.getBlockBlobClient().getBlobOutputStream(this.parallelTransferOptions, null, null, null, null)) {
            DataBufferUtils.write(flux, outputStream)
                .map(DataBufferUtils::release)
                .blockLast(Duration.ofHours(22));
            outputStream.flush();
        } catch (IOException e) {
            throw new IllegalStateException(e);
        }

        log.info("!!!!!!!!!!!!!!!!!!!!!!!!!!!!   end download of {}", destination);
    }

答案1

得分: 0

  • uploadFile方法现在直接接受Flux<DataBuffer>作为请求的一部分,而不再使用MultipartFile

  • 使用响应式编程的DataBufferUtils.write来流式传输文件数据,请查看下面的代码:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;

@Controller
public class AzureBlobUploaderController {

    // 省略其它部分
    // ...

    @PostMapping(path = "/upload")
    public void uploadFile(@RequestPart("file") Flux<DataBuffer> file) {
        // 省略其它部分
        // ...
    }

    // 省略其它部分
    // ...

    public static void main(String[] args) {
        // 省略其它部分
        // ...
    }
}

输出:
Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

  • 通过POST请求,我能够将1GB文件上传到我的Blob容器。

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

  • 在这里,我可以看到我的容器中的文件。

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

更新后的代码:

public Mono<Void> uploadFileToBlobStorage(WebClient webClient, String containerName, String blobName, File file) {
    return webClient.put()
            .uri("/" + containerName + "/" + blobName)
            .header(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()))
            .header(HttpHeaders.CONTENT_TYPE, "application/octet-stream")
            .body(BodyInserters.fromPublisher(getFileDataBufferFlux(file), DataBuffer.class))
            .retrieve()
            .bodyToMono(Void.class);
}

private Flux<DataBuffer> getFileDataBufferFlux(File file) {
    return DataBufferUtils.read(FileChannel::open, file.toPath(), DEFAULT_BUFFER_SIZE)
            .doFinally(signalType -> file.delete()); // 在读取数据后删除文件
}
  • getFileDataBufferFlux以响应式方式读取文件,返回Flux<DataBuffer>uploadFileToBlobStorage方法使用WebClient执行PUT请求,并将文件数据作为请求体。
英文:
  • The uploadFile method now accepts a Flux<DataBuffer> directly as a request part instead of using MultipartFile

  • The file data is streamed using reactive programming with DataBufferUtils.write, Check the below code:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestPart;
import org.springframework.web.multipart.MultipartFile;
import reactor.core.publisher.Flux;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.time.Duration;

@Controller
public class AzureBlobUploaderController {

    private final AzureBlobContainerClient containerClient;
    private final int bufferSize;
    private final int timeoutSeconds;

    public AzureBlobUploaderController(@Value("${azure.storage.containerName}") String containerName,
                                       @Value("${bufferSize}") int bufferSize,
                                       @Value("${timeoutSeconds}") int timeoutSeconds) {
        this.containerClient = new AzureBlobContainerClient(containerName);
        this.bufferSize = bufferSize;
        this.timeoutSeconds = timeoutSeconds;
    }

    @PostConstruct
    public void init() {
        // Create the container if it doesn't exist
        if (!containerClient.exists()) {
            containerClient.create();
        }
    }

    @PostMapping(path = "/upload")
    public void uploadFile(@RequestPart("file") Flux<DataBuffer> file) {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        String destinationBlobName = "UploadedFile_" + System.currentTimeMillis() + ".bin";
        var blobClient = containerClient.getBlobClient(destinationBlobName);
        var blobOutputStream = blobClient.getBlockBlobClient().getBlobOutputStream();

        DataBufferUtils.write(file, blobOutputStream, bufferSize)
                .doOnError(throwable -> {
                    // Handle the error gracefully
                    stopWatch.stop();
                    System.out.println("Error occurred during data streaming: " + throwable.getMessage());
                })
                .doFinally(signal -> {
                    try {
                        blobOutputStream.flush();
                        blobOutputStream.close();
                        stopWatch.stop();
                        System.out.println("File uploaded successfully " + stopWatch.getTotalTimeMillis() + "ms");
                    } catch (IOException e) {
                        System.out.println("Error occurred while closing the output stream: " + e.getMessage());
                    }
                })
                .blockLast(Duration.ofSeconds(timeoutSeconds));
    }

    public static void main(String[] args) {
        // Replace with your Azure Blob Storage container name
        String containerName = "your-container-name";

        // Replace with your buffer size and timeout settings
        int bufferSize = ;
        int timeoutSeconds = ;

        AzureBlobUploaderController controller = new AzureBlobUploaderController(containerName, bufferSize, timeoutSeconds);
    }
}

Output :
Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

  • Here by the post request, I am able to upload the 1GB file to my blob container.

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

  • Here I am able to see the file in my container.

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

Can not transfer more than 250MB with DataBufferUtils.write to Azure Blob Storage – Spring Webflux usage correct?

Updated code:

public Mono<Void> uploadFileToBlobStorage(WebClient webClient, String containerName, String blobName, File file) {
return webClient.put()
.uri("/" + containerName + "/" + blobName)
.header(HttpHeaders.CONTENT_LENGTH, String.valueOf(file.length()))
.header(HttpHeaders.CONTENT_TYPE, "application/octet-stream")
.body(BodyInserters.fromPublisher(getFileDataBufferFlux(file), DataBuffer.class))
.retrieve()
.bodyToMono(Void.class);
}
private Flux<DataBuffer> getFileDataBufferFlux(File file) {
return DataBufferUtils.read(FileChannel::open, file.toPath(), DEFAULT_BUFFER_SIZE)
.doFinally(signalType -> file.delete()); // Delete the file after reading its data
}
  • The getFileDataBufferFlux reads the file in a reactive manner, returning a Flux<DataBuffer> the uploadFileToBlobStorage method uses the WebClient to perform a PUT request with the file's data as the request body.

答案2

得分: 0

问题是我使用了BlobContainerClient而不是BlobContainerAsyncClientBlobContainerAsyncClient具有处理Flux-"Stuff"的特殊API。

这是我现在使用的代码:

 public void uploadAsync(Flux<DataBuffer> flux, Path destination) {
        BlobAsyncClient blobClientTarget = this.blobContainerAsyncClient.getBlobAsyncClient(relativePathUnix(destination));
        blobClientTarget.upload(flux.map((dataBuffer) -> {
            ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
            dataBuffer.toByteBuffer(buffer);
            DataBufferUtils.release(dataBuffer);
            return buffer;
        }), this.parallelTransferOptions).block();
    }

这是带有解决方案的示例存储库:https://github.com/git9999999/azure-blob-large-file-upload-problem

这是解决问题的工单:https://github.com/Azure/azure-sdk-for-java/issues/35477

英文:

The Problem was that I used the BlobContainerClient and not the BlobContainerAsyncClient. The BlobContainerAsyncClient has special API's to handle the Flux-"Stuff"

Hear is the code that I use now:

 public void uploadAsync(Flux&lt;DataBuffer&gt; flux, Path destination) {
BlobAsyncClient blobClientTarget = this.blobContainerAsyncClient.getBlobAsyncClient(relativePathUnix(destination));
blobClientTarget.upload(flux.map((dataBuffer) -&gt; {
ByteBuffer buffer = ByteBuffer.allocate(dataBuffer.readableByteCount());
dataBuffer.toByteBuffer(buffer);
DataBufferUtils.release(dataBuffer);
return buffer;
}), this.parallelTransferOptions).block();
}

And Here is the Example repo with solution https://github.com/git9999999/azure-blob-large-file-upload-problem

Here the Ticket that solve the Problem: https://github.com/Azure/azure-sdk-for-java/issues/35477

huangapple
  • 本文由 发表于 2023年6月29日 00:33:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/76575117.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定