Go: One Billion Row Challenge
Go: One Billion Row Challenge

Go: One Billion Row Challenge

Tags
Published
February 1, 2024
Author
The One Billion Row Challenge (1BRC) is summarized as follows:
Write a Go program for retrieving temperature measurement values from a text file and calculating the min, mean, and max temperature per weather station. There’s just one caveat: the file has 1,000,000,000 rows!
 
Here is an example of how the text file would look like:
Hamburg;12.0 Bulawayo;8.9 Palembang;38.8 St. John's;15.2 Cracow;12.6 ...
  • This comes out to be about 13 GB of data
 
In the article, the author describes 9 iterations that optimize the Go program from 1m45s to just 4 seconds.

Iteration 1: Simple Go program

func r1(inputPath string, output io.Writer) error { type stats struct { min, max, sum float64 count int64 } f, err := os.Open(inputPath) if err != nil { return err } defer f.Close() stationStats := make(map[string]stats) scanner := bufio.NewScanner(f) for scanner.Scan() { line := scanner.Text() station, tempStr, hasSemi := strings.Cut(line, ";") if !hasSemi { continue } temp, err := strconv.ParseFloat(tempStr, 64) if err != nil { return err } s, ok := stationStats[station] if !ok { s.min = temp s.max = temp s.sum = temp s.count = 1 } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ } stationStats[station] = s } stations := make([]string, 0, len(stationStats)) for station := range stationStats { stations = append(stations, station) } sort.Strings(stations) fmt.Fprint(output, "{") for i, station := range stations { if i > 0 { fmt.Fprint(output, ", ") } s := stationStats[station] mean := s.sum / float64(s.count) fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, s.min, mean, s.max) } fmt.Fprint(output, "}\n") return nil }
The above code ran in 1m45s

Iteration 2: Reducing number of hash operations

It turns out that in the above, we’re doing a lot more hashing than we have to.
For each line, we’re hashing the string key (station) twice: once when we try to fetch the value from the map, and once when we update the map.
To avoid that, we can use a map[string]*stats (pointer values) and update the pointed-to struct, instead of a map[string]stats and updating the hash table itself.
Running a CPU profiler on the Go program
$ ./go-1brc -cpuprofile=cpu.prof -revision=1 measurements-10000000.txt >measurements-10000000.out Processed 131.6MB in 965.888929ms $ go tool pprof -http=: cpu.prof
notion image
Map operations are taking a full 30% of the time: 12.24% for assign and 17.35% for lookup.
s := stationStats[station] if s == nil { stationStats[station] = &stats{ min: temp, max: temp, sum: temp, count: 1, } } else { s.min = min(s.min, temp) s.max = max(s.max, temp) s.sum += temp s.count++ }
In the above, code, for most cases, we hit the else block and the hash operation is done only once, for accessing.
This reduced the operation time by 14 seconds to 1m31s

Iteration 3: Avoiding standard library parseFloat

 The standard library function handles a ton of edge cases we don’t need to support for the simple temperatures our input has: 2 or 3 digits in the format 1.2 or 34.5 (and some with a minus sign in front)
Instead of copying and allocating a string when we use Scanner.Text, we can directly access a byte slice via Scanner.Bytes
Combined with writing custom logic for parsing temperatures, we reduce the operation time by 36 seconds, down to 55s.
negative := false index := 0 if tempBytes[index] == '-' { index++ negative = true } temp := float64(tempBytes[index] - '0') // parse first digit index++ if tempBytes[index] != '.' { temp = temp*10 + float64(tempBytes[index]-'0') // parse optional second digit index++ } index++ // skip '.' temp += float64(tempBytes[index]-'0') / 10 // parse decimal digit if negative { temp = -temp }

Iteration 4: Fixed point integers

Floating point instructions are always slower than integer ones
Since each temperature has a single decimal digit, we can use fixed point integers to represent them
For example, 34.5 => 345
type stats struct { min, max, count int32 sum int64 }
The only other change is that now you divide the fix point integer by 10 when you print:
mean := float64(s.sum) / float64(s.count) / 10 fmt.Fprintf(output, "%s=%.1f/%.1f/%.1f", station, float64(s.min)/10, mean, float64(s.max)/10)
Using integers cut the time down from 55.8 seconds to 51.0 seconds

Iteration 5: Avoid bytes.Cut

