Store上传
Volume.Write
- 更新传递过来的
Needle n
的Offset
- 调用
Block.Write
方法,在super block
中写入needle
Indexer.Add
写入key
对应的信息,Offset
和TotalSize
- 更新统计信息
// Write add a needle, if key exists append to super block, then update
// needle cache offset to new offset.
func (v *Volume) Write(n *needle.Needle) (err error) {
var (
ok bool
nc int64
offset uint32
now = time.Now().UnixNano()
)
v.lock.Lock()
n.Offset = v.Block.Offset
if err = v.Block.Write(n); err == nil {
if err = v.Indexer.Add(n.Key, n.Offset, n.TotalSize); err == nil {
nc, ok = v.needles[n.Key]
v.needles[n.Key] = needle.NewCache(n.Offset, n.TotalSize)
}
}
v.lock.Unlock()
if err == nil {
if log.V(1) {
log.Infof("add needle, offset: %d, size: %d", n.Offset, n.TotalSize)
log.Info(n)
}
if ok {
offset, _ = needle.Cache(nc)
v.del(offset)
}
atomic.AddUint64(&v.Stats.TotalWriteProcessed, 1)
atomic.AddUint64(&v.Stats.TotalWriteBytes, uint64(n.TotalSize))
atomic.AddUint64(&v.Stats.TotalWriteDelay, uint64(time.Now().UnixNano()-now))
}
return
}
SuperBlock.Write
sync_file_range
可以将文件的部分范围作为目标,将对应范围内的脏页刷回磁盘,而不是整个文件的范围。好处是,当我们对大文件进行了修改时,如果修改了大量的数据块,我们最后fsync
的时候,可能会很慢。即使fdatasync
,也是有问题的,例如这个大文件的长度在我们的修改过程中发生了变化,那么fdatasync
将同时写metadata
,而对于文件系统来说,单个文件系统的写metadata
是串行的,这势必导致影响其他用户操作metadata
(如创建文件)。
sync_file_range
是绝对不会写metadata
的,所以用它非常合适,每次对文件做了小范围的修改时,立即调用sync_file_range
,把对应的脏数据刷到磁盘,那么在结束对文件的修改后,再调用fdatasync (flush dirty data page), fsync(flush dirty data+metadata page
)都是很块的。
sync_file_range
的几个flag, 注意SYNC_FILE_RANGE_WRITE
是异步的,所以如果你要达到以上目的话,那么最好不要使用异步模式,或者至少在调用fdatasync
和fsync
前,使用SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
SYNC_FILE_RANGE_WAIT_AFTER
做一次全文件范围的sync_file_range
。从而保证在调用fdatasync
或fsync
前,该文件的dirty page
已经全部刷到磁盘了。
可以调用posix_fadvise
函数来设置一些操作文件的方式,调用该函数后内核会把所指定的范围从页面缓冲区回收,也就是清除缓存。网上有人用这个来消除测试I/O的效率时cache的影响。advice的值可以是下面的几种:
POSIX_FADV_NORMAL
:没有任何意见;POSIX_FADV_RANDOM
:程序打算随机读写,内核禁用预读功能,每次读取最少量的数据;POSIX_FADV_SEQUENTIALP
:打算顺序的方式访问,内核把预读的大小扩大一倍;POSIX_FADV_WILLNEED
:在不久的将来程序将访问该段内容,内核开启预读,把它们读入;POSIX_FADV_NOREUSE
:将来打算访问当只访问一次,但内核行为如同4;POSIX_FADV_DONTNEED
:在不久的将来应用程序不打算访问指定范围中的页面,内核从页缓冲中删除指定的范围。
- 1.写文件句柄写入
needle
- 2.
flush
一把 - 3.更新
SuperBlock
的Offset
和Size
func (b *SuperBlock) flush(force bool) (err error) {
var (
fd uintptr
offset int64
size int64
)
// sync write operation after N write
//判断是否需要sync磁盘
if b.write++; !force && b.write < b.conf.Block.SyncWrite {
return
}
b.write = 0
offset = needle.BlockOffset(b.syncOffset)
size = needle.BlockOffset(b.Offset - b.syncOffset)
fd = b.w.Fd()
//true
//是否可以使用sync_file_range
if b.conf.Block.Syncfilerange {
if err = myos.Syncfilerange(fd, offset, size, myos.SYNC_FILE_RANGE_WRITE); err != nil {
log.Errorf("block: %s Syncfilerange() error(%v)", b.File, err)
b.LastErr = err
return
}
} else {
if err = myos.Fdatasync(fd); err != nil {
log.Errorf("block: %s Fdatasync() error(%v)", b.File, err)
b.LastErr = err
return
}
}
//在不久的将来应用程序不打算访问指定范围中的页面,内核从页缓冲中删除指定的范围。
if err = myos.Fadvise(fd, offset, size, myos.POSIX_FADV_DONTNEED); err == nil {
b.syncOffset = b.Offset
} else {
log.Errorf("block: %s Fadvise() error(%v)", b.File, err)
b.LastErr = err
}
return
}
// Write write needle to the block.
func (b *SuperBlock) Write(n *needle.Needle) (err error) {
if b.LastErr != nil {
return b.LastErr
}
if _maxOffset-n.IncrOffset < b.Offset {
err = errors.ErrSuperBlockNoSpace
return
}
//写入needle
if _, err = b.w.Write(n.Buffer()); err == nil {
// flush一下
err = b.flush(false)
} else {
b.LastErr = err
return
}
//更新Offset和Size
b.Offset += n.IncrOffset
b.Size += int64(n.TotalSize)
return
}
Indexer.Add
异步写入,先写入ring
这个循环数组中,当缓冲区满了之后或者一定时间之内,将缓冲区中的对象写入到文件中
// Add append a index data to ring.
func (i *Indexer) Add(key int64, offset uint32, size int32) (err error) {
var index *Index
if i.LastErr != nil {
return i.LastErr
}
if index, err = i.ring.Set(); err != nil {
i.LastErr = err
return
}
index.Key = key
index.Offset = offset
index.Size = size
i.ring.SetAdv()
if i.ring.Buffered() > i.conf.Index.MergeWrite {
i.Signal()
}
return
}