如何确定 IEnumerable 的字节大小,以便获得良好的批处理大小?

huangapple go评论119阅读模式
英文:

How to determine size of IEnumerable in bytes in order to get a good batch size?

问题

  1. 我将记录写入CSV格式,以便通过外部API作为文件上传,该API对上传文件的大小有限制。我将记录写入内存的方法如下:
  2. ```C#
  3. using CsvHelper;
  4. public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
  5. {
  6. using (var memoryStream = new MemoryStream())
  7. {
  8. using (var writer = new StreamWriter(memoryStream))
  9. using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
  10. {
  11. await csv.WriteRecordsAsync(recordsToWrite);
  12. }
  13. return memoryStream.ToArray();
  14. }
  15. }

我的当前批处理方法如下:

  1. public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
  2. {
  3. var maxBatchSize = 50_000;
  4. var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);
  5. Dictionary<int, byte[]> records = new();
  6. for (int batchNr = 0; batchNr < nrOfBatches; batchNr++)
  7. {
  8. records.Add(batchNr, await WriteToMemoryAsync<T>(recordsToWrite));
  9. }
  10. return records;
  11. }

问题在于批处理大小有点随意。取决于对象“T”包含的内容大小,这可能会失败。

有没有办法获取“IEnumerable”的字节大小,以便获取文件大小的近似值,然后根据这个确定批处理的数量?

有没有其他方法可以解决这个问题?

编辑


我现在已经实施了Magnus建议的第一种解决方案。但在检查流的长度之前刷新写入的记录时存在问题,因为在那时流已经超过了大小限制。

我创建了一个测试来模拟这个问题,但第一批的批处理大小达到了1009B。

  1. [Fact]
  2. public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
  3. {
  4. //安排
  5. var records = GenerateTestRecords(100);
  6. var fileSizeLimit = 1_000; //1000B 限制
  7. var csvHandler = new CsvHandler();
  8. //操作
  9. var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);
  10. //断言
  11. Assert.All(file, f => Assert.True(f.Length < fileSizeLimit, $"Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}."));
  12. }
  13. private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
  14. {
  15. List<TestRecord> records = new();
  16. for (int i = 0; i < amountOfRecords; i++)
  17. {
  18. records.Add(new TestRecord
  19. {
  20. StringType = $"String {i}",
  21. IntType = 1,
  22. });
  23. }
  24. return records;
  25. }
  26. private class TestRecord
  27. {
  28. public string? StringType { get; set; }
  29. public int IntType { get; set; }
  30. }
  1. <details>
  2. <summary>英文:</summary>
  3. I&#39;m writing records into a csv format in order to upload it as a file through an external API that has a file size limit on the uploads. My method for writing records to memory looks like this:

using CsvHelper;

public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
using (var memoryStream = new MemoryStream())
{
using (var writer = new StreamWriter(memoryStream))
using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
{
await csv.WriteRecordsAsync(recordsToWrite);

  1. }
  2. return memoryStream.ToArray();
  3. }

}

  1. My current approach for batching looks like this:

public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
var maxBatchSize = 50_000;

  1. var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);
  2. Dictionary&lt;int, byte[]&gt; records = new();
  3. for (int batchNr = 0; batchNr &lt; nrOfBatches; batchNr++)
  4. {
  5. records.Add(batchNr, await WriteToMemoryAsync&lt;T&gt;(recordsToWrite));
  6. }
  7. return records;

}

  1. The problem is that the batch size is sort of arbitrary. Depending on how large objects ``T`` includes this might fail.
  2. &gt; Is there any way to get the size in bytes of the ``IEnumerable&lt;T&gt;`` in order to get an approximation of the file size and then determine the number of batches based on this?
  3. &gt; Is there any other way to approach this?
  4. ## Edit ##
  5. ----------
  6. I&#39;ve now implemented the first solution that Magnus suggested. However, there is a problem with flushing the written record before checking the length of the stream as the stream has already passed the size limit at that point.
  7. I created a test to simulate the problem and this fails as the batch size hits 1009B for the first batch.

[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
//Arrange
var records = GenerateTestRecords(100);

  1. var fileSizeLimit = 1_000; //1000B limit
  2. var csvHandler = new CsvHandler();
  3. //Act
  4. var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);
  5. //Assert
  6. Assert.All(file, f =&gt; Assert.True(f.Length &lt; fileSizeLimit, $&quot;Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}.&quot;));

}

private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
List<TestRecord> records = new();
for (int i = 0; i < amountOfRecords; i++)
{
records.Add(new TestRecord
{
StringType = $"String {i}",
IntType = 1,
});
}
return records;
}

private class TestRecord
{
public string? StringType { get; set; }
public int IntType { get; set; }
}

  1. </details>
  2. # 答案1
  3. **得分**: 2
  4. 如果将所有项目发送到 `WriteToMemory`,然后检查流的大小,如果达到所需大小,则返回写入的项目并初始化下一批的新流。因此,`WriteToMemory` 将返回一组批次。
  5. 可能不希望在每次写入记录后都刷新,找到合适的刷新间隔。
  6. ```csharp
  7. public static IEnumerable<byte[]> WriteToMemory<T>(IEnumerable<T> recordsToWrite)
  8. {
  9. var memoryStream = new MemoryStream();
  10. var writer = new StreamWriter(memoryStream);
  11. var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
  12. try
  13. {
  14. foreach (var r in recordsToWrite)
  15. {
  16. csv.WriteRecord(r);
  17. csv.Flush(); // 可能希望在每 x 个项目后刷新,而不是每个项目。
  18. if (memoryStream.Length >= 1024)
  19. {
  20. csv.Dispose();
  21. writer.Dispose();
  22. memoryStream.Dispose();
  23. yield return memoryStream.ToArray();
  24. memoryStream = new MemoryStream();
  25. writer = new StreamWriter(memoryStream);
  26. csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
  27. }
  28. }
  29. csv.Flush();
  30. if (memoryStream.Length > 0)
  31. yield return memoryStream.ToArray();
  32. }
  33. finally
  34. {
  35. csv.Dispose();
  36. writer.Dispose();
  37. memoryStream.Dispose();
  38. }
  39. }

