SqlBulkCopy将DataTables逐个添加到DataSet中时。

huangapple go评论89阅读模式
英文:

SqlBulkCopy DataTables as they are added to a DataSet

问题

我想从CSV中解析值到数据表块中,将它们添加到数据集中,然后使用SQLBulkCopy将数据表插入到SQL中的单个表中。原始的CSV文件大小可以在4 GB到8 GB之间变化,我需要避免将整个文件读入内存,因此需要分块处理。我大致基于这个文章来实现我的分块处理。我使用LumenWorks来解析CSV值。

一旦将数据表添加到数据集中,我希望立即使用SqlBulkCopy将其插入到我的SQL表中,同时下一个数据表正在创建中。在SqlBulkCopy完成后,我希望删除数据表以释放内存。

我的第一个想法是以异步方式运行分块处理方法,而不等待它完成,然后运行一个while循环,检查数据集中是否存在下一个数据表。如果数据表存在,则进行批量复制。如果数据表的行数小于行限制,那么它是最后一块,停止while循环。

我是否走错了?如果没有,我该如何实现这样的操作?

string filePath = @"C:\Users\user\Downloads\Testing\file - Copy.csv";
DataSet ds = new DataSet();

bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;

// 启动分块处理,但不等待它完成,然后开始while循环
ChunkCSV(filePath, ds, rowLimit);

// 如果数据表存在,则运行SqlBulkCopy
while (continueInsert)
{
    if (ds.Tables.Contains("tbl_" + tableNumber))
    {
        DataTable dataTable = ds.Tables["tbl_" + tableNumber];

        // 在这里执行SqlBulkCopy dataTable的代码

        if (ds.Tables["tbl_" + tableNumber].Rows.Count < rowLimit)
        {
            continueInsert = false;
        }

        // 从数据集中删除数据表以释放内存
        ds.Tables.Remove("tbl_" + tableNumber);

        tableNumber++;
    }
    else
    {
        Thread.Sleep(1000);
    }
}

这是我的分块处理代码:

private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
    char delimiter = ',';

    DataTable dtChunk = null;
    int tableNumber = 0;
    int chunkRowCount = 0;
    bool firstLineOfChunk = true;

    using (var sr = new StreamReader(filePath))
    using (CsvReader csv = new CsvReader(sr, false, delimiter, '"', '
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
    char delimiter = ',';

    DataTable dtChunk = null;
    int tableNumber = 0;
    int chunkRowCount = 0;
    bool firstLineOfChunk = true;

    using (var sr = new StreamReader(filePath))
    using (CsvReader csv = new CsvReader(sr, false, delimiter, '"', '\0', '\0', ValueTrimmingOptions.All, 65536))
    {
        int fieldCount = csv.FieldCount;
        string[] row = new string[fieldCount];

        // 当需要时添加字段
        csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;

        while (csv.ReadNextRecord())
        {
            if (firstLineOfChunk)
            {
                firstLineOfChunk = false;
                dtChunk = CreateDataTable(fieldCount, tableNumber);
            }

            DataRow dataRow = dtChunk.NewRow();

            csv.CopyCurrentRecordTo(row);
            for (int f = 0; f < fieldCount; f++)
            {
                dataRow[f] = row[f];
            }

            dtChunk.Rows.Add(dataRow);
            chunkRowCount++;

            if (chunkRowCount == rowLimit)
            {
                firstLineOfChunk = true;
                chunkRowCount = 0;
                tableNumber++;
                dataSet.Tables.Add(dtChunk);
                dtChunk = null;
            }
        }
    }

    if (dtChunk != null)
    {
        dataSet.Tables.Add(dtChunk);
    }
}

private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
    DataTable dt = new DataTable("tbl_" + tableNumber);

    for(int i = 0; i < fieldCount; i++)
    {
        dt.Columns.Add("Column_" + i);
    }

    return dt;
}
'
, '
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
    char delimiter = ',';

    DataTable dtChunk = null;
    int tableNumber = 0;
    int chunkRowCount = 0;
    bool firstLineOfChunk = true;

    using (var sr = new StreamReader(filePath))
    using (CsvReader csv = new CsvReader(sr, false, delimiter, '"', '\0', '\0', ValueTrimmingOptions.All, 65536))
    {
        int fieldCount = csv.FieldCount;
        string[] row = new string[fieldCount];

        // 当需要时添加字段
        csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;

        while (csv.ReadNextRecord())
        {
            if (firstLineOfChunk)
            {
                firstLineOfChunk = false;
                dtChunk = CreateDataTable(fieldCount, tableNumber);
            }

            DataRow dataRow = dtChunk.NewRow();

            csv.CopyCurrentRecordTo(row);
            for (int f = 0; f < fieldCount; f++)
            {
                dataRow[f] = row[f];
            }

            dtChunk.Rows.Add(dataRow);
            chunkRowCount++;

            if (chunkRowCount == rowLimit)
            {
                firstLineOfChunk = true;
                chunkRowCount = 0;
                tableNumber++;
                dataSet.Tables.Add(dtChunk);
                dtChunk = null;
            }
        }
    }

    if (dtChunk != null)
    {
        dataSet.Tables.Add(dtChunk);
    }
}

