英文:
How to determine size of IEnumerable in bytes in order to get a good batch size?
问题
我将记录写入CSV格式,以便通过外部API作为文件上传,该API对上传文件的大小有限制。我将记录写入内存的方法如下:
```C#
using CsvHelper;
public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
using (var memoryStream = new MemoryStream())
{
using (var writer = new StreamWriter(memoryStream))
using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
{
await csv.WriteRecordsAsync(recordsToWrite);
}
return memoryStream.ToArray();
}
}
我的当前批处理方法如下:
public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
var maxBatchSize = 50_000;
var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);
Dictionary<int, byte[]> records = new();
for (int batchNr = 0; batchNr < nrOfBatches; batchNr++)
{
records.Add(batchNr, await WriteToMemoryAsync<T>(recordsToWrite));
}
return records;
}
问题在于批处理大小有点随意。取决于对象“T”包含的内容大小,这可能会失败。
有没有办法获取“IEnumerable
”的字节大小,以便获取文件大小的近似值,然后根据这个确定批处理的数量?
有没有其他方法可以解决这个问题?
编辑
我现在已经实施了Magnus建议的第一种解决方案。但在检查流的长度之前刷新写入的记录时存在问题,因为在那时流已经超过了大小限制。
我创建了一个测试来模拟这个问题,但第一批的批处理大小达到了1009B。
[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
//安排
var records = GenerateTestRecords(100);
var fileSizeLimit = 1_000; //1000B 限制
var csvHandler = new CsvHandler();
//操作
var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);
//断言
Assert.All(file, f => Assert.True(f.Length < fileSizeLimit, $"Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}."));
}
private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
List<TestRecord> records = new();
for (int i = 0; i < amountOfRecords; i++)
{
records.Add(new TestRecord
{
StringType = $"String {i}",
IntType = 1,
});
}
return records;
}
private class TestRecord
{
public string? StringType { get; set; }
public int IntType { get; set; }
}
<details>
<summary>英文:</summary>
I'm writing records into a csv format in order to upload it as a file through an external API that has a file size limit on the uploads. My method for writing records to memory looks like this:
using CsvHelper;
public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
using (var memoryStream = new MemoryStream())
{
using (var writer = new StreamWriter(memoryStream))
using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
{
await csv.WriteRecordsAsync(recordsToWrite);
}
return memoryStream.ToArray();
}
}
My current approach for batching looks like this:
public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
var maxBatchSize = 50_000;
var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);
Dictionary<int, byte[]> records = new();
for (int batchNr = 0; batchNr < nrOfBatches; batchNr++)
{
records.Add(batchNr, await WriteToMemoryAsync<T>(recordsToWrite));
}
return records;
}
The problem is that the batch size is sort of arbitrary. Depending on how large objects ``T`` includes this might fail.
> Is there any way to get the size in bytes of the ``IEnumerable<T>`` in order to get an approximation of the file size and then determine the number of batches based on this?
> Is there any other way to approach this?
## Edit ##
----------
I've now implemented the first solution that Magnus suggested. However, there is a problem with flushing the written record before checking the length of the stream as the stream has already passed the size limit at that point.
I created a test to simulate the problem and this fails as the batch size hits 1009B for the first batch.
[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
//Arrange
var records = GenerateTestRecords(100);
var fileSizeLimit = 1_000; //1000B limit
var csvHandler = new CsvHandler();
//Act
var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);
//Assert
Assert.All(file, f => Assert.True(f.Length < fileSizeLimit, $"Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}."));
}
private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
List<TestRecord> records = new();
for (int i = 0; i < amountOfRecords; i++)
{
records.Add(new TestRecord
{
StringType = $"String {i}",
IntType = 1,
});
}
return records;
}
private class TestRecord
{
public string? StringType { get; set; }
public int IntType { get; set; }
}
</details>
# 答案1
**得分**: 2
如果将所有项目发送到 `WriteToMemory`,然后检查流的大小,如果达到所需大小,则返回写入的项目并初始化下一批的新流。因此,`WriteToMemory` 将返回一组批次。
可能不希望在每次写入记录后都刷新,找到合适的刷新间隔。
```csharp
public static IEnumerable<byte[]> WriteToMemory<T>(IEnumerable<T> recordsToWrite)
{
var memoryStream = new MemoryStream();
var writer = new StreamWriter(memoryStream);
var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
try
{
foreach (var r in recordsToWrite)
{
csv.WriteRecord(r);
csv.Flush(); // 可能希望在每 x 个项目后刷新,而不是每个项目。
if (memoryStream.Length >= 1024)
{
csv.Dispose();
writer.Dispose();
memoryStream.Dispose();
yield return memoryStream.ToArray();
memoryStream = new MemoryStream();
writer = new StreamWriter(memoryStream);
csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
}
}
csv.Flush();
if (memoryStream.Length > 0)
yield return memoryStream.ToArray();
}
finally
{
csv.Dispose();
writer.Dispose();
memoryStream.Dispose();
}
}
为了避免在内存中保存大量字节数组,可以改为将委托传递给方法以创建流(例如磁盘上的文件)。
public class Program
{
private static int count = 0;
public static async Task Main()
{
await WriteToStreamAsync(Enumerable.Range(0, 10_000), () => File.Create($"C:\\temp\\files\\file{count++}.csv"));
}
public static async Task WriteToStreamAsync<T>(IEnumerable<T> recordsToWrite, Func<Stream> createFile)
{
var stream = createFile();
var writer = new StreamWriter(stream);
var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
try
{
var i = 0;
foreach (var r in recordsToWrite)
{
csv.WriteRecord(r);
if (++i % 100 == 0) // 找到一些良好的间隔
await csv.FlushAsync();
if (stream.Length >= 1024)
{
await csv.DisposeAsync();
await writer.DisposeAsync();
await stream.DisposeAsync();
stream = createFile();
writer = new StreamWriter(stream);
csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
}
}
}
finally
{
await csv.DisposeAsync();
await writer.DisposeAsync();
await stream.DisposeAsync();
}
}
}
英文:
What if instead send all your items to WriteToMemory
and then check the size of the stream and if of desired size return the items written and initialize a new stream for the next batch. So WriteToMemory
will return a set of batches instead.
You might not want to flush after every written record, find an appropriate flush interval.
public static IEnumerable<byte[]> WriteToMemory<T>(IEnumerable<T> recordsToWrite)
{
var memoryStream = new MemoryStream();
var writer = new StreamWriter(memoryStream);
var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
try
{
foreach (var r in recordsToWrite)
{
csv.WriteRecord(r);
csv.Flush(); //might want to flush after every x items instead of each.
if (memoryStream.Length >= 1024)
{
csv.Dispose();
writer.Dispose();
memoryStream.Dispose();
yield return memoryStream.ToArray();
memoryStream = new MemoryStream();
writer = new StreamWriter(memoryStream);
csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
}
}
csv.Flush();
if (memoryStream.Length > 0)
yield return memoryStream.ToArray();
}
finally
{
csv.Dispose();
writer.Dispose();
memoryStream.Dispose();
}
}
To avoid holding large amounts of byte arrays in memory you can instead take in a delegate to your method to create the stream (for example a file on disk).
public class Program
{
private static int count = 0;
public static async Task Main()
{
await WriteToStreamAsync(Enumerable.Range(0, 10_000), () => File.Create($"C:\\temp\\\\files\\file{count++}.csv"));
}
public static async Task WriteToStreamAsync<T>(IEnumerable<T> recordsToWrite, Func<Stream> createFile)
{
var stream = createFile();
var writer = new StreamWriter(stream);
var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
try
{
var i = 0;
foreach (var r in recordsToWrite)
{
csv.WriteRecord(r);
if (++i % 100 == 0) //Find some good interval
await csv.FlushAsync();
if (stream.Length >= 1024)
{
await csv.DisposeAsync();
await writer.DisposeAsync();
await stream.DisposeAsync();
stream = createFile();
writer = new StreamWriter(stream);
csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
}
}
}
finally
{
await csv.DisposeAsync();
await writer.DisposeAsync();
await stream.DisposeAsync();
}
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论