Store上传

Volume.Write

  • 更新传递过来的Needle nOffset
  • 调用Block.Write方法,在super block中写入needle
  • Indexer.Add写入key对应的信息,OffsetTotalSize
  • 更新统计信息
// Write add a needle, if key exists append to super block, then update
// needle cache offset to new offset.
func (v *Volume) Write(n *needle.Needle) (err error) {
    var (
        ok     bool
        nc     int64
        offset uint32
        now    = time.Now().UnixNano()
    )
    v.lock.Lock()
    n.Offset = v.Block.Offset
    if err = v.Block.Write(n); err == nil {
        if err = v.Indexer.Add(n.Key, n.Offset, n.TotalSize); err == nil {
            nc, ok = v.needles[n.Key]
            v.needles[n.Key] = needle.NewCache(n.Offset, n.TotalSize)
        }
    }
    v.lock.Unlock()
    if err == nil {
        if log.V(1) {
            log.Infof("add needle, offset: %d, size: %d", n.Offset, n.TotalSize)
            log.Info(n)
        }
        if ok {
            offset, _ = needle.Cache(nc)
            v.del(offset)
        }
        atomic.AddUint64(&v.Stats.TotalWriteProcessed, 1)
        atomic.AddUint64(&v.Stats.TotalWriteBytes, uint64(n.TotalSize))
        atomic.AddUint64(&v.Stats.TotalWriteDelay, uint64(time.Now().UnixNano()-now))
    }
    return
}

SuperBlock.Write

sync_file_range 可以将文件的部分范围作为目标,将对应范围内的脏页刷回磁盘,而不是整个文件的范围。好处是,当我们对大文件进行了修改时,如果修改了大量的数据块,我们最后fsync的时候,可能会很慢。即使fdatasync,也是有问题的,例如这个大文件的长度在我们的修改过程中发生了变化,那么fdatasync将同时写metadata,而对于文件系统来说,单个文件系统的写metadata是串行的,这势必导致影响其他用户操作metadata(如创建文件)。 sync_file_range是绝对不会写metadata的,所以用它非常合适,每次对文件做了小范围的修改时,立即调用sync_file_range,把对应的脏数据刷到磁盘,那么在结束对文件的修改后,再调用fdatasync (flush dirty data page), fsync(flush dirty data+metadata page)都是很块的。

sync_file_range的几个flag, 注意SYNC_FILE_RANGE_WRITE是异步的,所以如果你要达到以上目的话,那么最好不要使用异步模式,或者至少在调用fdatasyncfsync前,使用SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | SYNC_FILE_RANGE_WAIT_AFTER做一次全文件范围的sync_file_range。从而保证在调用fdatasyncfsync前,该文件的dirty page已经全部刷到磁盘了。

可以调用posix_fadvise函数来设置一些操作文件的方式,调用该函数后内核会把所指定的范围从页面缓冲区回收,也就是清除缓存。网上有人用这个来消除测试I/O的效率时cache的影响。advice的值可以是下面的几种:

  • POSIX_FADV_NORMAL:没有任何意见;
  • POSIX_FADV_RANDOM:程序打算随机读写,内核禁用预读功能,每次读取最少量的数据;
  • POSIX_FADV_SEQUENTIALP:打算顺序的方式访问,内核把预读的大小扩大一倍;
  • POSIX_FADV_WILLNEED:在不久的将来程序将访问该段内容,内核开启预读,把它们读入;
  • POSIX_FADV_NOREUSE:将来打算访问当只访问一次,但内核行为如同4;
  • POSIX_FADV_DONTNEED:在不久的将来应用程序不打算访问指定范围中的页面,内核从页缓冲中删除指定的范围。
  • 1.写文件句柄写入needle
  • 2.flush一把
  • 3.更新SuperBlockOffsetSize
func (b *SuperBlock) flush(force bool) (err error) {
    var (
        fd     uintptr
        offset int64
        size   int64
    )
    // sync write operation after N write
    //判断是否需要sync磁盘
    if b.write++; !force && b.write < b.conf.Block.SyncWrite {
        return
    }
    b.write = 0
    offset = needle.BlockOffset(b.syncOffset)
    size = needle.BlockOffset(b.Offset - b.syncOffset)
    fd = b.w.Fd()
    //true
    //是否可以使用sync_file_range
    if b.conf.Block.Syncfilerange {
        if err = myos.Syncfilerange(fd, offset, size, myos.SYNC_FILE_RANGE_WRITE); err != nil {
            log.Errorf("block: %s Syncfilerange() error(%v)", b.File, err)
            b.LastErr = err
            return
        }
    } else {
        if err = myos.Fdatasync(fd); err != nil {
            log.Errorf("block: %s Fdatasync() error(%v)", b.File, err)
            b.LastErr = err
            return
        }
    }
    //在不久的将来应用程序不打算访问指定范围中的页面,内核从页缓冲中删除指定的范围。
    if err = myos.Fadvise(fd, offset, size, myos.POSIX_FADV_DONTNEED); err == nil {
        b.syncOffset = b.Offset
    } else {
        log.Errorf("block: %s Fadvise() error(%v)", b.File, err)
        b.LastErr = err
    }
    return
}

// Write write needle to the block.
func (b *SuperBlock) Write(n *needle.Needle) (err error) {
    if b.LastErr != nil {
        return b.LastErr
    }
    if _maxOffset-n.IncrOffset < b.Offset {
        err = errors.ErrSuperBlockNoSpace
        return
    }
    //写入needle
    if _, err = b.w.Write(n.Buffer()); err == nil {
        // flush一下
        err = b.flush(false)
    } else {
        b.LastErr = err
        return
    }
    //更新Offset和Size
    b.Offset += n.IncrOffset
    b.Size += int64(n.TotalSize)
    return
}

Indexer.Add

异步写入,先写入ring这个循环数组中,当缓冲区满了之后或者一定时间之内,将缓冲区中的对象写入到文件中

// Add append a index data to ring.
func (i *Indexer) Add(key int64, offset uint32, size int32) (err error) {
    var index *Index
    if i.LastErr != nil {
        return i.LastErr
    }
    if index, err = i.ring.Set(); err != nil {
        i.LastErr = err
        return
    }
    index.Key = key
    index.Offset = offset
    index.Size = size
    i.ring.SetAdv()
    if i.ring.Buffered() > i.conf.Index.MergeWrite {
        i.Signal()
    }
    return
}

results matching ""

    No results matching ""