如何确定 IEnumerable 的字节大小,以便获得良好的批处理大小?

huangapple go评论97阅读模式
英文:

How to determine size of IEnumerable in bytes in order to get a good batch size?

问题

我将记录写入CSV格式,以便通过外部API作为文件上传,该API对上传文件的大小有限制。我将记录写入内存的方法如下:

```C#
using CsvHelper;

public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
    using (var memoryStream = new MemoryStream())
    {
        using (var writer = new StreamWriter(memoryStream))
        using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
        {
            await csv.WriteRecordsAsync(recordsToWrite);

        }
        return memoryStream.ToArray();
    }
}

我的当前批处理方法如下:

public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
    var maxBatchSize = 50_000;

    var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);

    Dictionary<int, byte[]> records = new();
    for (int batchNr = 0; batchNr < nrOfBatches; batchNr++)
    {
        records.Add(batchNr, await WriteToMemoryAsync<T>(recordsToWrite));
    }

    return records;
}

问题在于批处理大小有点随意。取决于对象“T”包含的内容大小,这可能会失败。

有没有办法获取“IEnumerable”的字节大小,以便获取文件大小的近似值,然后根据这个确定批处理的数量?

有没有其他方法可以解决这个问题?

编辑


我现在已经实施了Magnus建议的第一种解决方案。但在检查流的长度之前刷新写入的记录时存在问题,因为在那时流已经超过了大小限制。

我创建了一个测试来模拟这个问题,但第一批的批处理大小达到了1009B。

[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
    //安排
    var records = GenerateTestRecords(100);

    var fileSizeLimit = 1_000;  //1000B 限制

    var csvHandler = new CsvHandler();

    //操作
    var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);

    //断言
    Assert.All(file, f => Assert.True(f.Length < fileSizeLimit, $"Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}."));
}

private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
    List<TestRecord> records = new();
    for (int i = 0; i < amountOfRecords; i++)
    {
        records.Add(new TestRecord
        {
            StringType = $"String {i}",
            IntType = 1,
        });
    }
    return records;
}

private class TestRecord
{
    public string? StringType { get; set; }
    public int IntType { get; set; }
}

<details>
<summary>英文:</summary>

I&#39;m writing records into a csv format in order to upload it as a file through an external API that has a file size limit on the uploads. My method for writing records to memory looks like this:

using CsvHelper;

public async Task<byte[]> WriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
using (var memoryStream = new MemoryStream())
{
using (var writer = new StreamWriter(memoryStream))
using (var csv = new CsvWriter(writer, new CultureInfo("sv-SE")))
{
await csv.WriteRecordsAsync(recordsToWrite);

    }
    return memoryStream.ToArray();
}

}

My current approach for batching looks like this:

public async Task<Dictionary<int, byte[]>> BatchWriteToMemoryAsync<T>(IEnumerable<T> recordsToWrite) where T : class
{
var maxBatchSize = 50_000;

var nrOfBatches = (int)Math.Ceiling((double)recordsToWrite.Count() / maxBatchSize);

Dictionary&lt;int, byte[]&gt; records = new();
for (int batchNr = 0; batchNr &lt; nrOfBatches; batchNr++)
{
    records.Add(batchNr, await WriteToMemoryAsync&lt;T&gt;(recordsToWrite));
}

return records;

}

The problem is that the batch size is sort of arbitrary. Depending on how large objects ``T`` includes this might fail.  

&gt; Is there any way to get the size in bytes of the ``IEnumerable&lt;T&gt;`` in order to get an approximation of the file size and then determine the number of batches based on this? 

&gt; Is there any other way to approach this?

## Edit ##
----------
I&#39;ve now implemented the first solution that Magnus suggested. However, there is a problem with flushing the written record before checking the length of the stream as the stream has already passed the size limit at that point. 

I created a test to simulate the problem and this fails as the batch size hits 1009B for the first batch.

[Fact]
public void WhenWritingToMemoryInBatches_ShouldEnsureFileSizeDoesNotExceedLimit()
{
//Arrange
var records = GenerateTestRecords(100);

var fileSizeLimit = 1_000;  //1000B limit

var csvHandler = new CsvHandler();

//Act
var file = csvHandler.BatchWriteToMemory(records, fileSizeLimit);

//Assert
Assert.All(file, f =&gt; Assert.True(f.Length &lt; fileSizeLimit, $&quot;Expected fileSize to be less than {fileSizeLimit}. Actual fileSize was {f.Length}.&quot;));

}

private IEnumerable<TestRecord> GenerateTestRecords(int amountOfRecords)
{
List<TestRecord> records = new();
for (int i = 0; i < amountOfRecords; i++)
{
records.Add(new TestRecord
{
StringType = $"String {i}",
IntType = 1,
});
}
return records;
}

private class TestRecord
{
public string? StringType { get; set; }
public int IntType { get; set; }
}


</details>


# 答案1
**得分**: 2

如果将所有项目发送到 `WriteToMemory`,然后检查流的大小,如果达到所需大小,则返回写入的项目并初始化下一批的新流。因此,`WriteToMemory` 将返回一组批次。

可能不希望在每次写入记录后都刷新,找到合适的刷新间隔。

```csharp
    public static IEnumerable<byte[]> WriteToMemory<T>(IEnumerable<T> recordsToWrite)
    {
        var memoryStream = new MemoryStream();
        var writer = new StreamWriter(memoryStream);
        var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));

        try
        {
            foreach (var r in recordsToWrite)
            {
                csv.WriteRecord(r);
                csv.Flush(); // 可能希望在每 x 个项目后刷新,而不是每个项目。
                if (memoryStream.Length >= 1024)
                {
                    csv.Dispose();
                    writer.Dispose();
                    memoryStream.Dispose();

                    yield return memoryStream.ToArray();

                    memoryStream = new MemoryStream();
                    writer = new StreamWriter(memoryStream);
                    csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
                }
            }

            csv.Flush();
            if (memoryStream.Length > 0)
                yield return memoryStream.ToArray();
        }
        finally
        {
            csv.Dispose();
            writer.Dispose();
            memoryStream.Dispose();
        }
    }

