Store Indexer

Indexer写入时并不是直接写入磁盘,而是先写入内存中,然后定期将内存中的 数据写入磁盘中,这么做可以减少磁盘的写入次数,提升性能。具体实现的时候采用循环数组这个Ring数据结构,实现原理就是:内部预先分配缓冲区数组,用两个指针分别用来表示读、写的位置,用两个计数器记录当前已经写入缓冲区、磁盘的Index计数。将Index写入磁盘。

Ring类

type Ring struct {
    // read
    //当前读的数量,即写入磁盘的Index数量
    rn int64
    //当前缓冲区可读的index
    rp int
    // write
    //当前已经写的数量,即写入缓冲区的Index数量
    wn int64
    //当前缓冲区可写的index
    wp int
    // info
    //缓冲区最大个数
    num  int
    //Index缓冲区
    data []Index
}

写Ring

  • 判断缓冲区中的Index数量
  • Ring.Set返回当前写入的index的引用
  • Ring.SetAdv标记写入缓冲区成功,将写入位置顺延到下一个
func (r *Ring) Set() (index *Index, err error) {
    if r.Buffered() >= r.num {
        return nil, errors.ErrRingFull
    }
    index = &r.data[r.wp]
    return
}

func (r *Ring) SetAdv() {
    if r.wp++; r.wp >= r.num {
        r.wp = 0
    }
    r.wn++
}

func (r *Ring) Buffered() int {
    return int(r.wn - r.rn)
}

读Ring

  • 判断缓冲区的数量
  • Ring.Get返回当前读index的引用Index
  • Ring.GetAdv标记读成功,将读位置顺延到下一个
func (r *Ring) Get() (index *Index, err error) {
    if r.wn == r.rn {
        return nil, errors.ErrRingEmpty
    }
    index = &r.data[r.rp]
    return
}

func (r *Ring) GetAdv() {
    if r.rp++; r.rp >= r.num {
        r.rp = 0
    }
    r.rn++
}

Indexer

Indexer初始的时候打开volume配置的Indexer文件,使用Ring作为Index缓冲区,可以循环写入Index。定时、定量从Ring缓冲区取出Index序列化成自己数组缓冲区,当缓冲区写入的次数满足一定,就强制刷新数据到磁盘上。

// Indexer used for fast recovery super block needle cache.
type Indexer struct {
    wg     sync.WaitGroup
    //Index写入文件句柄
    f      *os.File
    //事件管道,需要merge的时候,发送信号到这个管道中
    signal chan int
    //Index循环写入工具类
    ring   *Ring
    // buffer
    //内部缓冲区,在写入文件之前需要先将Index转换成二进制byte
    buf []byte
    //缓冲区index
    bn  int
    File    string `json:"file"`
    LastErr error  `json:"last_err"`
    //Index文件写入的offset
    Offset  int64  `json:"offset"`
    conf    *conf.Config
    // status
    //同步的offset
    syncOffset int64
    closed     bool
    //记录缓冲区写入的次数
    write      int
}

Indexer接口

// Signal signal the write job merge index data.
func (i *Indexer) Signal() {
    if i.closed {
        return
    }
    select {
    case i.signal <- _ready:
    default:
    }
}

// Add append a index data to ring.
func (i *Indexer) Add(key int64, offset uint32, size int32) (err error) {
    var index *Index
    if i.LastErr != nil {
        return i.LastErr
    }
    if index, err = i.ring.Set(); err != nil {
        i.LastErr = err
        return
    }
    index.Key = key
    index.Offset = offset
    index.Size = size
    //从Ring循环数组中取出可写位置
    i.ring.SetAdv()
    //Ring缓冲区中数量足够多,需要merge,发送信号
    if i.ring.Buffered() > i.conf.Index.MergeWrite {
        i.Signal()
    }
    return
}

