我有一个文件(具有1000万条记录),如下所示:
line1 line2 line3 line4 ....... ...... 10 million lines
因此,基本上我想将1000万条记录插入数据库。所以我阅读了文件并将其上传到SQL Server。
C#代码
System.IO.StreamReader file = new System.IO.StreamReader(@"c:\test.txt"); while((line = file.ReadLine()) != null) { // insertion code goes here //DAL.ExecuteSql("insert into table1 values("+line+")"); } file.Close();
但是插入会花费很长时间。如何使用C#在最短的时间内插入1000万条记录?
更新1: 批量插入:
BULK INSERT DBNAME.dbo.DATAs FROM 'F:\dt10000000\dt10000000.txt' WITH ( ROWTERMINATOR =' \n' );
我的桌子如下:
DATAs ( DatasField VARCHAR(MAX) )
但我收到以下错误:
消息4866,级别16,状态1,第1行 大容量加载失败。数据文件中第1行第1列的列太长。请验证是否正确指定了字段终止符和行终止符。 消息7399,级别16,状态1,第1行 链接服务器“(空)”的OLE DB访问接口“ BULK”报告了一个错误。提供程序未提供有关该错误的任何信息。 消息7330,级别16,状态2,行1 无法从OLE DB访问接口“ BULK”获取链接服务器“(空)”的行。
消息4866,级别16,状态1,第1行 大容量加载失败。数据文件中第1行第1列的列太长。请验证是否正确指定了字段终止符和行终止符。
消息7399,级别16,状态1,第1行 链接服务器“(空)”的OLE DB访问接口“ BULK”报告了一个错误。提供程序未提供有关该错误的任何信息。
消息7330,级别16,状态2,行1 无法从OLE DB访问接口“ BULK”获取链接服务器“(空)”的行。
下面的代码有效:
BULK INSERT DBNAME.dbo.DATAs FROM 'F:\dt10000000\dt10000000.txt' WITH ( FIELDTERMINATOR = '\t', ROWTERMINATOR = '\n' );
请不要 不 创建一个DataTable负载通过BulkCopy。对于较小的数据集,这是一个好的解决方案,但是绝对没有理由在调用数据库之前将所有1000万行加载到内存中。
DataTable
最好的选择(在BCP/ BULK INSERT/ 之外OPENROWSET(BULK...))是通过表值参数(TVP)将文件中的内容流式传输到数据库中。通过使用TVP,您可以打开文件,读取一行并发送一行直到完成,然后关闭该文件。此方法的内存占用量仅为一行。我写了一篇文章,《从应用程序将数据流式传输到SQL Server 2008》,其中有这种情况的示例。
BCP
BULK INSERT
OPENROWSET(BULK...)
结构的简单概述如下。我假设与上面的问题所示的导入表和字段名称相同。
所需的数据库对象:
-- First: You need a User-Defined Table Type CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX)); GO -- Second: Use the UDTT as an input param to an import proc. -- Hence "Tabled-Valued Parameter" (TVP) CREATE PROCEDURE dbo.ImportData ( @ImportTable dbo.ImportStructure READONLY ) AS SET NOCOUNT ON; -- maybe clear out the table first? TRUNCATE TABLE dbo.DATAs; INSERT INTO dbo.DATAs (DatasField) SELECT Field FROM @ImportTable; GO
下面是利用上述SQL对象的C#应用代码。请注意,不是填充对象(例如,DataTable)然后执行存储过程,而是通过执行存储过程来启动文件内容的读取,而不是执行存储过程。Stored Proc的输入参数不是变量。它是方法的返回值GetFileContents。当SqlCommand调用ExecuteNonQuery打开文件,读取行并将该行通过IEnumerable<SqlDataRecord>和yield return构造发送到SQL Server ,然后关闭文件时,将调用该方法。存储过程只是看到一个表变量@ImportTable,一旦数据开始更新,就可以访问该表变量( 注意:数据确实会保留很短的时间,即使tempdb中的内容不是全部 )。
GetFileContents
SqlCommand
ExecuteNonQuery
IEnumerable<SqlDataRecord>
yield return
using System.Collections; using System.Data; using System.Data.SqlClient; using System.IO; using Microsoft.SqlServer.Server; private static IEnumerable<SqlDataRecord> GetFileContents() { SqlMetaData[] _TvpSchema = new SqlMetaData[] { new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max) }; SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema); StreamReader _FileReader = null; try { _FileReader = new StreamReader("{filePath}"); // read a row, send a row while (!_FileReader.EndOfStream) { // You shouldn't need to call "_DataRecord = new SqlDataRecord" as // SQL Server already received the row when "yield return" was called. // Unlike BCP and BULK INSERT, you have the option here to create a string // call ReadLine() into the string, do manipulation(s) / validation(s) on // the string, then pass that string into SetString() or discard if invalid. _DataRecord.SetString(0, _FileReader.ReadLine()); yield return _DataRecord; } } finally { _FileReader.Close(); } }
GetFileContents上面的方法用作存储过程的输入参数值,如下所示:
public static void test() { SqlConnection _Connection = new SqlConnection("{connection string}"); SqlCommand _Command = new SqlCommand("ImportData", _Connection); _Command.CommandType = CommandType.StoredProcedure; SqlParameter _TVParam = new SqlParameter(); _TVParam.ParameterName = "@ImportTable"; _TVParam.TypeName = "dbo.ImportStructure"; _TVParam.SqlDbType = SqlDbType.Structured; _TVParam.Value = GetFileContents(); // return value of the method is streamed data _Command.Parameters.Add(_TVParam); try { _Connection.Open(); _Command.ExecuteNonQuery(); } finally { _Connection.Close(); } return; }
补充笔记:
SELECT
SqlBulkCopy
MERGE
DELETE
ExecuteReader
IDENTITY
DATAs
OUTPUT
INSERT
INSERTED.[ID]
ID
Reader.NextResult()