Store下载

Volume.Read

  • 根据函数参数key定位到needles的cache data 代表Offset,TotalSize的nc
  • needle.NewReader(key, nc),初始化Needle n,将nc的值解析到Needle nOffset,TotalSize
  • 调用Volume.read返回needle.Needle
    • BlockReadAt方法
    • 校验Key,Size
    • 判断是否needle是否删除,如果已经被删除,需要更新needle cache data
  • 校验Cookie
// Read get a needle by key and cookie and write to wr.
func (v *Volume) Read(key int64, cookie int32) (n *needle.Needle, err error) {
    var (
        ok bool
        nc int64
    )
    v.lock.RLock()
    if nc, ok = v.needles[key]; !ok {
        err = errors.ErrNeedleNotExist
    }
    v.lock.RUnlock()
    if err == nil {
        if n = needle.NewReader(key, nc); n.Offset != needle.CacheDelOffset {
            if err = v.read(n); err == nil {
                if n.Cookie != cookie {
                    err = errors.ErrNeedleCookie
                }
            }
        } else {
            err = errors.ErrNeedleDeleted
        }
        if err != nil {
            n.Close()
            n = nil
        }
    }
    return
}

Volume.read

  • 调用SuperBlock.ReadAt方法,读取数据到Needle n中
  • 判断Key,TotalSize,Flag
  • 更新Volume统计数据
func (v *Volume) read(n *needle.Needle) (err error) {
    var (
        key  = n.Key
        size = n.TotalSize
        now  = time.Now().UnixNano()
    )
    // TODO iops limit
    // pread syscall is atomic, no lock
    if err = v.Block.ReadAt(n); err != nil {
        return
    }
    if n.Key != key {
        return errors.ErrNeedleKey
    }
    if n.TotalSize != size {
        return errors.ErrNeedleSize
    }
    if log.V(1) {
        log.Infof("get needle key: %d, cookie: %d, offset: %d, size: %d", n.Key, n.Cookie, n.Offset, size)
        log.Infof("%v\n", n)
    }
    // needles map may be out-dated, recheck
    if n.Flag == needle.FlagDel {
        v.lock.Lock()
        v.needles[key] = needle.NewCache(needle.CacheDelOffset, size)
        v.lock.Unlock()
        err = errors.ErrNeedleDeleted
    } else {
        atomic.AddUint64(&v.Stats.TotalGetProcessed, 1)
        atomic.AddUint64(&v.Stats.TotalReadBytes, uint64(size))
        atomic.AddUint64(&v.Stats.TotalGetDelay, uint64(time.Now().UnixNano()-now))
    }
    return
}

SuperBlock.ReadAt

  • 读数据文件
  • Needle.Parse
func (b *SuperBlock) ReadAt(n *needle.Needle) (err error) {
    if b.LastErr != nil {
        return b.LastErr
    }
    if _, err = b.r.ReadAt(n.Buffer(), needle.BlockOffset(n.Offset)); err == nil {
        err = n.Parse()
    } else {
        b.LastErr = err
    }
    return
}

results matching ""

    No results matching ""