为了避免在内存中保存大量字节数组,可以改为将委托传递给方法以创建流(例如磁盘上的文件)。

  1. public class Program
  2. {
  3. private static int count = 0;
  4. public static async Task Main()
  5. {
  6. await WriteToStreamAsync(Enumerable.Range(0, 10_000), () => File.Create($"C:\\temp\\files\\file{count++}.csv"));
  7. }
  8. public static async Task WriteToStreamAsync<T>(IEnumerable<T> recordsToWrite, Func<Stream> createFile)
  9. {
  10. var stream = createFile();
  11. var writer = new StreamWriter(stream);
  12. var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
  13. try
  14. {
  15. var i = 0;
  16. foreach (var r in recordsToWrite)
  17. {
  18. csv.WriteRecord(r);
  19. if (++i % 100 == 0) // 找到一些良好的间隔
  20. await csv.FlushAsync();
  21. if (stream.Length >= 1024)
  22. {
  23. await csv.DisposeAsync();
  24. await writer.DisposeAsync();
  25. await stream.DisposeAsync();
  26. stream = createFile();
  27. writer = new StreamWriter(stream);
  28. csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
  29. }
  30. }
  31. }
  32. finally
  33. {
  34. await csv.DisposeAsync();
  35. await writer.DisposeAsync();
  36. await stream.DisposeAsync();
  37. }
  38. }
  39. }
英文:

What if instead send all your items to WriteToMemory and then check the size of the stream and if of desired size return the items written and initialize a new stream for the next batch. So WriteToMemory will return a set of batches instead.
You might not want to flush after every written record, find an appropriate flush interval.

  1. public static IEnumerable&lt;byte[]&gt; WriteToMemory&lt;T&gt;(IEnumerable&lt;T&gt; recordsToWrite)
  2. {
  3. var memoryStream = new MemoryStream();
  4. var writer = new StreamWriter(memoryStream);
  5. var csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
  6. try
  7. {
  8. foreach (var r in recordsToWrite)
  9. {
  10. csv.WriteRecord(r);
  11. csv.Flush(); //might want to flush after every x items instead of each.
  12. if (memoryStream.Length &gt;= 1024)
  13. {
  14. csv.Dispose();
  15. writer.Dispose();
  16. memoryStream.Dispose();
  17. yield return memoryStream.ToArray();
  18. memoryStream = new MemoryStream();
  19. writer = new StreamWriter(memoryStream);
  20. csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
  21. }
  22. }
  23. csv.Flush();
  24. if (memoryStream.Length &gt; 0)
  25. yield return memoryStream.ToArray();
  26. }
  27. finally
  28. {
  29. csv.Dispose();
  30. writer.Dispose();
  31. memoryStream.Dispose();
  32. }
  33. }

To avoid holding large amounts of byte arrays in memory you can instead take in a delegate to your method to create the stream (for example a file on disk).

  1. public class Program
  2. {
  3. private static int count = 0;
  4. public static async Task Main()
  5. {
  6. await WriteToStreamAsync(Enumerable.Range(0, 10_000), () =&gt; File.Create($&quot;C:\\temp\\\\files\\file{count++}.csv&quot;));
  7. }
  8. public static async Task WriteToStreamAsync&lt;T&gt;(IEnumerable&lt;T&gt; recordsToWrite, Func&lt;Stream&gt; createFile)
  9. {
  10. var stream = createFile();
  11. var writer = new StreamWriter(stream);
  12. var csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
  13. try
  14. {
  15. var i = 0;
  16. foreach (var r in recordsToWrite)
  17. {
  18. csv.WriteRecord(r);
  19. if (++i % 100 == 0) //Find some good interval
  20. await csv.FlushAsync();
  21. if (stream.Length &gt;= 1024)
  22. {
  23. await csv.DisposeAsync();
  24. await writer.DisposeAsync();
  25. await stream.DisposeAsync();
  26. stream = createFile();
  27. writer = new StreamWriter(stream);
  28. csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
  29. }
  30. }
  31. }
  32. finally
  33. {
  34. await csv.DisposeAsync();
  35. await writer.DisposeAsync();
  36. await stream.DisposeAsync();
  37. }
  38. }
  39. }

huangapple
  • 本文由 发表于 2023年3月8日 16:57:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75671043.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定