查看原文
其他

Golang 秒读 16GB 大文件

推荐关注↓

【导读】go读取大文件怎么做?本文巧妙使用bufio达成了读取大文件的目标,来看看如何实践吧!

现代计算机系统每天都会产生大量的数据,随着系统规模的增大,把产生的所有调试数据存储到数据库是不可行的,因为它们产生以后就不会被改变,只是用来分析和解决故障。因此把它存储在本地磁盘上是一个很好的办法。

在这我们打算使用GOLANG读取一个16GB大小,上百万行内容的txt或者log文件。跟我一起来吧。

LET‘s CODE

首先我们使用GO标准库中的os.File来打开文件

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

当文件被打开以后,我们有两个选择

  1. 逐行读取,这能帮助我们减少内存的使用,但会耗费大量的时间在IO上。
  2. 一次性读取整个文件到内存中进行处理,这将显著的增加内存使用,但是会节省大量的时间。

但是要注意,我们的文件大小是16GB,因此我们不可能把它一次性的加载到内存中。但是第一个选择也不适合,因为我们想在几秒内处理完成。

因此,仔细想想,我们也许还有第三个选择,我们不去一次性读取整个文件到内存中,而是分段读取,想想看GO的标准库中是不是有个bufio.NewReader()呢?

r := bufio.NewReader(f)
for {
  buf := make([]byte,4*1024//the chunk size
  n, err := r.Read(buf) //loading chunk into buffer
  buf = buf[:n]
  if n == 0 {
       if err == io.EOF {
         break
       }
       if err != nil {
         fmt.Println(err)
         break
       }
       return err
    }
}

一旦我们有个一个分块,我们就可以开启一个goroutine去处理它。因此上面的代码可以做如下修改。

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
 
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
     if n == 0 {
        if err == io.EOF {
            break
        }
        if err != nil {
            fmt.Println(err)
            break
        }
 
        return err
     }
     nextUntillNewline, err := r.ReadBytes('\n')//read entire line
 
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
 
     wg.Add(1)
     go func() { 
 
        //process each chunk concurrently
        //start -> log start time, end -> log end time
 
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
        wg.Done()
 
     }()
    }
    wg.Wait()
}

上面的代码做了两个优化:

  1. sync.Pool可以减轻GC的压力,我们可以重复使用已分配的内存,减少内存消耗,加快处理速度。
  2. goroutine帮助我们并发处理多个切块,显著加快处理速度。

接下来我们就来实现ProcessChunk函数,来处理如下格式的日志文件

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳来提取日志

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
 
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
 
            logParts := strings.SplitN(text, ","2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
 
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
 
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

使用上面的代码来打开处理16GB的日志文件,测试花费时间大概是25s。


转自:寒城

链接:zhuanlan.zhihu.com/p/184937550



- EOF -

推荐阅读  点击标题可跳转

1、详解 Golang 内存映射文件 mmap

2、揭晓 Go 语言真实现状!

3、Go 语言错误处理的优雅实现


关注「程序员的那些事」加星标,不错过圈内事

点赞和在看就是最大的支持❤️

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存