Store模块组件初始化

Store组件

store组件负责管理volume,新增Free volume,将Free Volume上线。同时根据请求中的vid参数,将相应的下载、上传请求转发给volume组件。

定义

// Store save volumes.
type Store struct {
    //volume meta file
    //这个文件里面保存了当前store模块上线的volume,以及每个volume文件所在的路径,volume index所在的文件
    vf          *os.File
    //这个文件里面保存了当前store模块所有未被使用的volume,以及每个volume文件所在的路径,volume index所在的文件。当调用新增volume api的时候,会从free volume中选出一个,删除掉,并在vf文件中新增加一行
    fvf         *os.File
    //表示free volume的最大的id
    FreeId      int32
    //当前上线的volume
    Volumes     map[int32]*volume.Volume // split volumes lock
    //未上线的volumn
    FreeVolumes []*volume.Volume
    //zk客户端
    zk          *myzk.Zookeeper
    conf        *conf.Config
    flock       sync.Mutex // protect FreeId & saveIndex
    vlock       sync.Mutex // protect Volumes map
}

初始化流程

  • 连接zk集群
  • 打开volume meta文件
  • 打开free volume meta文件
  • 初始化,加载volume metafree volume meta文件
// NewStore
func NewStore(c *conf.Config) (s *Store, err error) {
    s = &Store{}
    //连接zk集群
    if s.zk, err = myzk.NewZookeeper(c); err != nil {
        return
    }
    s.conf = c
    s.FreeId = 0
    s.Volumes = make(map[int32]*volume.Volume)
    //打开volume meta文件
    if s.vf, err = os.OpenFile(c.Store.VolumeIndex, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
        log.Errorf("os.OpenFile(\"%s\") error(%v)", c.Store.VolumeIndex, err)
        s.Close()
        return nil, err
    }
    //打开free volume meta文件
    if s.fvf, err = os.OpenFile(c.Store.FreeVolumeIndex, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
        log.Errorf("os.OpenFile(\"%s\") error(%v)", c.Store.FreeVolumeIndex, err)
        s.Close()
        return nil, err
    }
    //初始化,加载volume meta和 free volume meta文件
    if err = s.init(); err != nil {
        s.Close()
        return nil, err
    }
    return
}

// init init the store.
func (s *Store) init() (err error) {
    if err = s.parseFreeVolumeIndex(); err == nil {
        err = s.parseVolumeIndex()
    }
    return
}

// parseFreeVolumeIndex parse free index from local.
func (s *Store) parseFreeVolumeIndex() (err error) {
    var (
        i     int
        id    int32
        bfile string
        ifile string
        v     *volume.Volume
        data  []byte
        ids   []int32
        lines []string
        bfs   []string
        ifs   []string
    )
    if data, err = ioutil.ReadAll(s.fvf); err != nil {
        log.Errorf("ioutil.ReadAll() error(%v)", err)
        return
    }
    lines = strings.Split(string(data), "\n")
    //解析mata文件
    if _, ids, bfs, ifs, err = s.parseIndex(lines); err != nil {
        return
    }
    for i = 0; i < len(bfs); i++ {
        id, bfile, ifile = ids[i], bfs[i], ifs[i]
        //创建新的volumn
        if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
            return
        }
        v.Close()
        //追加到FreeVolumn上
        s.FreeVolumes = append(s.FreeVolumes, v)
        if id = s.fileFreeId(bfile); id > s.FreeId {
            s.FreeId = id
        }
    }
    log.V(1).Infof("current max free volume id: %d", s.FreeId)
    return
}

