banner
Alex Wu

Alex Wu

college student in CS | web3 | Gopher | lifelong grower | cat lover https://wureny.xyz/
github
x
email

Go处理大型文件

overview#

处理文件本质上就是将文件内容从磁盘加载到内存中,再进行修改等操作,当文件不大时(比如 16MB),我们可以直接将整个文件加载到内存中,当文件大小高达 32G 时,这种方式显然并不合适。

我们的思路便是将每一次读取到内存中的内容缩小到合适的大小,这样即可完成整个文件的读取。

本文分为三部分:

  1. 分片处理,适用于二进制等文件;
  2. 分块处理,适用于文本文件;
  3. 并行处理,可对读取处理的效率进一步优化。

分片处理#

所谓分片,就是指创建一个带有容量的切片,每次对文件都读取切片容量个大小的内容至该切片中处理。

package main

import (
	"fmt"
	"os"
)

func main() {
	file, err := os.Open("largefile.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	buf := make([]byte, 1024) // 创建一个大小为1024的缓冲区
	switch n, err := file.Read(buf[:]); true {
        case n < 0:
            fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
            os.Exit(1)
        case n == 0: // EOF
            return nil
        case n > 0:
            handle(buf[0:nr])
        }
    }
}

分块处理#

其宗旨是对文本文件按照规定的方式进行分割成块,每一次读取并处理一个块;

我们按照以行为分割方式的处理为例:

package main

import (
    "bufio"
    "os"
)

func main() {
    file, err := os.Open("file.txt")
    if err != nil {
				os.Exit(1)
    }
    defer file.Close()

		//规定最大块容量
    const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
    buf := make([]byte)

		//scanner是bufio中定义的结构,提供了一个方便的接口,用于读取数据;
		//连续调用Scan方法将逐步遍历文件中的“标记”,跳过标记之间的字节;
		//标记的规范由类型为SplitFunc的拆分函数定义;默认拆分函数将输入按行拆分,并去除行终止符;
		//通过NewScanner方法传入io.Reader创建scanner。
    scanner := bufio.NewScanner(file)

		//Buffer在扫描时设置初始缓冲区和可分配的最大缓冲区大小;
		//最大标记大小是max和cap(buf)中较大的值;
		//如果max <= cap(buf),Scan将仅使用此缓冲区,不进行分配;
		//默认情况下,Scan使用内部缓冲区,并将最大标记大小设置为MaxScanTokenSize。
    scanner.Buffer(buf, maxScanTokenSize)

    for scanner.Scan() {
		//获取每一次读到的块
        l := scanner.Text()
		//处理读取的内容
				handle(l)
    }

    if err := scanner.Err(); err != nil {
        //处理错误
    }
}

按照代码注释中所讲:标记的规范由类型为 SplitFunc 的拆分函数定义;

我们可以自定义 SplitFunc 类型的函数来规定分隔符;

具体可以参考scanner 的文档介绍

并行优化#

Go 语言具有天然的并发优势,而现代 cpu 往往是多核,所以可以考虑将程序改为并行处理,这会给我们处理单个大型文件带来性能上的巨大提升;

首先思考并行在哪里;

文件处理的流程是读取,然后处理;

在同步模式下,读一块,处理一块,不断循环;

但事实上,在处理一块的时候,完全可以读取下一块,也就是说,下一块的读取与处理和上一块是不相干的;

进一步,我们还可以将多个块打包为一组,每次读取部分 reader 读取完一个组,交给处理部分 handler 去处理,在此期间无需堵塞,直接读取下一个组中的块,当读完组后,交给另一个 handler 处理;

于是我们可以设置 handler 的数目,这个值相当于我们划分出来的组的数量,还可以设置组的大小,也就是一个组包含几个块;

而组的交付则使用 channel 来进行;

而最终,我们还要将多个 handler 中的处理完成的内容汇总,才能得到最后的结果,于是我们还需要一个 combiner;

现在,我们考虑好了,整个过程有三大模块:reader,handler,combiner,每一个模块都可以用函数去表示,最终在主函数中产生联系;

reader 模块代码:

reader := func(ctx context.Context, group *[]string) <-chan []string {
		out := make(chan []string)
		scanner := bufio.NewScanner(f)

		go func() {
			defer close(out) 
			for {
				scanned := scanner.Scan()

				select {
				case <-ctx.Done():
					return
				default:
					r := scanner.Text()
					if len(*group) == groupSize || !scanned {
						out <- *group
						*group = []string{} 					
}
					*group = append(*group, r) 				}

				if !scanned {
					return
				}
			}
		}()

		return out
	}

handler 模块代码

handler := func(ctx context.Context, group <-chan []string) <-chan handled {
		out := make(chan handled)

		go func() {
			defer close(out)

			p := handled{}
			for g := range group {
				for _, r := range g {
					// do something
				}
			}
			out <- p
		}()

		return out
	}

//
type handled struct {
	//
}

combiner 模块代码

combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
		out := make(chan handled)

		var wg sync.WaitGroup
		com := func(p <-chan handled) {
			defer wg.Done()

			for in := range p {
				select {
				case <-ctx.Done():
				case out <- in:
				}
			}
		}

		wg.Add(len(inputs))
		for _, in := range inputs {
			go com(in)
		}

		go func() {
			wg.Wait()
			close(out)
		}()

		return out
	}

最终实现


group := []string{}
//read
Ch := reader(ctx, &group)

numOfHandler :=10

//handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
  handlersCh[i] = handle(ctx, Ch)
}

//combine
for handled := range combiner(ctx, handlersCh...) {
  // do something
}
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。