Observing the CPU Profiler again
notion image
The author looks for easiest possible optimizations: bytes.Cut takes 6%.
Instead of using bytes.Cut, you can write logic to extract necessary information out
end := len(line) tenths := int32(line[end-1] - '0') ones := int32(line[end-3] - '0') // line[end-2] is '.' var temp int32 var semicolon int if line[end-4] == ';' { // positive N.N temperature temp = ones*10 + tenths semicolon = end - 4 } else if line[end-4] == '-' { // negative -N.N temperature temp = -(ones*10 + tenths) semicolon = end - 5 } else { tens := int32(line[end-4] - '0') if line[end-5] == ';' { // positive NN.N temperature temp = tens*100 + ones*10 + tenths semicolon = end - 5 } else { // negative -NN.N temperature temp = -(tens*100 + ones*10 + tenths) semicolon = end - 6 } } station := line[:semicolon]
The above leads to a small improvement of 51.0 seconds to 46.0 seconds

Iteration 6: Avoid bufio.Scanner

As we see in the above CPU Profiler, bufio.Scanner takes up 15%.
To find the end of each line, the scanner has to look through all the bytes to find the newline character. Then we process many of the bytes again to parse the temperature and find the ';'. So let’s try to combine those steps and throw bufio.Scanner out the window.
The implementation involves:
  1. Allocating a 1MB buffer to read the file in chunks
  1. Look for the newline character in each chunk
  1. Processing each chunk
buf := make([]byte, 1024*1024) readStart := 0 for { n, err := f.Read(buf[readStart:]) if err != nil && err != io.EOF { return err } if readStart+n == 0 { break } chunk := buf[:readStart+n] newline := bytes.LastIndexByte(chunk, '\n') if newline < 0 { break } remaining := chunk[newline+1:] chunk = chunk[:newline+1] for { station, after, hasSemi := bytes.Cut(chunk, []byte(";"))
Removing bufio.Scanner and doing our own scanning cut down the time from 46.0 seconds to 41.3 seconds.

Iteration 7: Use custom hash table

Next, we’re left with no other area to optimize: the map operations dominate 56% of resources.
Implementing a custom hash table instead of using Go’s map has two advantages:
  1. We can hash the station name as we look for the ';', avoiding processing bytes twice.
  1. We can store each key in our hash table as a byte slice, avoiding the need to convert each key to a string (which will allocate and copy for every line).
The implemented hash table uses the FNV-1a hash algorithm with linear probing: if there’s a collision, use the next empty slot.
Code for this is omitted since there is not that much to learn from
The custom hash table cuts down the time from 41.3 seconds to 25.8s.

Iteration 8: Process chunks in parallel

The idea is to split the file into similar-sized chunks (one for each CPU core), fire up a thread (in Go, a goroutine) to process each chunk, and then merge the results at the end.
// Determine non-overlapping parts for file split (each part has offset and size). parts, err := splitFile(inputPath, maxGoroutines) if err != nil { return err } // Start a goroutine to process each part, returning results on a channel. resultsCh := make(chan map[string]r8Stats) for _, part := range parts { go r8ProcessPart(inputPath, part.offset, part.size, resultsCh) } // Wait for the results to come back in and aggregate them. totals := make(map[string]r8Stats) for i := 0; i < len(parts); i++ { result := <-resultsCh for station, s := range result { ts, ok := totals[station] if !ok { totals[station] = r8Stats{ min: s.min, max: s.max, sum: s.sum, count: s.count, } continue } ts.min = min(ts.min, s.min) ts.max = max(ts.max, s.max) ts.sum += s.sum ts.count += s.count totals[station] = ts } }
splitFile looks at the size of the file, divides that by the number of parts we want, and then seeks to each part, reading 100 bytes before the end and looking for the last newline to ensure each part ends with a full line.
r8ProcessPart is shown below:
func r8ProcessPart(inputPath string, fileOffset, fileSize int64, resultsCh chan map[string]r8Stats) { file, err := os.Open(inputPath) if err != nil { panic(err) } defer file.Close() _, err = file.Seek(fileOffset, io.SeekStart) if err != nil { panic(err) } f := io.LimitedReader{R: file, N: fileSize} stationStats := make(map[string]r8Stats) scanner := bufio.NewScanner(&f) for scanner.Scan() { // ... same processing as r1 ... } resultsCh <- stationStats }
Processing the input file in parallel takes the time from 1 minute 45 seconds to 24.3 seconds.

Final iteration: Combining iteration 7 and 8

This final version cut down the time from 24.3 seconds to 3.99 seconds, a huge win.

Conclusion

Thomas Wuerthinger (with credit to others) created the fastest overall solution to the original challenge in Java
On top of the above solution, other faster solutions have tried:
  1. memory-mapped files
  1. unrolled loops, non-branching parsing code, and other low-level tricks.
 
For those interested, there are also 1BRC challenges on the database level