// parseVolumeIndex parse index from local config and zookeeper.
func (s *Store) parseVolumeIndex() (err error) {
    var (
        i          int
        ok         bool
        id         int32
        bfile      string
        ifile      string
        v          *volume.Volume
        data       []byte
        lids, zids []int32
        lines      []string
        lbfs, lifs []string
        zbfs, zifs []string
        lim, zim   map[int32]struct{}
    )
    if data, err = ioutil.ReadAll(s.vf); err != nil {
        log.Errorf("ioutil.ReadAll() error(%v)", err)
        return
    }
    //解析meta文件
    lines = strings.Split(string(data), "\n")
    if lim, lids, lbfs, lifs, err = s.parseIndex(lines); err != nil {
        return
    }
    //从zk集群上同步volumn信息
    if lines, err = s.zk.Volumes(); err != nil {
        return
    }
    if zim, zids, zbfs, zifs, err = s.parseIndex(lines); err != nil {
        return
    }
    // 遍历本地的local index,如果zk集群上没有,就把本地的增加到zk集群上
    for i = 0; i < len(lbfs); i++ {
        id, bfile, ifile = lids[i], lbfs[i], lifs[i]
        if _, ok = s.Volumes[id]; ok {
            continue
        }
        //新建volumn
        if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
            return
        }
        //保存到Volumes中
        s.Volumes[id] = v
        if _, ok = zim[id]; !ok {
            if err = s.zk.AddVolume(id, v.Meta()); err != nil {
                return
            }
        } else {
            if err = s.zk.SetVolume(id, v.Meta()); err != nil {
                return
            }
        }
    }
    // 遍历zk集群上的zk index,如果本地没有就把zk集群上的同步过来
    for i = 0; i < len(zbfs); i++ {
        id, bfile, ifile = zids[i], zbfs[i], zifs[i]
        if _, ok = s.Volumes[id]; ok {
            continue
        }
        // if not exists in local
        if _, ok = lim[id]; !ok {
            if v, err = newVolume(id, bfile, ifile, s.conf); err != nil {
                return
            }
            s.Volumes[id] = v
        }
    }
    //调用保存接口,将最新的volumn meta信息保存
    err = s.saveVolumeIndex()
    return
}

// parseIndex parse volume info from a index file.
//  ------------------------------------
// | block_path,index_path,volume_id   |
// | /bfs/block_1,/bfs/block_1.idx,1\n |
// | /bfs/block_2,/bfs/block_2.idx,2\n |
//  -----------------------------------|
func (s *Store) parseIndex(lines []string) (im map[int32]struct{}, ids []int32, bfs, ifs []string, err error) {
    var (
        id    int64
        vid   int32
        line  string
        bfile string
        ifile string
        seps  []string
    )
    im = make(map[int32]struct{})
    for _, line = range lines {
        if len(strings.TrimSpace(line)) == 0 {
            continue
        }
        if seps = strings.Split(line, ","); len(seps) != 3 {
            log.Errorf("volume index: \"%s\" format error", line)
            err = errors.ErrStoreVolumeIndex
            return
        }
        bfile = seps[0]
        ifile = seps[1]
        if id, err = strconv.ParseInt(seps[2], 10, 32); err != nil {
            log.Errorf("volume index: \"%s\" format error", line)
            return
        }
        vid = int32(id)
        ids = append(ids, vid)
        bfs = append(bfs, bfile)
        ifs = append(ifs, ifile)
        im[vid] = struct{}{}
        log.V(1).Infof("parse volume index, id: %d, block: %s, index: %s", id, bfile, ifile)
    }
    return
}

Volume组件

每一个volume是由SuperBlockIndex两个组件组成,SuperBlock负责对图片数据文件的读写操作,SuperBlock的基本组成单位是NeedleIndexer负责对图片索引读写,通过Indexer可以快速定位到图片的OffsetSize。在Recovery的时候快速恢复,而不用对图片数据文件扫描一遍,如果在每次写Index文件的时候都使用sync同步的话成本会比较大,所以采用异步写可以提升系统的整体性能,假设在机器掉电之后,可能会丢失Index的最后写入的数据,怎么处理这种case?实际上在启动的时候先扫描Index文件,扫描完Index文件之后再对比Indexoffsetvolume的数据文件大小,将未来得及保存的needleIndex重新写入到index中即可。

volume meta文件

  • 作用:标记正在使用的volumn以及空闲的volumn所在的文件路径,在store模块启动的时候需要加载
  • 格式如下:

    volume index file formaßt:
     ------------------------------------
    | block_path,index_path,volume_id   |
    | /bfs/block_1,/bfs/block_1.idx,1\n |
    | /bfs/block_2,/bfs/block_2.idx,2\n |
     -----------------------------------|
    

定义

// An store server contains many logic Volume, volume is superblock container.
type Volume struct {
    //wait
    wg   sync.WaitGroup
    lock sync.RWMutex
    // meta
    Id      int32             `json:"id"`
    Stats   *stat.Stats       `json:"stats"`
    //superblock组件
    Block   *block.SuperBlock `json:"block"`
    //index组件
    Indexer *index.Indexer    `json:"index"`
    // data
    // key:Key
    // value: needle Cache
    // NeedleCache needle meta data in memory.
    // high 32bit = Offset
    // low 32 bit = Size
    //  ----------------
    // |      int64     |
    //  ----------------
    // | 32bit  | 32bit |
    // | offset | size  |
    //  ----------------
    needles map[int64]int64
    ch      chan uint32
    conf    *conf.Config
    // compact
    Compact       bool   `json:"compact"`
    CompactOffset uint32 `json:"compact_offset"`
    CompactTime   int64  `json:"compact_time"`
    compactKeys   []int64
    // status
    closed bool
}

