banner
Alex Wu

Alex Wu

college student in CS | web3 | Gopher | lifelong grower | cat lover https://wureny.xyz/
github
x
email

處理大型檔案的 Go

概述#

處理檔案本質上就是將檔案內容從磁碟載入到記憶體中,再進行修改等操作,當檔案不大時(比如 16MB),我們可以直接將整個檔案載入到記憶體中,當檔案大小高達 32G 時,這種方式顯然並不合適。

我們的思路便是將每一次讀取到記憶體中的內容縮小到合適的大小,這樣即可完成整個檔案的讀取。

本文分為三部分:

  1. 分片處理,適用於二進制等檔案;
  2. 分塊處理,適用於文本檔案;
  3. 並行處理,可對讀取處理的效率進一步優化。

分片處理#

所謂分片,就是指創建一個帶有容量的切片,每次對檔案都讀取切片容量個大小的內容至該切片中處理。

package main

import (
	"fmt"
	"os"
)

func main() {
	file, err := os.Open("largefile.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	buf := make([]byte, 1024) // 創建一個大小為1024的緩衝區
	switch n, err := file.Read(buf[:]); true {
        case n < 0:
            fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
            os.Exit(1)
        case n == 0: // EOF
            return nil
        case n > 0:
            handle(buf[0:nr])
        }
    }
}

分塊處理#

其宗旨是對文本檔案按照規定的方式進行分割成塊,每一次讀取並處理一個塊;

我們按照以行為分割方式的處理為例:

package main

import (
    "bufio"
    "os"
)

func main() {
    file, err := os.Open("file.txt")
    if err != nil {
				os.Exit(1)
    }
    defer file.Close()

		//規定最大塊容量
    const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
    buf := make([]byte)

		//scanner是bufio中定義的結構,提供了一個方便的介面,用於讀取資料;
		//連續調用Scan方法將逐步遍歷檔案中的“標記”,跳過標記之間的位元組;
		//標記的規範由類型為SplitFunc的拆分函數定義;默認拆分函數將輸入按行拆分,並去除行終止符;
		//通過NewScanner方法傳入io.Reader創建scanner。
    scanner := bufio.NewScanner(file)

		//Buffer在掃描時設定初始緩衝區和可分配的最大緩衝區大小;
		//最大標記大小是max和cap(buf)中較大的值;
		//如果max <= cap(buf),Scan將僅使用此緩衝區,不進行分配;
		//默認情況下,Scan使用內部緩衝區,並將最大標記大小設定為MaxScanTokenSize。
    scanner.Buffer(buf, maxScanTokenSize)

    for scanner.Scan() {
		//獲取每一次讀到的塊
        l := scanner.Text()
		//處理讀取的內容
				handle(l)
    }

    if err := scanner.Err(); err != nil {
        //處理錯誤
    }
}

按照程式碼註釋中所講:標記的規範由類型為 SplitFunc 的拆分函數定義;

我們可以自定義 SplitFunc 類型的函數來規定分隔符;

具體可以參考scanner 的文件介紹

並行優化#

Go 語言具有天然的並行優勢,而現代 cpu 往往是多核,所以可以考慮將程式改為並行處理,這會給我們處理單個大型檔案帶來性能上的巨大提升;

首先思考並行在哪裡;

檔案處理的流程是讀取,然後處理;

在同步模式下,讀一塊,處理一塊,不斷循環;

但事實上,在處理一塊的時候,完全可以讀取下一塊,也就是說,下一塊的讀取與處理和上一塊是不相干的;

進一步,我們還可以將多個塊打包為一組,每次讀取部分 reader 讀取完一個組,交給處理部分 handler 去處理,在此期間無需堵塞,直接讀取下一個組中的塊,當讀完組後,交給另一個 handler 處理;

於是我們可以設定 handler 的數目,這個值相當於我們劃分出來的組的數量,還可以設定組的大小,也就是一個組包含幾個塊;

而組的交付則使用 channel 來進行;

而最終,我們還要將多個 handler 中的處理完成的內容匯總,才能得到最後的結果,於是我們還需要一個 combiner;

現在,我們考慮好了,整個過程有三大模塊:reader,handler,combiner,每一個模塊都可以用函數去表示,最終在主函數中產生聯繫;

reader 模塊程式碼:

reader := func(ctx context.Context, group *[]string) <-chan []string {
		out := make(chan []string)
		scanner := bufio.NewScanner(f)

		go func() {
			defer close(out) 
			for {
				scanned := scanner.Scan()

				select {
				case <-ctx.Done():
					return
				default:
					r := scanner.Text()
					if len(*group) == groupSize || !scanned {
						out <- *group
						*group = []string{} 					
}
					*group = append(*group, r) 				}

				if !scanned {
					return
				}
			}
		}()

		return out
	}

handler 模塊程式碼

handler := func(ctx context.Context, group <-chan []string) <-chan handled {
		out := make(chan handled)

		go func() {
			defer close(out)

			p := handled{}
			for g := range group {
				for _, r := range g {
					// do something
				}
			}
			out <- p
		}()

		return out
	}

//
type handled struct {
	//
}

combiner 模塊程式碼

combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
		out := make(chan handled)

		var wg sync.WaitGroup
		com := func(p <-chan handled) {
			defer wg.Done()

			for in := range p {
				select {
				case <-ctx.Done():
				case out <- in:
				}
			}
		}

		wg.Add(len(inputs))
		for _, in := range inputs {
			go com(in)
		}

		go func() {
			wg.Wait()
			close(out)
		}()

		return out
	}

最終實現


group := []string{}
//read
Ch := reader(ctx, &group)

numOfHandler :=10

//handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
  handlersCh[i] = handle(ctx, Ch)
}

//combine
for handled := range combiner(ctx, handlersCh...) {
  // do something
}
載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。