Store模块组件初始化

Store组件
store组件负责管理volume,新增Free volume,将Free Volume上线。同时根据请求中的vid参数,将相应的下载、上传请求转发给volume组件。
定义
// Store save volumes.
type Store struct {
//volume meta file
//这个文件里面保存了当前store模块上线的volume,以及每个volume文件所在的路径,volume index所在的文件
vf *os.File
//这个文件里面保存了当前store模块所有未被使用的volume,以及每个volume文件所在的路径,volume index所在的文件。当调用新增volume api的时候,会从free volume中选出一个,删除掉,并在vf文件中新增加一行
fvf *os.File
//表示free volume的最大的id
FreeId int32
//当前上线的volume
Volumes map[int32]*volume.Volume // split volumes lock
//未上线的volumn
FreeVolumes []*volume.Volume
//zk客户端
zk *myzk.Zookeeper
conf *conf.Config
flock sync.Mutex // protect FreeId & saveIndex
vlock sync.Mutex // protect Volumes map
}
初始化流程
- 连接zk集群
- 打开
volume meta文件 - 打开
free volume meta文件 - 初始化,加载
volume meta和free volume meta文件
// NewStore
func NewStore(c *conf.Config) (s *Store, err error) {
s = &Store{}
//连接zk集群
if s.zk, err = myzk.NewZookeeper(c); err != nil {
return
}
s.conf = c
s.FreeId = 0
s.Volumes = make(map[int32]*volume.Volume)
//打开volume meta文件
if s.vf, err = os.OpenFile(c.Store.VolumeIndex, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
log.Errorf("os.OpenFile(\"%s\") error(%v)", c.Store.VolumeIndex, err)
s.Close()
return nil, err
}
//打开free volume meta文件
if s.fvf, err = os.OpenFile(c.Store.FreeVolumeIndex, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
log.Errorf("os.OpenFile(\"%s\") error(%v)", c.Store.FreeVolumeIndex, err)
s.Close()
return nil, err
}
//初始化,加载volume meta和 free volume meta文件
if err = s.init(); err != nil {
s.Close()
return nil, err
}
return
}
// init init the store.
func (s *Store) init() (err error) {
if err = s.parseFreeVolumeIndex(); err == nil {
err = s.parseVolumeIndex()
}
return
}
// parseFreeVolumeIndex parse free index from local.
func (s *Store) parseFreeVolumeIndex() (err error) {
var (
i int
id int32
bfile string
ifile string
v *volume.Volume
data []byte
ids []int32
lines []string
bfs []string
ifs []string
)
if data, err = ioutil.ReadAll(s.fvf); err != nil {
log.Errorf("ioutil.ReadAll() error(%v)", err)
return
}
lines = strings.Split(string(data), "\n")
//解析mata文件
if _, ids, bfs, ifs, err = s.parseIndex(lines); err != nil {
return
}
for i = 0; i < len(bfs); i++ {
id, bfile, ifile = ids[i], bfs[i], ifs[i]
//创建新的volumn
if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
return
}
v.Close()
//追加到FreeVolumn上
s.FreeVolumes = append(s.FreeVolumes, v)
if id = s.fileFreeId(bfile); id > s.FreeId {
s.FreeId = id
}
}
log.V(1).Infof("current max free volume id: %d", s.FreeId)
return
}
// parseVolumeIndex parse index from local config and zookeeper.
func (s *Store) parseVolumeIndex() (err error) {
var (
i int
ok bool
id int32
bfile string
ifile string
v *volume.Volume
data []byte
lids, zids []int32
lines []string
lbfs, lifs []string
zbfs, zifs []string
lim, zim map[int32]struct{}
)
if data, err = ioutil.ReadAll(s.vf); err != nil {
log.Errorf("ioutil.ReadAll() error(%v)", err)
return
}
//解析meta文件
lines = strings.Split(string(data), "\n")
if lim, lids, lbfs, lifs, err = s.parseIndex(lines); err != nil {
return
}
//从zk集群上同步volumn信息
if lines, err = s.zk.Volumes(); err != nil {
return
}
if zim, zids, zbfs, zifs, err = s.parseIndex(lines); err != nil {
return
}
// 遍历本地的local index,如果zk集群上没有,就把本地的增加到zk集群上
for i = 0; i < len(lbfs); i++ {
id, bfile, ifile = lids[i], lbfs[i], lifs[i]
if _, ok = s.Volumes[id]; ok {
continue
}
//新建volumn
if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
return
}
//保存到Volumes中
s.Volumes[id] = v
if _, ok = zim[id]; !ok {
if err = s.zk.AddVolume(id, v.Meta()); err != nil {
return
}
} else {
if err = s.zk.SetVolume(id, v.Meta()); err != nil {
return
}
}
}
// 遍历zk集群上的zk index,如果本地没有就把zk集群上的同步过来
for i = 0; i < len(zbfs); i++ {
id, bfile, ifile = zids[i], zbfs[i], zifs[i]
if _, ok = s.Volumes[id]; ok {
continue
}
// if not exists in local
if _, ok = lim[id]; !ok {
if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
return
}
s.Volumes[id] = v
}
}
//调用保存接口,将最新的volumn meta信息保存
err = s.saveVolumeIndex()
return
}
// parseIndex parse volume info from a index file.
// ------------------------------------
// | block_path,index_path,volume_id |
// | /bfs/block_1,/bfs/block_1.idx,1\n |
// | /bfs/block_2,/bfs/block_2.idx,2\n |
// -----------------------------------|
func (s *Store) parseIndex(lines []string) (im map[int32]struct{}, ids []int32, bfs, ifs []string, err error) {
var (
id int64
vid int32
line string
bfile string
ifile string
seps []string
)
im = make(map[int32]struct{})
for _, line = range lines {
if len(strings.TrimSpace(line)) == 0 {
continue
}
if seps = strings.Split(line, ","); len(seps) != 3 {
log.Errorf("volume index: \"%s\" format error", line)
err = errors.ErrStoreVolumeIndex
return
}
bfile = seps[0]
ifile = seps[1]
if id, err = strconv.ParseInt(seps[2], 10, 32); err != nil {
log.Errorf("volume index: \"%s\" format error", line)
return
}
vid = int32(id)
ids = append(ids, vid)
bfs = append(bfs, bfile)
ifs = append(ifs, ifile)
im[vid] = struct{}{}
log.V(1).Infof("parse volume index, id: %d, block: %s, index: %s", id, bfile, ifile)
}
return
}
Volume组件
每一个volume是由SuperBlock和Index两个组件组成,SuperBlock负责对图片数据文件的读写操作,SuperBlock的基本组成单位是Needle。Indexer负责对图片索引读写,通过Indexer可以快速定位到图片的Offset,Size。在Recovery的时候快速恢复,而不用对图片数据文件扫描一遍,如果在每次写Index文件的时候都使用sync同步的话成本会比较大,所以采用异步写可以提升系统的整体性能,假设在机器掉电之后,可能会丢失Index的最后写入的数据,怎么处理这种case?实际上在启动的时候先扫描Index文件,扫描完Index文件之后再对比Index的offset和volume的数据文件大小,将未来得及保存的needle的Index重新写入到index中即可。
volume meta文件
- 作用:标记正在使用的volumn以及空闲的volumn所在的文件路径,在store模块启动的时候需要加载
格式如下:
volume index file formaßt: ------------------------------------ | block_path,index_path,volume_id | | /bfs/block_1,/bfs/block_1.idx,1\n | | /bfs/block_2,/bfs/block_2.idx,2\n | -----------------------------------|
定义
// An store server contains many logic Volume, volume is superblock container.
type Volume struct {
//wait
wg sync.WaitGroup
lock sync.RWMutex
// meta
Id int32 `json:"id"`
Stats *stat.Stats `json:"stats"`
//superblock组件
Block *block.SuperBlock `json:"block"`
//index组件
Indexer *index.Indexer `json:"index"`
// data
// key:Key
// value: needle Cache
// NeedleCache needle meta data in memory.
// high 32bit = Offset
// low 32 bit = Size
// ----------------
// | int64 |
// ----------------
// | 32bit | 32bit |
// | offset | size |
// ----------------
needles map[int64]int64
ch chan uint32
conf *conf.Config
// compact
Compact bool `json:"compact"`
CompactOffset uint32 `json:"compact_offset"`
CompactTime int64 `json:"compact_time"`
compactKeys []int64
// status
closed bool
}
初始化
- 调用
Indexer的恢复接口,cache index到needles中- 传递一个函数指针给恢复接口
- 判断当前
Index代表的图片Offset和lastOffset大小关系 - 当前图片的大小+在文件中的偏移和
SuperBlock大小关系 - cache index到
needles中 - 记录当前offset
- 判断当前
- 传递一个函数指针给恢复接口
- 调用
SuperBlock的恢复接口,cache index到needles中- 传递一个函数指针给恢复接口
- 判断Needle是否有删除标记
- 没有删除标记,写入
Indexer中
- 没有删除标记,写入
- 判断Needle是否有删除标记
- 传递一个函数指针给恢复接口
- 校验恢复的出来的
Offset和Size大小 Indexer.Flush将index刷到磁盘上
// init recovery super block from index or super block.
func (v *Volume) init() (err error) {
var (
size int64
offset uint32
lastOffset uint32
)
// recovery from index
// 调用Indexer的恢复接口
if err = v.Indexer.Recovery(func(ix *index.Index) error {
// must no less than last offset
//判断当前Index代表的图片Offset和lastOffset
if ix.Offset < lastOffset {
log.Error("recovery index: %s lastoffset: %d error(%v)", ix, lastOffset, errors.ErrIndexOffset)
return errors.ErrIndexOffset
}
// WARN if index's offset more than the block, discard it.
if size = int64(ix.Size) + needle.BlockOffset(ix.Offset); size > v.Block.Size {
log.Error("recovery index: %s EOF", ix)
return errors.ErrIndexEOF
}
v.needles[ix.Key] = needle.NewCache(ix.Offset, ix.Size)
offset = ix.Offset + needle.NeedleOffset(int64(ix.Size))
lastOffset = ix.Offset
return nil
}); err != nil && err != errors.ErrIndexEOF {
return
}
// recovery from super block
if err = v.Block.Recovery(offset, func(n *needle.Needle, so, eo uint32) (err1 error) {
if n.Flag == needle.FlagOK {
if err1 = v.Indexer.Write(n.Key, so, n.TotalSize); err1 != nil {
return
}
} else {
so = needle.CacheDelOffset
}
v.needles[n.Key] = needle.NewCache(so, n.TotalSize)
return
}); err != nil {
return
}
// recheck offset, keep size and offset consistency
if v.Block.Size != needle.BlockOffset(v.Block.Offset) {
log.Errorf("block: %s [real size: %d, offset: %d] but [size: %d, offset: %d] not consistency",
v.Block.File, v.Block.Size, needle.NeedleOffset(v.Block.Size),
needle.BlockOffset(v.Block.Offset), v.Block.Offset)
return errors.ErrSuperBlockOffset
}
// flush index
err = v.Indexer.Flush()
return
}
Superblock组件

