Store下载

Volume.Read
- 根据函数参数
key
定位到needles
的cache data 代表Offset
,TotalSize
的nc
needle.NewReader(key, nc)
,初始化Needle n
,将nc的值解析到Needle n
的Offset
,TotalSize
- 调用
Volume.read
返回needle.Needle
Block
的ReadAt
方法
- 校验
Key
,Size
- 判断是否
needle
是否删除,如果已经被删除,需要更新needle cache data
- 校验
Cookie
// Read get a needle by key and cookie and write to wr.
func (v *Volume) Read(key int64, cookie int32) (n *needle.Needle, err error) {
var (
ok bool
nc int64
)
v.lock.RLock()
if nc, ok = v.needles[key]; !ok {
err = errors.ErrNeedleNotExist
}
v.lock.RUnlock()
if err == nil {
if n = needle.NewReader(key, nc); n.Offset != needle.CacheDelOffset {
if err = v.read(n); err == nil {
if n.Cookie != cookie {
err = errors.ErrNeedleCookie
}
}
} else {
err = errors.ErrNeedleDeleted
}
if err != nil {
n.Close()
n = nil
}
}
return
}
Volume.read
- 调用
SuperBlock.ReadAt
方法,读取数据到Needle n中
- 判断
Key
,TotalSize
,Flag
- 更新Volume统计数据
func (v *Volume) read(n *needle.Needle) (err error) {
var (
key = n.Key
size = n.TotalSize
now = time.Now().UnixNano()
)
// TODO iops limit
// pread syscall is atomic, no lock
if err = v.Block.ReadAt(n); err != nil {
return
}
if n.Key != key {
return errors.ErrNeedleKey
}
if n.TotalSize != size {
return errors.ErrNeedleSize
}
if log.V(1) {
log.Infof("get needle key: %d, cookie: %d, offset: %d, size: %d", n.Key, n.Cookie, n.Offset, size)
log.Infof("%v\n", n)
}
// needles map may be out-dated, recheck
if n.Flag == needle.FlagDel {
v.lock.Lock()
v.needles[key] = needle.NewCache(needle.CacheDelOffset, size)
v.lock.Unlock()
err = errors.ErrNeedleDeleted
} else {
atomic.AddUint64(&v.Stats.TotalGetProcessed, 1)
atomic.AddUint64(&v.Stats.TotalReadBytes, uint64(size))
atomic.AddUint64(&v.Stats.TotalGetDelay, uint64(time.Now().UnixNano()-now))
}
return
}
SuperBlock.ReadAt
func (b *SuperBlock) ReadAt(n *needle.Needle) (err error) {
if b.LastErr != nil {
return b.LastErr
}
if _, err = b.r.ReadAt(n.Buffer(), needle.BlockOffset(n.Offset)); err == nil {
err = n.Parse()
} else {
b.LastErr = err
}
return
}