概述#
處理檔案本質上就是將檔案內容從磁碟載入到記憶體中,再進行修改等操作,當檔案不大時(比如 16MB),我們可以直接將整個檔案載入到記憶體中,當檔案大小高達 32G 時,這種方式顯然並不合適。
我們的思路便是將每一次讀取到記憶體中的內容縮小到合適的大小,這樣即可完成整個檔案的讀取。
本文分為三部分:
- 分片處理,適用於二進制等檔案;
- 分塊處理,適用於文本檔案;
- 並行處理,可對讀取處理的效率進一步優化。
分片處理#
所謂分片,就是指創建一個帶有容量的切片,每次對檔案都讀取切片容量個大小的內容至該切片中處理。
package main
import (
"fmt"
"os"
)
func main() {
file, err := os.Open("largefile.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
buf := make([]byte, 1024) // 創建一個大小為1024的緩衝區
switch n, err := file.Read(buf[:]); true {
case n < 0:
fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
os.Exit(1)
case n == 0: // EOF
return nil
case n > 0:
handle(buf[0:nr])
}
}
}
分塊處理#
其宗旨是對文本檔案按照規定的方式進行分割成塊,每一次讀取並處理一個塊;
我們按照以行為分割方式的處理為例:
package main
import (
"bufio"
"os"
)
func main() {
file, err := os.Open("file.txt")
if err != nil {
os.Exit(1)
}
defer file.Close()
//規定最大塊容量
const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
buf := make([]byte)
//scanner是bufio中定義的結構,提供了一個方便的介面,用於讀取資料;
//連續調用Scan方法將逐步遍歷檔案中的“標記”,跳過標記之間的位元組;
//標記的規範由類型為SplitFunc的拆分函數定義;默認拆分函數將輸入按行拆分,並去除行終止符;
//通過NewScanner方法傳入io.Reader創建scanner。
scanner := bufio.NewScanner(file)
//Buffer在掃描時設定初始緩衝區和可分配的最大緩衝區大小;
//最大標記大小是max和cap(buf)中較大的值;
//如果max <= cap(buf),Scan將僅使用此緩衝區,不進行分配;
//默認情況下,Scan使用內部緩衝區,並將最大標記大小設定為MaxScanTokenSize。
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
//獲取每一次讀到的塊
l := scanner.Text()
//處理讀取的內容
handle(l)
}
if err := scanner.Err(); err != nil {
//處理錯誤
}
}
按照程式碼註釋中所講:標記的規範由類型為 SplitFunc 的拆分函數定義;
我們可以自定義 SplitFunc 類型的函數來規定分隔符;
具體可以參考scanner 的文件介紹
並行優化#
Go 語言具有天然的並行優勢,而現代 cpu 往往是多核,所以可以考慮將程式改為並行處理,這會給我們處理單個大型檔案帶來性能上的巨大提升;
首先思考並行在哪裡;
檔案處理的流程是讀取,然後處理;
在同步模式下,讀一塊,處理一塊,不斷循環;
但事實上,在處理一塊的時候,完全可以讀取下一塊,也就是說,下一塊的讀取與處理和上一塊是不相干的;
進一步,我們還可以將多個塊打包為一組,每次讀取部分 reader 讀取完一個組,交給處理部分 handler 去處理,在此期間無需堵塞,直接讀取下一個組中的塊,當讀完組後,交給另一個 handler 處理;
於是我們可以設定 handler 的數目,這個值相當於我們劃分出來的組的數量,還可以設定組的大小,也就是一個組包含幾個塊;
而組的交付則使用 channel 來進行;
而最終,我們還要將多個 handler 中的處理完成的內容匯總,才能得到最後的結果,於是我們還需要一個 combiner;
現在,我們考慮好了,整個過程有三大模塊:reader,handler,combiner,每一個模塊都可以用函數去表示,最終在主函數中產生聯繫;
reader 模塊程式碼:
reader := func(ctx context.Context, group *[]string) <-chan []string {
out := make(chan []string)
scanner := bufio.NewScanner(f)
go func() {
defer close(out)
for {
scanned := scanner.Scan()
select {
case <-ctx.Done():
return
default:
r := scanner.Text()
if len(*group) == groupSize || !scanned {
out <- *group
*group = []string{}
}
*group = append(*group, r) }
if !scanned {
return
}
}
}()
return out
}
handler 模塊程式碼
handler := func(ctx context.Context, group <-chan []string) <-chan handled {
out := make(chan handled)
go func() {
defer close(out)
p := handled{}
for g := range group {
for _, r := range g {
// do something
}
}
out <- p
}()
return out
}
//
type handled struct {
//
}
combiner 模塊程式碼
combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
out := make(chan handled)
var wg sync.WaitGroup
com := func(p <-chan handled) {
defer wg.Done()
for in := range p {
select {
case <-ctx.Done():
case out <- in:
}
}
}
wg.Add(len(inputs))
for _, in := range inputs {
go com(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
最終實現
group := []string{}
//read
Ch := reader(ctx, &group)
numOfHandler :=10
//handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
handlersCh[i] = handle(ctx, Ch)
}
//combine
for handled := range combiner(ctx, handlersCh...) {
// do something
}