类定义
- 第一次打开
block文件时候,预分配block物理空间,写入header - 非第一次打开
block文件时候,首先解析header
// An Volume contains one superblock and many needles.
type SuperBlock struct {
//读文件句柄
r *os.File
//写文件句柄
w *os.File
conf *conf.Config
File string `json:"file"`
Offset uint32 `json:"offset"`
Size int64 `json:"size"`
LastErr error `json:"last_err"`
Ver byte `json:"ver"`
magic []byte `json:"-"`
Padding uint32 `json:"padding"`
// status
closed bool
write int
syncOffset uint32
}
初始化
// NewSuperBlock creae a new super block.
func NewSuperBlock(file string, c *conf.Config) (b *SuperBlock, err error) {
b = &SuperBlock{}
b.conf = c
b.File = file
b.closed = false
b.write = 0
b.syncOffset = 0
b.Padding = needle.PaddingSize
if b.w, err = os.OpenFile(file, os.O_WRONLY|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
b.Close()
return nil, err
}
if b.r, err = os.OpenFile(file, os.O_RDONLY|myos.O_NOATIME, 0664); err != nil {
log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
b.Close()
return nil, err
}
if err = b.init(); err != nil {
log.Errorf("block: %s init() error(%v)", file, err)
b.Close()
return nil, err
}
return
}
// 写入super block的meta信息
// writeMeta write block meta info.
--------------
| magic number | ---- 4bytes
| version | ---- 1byte
| padding | ---- aligned with needle padding size (for furtuer used)
--------------
func (b *SuperBlock) writeMeta() (err error) {
// magic
if _, err = b.w.Write(_magic); err != nil {
return
}
// ver
if _, err = b.w.Write(_ver); err != nil {
return
}
// padding
_, err = b.w.Write(_padding)
return
}
// parseMeta parse block meta info.
func (b *SuperBlock) parseMeta() (err error) {
var buf = make([]byte, _headerSize)
if _, err = b.r.Read(buf[:_headerSize]); err != nil {
return
}
b.magic = buf[_magicOffset : _magicOffset+_magicSize]
b.Ver = buf[_verOffset : _verOffset+_verSize][0]
if !bytes.Equal(b.magic, _magic) {
return errors.ErrSuperBlockMagic
}
if b.Ver == Ver1 {
return errors.ErrSuperBlockVer
}
return
}
//第一次打开`block`文件时候,预分配block物理空间,写入header
//非第一次打开`block`文件时候,首先解析`header`
// init init block file, add/parse meta info.
func (b *SuperBlock) init() (err error) {
var stat os.FileInfo
if stat, err = b.r.Stat(); err != nil {
log.Errorf("block: %s Stat() error(%v)", b.File, err)
return
}
if b.Size = stat.Size(); b.Size == 0 {
//预分配block物理空间
if err = myos.Fallocate(b.w.Fd(), myos.FALLOC_FL_KEEP_SIZE, 0, _maxSize); err != nil {
log.Errorf("block: %s Fallocate() error(%s)", b.File, err)
return
}
//写入header
if err = b.writeMeta(); err != nil {
log.Errorf("block: %s writeMeta() error(%v)", b.File, err)
return
}
b.Size = _headerSize
} else {
//解析header
if err = b.parseMeta(); err != nil {
log.Errorf("block: %s parseMeta() error(%v)", b.File, err)
return
}
//seek到头
if _, err = b.w.Seek(_headerOffset, os.SEEK_SET); err != nil {
log.Errorf("block: %s Seek() error(%v)", b.File, err)
return
}
}
b.Offset = needle.NeedleOffset(_headerOffset)
return
}
Indexer组件
类定义
// Index for fast recovery super block needle cache in memory, index is async
// append the needle meta data.
//
// index file format:
// ---------------
// | super block |
// ---------------
// | needle | ----------------
// | needle | | key (int64) |
// | needle | ----> | offset (uint) |
// | needle | | size (int32) |
// | ...... | ----------------
// | ...... | int bigendian
//
// field | explanation
// --------------------------------------------------
// key | needle key (photo id)
// offset | needle offset in super block (aligned)
// size | needle data size
// Indexer used for fast recovery super block needle cache.
type Ring struct {
// read
rn int64
rp int
// write
wn int64
wp int
// info
num int
data []Index
}
type Indexer struct {
wg sync.WaitGroup
f *os.File
signal chan int
ring *Ring
// buffer
buf []byte
bn int
File string `json:"file"`
LastErr error `json:"last_err"`
Offset int64 `json:"offset"`
conf *conf.Config
//status
syncOffset int64
closed bool
write int
}
// Index index data.
type Index struct {
Key int64
Offset uint32
Size int32
}
初始化
// NewIndexer new a indexer for async merge index data to disk.
func NewIndexer(file string, conf *conf.Config) (i *Indexer, err error) {
var stat os.FileInfo
i = &Indexer{}
i.File = file
i.closed = false
i.syncOffset = 0
i.conf = conf
// must align size
i.ring = NewRing(conf.Index.RingBuffer)
i.bn = 0
i.buf = make([]byte, conf.Index.BufferSize)
if i.f, err = os.OpenFile(file, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
return nil, err
}
if stat, err = i.f.Stat(); err != nil {
log.Errorf("index: %s Stat() error(%v)", i.File, err)
return nil, err
}
if stat.Size() == 0 {
if err = myos.Fallocate(i.f.Fd(), myos.FALLOC_FL_KEEP_SIZE, 0, _fallocSize); err != nil {
log.Errorf("index: %s fallocate() error(err)", i.File, err)
i.Close()
return nil, err
}
}
i.wg.Add(1)
i.signal = make(chan int, 1)
go i.merge()
return
}