Overview#
Processing files essentially involves loading the file content from the disk into memory and performing modifications. When the file is small (e.g., 16MB), we can directly load the entire file into memory. However, this approach is not suitable when the file size reaches 32GB.
Our approach is to reduce the size of each content loaded into memory to an appropriate size, allowing us to complete the reading of the entire file.
This article is divided into three parts:
- Chunk processing, suitable for binary files.
- Block processing, suitable for text files.
- Parallel processing, further optimizing the efficiency of reading and processing.
Chunk Processing#
Chunk processing refers to creating a slice with a specified capacity and reading a chunk of content from the file into the slice for processing.
package main
import (
"fmt"
"os"
)
func main() {
file, err := os.Open("largefile.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
buf := make([]byte, 1024) // Create a buffer with a size of 1024
switch n, err := file.Read(buf[:]); true {
case n < 0:
fmt.Fprintf(os.Stderr, "error: %s\n", err.Error())
os.Exit(1)
case n == 0: // EOF
return nil
case n > 0:
handle(buf[0:nr])
}
}
}
Block Processing#
The purpose of block processing is to divide a text file into blocks according to a specified method and read and process one block at a time.
Let's take the example of processing based on line separation:
package main
import (
"bufio"
"os"
)
func main() {
file, err := os.Open("file.txt")
if err != nil {
os.Exit(1)
}
defer file.Close()
// Specify the maximum block capacity
const maxScanTokenSize = 64 * 1024 * 1024 // 64MB
buf := make([]byte)
// The scanner is a structure defined in bufio that provides a convenient interface for reading data.
// By continuously calling the Scan method, the "tokens" in the file will be traversed step by step, skipping the bytes between the tokens.
// The specification of the tokens is defined by the split function, which is of type SplitFunc.
// The default split function splits the input by lines and removes the line terminator.
// The scanner is created by passing an io.Reader to the NewScanner method.
scanner := bufio.NewScanner(file)
// The Buffer sets the initial buffer and the maximum buffer size that can be allocated during scanning.
// The maximum token size is the larger value between max and cap(buf).
// If max <= cap(buf), Scan will only use this buffer and will not allocate a new one.
// By default, Scan uses an internal buffer and sets the maximum token size to MaxScanTokenSize.
scanner.Buffer(buf, maxScanTokenSize)
for scanner.Scan() {
// Get each block read
l := scanner.Text()
// Process the read content
handle(l)
}
if err := scanner.Err(); err != nil {
// Handle errors
}
}
As mentioned in the code comments: The specification of the tokens is defined by the split function.
We can customize a split function of type SplitFunc to specify the delimiter.
For more information, refer to the documentation of scanner.
Parallel Optimization#
Go language has inherent advantages in concurrency, and modern CPUs are often multi-core. Therefore, we can consider converting the program to parallel processing, which can greatly improve the performance of processing a single large file.
First, let's consider where parallelism can be applied.
The file processing flow involves reading and then processing.
In synchronous mode, we read one chunk and process one chunk in a continuous loop.
However, in reality, while processing one chunk, we can read the next chunk. In other words, the reading and processing of the next chunk are independent of the previous chunk.
Furthermore, we can bundle multiple chunks into a group. Each time a part of the reader finishes reading a group, it can be handed over to the processing part (handler) for processing. During this period, there is no need to block, and we can directly read the chunks in the next group. When a group is finished, it can be handed over to another handler for processing.
Therefore, we can set the number of handlers, which corresponds to the number of groups we divide, and we can also set the size of each group, which is the number of chunks in a group.
The delivery of groups can be done using channels.
Finally, we need to combine the processed content from multiple handlers to obtain the final result, so we also need a combiner.
Now that we have figured it out, the whole process has three main modules: reader, handler, and combiner. Each module can be represented by a function, and they are connected in the main function.
Reader module code:
reader := func(ctx context.Context, group *[]string) <-chan []string {
out := make(chan []string)
scanner := bufio.NewScanner(f)
go func() {
defer close(out)
for {
scanned := scanner.Scan()
select {
case <-ctx.Done():
return
default:
r := scanner.Text()
if len(*group) == groupSize || !scanned {
out <- *group
*group = []string{}
}
*group = append(*group, r) }
if !scanned {
return
}
}
}()
return out
}
Handler module code:
handler := func(ctx context.Context, group <-chan []string) <-chan handled {
out := make(chan handled)
go func() {
defer close(out)
p := handled{}
for g := range group {
for _, r := range g {
// do something
}
}
out <- p
}()
return out
}
//
type handled struct {
//
}
Combiner module code:
combiner := func(ctx context.Context, inputs ...<-chan handled) <-chan handled {
out := make(chan handled)
var wg sync.WaitGroup
com := func(p <-chan handled) {
defer wg.Done()
for in := range p {
select {
case <-ctx.Done():
case out <- in:
}
}
}
wg.Add(len(inputs))
for _, in := range inputs {
go com(in)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Final implementation:
group := []string{}
// Read
Ch := reader(ctx, &group)
numOfHandler :=10
// Handle
handlersCh := make([]<-chan handled, numOfHandler)
for i := 0; i < numOHandler; i++ {
handlersCh[i] = handle(ctx, Ch)
}
// Combine
for handled := range combiner(ctx, handlersCh...) {
// do something
}