Store Indexer
Indexer写入时并不是直接写入磁盘,而是先写入内存中,然后定期将内存中的
数据写入磁盘中,这么做可以减少磁盘的写入次数,提升性能。具体实现的时候采用循环数组这个Ring数据结构,实现原理就是:内部预先分配缓冲区数组,用两个指针分别用来表示读、写的位置,用两个计数器记录当前已经写入缓冲区、磁盘的Index计数。将Index写入磁盘。
Ring类
type Ring struct {
// read
//当前读的数量,即写入磁盘的Index数量
rn int64
//当前缓冲区可读的index
rp int
// write
//当前已经写的数量,即写入缓冲区的Index数量
wn int64
//当前缓冲区可写的index
wp int
// info
//缓冲区最大个数
num int
//Index缓冲区
data []Index
}
写Ring
- 判断缓冲区中的
Index数量 Ring.Set返回当前写入的index的引用Ring.SetAdv标记写入缓冲区成功,将写入位置顺延到下一个
func (r *Ring) Set() (index *Index, err error) {
if r.Buffered() >= r.num {
return nil, errors.ErrRingFull
}
index = &r.data[r.wp]
return
}
func (r *Ring) SetAdv() {
if r.wp++; r.wp >= r.num {
r.wp = 0
}
r.wn++
}
func (r *Ring) Buffered() int {
return int(r.wn - r.rn)
}
读Ring
- 判断缓冲区的数量
Ring.Get返回当前读index的引用IndexRing.GetAdv标记读成功,将读位置顺延到下一个
func (r *Ring) Get() (index *Index, err error) {
if r.wn == r.rn {
return nil, errors.ErrRingEmpty
}
index = &r.data[r.rp]
return
}
func (r *Ring) GetAdv() {
if r.rp++; r.rp >= r.num {
r.rp = 0
}
r.rn++
}
Indexer
Indexer初始的时候打开volume配置的Indexer文件,使用Ring作为Index缓冲区,可以循环写入Index。定时、定量从Ring缓冲区取出Index序列化成自己数组缓冲区,当缓冲区写入的次数满足一定,就强制刷新数据到磁盘上。
// Indexer used for fast recovery super block needle cache.
type Indexer struct {
wg sync.WaitGroup
//Index写入文件句柄
f *os.File
//事件管道,需要merge的时候,发送信号到这个管道中
signal chan int
//Index循环写入工具类
ring *Ring
// buffer
//内部缓冲区,在写入文件之前需要先将Index转换成二进制byte
buf []byte
//缓冲区index
bn int
File string `json:"file"`
LastErr error `json:"last_err"`
//Index文件写入的offset
Offset int64 `json:"offset"`
conf *conf.Config
// status
//同步的offset
syncOffset int64
closed bool
//记录缓冲区写入的次数
write int
}
Indexer接口
// Signal signal the write job merge index data.
func (i *Indexer) Signal() {
if i.closed {
return
}
select {
case i.signal <- _ready:
default:
}
}
// Add append a index data to ring.
func (i *Indexer) Add(key int64, offset uint32, size int32) (err error) {
var index *Index
if i.LastErr != nil {
return i.LastErr
}
if index, err = i.ring.Set(); err != nil {
i.LastErr = err
return
}
index.Key = key
index.Offset = offset
index.Size = size
//从Ring循环数组中取出可写位置
i.ring.SetAdv()
//Ring缓冲区中数量足够多,需要merge,发送信号
if i.ring.Buffered() > i.conf.Index.MergeWrite {
i.Signal()
}
return
}
Indexer Merge
- 从
Ring中取出Index Index.Write将Index序列化成字节数组,写入Indexer缓冲区Indexer.flush操作
// Write append index needle to disk.
// WARN can't concurrency with merge and write.
// ONLY used in super block recovery!!!!!!!!!!!
func (i *Indexer) Write(key int64, offset uint32, size int32) (err error) {
if i.LastErr != nil {
return i.LastErr
}
if i.bn+_indexSize >= i.conf.Index.BufferSize {
// buffer full
if err = i.flush(true); err != nil {
return
}
}
binary.BigEndian.PutInt64(i.buf[i.bn:], key)
i.bn += _keySize
binary.BigEndian.PutUint32(i.buf[i.bn:], offset)
i.bn += _offsetSize
binary.BigEndian.PutInt32(i.buf[i.bn:], size)
i.bn += _sizeSize
err = i.flush(false)
return
}
// mergeRing get index data from ring then write to disk.
func (i *Indexer) mergeRing() (err error) {
var index *Index
for {
if index, err = i.ring.Get(); err != nil {
err = nil
break
}
if err = i.Write(index.Key, index.Offset, index.Size); err != nil {
log.Errorf("index: %s Write() error(%v)", i.File, err)
break
}
i.ring.GetAdv()
}
return
}
// merge merge from ring index data, then write to disk.
func (i *Indexer) merge() {
var (
err error
sig int
)
log.Infof("index: %s write job start", i.File)
for {
select {
case sig = <-i.signal:
case <-time.After(i.conf.Index.MergeDelay.Duration):
sig = _ready
}
if sig != _ready {
break
}
if err = i.mergeRing(); err != nil {
break
}
if err = i.flush(false); err != nil {
break
}
}
i.mergeRing()
i.flush(true)
i.wg.Done()
log.Warningf("index: %s write job exit", i.File)
return
}
Index Flush
- 判断是否强制同步、判断写入次数是否满足强制同步
- 如果满足同步条件,则写入文件
- 更新当前文件写
Offset,缓冲区偏移bn,写入缓冲区次数write - 判断是否用
syncfilerange这种方式还是datasync这种强制同步磁盘
// flush the in-memory data flush to disk.
func (i *Indexer) flush(force bool) (err error) {
var (
fd uintptr
offset int64
size int64
)
if i.write++; !force && i.write < i.conf.Index.SyncWrite {
return
}
if _, err = i.f.Write(i.buf[:i.bn]); err != nil {
i.LastErr = err
log.Errorf("index: %s Write() error(%v)", i.File, err)
return
}
i.Offset += int64(i.bn)
i.bn = 0
i.write = 0
offset = i.syncOffset
size = i.Offset - i.syncOffset
fd = i.f.Fd()
if i.conf.Index.Syncfilerange {
if err = myos.Syncfilerange(fd, offset, size, myos.SYNC_FILE_RANGE_WRITE); err != nil {
i.LastErr = err
log.Errorf("index: %s Syncfilerange() error(%v)", i.File, err)
return
}
} else {
if err = myos.Fdatasync(fd); err != nil {
i.LastErr = err
log.Errorf("index: %s Fdatasync() error(%v)", i.File, err)
return
}
}
if err = myos.Fadvise(fd, offset, size, myos.POSIX_FADV_DONTNEED); err == nil {
i.syncOffset = i.Offset
} else {
log.Errorf("index: %s Fadvise() error(%v)", i.File, err)
i.LastErr = err
}
return
}
// Flush flush writer buffer.
func (i *Indexer) Flush() (err error) {
if i.LastErr != nil {
return i.LastErr
}
err = i.flush(true)
return
}