Store管理
- AddVolume
- AddFreeVolume
- CompactVolume
- DelVolume
- BulkVolume
AddVolume
- 从
Free Volume中取出空闲的Volume然后进行上线
- 从
Free Volume Meta文件中删除一项,Store.saveFreeVolumeIndex
- 在
Volume Meta文件中新增一项,Store.saveVolumeIndex
- 在
zk集群上zk.AddVolume,同步Volume Meta到zk集群上
// freeVolume get a free volume.
func (s *Store) freeVolume(id int32) (v *volume.Volume, err error) {
var (
i int
bfile, nbfile, ifile, nifile, bdir, idir string
)
s.flock.Lock()
defer s.flock.Unlock()
if len(s.FreeVolumes) == 0 {
err = errors.ErrStoreNoFreeVolume
return
}
//第一个Free Volume
v = s.FreeVolumes[0]
s.FreeVolumes = s.FreeVolumes[1:]
v.Id = id
bfile, ifile = v.Block.File, v.Indexer.File
bdir, idir = filepath.Dir(bfile), filepath.Dir(ifile)
for {
//${id}_${i}
nbfile, nifile = s.file(id, bdir, idir, i)
if !myos.Exist(nbfile) && !myos.Exist(nifile) {
break
}
i++
}
log.Infof("rename block: %s to %s", bfile, nbfile)
log.Infof("rename index: %s to %s", ifile, nifile)
//重命名index file
if err = os.Rename(ifile, nifile); err != nil {
log.Errorf("os.Rename(\"%s\", \"%s\") error(%v)", ifile, nifile, err)
v.Destroy()
return
}
//重命名block file
if err = os.Rename(bfile, nbfile); err != nil {
log.Errorf("os.Rename(\"%s\", \"%s\") error(%v)", bfile, nbfile, err)
v.Destroy()
return
}
//重新设置volume的Block、Index文件名称
v.Block.File = nbfile
v.Indexer.File = nifile
if err = v.Open(); err != nil {
v.Destroy()
return
}
//将最新的Free Volume Index信息保存到磁盘
err = s.saveFreeVolumeIndex()
return
}
// addVolume atomic add volume by copy-on-write.
func (s *Store) addVolume(id int32, nv *volume.Volume) {
var (
vid int32
v *volume.Volume
volumes = make(map[int32]*volume.Volume, len(s.Volumes)+1)
)
for vid, v = range s.Volumes {
volumes[vid] = v
}
volumes[id] = nv
// goroutine safe replace
s.Volumes = volumes
}
// AddVolume add a new volume.
func (s *Store) AddVolume(id int32) (v *volume.Volume, err error) {
var ov *volume.Volume
// try check exists
// 检查是否存在
if ov = s.Volumes[id]; ov != nil {
return nil, errors.ErrVolumeExist
}
// find a free volume
// 选中一个Free Volumn
if v, err = s.freeVolume(id); err != nil {
return
}
s.vlock.Lock()
if ov = s.Volumes[id]; ov == nil {
//内部函数addVolume,从free volume中删除一项,并将更新后的最新的信息保存。
s.addVolume(id, v)
//将新的volume保存到meta文件中
if err = s.saveVolumeIndex(); err == nil {
// 同步信息到zk集群上
err = s.zk.AddVolume(id, v.Meta())
}
if err != nil {
log.Errorf("add volume: %d error(%v), local index or zookeeper index may save failed", id, err)
}
} else {
err = errors.ErrVolumeExist
}
s.vlock.Unlock()
if err == errors.ErrVolumeExist {
v.Destroy()
}
return
}
// saveVolumeIndex save volumes index info to disk.
// 保存 Free Volume Meta 文件
func (s *Store) saveVolumeIndex() (err error) {
var (
tn, n int
v *volume.Volume
)
if _, err = s.vf.Seek(0, os.SEEK_SET); err != nil {
log.Errorf("vf.Seek() error(%v)", err)
return
}
//遍历Volumes每一项,写到磁盘中
for _, v = range s.Volumes {
if n, err = s.vf.WriteString(fmt.Sprintf("%s\n", string(v.Meta()))); err != nil {
log.Errorf("vf.WriteString() error(%v)", err)
return
}
tn += n
}
// sync刷磁盘
if err = s.vf.Sync(); err != nil {
log.Errorf("vf.Sync() error(%v)", err)
return
}
//os.Truncate(name, size),,将文件进行截断
if err = os.Truncate(s.conf.Store.VolumeIndex, int64(tn)); err != nil {
log.Errorf("os.Truncate() error(%v)", err)
}
return
}
AddFreeVolume
- 根据
bdir和idir以及FreeId获取文件名
- 新建volume
- 添加到
Free Volume上
Store.saveFreeVolumeIndex保存到meta文件中,启动的时候可以加载
// AddFreeVolume add free volumes.
func (s *Store) AddFreeVolume(n int, bdir, idir string) (sn int, err error) {
var (
i int
bfile, ifile string
v *volume.Volume
)
s.flock.Lock()
for i = 0; i < n; i++ {
s.FreeId++
//根据bdir和idir以及FreeId获取文件名
bfile, ifile = s.freeFile(s.FreeId, bdir, idir)
if myos.Exist(bfile) || myos.Exist(ifile) {
continue
}
//新建volume
if v, err = newVolume(volumeFreeId, bfile, ifile, s.conf); err != nil {
// if no free space, delete the file
os.Remove(bfile)
os.Remove(ifile)
break
}
v.Close()
//添加到Free Volume上
s.FreeVolumes = append(s.FreeVolumes, v)
sn++
}
//保存
err = s.saveFreeVolumeIndex()
s.flock.Unlock()
return
}
// 保存 Free Volume Meta文件
// saveFreeVolumeIndex save free volumes index info to disk.
func (s *Store) saveFreeVolumeIndex() (err error) {
var (
tn, n int
v *volume.Volume
)
// 定位到文件开头
if _, err = s.fvf.Seek(0, os.SEEK_SET); err != nil {
log.Errorf("fvf.Seek() error(%v)", err)
return
}
//遍历 FreeVolumes,将每一项分别追加到文件中
for _, v = range s.FreeVolumes {
if n, err = s.fvf.WriteString(fmt.Sprintf("%s\n", string(v.Meta()))); err != nil {
log.Errorf("fvf.WriteString() error(%v)", err)
return
}
tn += n
}
// sync刷磁盘
if err = s.fvf.Sync(); err != nil {
log.Errorf("fvf.saveFreeVolumeIndex Sync() error(%v)", err)
return
}
//os.Truncate(name, size),,将文件进行截断
if err = os.Truncate(s.conf.Store.FreeVolumeIndex, int64(tn)); err != nil {
log.Errorf("os.Truncate() error(%v)", err)
}
return
}
CompactVolume
- 将老的
volume数据copy到新的volume中
// CompactVolume compact a super block to another file.
func (s *Store) CompactVolume(id int32) (err error) {
var (
v, nv *volume.Volume
bdir, idir string
)
// try check volume
if v = s.Volumes[id]; v != nil {
if v.Compact {
return errors.ErrVolumeInCompact
}
} else {
return errors.ErrVolumeExist
}
// find a free volume
if nv, err = s.freeVolume(id); err != nil {
return
}
log.Infof("start compact volume: (%d) %s to %s", id, v.Block.File, nv.Block.File)
// no lock here, Compact is no side-effect
if err = v.StartCompact(nv); err != nil {
nv.Destroy()
v.StopCompact(nil)
return
}
s.vlock.Lock()
if v = s.Volumes[id]; v != nil {
log.Infof("stop compact volume: (%d) %s to %s", id, v.Block.File, nv.Block.File)
if err = v.StopCompact(nv); err == nil {
// WARN no need update volumes map, use same object, only update
// zookeeper the local index cause the block and index file changed.
if err = s.saveVolumeIndex(); err == nil {
err = s.zk.SetVolume(id, v.Meta())
}
if err != nil {
log.Errorf("compact volume: %d error(%v), local index or zookeeper index may save failed", id, err)
}
}
} else {
// never happen
err = errors.ErrVolumeExist
log.Errorf("compact volume: %d not exist(may bug)", id)
}
s.vlock.Unlock()
// WARN if failed, nv is free volume, if succeed nv replace with v.
// Sleep untill anyone had old volume variables all processed.
time.Sleep(_compactSleep)
nv.Destroy()
if err == nil {
bdir, idir = filepath.Dir(nv.Block.File), filepath.Dir(nv.Indexer.File)
_, err = s.AddFreeVolume(1, bdir, idir)
}
return
}
DelVolume
// delVolume atomic del volume by copy-on-write.
func (s *Store) delVolume(id int32) {
var (
vid int32
v *volume.Volume
volumes = make(map[int32]*volume.Volume, len(s.Volumes)-1)
)
for vid, v = range s.Volumes {
volumes[vid] = v
}
delete(volumes, id)
// goroutine safe replace
s.Volumes = volumes
}
// DelVolume del the volume by volume id.
func (s *Store) DelVolume(id int32) (err error) {
var v *volume.Volume
s.vlock.Lock()
if v = s.Volumes[id]; v != nil {
if !v.Compact {
s.delVolume(id)
if err = s.saveVolumeIndex(); err == nil {
err = s.zk.DelVolume(id)
}
if err != nil {
log.Errorf("del volume: %d error(%v), local index or zookeeper index may save failed", id, err)
}
} else {
err = errors.ErrVolumeInCompact
}
} else {
err = errors.ErrVolumeNotExist
}
s.vlock.Unlock()
// if succced or not meta data saved error, close volume
if err == nil || (err != errors.ErrVolumeInCompact &&
err != errors.ErrVolumeNotExist) {
v.Destroy()
}
return
}
BulkVolume
- 加载
Volume
- 添加到store的
Volumes
- 保存
Volume meta信息
- 同步到
zk集群上
// BulkVolume copy a super block from another store server add to this server.
func (s *Store) BulkVolume(id int32, bfile, ifile string) (err error) {
var v, nv *volume.Volume
// recovery new block
if nv, err = newVolume(id, bfile, ifile, s.conf); err != nil {
return
}
s.vlock.Lock()
if v = s.Volumes[id]; v == nil {
s.addVolume(id, nv)
if err = s.saveVolumeIndex(); err == nil {
err = s.zk.AddVolume(id, nv.Meta())
}
if err != nil {
log.Errorf("bulk volume: %d error(%v), local index or zookeeper index may save failed", id, err)
}
} else {
err = errors.ErrVolumeExist
}
s.vlock.Unlock()
return
}