小编典典

如何在最短的时间内插入1000万条记录?

c#

我有一个文件(具有1000万条记录),如下所示:

    line1
    line2
    line3
    line4
   .......
    ......
    10 million lines

因此,基本上我想将1000万条记录插入数据库。所以我阅读了文件并将其上传到SQL Server。

C#代码

System.IO.StreamReader file = 
    new System.IO.StreamReader(@"c:\test.txt");
while((line = file.ReadLine()) != null)
{
    // insertion code goes here
    //DAL.ExecuteSql("insert into table1 values("+line+")");
}

file.Close();

但是插入会花费很长时间。如何使用C#在最短的时间内插入1000万条记录?

更新1:
批量插入:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(

     ROWTERMINATOR =' \n'
  );

我的桌子如下:

DATAs
(
     DatasField VARCHAR(MAX)
)

但我收到以下错误:

消息4866,级别16,状态1,第1行
大容量加载失败。数据文件中第1行第1列的列太长。请验证是否正确指定了字段终止符和行终止符。

消息7399,级别16,状态1,第1行
链接服务器“(空)”的OLE DB访问接口“ BULK”报告了一个错误。提供程序未提供有关该错误的任何信息。

消息7330,级别16,状态2,行1
无法从OLE DB访问接口“ BULK”获取链接服务器“(空)”的行。

下面的代码有效:

BULK INSERT DBNAME.dbo.DATAs
FROM 'F:\dt10000000\dt10000000.txt'
WITH
(
    FIELDTERMINATOR = '\t',
    ROWTERMINATOR = '\n'
);

阅读 260

收藏
2020-05-19

共1个答案

小编典典

请不要
创建一个DataTable负载通过BulkCopy。对于较小的数据集,这是一个好的解决方案,但是绝对没有理由在调用数据库之前将所有1000万行加载到内存中。

最好的选择(在BCP/ BULK INSERT/
之外OPENROWSET(BULK...))是通过表值参数(TVP)将文件中的内容流式传输到数据库中。通过使用TVP,您可以打开文件,读取一行并发送一行直到完成,然后关闭该文件。此方法的内存占用量仅为一行。我写了一篇文章,《从应用程序将数据流式传输到SQL
Server
2008》
,其中有这种情况的示例。

结构的简单概述如下。我假设与上面的问题所示的导入表和字段名称相同。

所需的数据库对象:

-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO

-- Second: Use the UDTT as an input param to an import proc.
--         Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
   @ImportTable    dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;

-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;

INSERT INTO dbo.DATAs (DatasField)
    SELECT  Field
    FROM    @ImportTable;

GO

下面是利用上述SQL对象的C#应用​​代码。请注意,不是填充对象(例如,DataTable)然后执行存储过程,而是通过执行存储过程来启动文件内容的读取,而不是执行存储过程。Stored
Proc的输入参数不是变量。它是方法的返回值GetFileContents。当SqlCommand调用ExecuteNonQuery打开文件,读取行并将该行通过IEnumerable<SqlDataRecord>yield return构造发送到SQL Server
,然后关闭文件时,将调用该方法。存储过程只是看到一个表变量@ImportTable,一旦数据开始更新,就可以访问该表变量(
注意:数据确实会保留很短的时间,即使tempdb中的内容不是全部 )。

using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;

private static IEnumerable<SqlDataRecord> GetFileContents()
{
   SqlMetaData[] _TvpSchema = new SqlMetaData[] {
      new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
   };
   SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
   StreamReader _FileReader = null;

   try
   {
      _FileReader = new StreamReader("{filePath}");

      // read a row, send a row
      while (!_FileReader.EndOfStream)
      {
         // You shouldn't need to call "_DataRecord = new SqlDataRecord" as
         // SQL Server already received the row when "yield return" was called.
         // Unlike BCP and BULK INSERT, you have the option here to create a string
         // call ReadLine() into the string, do manipulation(s) / validation(s) on
         // the string, then pass that string into SetString() or discard if invalid.
         _DataRecord.SetString(0, _FileReader.ReadLine());
         yield return _DataRecord;
      }
   }
   finally
   {
      _FileReader.Close();
   }
}

GetFileContents上面的方法用作存储过程的输入参数值,如下所示:

public static void test()
{
   SqlConnection _Connection = new SqlConnection("{connection string}");
   SqlCommand _Command = new SqlCommand("ImportData", _Connection);
   _Command.CommandType = CommandType.StoredProcedure;

   SqlParameter _TVParam = new SqlParameter();
   _TVParam.ParameterName = "@ImportTable";
   _TVParam.TypeName = "dbo.ImportStructure";
   _TVParam.SqlDbType = SqlDbType.Structured;
   _TVParam.Value = GetFileContents(); // return value of the method is streamed data
   _Command.Parameters.Add(_TVParam);

   try
   {
      _Connection.Open();

      _Command.ExecuteNonQuery();
   }
   finally
   {
      _Connection.Close();
   }

   return;
}

补充笔记:

  1. 经过一些修改,上面的C#代码可以适用于批处理数据。
  2. 稍作修改,即可将以上C#代码修改为在多个字段中发送(上面链接的“ Steaming Data …”文章中显示的示例在2个字段中传递)。
  3. 您还可以SELECT在proc语句中操纵每个记录的值。
  4. 您也可以在proc中使用WHERE条件过滤掉行。
  5. 您可以多次访问TVP表变量。它是READONLY,但不是“ forward only”。
  6. 优势SqlBulkCopy
    1. SqlBulkCopy仅用于INSERT,而使用TVP则允许以任何方式使用数据:您可以调用MERGE;您可以DELETE根据某些条件;您可以将数据分成多个表;等等。
    2. 由于TVP并非仅用于INSERT,因此不需要单独的登台表即可将数据转储到其中。
    3. 您可以通过调用ExecuteReader而不是从数据库获取数据ExecuteNonQuery。例如,如果导入表IDENTITY上有一个字段,则DATAs可以在上添加一个OUTPUT子句以INSERT回传INSERTED.[ID](假设IDIDENTITY字段的名称)。或者,您可以传回完全不同的查询的结果,或者两者都传回,因为可以通过发送和访问多个结果集Reader.NextResult()。当使用时,不可能从数据库取回信息。在SqlBulkCopy这里,有几个人想确切地做到这一点(至少关于新创建的IDENTITY值)。
    4. 有关为什么整个过程有时更快,即使将磁盘中的数据获取到SQL Server的速度稍慢的更多信息,请参见SQL Server客户咨询团队的本白皮书:最大化TVP的吞吐量
2020-05-19