private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
    DataTable dt = new DataTable("tbl_" + tableNumber);

    for(int i = 0; i < fieldCount; i++)
    {
        dt.Columns.Add("Column_" + i);
    }

    return dt;
}
'
, ValueTrimmingOptions.All, 65536))
{ int fieldCount = csv.FieldCount; string[] row = new string[fieldCount]; // 当需要时添加字段 csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty; while (csv.ReadNextRecord()) { if (firstLineOfChunk) { firstLineOfChunk = false; dtChunk = CreateDataTable(fieldCount, tableNumber); } DataRow dataRow = dtChunk.NewRow(); csv.CopyCurrentRecordTo(row); for (int f = 0; f < fieldCount; f++) { dataRow[f] = row[f]; } dtChunk.Rows.Add(dataRow); chunkRowCount++; if (chunkRowCount == rowLimit) { firstLineOfChunk = true; chunkRowCount = 0; tableNumber++; dataSet.Tables.Add(dtChunk); dtChunk = null; } } } if (dtChunk != null) { dataSet.Tables.Add(dtChunk); } } private static DataTable CreateDataTable(int fieldCount, int tableNumber) { DataTable dt = new DataTable("tbl_" + tableNumber); for(int i = 0; i < fieldCount; i++) { dt.Columns.Add("Column_" + i); } return dt; }

<details>
<summary>英文:</summary>

I&#39;d like to parse values from a csv into datatable chunks, add them to a dataset, and then use SQLBulkCopy to insert the datatables into a single table in SQL.  The original csv can range from 4 GB to 8 GB, and I need to avoid reading the entire thing into memory, hence the chunking.  I loosely based my chunking on this [post][1]. I use [LumenWorks][2] to parse the csv values.  


As soon as a datatable is added to the dataset, I want to use SqlBulkCopy to insert it into my SQL table, all while the next datatable is being created.  After the SqlBulkCopy completes, I want to remove the datatable to release the memory.  

My first thought is to run the chunking method asynchronously without await, then run a while loop that checks for the existence of a next datatable in the dataset.  If the datatable exists, then bulk copy.  If the datatable row count is less then the row limit, then it is the last chunk and stop while loop.

Am I going about this the wrong way?  If not, how can I do something like this?

            string filePath = @&quot;C:\Users\user\Downloads\Testing\file - Copy.csv&quot;;
            DataSet ds = new DataSet();

            bool continueInsert = true;
            int rowLimit = 100000;
            int tableNumber = 0;

            //Start this, but do not wait for it to complete before starting while loop
            ChunkCSV(filePath, ds, rowLimit);
 
            //Run SqlBulkCopy if datatable exists 
            while (continueInsert)
            {
                if (ds.Tables.Contains(&quot;tbl_&quot; + tableNumber))
                {
                    DataTable dataTable = ds.Tables[&quot;tbl_&quot; + tableNumber];

                    //SqlBulkCopy dataTable code HERE

                    if (ds.Tables[&quot;tbl_&quot; + tableNumber].Rows.Count &lt; rowLimit)
                    {
                        continueInsert = false;
                    }

                    //Remove datatable from dataset to release memory
                    ds.Tables.Remove(&quot;tbl_&quot; + tableNumber);

                    tableNumber++;
                }
                else
                {
                    Thread.Sleep(1000);
                }
            }


Here is my chunking code:

        private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
        {
            char delimiter = &#39;,&#39;;

            DataTable dtChunk = null;
            int tableNumber = 0;
            int chunkRowCount = 0;
            bool firstLineOfChunk = true;

            using (var sr = new StreamReader(filePath))
            using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;

<details>
<summary>英文:</summary>
I&#39;d like to parse values from a csv into datatable chunks, add them to a dataset, and then use SQLBulkCopy to insert the datatables into a single table in SQL.  The original csv can range from 4 GB to 8 GB, and I need to avoid reading the entire thing into memory, hence the chunking.  I loosely based my chunking on this [post][1]. I use [LumenWorks][2] to parse the csv values.  
As soon as a datatable is added to the dataset, I want to use SqlBulkCopy to insert it into my SQL table, all while the next datatable is being created.  After the SqlBulkCopy completes, I want to remove the datatable to release the memory.  
My first thought is to run the chunking method asynchronously without await, then run a while loop that checks for the existence of a next datatable in the dataset.  If the datatable exists, then bulk copy.  If the datatable row count is less then the row limit, then it is the last chunk and stop while loop.
Am I going about this the wrong way?  If not, how can I do something like this?
string filePath = @&quot;C:\Users\user\Downloads\Testing\file - Copy.csv&quot;;
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists 
while (continueInsert)
{
if (ds.Tables.Contains(&quot;tbl_&quot; + tableNumber))
{
DataTable dataTable = ds.Tables[&quot;tbl_&quot; + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables[&quot;tbl_&quot; + tableNumber].Rows.Count &lt; rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove(&quot;tbl_&quot; + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
Here is my chunking code:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = &#39;,&#39;;
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f &lt; fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable(&quot;tbl_&quot; + tableNumber);
for(int i = 0; i &lt; fieldCount; i++)
{
dt.Columns.Add(&quot;Column_&quot; + i);
}
return dt;
}
[1]: https://www.codeproject.com/Articles/543789/Parse-CSV-file-chunk-by-chunk-and-save-in-database
[2]: https://www.codeproject.com/Articles/9258/A-Fast-CSV-Reader
</details>
# 答案1
**得分**: 3
没有理由一开始就使用 DataTable。
使用 [SqlBulkCopy.WriteToServer(IDataReader)][1] 重载,可以直接将整个文件流传输到 SQL Server。如果不想在单个事务中加载所有行,请使用 [SqlBulkCopy.BatchSize][2]。
例如:
```csharp
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '&#39;&#39;', '&#39;\0&#39;', '&#39;\0&#39;', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;, &#39;

<details>
<summary>英文:</summary>
I&#39;d like to parse values from a csv into datatable chunks, add them to a dataset, and then use SQLBulkCopy to insert the datatables into a single table in SQL.  The original csv can range from 4 GB to 8 GB, and I need to avoid reading the entire thing into memory, hence the chunking.  I loosely based my chunking on this [post][1]. I use [LumenWorks][2] to parse the csv values.  
As soon as a datatable is added to the dataset, I want to use SqlBulkCopy to insert it into my SQL table, all while the next datatable is being created.  After the SqlBulkCopy completes, I want to remove the datatable to release the memory.  
My first thought is to run the chunking method asynchronously without await, then run a while loop that checks for the existence of a next datatable in the dataset.  If the datatable exists, then bulk copy.  If the datatable row count is less then the row limit, then it is the last chunk and stop while loop.
Am I going about this the wrong way?  If not, how can I do something like this?
string filePath = @&quot;C:\Users\user\Downloads\Testing\file - Copy.csv&quot;;
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists 
while (continueInsert)
{
if (ds.Tables.Contains(&quot;tbl_&quot; + tableNumber))
{
DataTable dataTable = ds.Tables[&quot;tbl_&quot; + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables[&quot;tbl_&quot; + tableNumber].Rows.Count &lt; rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove(&quot;tbl_&quot; + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
Here is my chunking code:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = &#39;,&#39;;
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f &lt; fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable(&quot;tbl_&quot; + tableNumber);
for(int i = 0; i &lt; fieldCount; i++)
{
dt.Columns.Add(&quot;Column_&quot; + i);
}
return dt;
}
[1]: https://www.codeproject.com/Articles/543789/Parse-CSV-file-chunk-by-chunk-and-save-in-database
[2]: https://www.codeproject.com/Articles/9258/A-Fast-CSV-Reader
</details>
# 答案1
**得分**: 3
没有理由一开始就使用 DataTable。
使用 [SqlBulkCopy.WriteToServer(IDataReader)][1] 重载,可以直接将整个文件流传输到 SQL Server。如果不想在单个事务中加载所有行,请使用 [SqlBulkCopy.BatchSize][2]。
例如:
```csharp
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '&#39;&#39;', '&#39;\0&#39;', '&#39;\0&#39;', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;, ValueTrimmingOptions.All, 65536)) { int fieldCount = csv.FieldCount; string[] row = new string[fieldCount]; //Add fields when necessary csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty; while (csv.ReadNextRecord()) { if (firstLineOfChunk) { firstLineOfChunk = false; dtChunk = CreateDataTable(fieldCount, tableNumber); } DataRow dataRow = dtChunk.NewRow(); csv.CopyCurrentRecordTo(row); for (int f = 0; f &lt; fieldCount; f++) { dataRow[f] = row[f]; } dtChunk.Rows.Add(dataRow); chunkRowCount++; if (chunkRowCount == rowLimit) { firstLineOfChunk = true; chunkRowCount = 0; tableNumber++; dataSet.Tables.Add(dtChunk); dtChunk = null; } } } if (dtChunk != null) { dataSet.Tables.Add(dtChunk); } } private static DataTable CreateDataTable(int fieldCount, int tableNumber) { DataTable dt = new DataTable(&quot;tbl_&quot; + tableNumber); for(int i = 0; i &lt; fieldCount; i++) { dt.Columns.Add(&quot;Column_&quot; + i); } return dt; } [1]: https://www.codeproject.com/Articles/543789/Parse-CSV-file-chunk-by-chunk-and-save-in-database [2]: https://www.codeproject.com/Articles/9258/A-Fast-CSV-Reader </details> # 答案1 **得分**: 3 没有理由一开始就使用 DataTable。 使用 [SqlBulkCopy.WriteToServer(IDataReader)][1] 重载,可以直接将整个文件流传输到 SQL Server。如果不想在单个事务中加载所有行,请使用 [SqlBulkCopy.BatchSize][2]。 例如: ```csharp using (var sr = new StreamReader(filePath)) using (CsvReader csv = new CsvReader(sr, false, delimiter, '&#39;&#39;', '&#39;

<details>
<summary>英文:</summary>
I&#39;d like to parse values from a csv into datatable chunks, add them to a dataset, and then use SQLBulkCopy to insert the datatables into a single table in SQL.  The original csv can range from 4 GB to 8 GB, and I need to avoid reading the entire thing into memory, hence the chunking.  I loosely based my chunking on this [post][1]. I use [LumenWorks][2] to parse the csv values.  
As soon as a datatable is added to the dataset, I want to use SqlBulkCopy to insert it into my SQL table, all while the next datatable is being created.  After the SqlBulkCopy completes, I want to remove the datatable to release the memory.  
My first thought is to run the chunking method asynchronously without await, then run a while loop that checks for the existence of a next datatable in the dataset.  If the datatable exists, then bulk copy.  If the datatable row count is less then the row limit, then it is the last chunk and stop while loop.
Am I going about this the wrong way?  If not, how can I do something like this?
string filePath = @&quot;C:\Users\user\Downloads\Testing\file - Copy.csv&quot;;
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists 
while (continueInsert)
{
if (ds.Tables.Contains(&quot;tbl_&quot; + tableNumber))
{
DataTable dataTable = ds.Tables[&quot;tbl_&quot; + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables[&quot;tbl_&quot; + tableNumber].Rows.Count &lt; rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove(&quot;tbl_&quot; + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
Here is my chunking code:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = &#39;,&#39;;
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f &lt; fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable(&quot;tbl_&quot; + tableNumber);
for(int i = 0; i &lt; fieldCount; i++)
{
dt.Columns.Add(&quot;Column_&quot; + i);
}
return dt;
}
[1]: https://www.codeproject.com/Articles/543789/Parse-CSV-file-chunk-by-chunk-and-save-in-database
[2]: https://www.codeproject.com/Articles/9258/A-Fast-CSV-Reader
</details>
# 答案1
**得分**: 3
没有理由一开始就使用 DataTable。
使用 [SqlBulkCopy.WriteToServer(IDataReader)][1] 重载,可以直接将整个文件流传输到 SQL Server。如果不想在单个事务中加载所有行,请使用 [SqlBulkCopy.BatchSize][2]。
例如:
```csharp
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '&#39;&#39;', '&#39;\0&#39;', '&#39;\0&#39;', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;', '&#39;

<details>
<summary>英文:</summary>
I&#39;d like to parse values from a csv into datatable chunks, add them to a dataset, and then use SQLBulkCopy to insert the datatables into a single table in SQL.  The original csv can range from 4 GB to 8 GB, and I need to avoid reading the entire thing into memory, hence the chunking.  I loosely based my chunking on this [post][1]. I use [LumenWorks][2] to parse the csv values.  
As soon as a datatable is added to the dataset, I want to use SqlBulkCopy to insert it into my SQL table, all while the next datatable is being created.  After the SqlBulkCopy completes, I want to remove the datatable to release the memory.  
My first thought is to run the chunking method asynchronously without await, then run a while loop that checks for the existence of a next datatable in the dataset.  If the datatable exists, then bulk copy.  If the datatable row count is less then the row limit, then it is the last chunk and stop while loop.
Am I going about this the wrong way?  If not, how can I do something like this?
string filePath = @&quot;C:\Users\user\Downloads\Testing\file - Copy.csv&quot;;
DataSet ds = new DataSet();
bool continueInsert = true;
int rowLimit = 100000;
int tableNumber = 0;
//Start this, but do not wait for it to complete before starting while loop
ChunkCSV(filePath, ds, rowLimit);
//Run SqlBulkCopy if datatable exists 
while (continueInsert)
{
if (ds.Tables.Contains(&quot;tbl_&quot; + tableNumber))
{
DataTable dataTable = ds.Tables[&quot;tbl_&quot; + tableNumber];
//SqlBulkCopy dataTable code HERE
if (ds.Tables[&quot;tbl_&quot; + tableNumber].Rows.Count &lt; rowLimit)
{
continueInsert = false;
}
//Remove datatable from dataset to release memory
ds.Tables.Remove(&quot;tbl_&quot; + tableNumber);
tableNumber++;
}
else
{
Thread.Sleep(1000);
}
}
Here is my chunking code:
private static void ChunkCSV(string filePath, DataSet dataSet, int rowLimit)
{
char delimiter = &#39;,&#39;;
DataTable dtChunk = null;
int tableNumber = 0;
int chunkRowCount = 0;
bool firstLineOfChunk = true;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
int fieldCount = csv.FieldCount;
string[] row = new string[fieldCount];
//Add fields when necessary
csv.MissingFieldAction = MissingFieldAction.ReplaceByEmpty;
while (csv.ReadNextRecord())
{
if (firstLineOfChunk)
{
firstLineOfChunk = false;
dtChunk = CreateDataTable(fieldCount, tableNumber);
}
DataRow dataRow = dtChunk.NewRow();
csv.CopyCurrentRecordTo(row);
for (int f = 0; f &lt; fieldCount; f++)
{
dataRow[f] = row[f];
}
dtChunk.Rows.Add(dataRow);
chunkRowCount++;
if (chunkRowCount == rowLimit)
{
firstLineOfChunk = true;
chunkRowCount = 0;
tableNumber++;
dataSet.Tables.Add(dtChunk);
dtChunk = null;
}
}
}
if (dtChunk != null)
{
dataSet.Tables.Add(dtChunk);
}
}
private static DataTable CreateDataTable(int fieldCount, int tableNumber)
{
DataTable dt = new DataTable(&quot;tbl_&quot; + tableNumber);
for(int i = 0; i &lt; fieldCount; i++)
{
dt.Columns.Add(&quot;Column_&quot; + i);
}
return dt;
}
[1]: https://www.codeproject.com/Articles/543789/Parse-CSV-file-chunk-by-chunk-and-save-in-database
[2]: https://www.codeproject.com/Articles/9258/A-Fast-CSV-Reader
</details>
# 答案1
**得分**: 3
没有理由一开始就使用 DataTable。
使用 [SqlBulkCopy.WriteToServer(IDataReader)][1] 重载,可以直接将整个文件流传输到 SQL Server。如果不想在单个事务中加载所有行,请使用 [SqlBulkCopy.BatchSize][2]。
例如:
```csharp
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, '&#39;&#39;', '&#39;\0&#39;', '&#39;\0&#39;', ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;', ValueTrimmingOptions.All, 65536)) { bulkCopy.WriteToServer(csv); }
英文:

There's no reason to use a DataTable to begin with.

Use the SqlBulkCopy.WriteToServer(IDataReader) overload, and you can stream the whole file directly to SQL Server. And use SqlBulkCopy.BatchSize if you don't want all the rows loaded in a single transaction.

eg

using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;, &#39;
using (var sr = new StreamReader(filePath))
using (CsvReader csv = new CsvReader(sr, false, delimiter, &#39;\&quot;&#39;, &#39;\0&#39;, &#39;\0&#39;, ValueTrimmingOptions.All, 65536))
{
bulkCopy.WriteToServer(csv);
}
&#39;, ValueTrimmingOptions.All, 65536)) { bulkCopy.WriteToServer(csv); }

huangapple
  • 本文由 发表于 2020年1月4日 01:43:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/59583032.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定