Indexer Merge

  • Ring中取出Index
  • Index.WriteIndex序列化成字节数组,写入Indexer缓冲区
  • Indexer.flush操作
// Write append index needle to disk.
// WARN can't concurrency with merge and write.
// ONLY used in super block recovery!!!!!!!!!!!
func (i *Indexer) Write(key int64, offset uint32, size int32) (err error) {
    if i.LastErr != nil {
        return i.LastErr
    }
    if i.bn+_indexSize >= i.conf.Index.BufferSize {
        // buffer full
        if err = i.flush(true); err != nil {
            return
        }
    }
    binary.BigEndian.PutInt64(i.buf[i.bn:], key)
    i.bn += _keySize
    binary.BigEndian.PutUint32(i.buf[i.bn:], offset)
    i.bn += _offsetSize
    binary.BigEndian.PutInt32(i.buf[i.bn:], size)
    i.bn += _sizeSize
    err = i.flush(false)
    return
}

// mergeRing get index data from ring then write to disk.
func (i *Indexer) mergeRing() (err error) {
    var index *Index
    for {
        if index, err = i.ring.Get(); err != nil {
            err = nil
            break
        }
        if err = i.Write(index.Key, index.Offset, index.Size); err != nil {
            log.Errorf("index: %s Write() error(%v)", i.File, err)
            break
        }
        i.ring.GetAdv()
    }
    return
}

// merge merge from ring index data, then write to disk.
func (i *Indexer) merge() {
    var (
        err error
        sig int
    )
    log.Infof("index: %s write job start", i.File)
    for {
        select {
        case sig = <-i.signal:
        case <-time.After(i.conf.Index.MergeDelay.Duration):
            sig = _ready
        }
        if sig != _ready {
            break
        }
        if err = i.mergeRing(); err != nil {
            break
        }
        if err = i.flush(false); err != nil {
            break
        }
    }
    i.mergeRing()
    i.flush(true)
    i.wg.Done()
    log.Warningf("index: %s write job exit", i.File)
    return
}

Index Flush

  • 判断是否强制同步、判断写入次数是否满足强制同步
  • 如果满足同步条件,则写入文件
  • 更新当前文件写Offset,缓冲区偏移bn,写入缓冲区次数write
  • 判断是否用syncfilerange这种方式还是datasync这种强制同步磁盘
// flush the in-memory data flush to disk.
func (i *Indexer) flush(force bool) (err error) {
    var (
        fd     uintptr
        offset int64
        size   int64
    )
    if i.write++; !force && i.write < i.conf.Index.SyncWrite {
        return
    }
    if _, err = i.f.Write(i.buf[:i.bn]); err != nil {
        i.LastErr = err
        log.Errorf("index: %s Write() error(%v)", i.File, err)
        return
    }
    i.Offset += int64(i.bn)
    i.bn = 0
    i.write = 0
    offset = i.syncOffset
    size = i.Offset - i.syncOffset
    fd = i.f.Fd()
    if i.conf.Index.Syncfilerange {
        if err = myos.Syncfilerange(fd, offset, size, myos.SYNC_FILE_RANGE_WRITE); err != nil {
            i.LastErr = err
            log.Errorf("index: %s Syncfilerange() error(%v)", i.File, err)
            return
        }
    } else {
        if err = myos.Fdatasync(fd); err != nil {
            i.LastErr = err
            log.Errorf("index: %s Fdatasync() error(%v)", i.File, err)
            return
        }
    }
    if err = myos.Fadvise(fd, offset, size, myos.POSIX_FADV_DONTNEED); err == nil {
        i.syncOffset = i.Offset
    } else {
        log.Errorf("index: %s Fadvise() error(%v)", i.File, err)
        i.LastErr = err
    }
    return
}

// Flush flush writer buffer.
func (i *Indexer) Flush() (err error) {
    if i.LastErr != nil {
        return i.LastErr
    }
    err = i.flush(true)
    return
}

results matching ""

    No results matching ""