概述#
處理檔案本質上就是將檔案內容從磁碟載入到記憶體中,再進行修改等操作,當檔案不大時(比如 16MB),我們可以直接將整個檔案載入到記憶體中,當檔案大小高達 32G 時,這種方式顯然並不合適。
我們的思路便是將每一次讀取到記憶體中的內容縮小到合適的大小,這樣即可完成整個檔案的讀取。
本文分為三部分:
- 分片處理,適用於二進制等檔案;
 - 分塊處理,適用於文本檔案;
 - 並行處理,可對讀取處理的效率進一步優化。
 
分片處理#
所謂分片,就是指創建一個帶有容量的切片,每次對檔案都讀取切片容量個大小的內容至該切片中處理。
package main
import (
	"fmt"
	"os"
)
func main() {
	file, err := os.Open("largefile.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()
	buf := make([]byte, 1024) // 創建一個大小為1024的緩衝區
	switch n, err := file.Read(buf[:]); true {
        case n < 0:
            fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
            os.Exit(1)
        case n == 0: // EOF
            return nil
        case n > 0:
            handle(buf[0:nr])
        }
    }
}
分塊處理#
其宗旨是對文本檔案按照規定的方式進行分割成塊,每一次讀取並處理一個塊;
我們按照以行為分割方式的處理為例:
package main
import (
    "bufio"
    "os"
)
func main() {
    file, err := os.Open("file.txt")
    if err != nil {
				os.Exit(1)
    }
    defer file.Close()
		//規定最大塊容量
    const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
    buf := make([]byte)
		//scanner是bufio中定義的結構,提供了一個方便的介面,用於讀取資料;
		//連續調用Scan方法將逐步遍歷檔案中的“標記”,跳過標記之間的位元組;
		//標記的規範由類型為SplitFunc的拆分函數定義;默認拆分函數將輸入按行拆分,並去除行終止符;
		//通過NewScanner方法傳入io.Reader創建scanner。
    scanner := bufio.NewScanner(file)
		//Buffer在掃描時設定初始緩衝區和可分配的最大緩衝區大小;
		//最大標記大小是max和cap(buf)中較大的值;
		//如果max <= cap(buf),Scan將僅使用此緩衝區,不進行分配;
		//默認情況下,Scan使用內部緩衝區,並將最大標記大小設定為MaxScanTokenSize。
    scanner.Buffer(buf, maxScanTokenSize)
    for scanner.Scan() {
		//獲取每一次讀到的塊
        l := scanner.Text()
		//處理讀取的內容
				handle(l)
    }
    if err := scanner.Err(); err != nil {
        //處理錯誤
    }
}
按照程式碼註釋中所講:標記的規範由類型為 SplitFunc 的拆分函數定義;
我們可以自定義 SplitFunc 類型的函數來規定分隔符;
具體可以參考scanner 的文件介紹
並行優化#
Go 語言具有天然的並行優勢,而現代 cpu 往往是多核,所以可以考慮將程式改為並行處理,這會給我們處理單個大型檔案帶來性能上的巨大提升;
首先思考並行在哪裡;
檔案處理的流程是讀取,然後處理;
在同步模式下,讀一塊,處理一塊,不斷循環;
但事實上,在處理一塊的時候,完全可以讀取下一塊,也就是說,下一塊的讀取與處理和上一塊是不相干的;
進一步,我們還可以將多個塊打包為一組,每次讀取部分 reader 讀取完一個組,交給處理部分 handler 去處理,在此期間無需堵塞,直接讀取下一個組中的塊,當讀完組後,交給另一個 handler 處理;
於是我們可以設定 handler 的數目,這個值相當於我們劃分出來的組的數量,還可以設定組的大小,也就是一個組包含幾個塊;
而組的交付則使用 channel 來進行;
而最終,我們還要將多個 handler 中的處理完成的內容匯總,才能得到最後的結果,於是我們還需要一個 combiner;
現在,我們考慮好了,整個過程有三大模塊:reader,handler,combiner,每一個模塊都可以用函數去表示,最終在主函數中產生聯繫;
reader 模塊程式碼:
reader := func(ctx context.Context, group *[]string) <-chan []string {
		out := make(chan []string)
		scanner := bufio.NewScanner(f)
		go func() {
			defer close(out) 
			for {
				scanned := scanner.Scan()
				select {
				case <-ctx.Done():
					return
				default:
					r := scanner.Text()
					if len(*group) == groupSize || !scanned {
						out <- *group
						*group = []string{} 					
}
					*group = append(*group, r) 				}
				if !scanned {
					return
				}
			}
		}()
		return out
	}
handler 模塊程式碼
handler := func(ctx context.Context, group <-chan []string) <-chan handled {
		out := make(chan handled)
		go func() {
			defer close(out)
			p := handled{}
			for g := range group {
				for _, r := range g {
					// do something
				}
			}
			out <- p
		}()
		return out
	}
//
type handled struct {
	//
}
combiner 模塊程式碼
combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
		out := make(chan handled)
		var wg sync.WaitGroup
		com := func(p <-chan handled) {
			defer wg.Done()
			for in := range p {
				select {
				case <-ctx.Done():
				case out <- in:
				}
			}
		}
		wg.Add(len(inputs))
		for _, in := range inputs {
			go com(in)
		}
		go func() {
			wg.Wait()
			close(out)
		}()
		return out
	}
最終實現
group := []string{}
//read
Ch := reader(ctx, &group)
numOfHandler :=10
//handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
  handlersCh[i] = handle(ctx, Ch)
}
//combine
for handled := range combiner(ctx, handlersCh...) {
  // do something
}