概要#
ファイルの処理は、基本的にはファイルの内容をディスクからメモリに読み込んで、変更などの操作を行い、ファイルが小さい場合(例:16MB)、ファイル全体を直接メモリに読み込むことができますが、ファイルサイズが 32G に達すると、この方法は明らかに適切ではありません。
私たちのアイデアは、メモリに読み込まれる内容を適切なサイズに縮小することで、ファイル全体の読み取りを完了することです。
この記事は 3 つのパートに分かれています:
- チャンク処理、バイナリファイルなどに適用されます。
- ブロック処理、テキストファイルに適用されます。
- 並列処理、読み取り処理の効率をさらに最適化できます。
チャンク処理#
チャンクとは、容量を持つスライスを作成し、ファイルごとにチャンク容量のサイズのコンテンツを読み取り、そのチャンクで処理することを指します。
package main
import (
"fmt"
"os"
)
func main() {
file, err := os.Open("largefile.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
buf := make([]byte, 1024) // サイズ1024のバッファを作成
switch n, err := file.Read(buf[:]); true {
case n < 0:
fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
os.Exit(1)
case n == 0: // EOF
return nil
case n > 0:
handle(buf[0:nr])
}
}
}
ブロック処理#
その目的は、テキストファイルを指定された方法でブロックに分割し、1 つのブロックを読み取り、処理することです。
行を区切りとする処理を例に説明します:
package main
import (
"bufio"
"os"
)
func main() {
file, err := os.Open("file.txt")
if err != nil {
os.Exit(1)
}
defer file.Close()
// 最大ブロック容量を指定
const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
buf := make([]byte)
// scannerはbufioパッケージで定義された構造体で、データの読み取りに便利なインターフェースを提供します。
// Scanメソッドを連続して呼び出すと、ファイル内の「トークン」を順番にスキャンし、トークン間のバイトをスキップします。
// トークンの仕様は、SplitFunc型の分割関数によって定義されます。デフォルトの分割関数は入力を行ごとに分割し、行末の改行を削除します。
// NewScannerメソッドにio.Readerを渡すことで、scannerを作成します。
scanner := bufio.NewScanner(file)
// Bufferはスキャン時に使用される初期バッファと割り当て可能な最大バッファサイズを設定します。
// 最大トークンサイズはmaxとcap(buf)のうち大きい方です。
// max <= cap(buf)の場合、Scanはこのバッファのみを使用し、割り当てを行いません。
// デフォルトでは、Scanは内部バッファを使用し、最大トークンサイズをMaxScanTokenSizeに設定します。
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
// 読み取ったブロックを取得
l := scanner.Text()
// 読み取ったコンテンツを処理
handle(l)
}
if err := scanner.Err(); err != nil {
// エラー処理
}
}
コードコメントで説明されているように、トークンの仕様は SplitFunc 型の分割関数によって定義されます。
分割記号をカスタマイズするために、SplitFunc 型の関数を自分で定義することもできます。
詳細については、scanner のドキュメントを参照してください。
並列処理の最適化#
Go 言語は並行処理の利点を持っており、現代の CPU は通常、マルチコアです。そのため、プログラムを並行処理に変更することで、単一の大規模なファイルの処理に対して大幅なパフォーマンス向上が期待できます。
まず、どこで並行処理を行うかを考えます。
ファイル処理のフローは、読み取り、処理の順です。
同期モードでは、1 つのブロックを読み取り、1 つのブロックを処理し、無限ループします。
しかし、実際には、1 つのブロックを処理する間に、次のブロックを読み取ることができます。つまり、次のブロックの読み取りと処理は前のブロックとは関係ありません。
さらに、複数のブロックを 1 つのグループにまとめ、部分的なリーダーが 1 つのグループを読み取ると、ハンドラーに処理を渡し、その間にブロックの次のグループを直接読み取ることができます。
したがって、ハンドラーの数を設定し、この値は私たちが作成したグループの数に相当し、グループのサイズも設定できます。つまり、1 つのグループに含まれるブロックの数です。
そして、グループの受け渡しにはチャネルを使用します。
最後に、ハンドラーの中で処理が完了したコンテンツを集約する必要があります。そのためには、コンバイナーが必要です。
これで、私たちは考えることができました。全体のプロセスには 3 つの主要なモジュールがあります:リーダー、ハンドラー、コンバイナー。各モジュールは関数で表現できます。そして、最終的にはメイン関数でこれらのモジュールを結びつけます。
リーダーモジュールのコード:
reader := func(ctx context.Context, group *[]string) <-chan []string {
out := make(chan []string)
scanner := bufio.NewScanner(f)
go func() {
defer close(out)
for {
scanned := scanner.Scan()
select {
case <-ctx.Done():
return
default:
r := scanner.Text()
if len(*group) == groupSize || !scanned {
out <- *group
*group = []string{}
}
*group = append(*group, r) }
if !scanned {
return
}
}
}()
return out
}
ハンドラーモジュールのコード:
handler := func(ctx context.Context, group <-chan []string) <-chan handled {
out := make(chan handled)
go func() {
defer close(out)
p := handled{}
for g := range group {
for _, r := range g {
// 何か処理する
}
}
out <- p
}()
return out
}
//
type handled struct {
//
}
コンバイナーモジュールのコード:
combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
out := make(chan handled)
var wg sync.WaitGroup
com := func(p <-chan handled) {
defer wg.Done()
for in := range p {
select {
case <-ctx.Done():
case out <- in:
}
}
}
wg.Add(len(inputs))
for _, in := range inputs {
go com(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
最終的な実装:
group := []string{}
// 読み取り
Ch := reader(ctx, &group)
numOfHandler :=10
// 処理
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
handlersCh[i] = handle(ctx, Ch)
}
// 結合
for handled := range combiner(ctx, handlersCh...) {
// 何か処理する
}