初始化

  • 调用Indexer的恢复接口,cache index到needles中
    • 传递一个函数指针给恢复接口
      • 判断当前Index代表的图片OffsetlastOffset大小关系
      • 当前图片的大小+在文件中的偏移和SuperBlock大小关系
      • cache index到needles
      • 记录当前offset
  • 调用SuperBlock的恢复接口,cache index到needles中
    • 传递一个函数指针给恢复接口
      • 判断Needle是否有删除标记
        • 没有删除标记,写入Indexer
  • 校验恢复的出来的OffsetSize大小
  • Indexer.Flushindex刷到磁盘上
// init recovery super block from index or super block.
func (v *Volume) init() (err error) {
    var (
        size       int64
        offset     uint32
        lastOffset uint32
    )
    // recovery from index
    // 调用Indexer的恢复接口
    if err = v.Indexer.Recovery(func(ix *index.Index) error {
        // must no less than last offset
        //判断当前Index代表的图片Offset和lastOffset
        if ix.Offset < lastOffset {
            log.Error("recovery index: %s lastoffset: %d error(%v)", ix, lastOffset, errors.ErrIndexOffset)
            return errors.ErrIndexOffset
        }
        // WARN if index's offset more than the block, discard it.
        if size = int64(ix.Size) + needle.BlockOffset(ix.Offset); size > v.Block.Size {
            log.Error("recovery index: %s EOF", ix)
            return errors.ErrIndexEOF
        }
        v.needles[ix.Key] = needle.NewCache(ix.Offset, ix.Size)
        offset = ix.Offset + needle.NeedleOffset(int64(ix.Size))
        lastOffset = ix.Offset
        return nil
    }); err != nil && err != errors.ErrIndexEOF {
        return
    }
    // recovery from super block
    if err = v.Block.Recovery(offset, func(n *needle.Needle, so, eo uint32) (err1 error) {
        if n.Flag == needle.FlagOK {
            if err1 = v.Indexer.Write(n.Key, so, n.TotalSize); err1 != nil {
                return
            }
        } else {
            so = needle.CacheDelOffset
        }
        v.needles[n.Key] = needle.NewCache(so, n.TotalSize)
        return
    }); err != nil {
        return
    }
    // recheck offset, keep size and offset consistency
    if v.Block.Size != needle.BlockOffset(v.Block.Offset) {
        log.Errorf("block: %s [real size: %d, offset: %d] but [size: %d, offset: %d] not consistency",
            v.Block.File, v.Block.Size, needle.NeedleOffset(v.Block.Size),
            needle.BlockOffset(v.Block.Offset), v.Block.Offset)
        return errors.ErrSuperBlockOffset
    }
    // flush index
    err = v.Indexer.Flush()
    return
}

Superblock组件

Facebook Haystack设计

类定义

  • 第一次打开block文件时候,预分配block物理空间,写入header
  • 非第一次打开block文件时候,首先解析header
// An Volume contains one superblock and many needles.
type SuperBlock struct {
    //读文件句柄
    r       *os.File
    //写文件句柄
    w       *os.File
    conf    *conf.Config
    File    string `json:"file"`
    Offset  uint32 `json:"offset"`
    Size    int64  `json:"size"`
    LastErr error  `json:"last_err"`
    Ver     byte   `json:"ver"`
    magic   []byte `json:"-"`
    Padding uint32 `json:"padding"`
    // status
    closed     bool
    write      int
    syncOffset uint32
}

初始化

// NewSuperBlock creae a new super block.
func NewSuperBlock(file string, c *conf.Config) (b *SuperBlock, err error) {
    b = &SuperBlock{}
    b.conf = c
    b.File = file
    b.closed = false
    b.write = 0
    b.syncOffset = 0
    b.Padding = needle.PaddingSize
    if b.w, err = os.OpenFile(file, os.O_WRONLY|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
        log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
        b.Close()
        return nil, err
    }
    if b.r, err = os.OpenFile(file, os.O_RDONLY|myos.O_NOATIME, 0664); err != nil {
        log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
        b.Close()
        return nil, err
    }
    if err = b.init(); err != nil {
        log.Errorf("block: %s init() error(%v)", file, err)
        b.Close()
        return nil, err
    }
    return
}

// 写入super block的meta信息
// writeMeta write block meta info.
  --------------
 | magic number |   ---- 4bytes
 | version      |   ---- 1byte
 | padding      |   ---- aligned with needle padding size (for furtuer  used)
  --------------
func (b *SuperBlock) writeMeta() (err error) {
    // magic
    if _, err = b.w.Write(_magic); err != nil {
        return
    }
    // ver
    if _, err = b.w.Write(_ver); err != nil {
        return
    }
    // padding
    _, err = b.w.Write(_padding)
    return
}