为了避免在内存中保存大量字节数组,可以改为将委托传递给方法以创建流(例如磁盘上的文件)。

    public class Program
    {
        private static int count = 0;
        public static async Task Main()
        {
            await WriteToStreamAsync(Enumerable.Range(0, 10_000), () => File.Create($"C:\\temp\\files\\file{count++}.csv"));
        }

        public static async Task WriteToStreamAsync<T>(IEnumerable<T> recordsToWrite, Func<Stream> createFile)
        {
            var stream = createFile();
            var writer = new StreamWriter(stream);
            var csv = new CsvWriter(writer, new CultureInfo("sv-SE"));

            try
            {
                var i = 0;
                foreach (var r in recordsToWrite)
                {
                    csv.WriteRecord(r);
                    if (++i % 100 == 0) // 找到一些良好的间隔
                        await csv.FlushAsync();
                    if (stream.Length >= 1024)
                    {
                        await csv.DisposeAsync();
                        await writer.DisposeAsync();
                        await stream.DisposeAsync();

                        stream = createFile();
                        writer = new StreamWriter(stream);
                        csv = new CsvWriter(writer, new CultureInfo("sv-SE"));
                    }
                }
            }
            finally
            {
                await csv.DisposeAsync();
                await writer.DisposeAsync();
                await stream.DisposeAsync();
            }
        }
    }
英文:

What if instead send all your items to WriteToMemory and then check the size of the stream and if of desired size return the items written and initialize a new stream for the next batch. So WriteToMemory will return a set of batches instead.
You might not want to flush after every written record, find an appropriate flush interval.

public static IEnumerable&lt;byte[]&gt; WriteToMemory&lt;T&gt;(IEnumerable&lt;T&gt; recordsToWrite)
{
	var memoryStream = new MemoryStream();
	var writer = new StreamWriter(memoryStream);
	var csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));

	try
	{
		foreach (var r in recordsToWrite)
		{
			csv.WriteRecord(r);
			csv.Flush(); //might want to flush after every x items instead of each.
			if (memoryStream.Length &gt;= 1024)
			{
                csv.Dispose();
                writer.Dispose();
                memoryStream.Dispose();
				
				yield return memoryStream.ToArray();
				
				memoryStream = new MemoryStream();
				writer = new StreamWriter(memoryStream);
				csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
			}
		}
        
        csv.Flush();
		if (memoryStream.Length &gt; 0)
			yield return memoryStream.ToArray();
	}
	finally
	{
        csv.Dispose();
        writer.Dispose();
		memoryStream.Dispose();
	}
}

To avoid holding large amounts of byte arrays in memory you can instead take in a delegate to your method to create the stream (for example a file on disk).

public class Program
{
	private static int count = 0;
	public static async Task Main()
	{
		await WriteToStreamAsync(Enumerable.Range(0, 10_000), () =&gt; File.Create($&quot;C:\\temp\\\\files\\file{count++}.csv&quot;));
	}

	public static async Task WriteToStreamAsync&lt;T&gt;(IEnumerable&lt;T&gt; recordsToWrite, Func&lt;Stream&gt; createFile)
	{
		var stream = createFile();
		var writer = new StreamWriter(stream);
		var csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));

		try
		{
			var i = 0;
			foreach (var r in recordsToWrite)
			{
				csv.WriteRecord(r);
				if (++i % 100 == 0) //Find some good interval
					await csv.FlushAsync();
				if (stream.Length &gt;= 1024)
				{
					await csv.DisposeAsync();
					await writer.DisposeAsync();
					await stream.DisposeAsync();
					
					stream = createFile();
					writer = new StreamWriter(stream);
					csv = new CsvWriter(writer, new CultureInfo(&quot;sv-SE&quot;));
				}
			}
		}
		finally
		{
			await csv.DisposeAsync();
			await writer.DisposeAsync();
			await stream.DisposeAsync();
		}
	}
}

huangapple
  • 本文由 发表于 2023年3月8日 16:57:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/75671043.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定