我正在编写一个POC来处理大约10亿行以上的超大文本文件,并为此进行了尝试。
package main import ( "bufio" "fmt" "log" "os" "time" ) func main() { start := time.Now() file, err := os.Open("dump10.txt") if err != nil { log.Fatal(err) } defer file.Close() scanner := bufio.NewScanner(file) for scanner.Scan() { go fmt.Println(scanner.Text()) } if err := scanner.Err(); err != nil { log.Fatal(err) } secs := time.Since(start).Seconds() fmt.Printf("Took %.2fs", secs) }
但是,当运行此命令时,会出现此错误;
紧急:单个文件或套接字上的并发操作过多(最大1048575)
我还没有在网上找到任何可以解决此特定错误的信息。我不确定这是否是文件描述符问题,错误中列出的最大值远高于我的ulimit -n限制500,000。
ulimit -n
做这个的最好方式是什么?
不太明显,它fmt.Println是我在处理数据时将调用的实际功能的替代品。
fmt.Println
在考虑并行化过程之前,您应该研究输入和计算以确保它有意义。
需要按顺序处理的输入不是很好的匹配,因为并行处理将需要附加的复杂指令来使事物保持顺序,如果这种策略是成功的话,很难预先评估。
同样,为了利用并行化,要运行的计算所花费的时间必须比同步并行任务所需的时间更长。通过批量处理数据可能会超过此成本,但是生成的算法将更加复杂,并会产生其他不良副作用(例如分配)。
否则,请勿并行化。
请参见下面各种计算时间长/短的实现示例及其产生的基准。
结论是,除非您计算出一个长时间运行的异步任务,该任务显然将超过同步成本,否则顺序处理会更快。
main.go
package main import ( "bufio" "fmt" "io" "runtime" "strings" "sync" "time" ) func main() { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) run_line_short(data, true) run_line_long(data, true) run_line_short_workers(data, true) run_line_long_workers(data, true) run_bulk_short(data, true) run_bulk_long(data, true) run_seq_short(data, true) run_seq_long(data, true) } func run_line_short(data string, stat bool) { if stat { s := stats("run_line_short") defer s() } r := strings.NewReader(data) err := process(r, line_handler_short) if err != nil { panic(err) } } func run_line_long(data string, stat bool) { if stat { s := stats("run_line_long") defer s() } r := strings.NewReader(data) err := process(r, line_handler_long) if err != nil { panic(err) } } func run_line_short_workers(data string, stat bool) { if stat { s := stats("run_line_short_workers") defer s() } r := strings.NewReader(data) err := processWorkers(r, line_handler_short) if err != nil { panic(err) } } func run_line_long_workers(data string, stat bool) { if stat { s := stats("run_line_long_workers") defer s() } r := strings.NewReader(data) err := processWorkers(r, line_handler_long) if err != nil { panic(err) } } func run_bulk_short(data string, stat bool) { if stat { s := stats("run_bulk_short") defer s() } r := strings.NewReader(data) err := processBulk(r, bulk_handler_short) if err != nil { panic(err) } } func run_bulk_long(data string, stat bool) { if stat { s := stats("run_bulk_long") defer s() } r := strings.NewReader(data) err := processBulk(r, bulk_handler_long) if err != nil { panic(err) } } func run_seq_short(data string, stat bool) { if stat { s := stats("run_seq_short") defer s() } r := strings.NewReader(data) err := processSeq(r, line_handler_short) if err != nil { panic(err) } } func run_seq_long(data string, stat bool) { if stat { s := stats("run_seq_long") defer s() } r := strings.NewReader(data) err := processSeq(r, line_handler_long) if err != nil { panic(err) } } func line_handler_short(k string) error { _ = len(k) return nil } func line_handler_long(k string) error { <-time.After(time.Millisecond * 5) _ = len(k) return nil } func bulk_handler_short(b []string) error { for _, k := range b { _ = len(k) } return nil } func bulk_handler_long(b []string) error { <-time.After(time.Millisecond * 5) for _, k := range b { _ = len(k) } return nil } func stats(name string) func() { fmt.Printf("======================\n") fmt.Printf("%v\n", name) start := time.Now() return func() { fmt.Printf("time to run %v\n", time.Since(start)) var ms runtime.MemStats runtime.ReadMemStats(&ms) fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n", ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024) fmt.Printf("Mallocs: %d, Frees: %d\n", ms.Mallocs, ms.Frees) fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n", ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024) fmt.Printf("HeapObjects: %d\n", ms.HeapObjects) fmt.Printf("\n") } } func process(r io.Reader, h func(string) error) error { errs := make(chan error) workers := make(chan struct{}, 4) var wg sync.WaitGroup go func() { scanner := bufio.NewScanner(r) for scanner.Scan() { workers <- struct{}{} // acquire a token wg.Add(1) go func(line string) { defer wg.Done() if err := h(line); err != nil { errs <- err } <-workers }(scanner.Text()) } wg.Wait() if err := scanner.Err(); err != nil { errs <- err } close(errs) }() var err error for e := range errs { if e != nil && err == nil { err = e } } return err } func processWorkers(r io.Reader, h func(string) error) error { errs := make(chan error) input := make(chan string) y := 4 var wg sync.WaitGroup for i := 0; i < y; i++ { wg.Add(1) go func() { defer wg.Done() for line := range input { if err := h(line); err != nil { errs <- err } } }() } go func() { scanner := bufio.NewScanner(r) for scanner.Scan() { input <- scanner.Text() } close(input) wg.Wait() if err := scanner.Err(); err != nil { errs <- err } close(errs) }() var err error for e := range errs { if err == nil && e != nil { err = e } } return err } func processBulk(r io.Reader, h func([]string) error) error { errs := make(chan error) input := make(chan []string) y := 4 var wg sync.WaitGroup for i := 0; i < y; i++ { wg.Add(1) go func() { defer wg.Done() for bulk := range input { if err := h(bulk); err != nil { errs <- err } } }() } go func() { scanner := bufio.NewScanner(r) l := 50 bulk := make([]string, l) i := 0 for scanner.Scan() { text := scanner.Text() bulk[i] = text i++ if i == l { copied := make([]string, l, l) copy(copied, bulk) i = 0 input <- copied } } if i > 0 { input <- bulk[:i] } close(input) if err := scanner.Err(); err != nil { errs <- err } }() go func() { wg.Wait() close(errs) }() var err error for e := range errs { if err == nil && e != nil { err = e } } return err } func processSeq(r io.Reader, h func(string) error) error { scanner := bufio.NewScanner(r) for scanner.Scan() { text := scanner.Text() if err := h(text); err != nil { return err } } return scanner.Err() }
main_test.go
package main import ( "strings" "testing" ) func Benchmark_run_line_short(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_line_short(data, false) } } func Benchmark_run_line_long(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_line_long(data, false) } } func Benchmark_run_line_short_workers(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_line_short_workers(data, false) } } func Benchmark_run_line_long_workers(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_line_long_workers(data, false) } } func Benchmark_run_bulk_short(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_bulk_short(data, false) } } func Benchmark_run_bulk_long(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_bulk_long(data, false) } } func Benchmark_run_seq_short(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_seq_short(data, false) } } func Benchmark_run_seq_long(b *testing.B) { data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000) for i := 0; i < b.N; i++ { run_seq_long(data, false) } }
结果
$ go run main.go ====================== run_line_short time to run 2.747827ms Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB Mallocs: 1378, Frees: 1 HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB HeapObjects: 1377 ====================== run_line_long time to run 1.30987804s Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB Mallocs: 5619, Frees: 5 HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB HeapObjects: 5614 ====================== run_line_short_workers time to run 4.54926ms Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB Mallocs: 6648, Frees: 5743 HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB HeapObjects: 905 ====================== run_line_long_workers time to run 1.29874118s Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB Mallocs: 10670, Frees: 5747 HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB HeapObjects: 4923 ====================== run_bulk_short time to run 1.279059ms Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB Mallocs: 11695, Frees: 5751 HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB HeapObjects: 5944 ====================== run_bulk_long time to run 31.328652ms Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB Mallocs: 12728, Frees: 11361 HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB HeapObjects: 1367 ====================== run_seq_short time to run 956.991µs Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB Mallocs: 13746, Frees: 11160 HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB HeapObjects: 2586 ====================== run_seq_long time to run 5.195705859s Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB Mallocs: 17766, Frees: 15973 HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB HeapObjects: 1793 [mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4 goos: linux goarch: amd64 pkg: test/bulk Benchmark_run_line_short-4 1000 1750824 ns/op 1029354 B/op 1005 allocs/op Benchmark_run_line_short-4 1000 1747408 ns/op 1029348 B/op 1005 allocs/op Benchmark_run_line_short-4 1000 1757826 ns/op 1029352 B/op 1005 allocs/op Benchmark_run_line_short-4 1000 1758427 ns/op 1029352 B/op 1005 allocs/op Benchmark_run_line_long-4 1 1303037704 ns/op 2253776 B/op 4075 allocs/op Benchmark_run_line_long-4 1 1305074974 ns/op 2247792 B/op 4032 allocs/op Benchmark_run_line_long-4 1 1305353658 ns/op 2246320 B/op 4013 allocs/op Benchmark_run_line_long-4 1 1305725817 ns/op 2247792 B/op 4031 allocs/op Benchmark_run_line_short_workers-4 1000 2148354 ns/op 1029366 B/op 1005 allocs/op Benchmark_run_line_short_workers-4 1000 2139629 ns/op 1029370 B/op 1005 allocs/op Benchmark_run_line_short_workers-4 1000 1983352 ns/op 1029359 B/op 1005 allocs/op Benchmark_run_line_short_workers-4 1000 1909968 ns/op 1029363 B/op 1005 allocs/op Benchmark_run_line_long_workers-4 1 1298321093 ns/op 2247856 B/op 4013 allocs/op Benchmark_run_line_long_workers-4 1 1299846127 ns/op 2246384 B/op 4012 allocs/op Benchmark_run_line_long_workers-4 1 1300003625 ns/op 2246288 B/op 4011 allocs/op Benchmark_run_line_long_workers-4 1 1302779911 ns/op 2246256 B/op 4011 allocs/op Benchmark_run_bulk_short-4 2000 704358 ns/op 1082154 B/op 1011 allocs/op Benchmark_run_bulk_short-4 2000 708563 ns/op 1082147 B/op 1011 allocs/op Benchmark_run_bulk_short-4 2000 714687 ns/op 1082148 B/op 1011 allocs/op Benchmark_run_bulk_short-4 2000 705546 ns/op 1082156 B/op 1011 allocs/op Benchmark_run_bulk_long-4 50 31411412 ns/op 1051497 B/op 1088 allocs/op Benchmark_run_bulk_long-4 50 31513018 ns/op 1051544 B/op 1088 allocs/op Benchmark_run_bulk_long-4 50 31539311 ns/op 1051502 B/op 1088 allocs/op Benchmark_run_bulk_long-4 50 31564940 ns/op 1051505 B/op 1088 allocs/op Benchmark_run_seq_short-4 2000 574346 ns/op 1028632 B/op 1002 allocs/op Benchmark_run_seq_short-4 3000 572857 ns/op 1028464 B/op 1002 allocs/op Benchmark_run_seq_short-4 2000 580493 ns/op 1028632 B/op 1002 allocs/op Benchmark_run_seq_short-4 3000 572240 ns/op 1028464 B/op 1002 allocs/op Benchmark_run_seq_long-4 1 5196313302 ns/op 2245792 B/op 4005 allocs/op Benchmark_run_seq_long-4 1 5199995649 ns/op 2245792 B/op 4005 allocs/op Benchmark_run_seq_long-4 1 5200460425 ns/op 2245792 B/op 4005 allocs/op Benchmark_run_seq_long-4 1 5201080570 ns/op 2245792 B/op 4005 allocs/op PASS ok test/bulk 68.944s
注意:令我惊讶的run_line_short_workers是,它的速度慢于run_line_short,我没有解释该结果,但是使用pprof进行更深入的分析应该可以提供答案。
run_line_short_workers
run_line_short