// parseMeta parse block meta info.
func (b *SuperBlock) parseMeta() (err error) {
    var buf = make([]byte, _headerSize)
    if _, err = b.r.Read(buf[:_headerSize]); err != nil {
        return
    }
    b.magic = buf[_magicOffset : _magicOffset+_magicSize]
    b.Ver = buf[_verOffset : _verOffset+_verSize][0]
    if !bytes.Equal(b.magic, _magic) {
        return errors.ErrSuperBlockMagic
    }
    if b.Ver == Ver1 {
        return errors.ErrSuperBlockVer
    }
    return
}

//第一次打开`block`文件时候,预分配block物理空间,写入header
//非第一次打开`block`文件时候,首先解析`header`

// init init block file, add/parse meta info.
func (b *SuperBlock) init() (err error) {
    var stat os.FileInfo
    if stat, err = b.r.Stat(); err != nil {
        log.Errorf("block: %s Stat() error(%v)", b.File, err)
        return
    }
    if b.Size = stat.Size(); b.Size == 0 {
        //预分配block物理空间
        if err = myos.Fallocate(b.w.Fd(), myos.FALLOC_FL_KEEP_SIZE, 0, _maxSize); err != nil {
            log.Errorf("block: %s Fallocate() error(%s)", b.File, err)
            return
        }
        //写入header
        if err = b.writeMeta(); err != nil {
            log.Errorf("block: %s writeMeta() error(%v)", b.File, err)
            return
        }
        b.Size = _headerSize
    } else {
        //解析header
        if err = b.parseMeta(); err != nil {
            log.Errorf("block: %s parseMeta() error(%v)", b.File, err)
            return
        }
        //seek到头
        if _, err = b.w.Seek(_headerOffset, os.SEEK_SET); err != nil {
            log.Errorf("block: %s Seek() error(%v)", b.File, err)
            return
        }
    }
    b.Offset = needle.NeedleOffset(_headerOffset)
    return
}

Indexer组件

类定义

// Index for fast recovery super block needle cache in memory, index is async
// append the needle meta data.
//
// index file format:
//  ---------------
// | super   block |
//  ---------------
// |     needle    |           ----------------
// |     needle    |          |  key (int64)   |
// |     needle    | ---->      |  offset (uint) |
// |     needle    |          |  size (int32)  |
// |     ......    |           ----------------
// |     ......    |             int bigendian
//
// field     | explanation
// --------------------------------------------------
// key       | needle key (photo id)
// offset    | needle offset in super block (aligned)
// size      | needle data size
// Indexer used for fast recovery super block needle cache.

type Ring struct {
    // read
    rn int64
    rp int
    // write
    wn int64
    wp int
    // info
    num  int
    data []Index
}

type Indexer struct {
    wg     sync.WaitGroup
    f      *os.File
    signal chan int
    ring   *Ring
    // buffer
    buf []byte
    bn  int
    File    string `json:"file"`
    LastErr error  `json:"last_err"`
    Offset  int64  `json:"offset"`
    conf    *conf.Config
    //status
    syncOffset int64
    closed     bool
    write      int
}

// Index index data.
type Index struct {
    Key    int64
    Offset uint32
    Size   int32
}

初始化

// NewIndexer new a indexer for async merge index data to disk.
func NewIndexer(file string, conf *conf.Config) (i *Indexer, err error) {
    var stat os.FileInfo
    i = &Indexer{}
    i.File = file
    i.closed = false
    i.syncOffset = 0
    i.conf = conf
    // must align size
    i.ring = NewRing(conf.Index.RingBuffer)
    i.bn = 0
    i.buf = make([]byte, conf.Index.BufferSize)
    if i.f, err = os.OpenFile(file, os.O_RDWR|os.O_CREATE|myos.O_NOATIME, 0664); err != nil {
        log.Errorf("os.OpenFile(\"%s\") error(%v)", file, err)
        return nil, err
    }
    if stat, err = i.f.Stat(); err != nil {
        log.Errorf("index: %s Stat() error(%v)", i.File, err)
        return nil, err
    }
    if stat.Size() == 0 {
        if err = myos.Fallocate(i.f.Fd(), myos.FALLOC_FL_KEEP_SIZE, 0, _fallocSize); err != nil {
            log.Errorf("index: %s fallocate() error(err)", i.File, err)
            i.Close()
            return nil, err
        }
    }
    i.wg.Add(1)
    i.signal = make(chan int, 1)
    go i.merge()
    return
}

results matching ""

    No results matching ""