overview#
处理文件本质上就是将文件内容从磁盘加载到内存中,再进行修改等操作,当文件不大时(比如 16MB),我们可以直接将整个文件加载到内存中,当文件大小高达 32G 时,这种方式显然并不合适。
我们的思路便是将每一次读取到内存中的内容缩小到合适的大小,这样即可完成整个文件的读取。
本文分为三部分:
- 分片处理,适用于二进制等文件;
- 分块处理,适用于文本文件;
- 并行处理,可对读取处理的效率进一步优化。
分片处理#
所谓分片,就是指创建一个带有容量的切片,每次对文件都读取切片容量个大小的内容至该切片中处理。
package main
import (
"fmt"
"os"
)
func main() {
file, err := os.Open("largefile.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
buf := make([]byte, 1024) // 创建一个大小为1024的缓冲区
switch n, err := file.Read(buf[:]); true {
case n < 0:
fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
os.Exit(1)
case n == 0: // EOF
return nil
case n > 0:
handle(buf[0:nr])
}
}
}
分块处理#
其宗旨是对文本文件按照规定的方式进行分割成块,每一次读取并处理一个块;
我们按照以行为分割方式的处理为例:
package main
import (
"bufio"
"os"
)
func main() {
file, err := os.Open("file.txt")
if err != nil {
os.Exit(1)
}
defer file.Close()
//规定最大块容量
const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
buf := make([]byte)
//scanner是bufio中定义的结构,提供了一个方便的接口,用于读取数据;
//连续调用Scan方法将逐步遍历文件中的“标记”,跳过标记之间的字节;
//标记的规范由类型为SplitFunc的拆分函数定义;默认拆分函数将输入按行拆分,并去除行终止符;
//通过NewScanner方法传入io.Reader创建scanner。
scanner := bufio.NewScanner(file)
//Buffer在扫描时设置初始缓冲区和可分配的最大缓冲区大小;
//最大标记大小是max和cap(buf)中较大的值;
//如果max <= cap(buf),Scan将仅使用此缓冲区,不进行分配;
//默认情况下,Scan使用内部缓冲区,并将最大标记大小设置为MaxScanTokenSize。
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
//获取每一次读到的块
l := scanner.Text()
//处理读取的内容
handle(l)
}
if err := scanner.Err(); err != nil {
//处理错误
}
}
按照代码注释中所讲:标记的规范由类型为 SplitFunc 的拆分函数定义;
我们可以自定义 SplitFunc 类型的函数来规定分隔符;
具体可以参考scanner 的文档介绍
并行优化#
Go 语言具有天然的并发优势,而现代 cpu 往往是多核,所以可以考虑将程序改为并行处理,这会给我们处理单个大型文件带来性能上的巨大提升;
首先思考并行在哪里;
文件处理的流程是读取,然后处理;
在同步模式下,读一块,处理一块,不断循环;
但事实上,在处理一块的时候,完全可以读取下一块,也就是说,下一块的读取与处理和上一块是不相干的;
进一步,我们还可以将多个块打包为一组,每次读取部分 reader 读取完一个组,交给处理部分 handler 去处理,在此期间无需堵塞,直接读取下一个组中的块,当读完组后,交给另一个 handler 处理;
于是我们可以设置 handler 的数目,这个值相当于我们划分出来的组的数量,还可以设置组的大小,也就是一个组包含几个块;
而组的交付则使用 channel 来进行;
而最终,我们还要将多个 handler 中的处理完成的内容汇总,才能得到最后的结果,于是我们还需要一个 combiner;
现在,我们考虑好了,整个过程有三大模块:reader,handler,combiner,每一个模块都可以用函数去表示,最终在主函数中产生联系;
reader 模块代码:
reader := func(ctx context.Context, group *[]string) <-chan []string {
out := make(chan []string)
scanner := bufio.NewScanner(f)
go func() {
defer close(out)
for {
scanned := scanner.Scan()
select {
case <-ctx.Done():
return
default:
r := scanner.Text()
if len(*group) == groupSize || !scanned {
out <- *group
*group = []string{}
}
*group = append(*group, r) }
if !scanned {
return
}
}
}()
return out
}
handler 模块代码
handler := func(ctx context.Context, group <-chan []string) <-chan handled {
out := make(chan handled)
go func() {
defer close(out)
p := handled{}
for g := range group {
for _, r := range g {
// do something
}
}
out <- p
}()
return out
}
//
type handled struct {
//
}
combiner 模块代码
combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
out := make(chan handled)
var wg sync.WaitGroup
com := func(p <-chan handled) {
defer wg.Done()
for in := range p {
select {
case <-ctx.Done():
case out <- in:
}
}
}
wg.Add(len(inputs))
for _, in := range inputs {
go com(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
最终实现
group := []string{}
//read
Ch := reader(ctx, &group)
numOfHandler :=10
//handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
handlersCh[i] = handle(ctx, Ch)
}
//combine
for handled := range combiner(ctx, handlersCh...) {
// do something
}