Store管理

  • AddVolume
  • AddFreeVolume
  • CompactVolume
  • DelVolume
  • BulkVolume

AddVolume

  • Free Volume中取出空闲的Volume然后进行上线
  • Free Volume Meta文件中删除一项,Store.saveFreeVolumeIndex
  • Volume Meta文件中新增一项,Store.saveVolumeIndex
  • zk集群上zk.AddVolume,同步Volume Meta到zk集群上
// freeVolume get a free volume.
func (s *Store) freeVolume(id int32) (v *volume.Volume, err error) {
    var (
        i                                        int
        bfile, nbfile, ifile, nifile, bdir, idir string
    )
    s.flock.Lock()
    defer s.flock.Unlock()
    if len(s.FreeVolumes) == 0 {
        err = errors.ErrStoreNoFreeVolume
        return
    }
    //第一个Free Volume
    v = s.FreeVolumes[0]
    s.FreeVolumes = s.FreeVolumes[1:]
    v.Id = id
    bfile, ifile = v.Block.File, v.Indexer.File
    bdir, idir = filepath.Dir(bfile), filepath.Dir(ifile)
    for {
        //${id}_${i}
        nbfile, nifile = s.file(id, bdir, idir, i)
        if !myos.Exist(nbfile) && !myos.Exist(nifile) {
            break
        }
        i++
    }
    log.Infof("rename block: %s to %s", bfile, nbfile)
    log.Infof("rename index: %s to %s", ifile, nifile)
    //重命名index file
    if err = os.Rename(ifile, nifile); err != nil {
        log.Errorf("os.Rename(\"%s\", \"%s\") error(%v)", ifile, nifile, err)
        v.Destroy()
        return
    }
    //重命名block file
    if err = os.Rename(bfile, nbfile); err != nil {
        log.Errorf("os.Rename(\"%s\", \"%s\") error(%v)", bfile, nbfile, err)
        v.Destroy()
        return
    }
    //重新设置volume的Block、Index文件名称
    v.Block.File = nbfile
    v.Indexer.File = nifile
    if err = v.Open(); err != nil {
        v.Destroy()
        return
    }
    //将最新的Free Volume Index信息保存到磁盘
    err = s.saveFreeVolumeIndex()
    return
}
// addVolume atomic add volume by copy-on-write.
func (s *Store) addVolume(id int32, nv *volume.Volume) {
    var (
        vid     int32
        v       *volume.Volume
        volumes = make(map[int32]*volume.Volume, len(s.Volumes)+1)
    )
    for vid, v = range s.Volumes {
        volumes[vid] = v
    }
    volumes[id] = nv
    // goroutine safe replace
    s.Volumes = volumes
}

// AddVolume add a new volume.
func (s *Store) AddVolume(id int32) (v *volume.Volume, err error) {
    var ov *volume.Volume
    // try check exists
    // 检查是否存在
    if ov = s.Volumes[id]; ov != nil {
        return nil, errors.ErrVolumeExist
    }
    // find a free volume
    // 选中一个Free Volumn
    if v, err = s.freeVolume(id); err != nil {
        return
    }
    s.vlock.Lock()
    if ov = s.Volumes[id]; ov == nil {
        //内部函数addVolume,从free volume中删除一项,并将更新后的最新的信息保存。
        s.addVolume(id, v)
        //将新的volume保存到meta文件中
        if err = s.saveVolumeIndex(); err == nil {
            // 同步信息到zk集群上
            err = s.zk.AddVolume(id, v.Meta())
        }
        if err != nil {
            log.Errorf("add volume: %d error(%v), local index or zookeeper index may save failed", id, err)
        }
    } else {
        err = errors.ErrVolumeExist
    }
    s.vlock.Unlock()
    if err == errors.ErrVolumeExist {
        v.Destroy()
    }
    return
}

// saveVolumeIndex save volumes index info to disk.
// 保存 Free Volume Meta 文件
func (s *Store) saveVolumeIndex() (err error) {
    var (
        tn, n int
        v     *volume.Volume
    )
    if _, err = s.vf.Seek(0, os.SEEK_SET); err != nil {
        log.Errorf("vf.Seek() error(%v)", err)
        return
    }
    //遍历Volumes每一项,写到磁盘中
    for _, v = range s.Volumes {
        if n, err = s.vf.WriteString(fmt.Sprintf("%s\n", string(v.Meta()))); err != nil {
            log.Errorf("vf.WriteString() error(%v)", err)
            return
        }
        tn += n
    }
    // sync刷磁盘
    if err = s.vf.Sync(); err != nil {
        log.Errorf("vf.Sync() error(%v)", err)
        return
    }
    //os.Truncate(name, size),,将文件进行截断
    if err = os.Truncate(s.conf.Store.VolumeIndex, int64(tn)); err != nil {
        log.Errorf("os.Truncate() error(%v)", err)
    }
    return
}

AddFreeVolume

  • 根据bdiridir以及FreeId获取文件名
  • 新建volume
  • 添加到Free Volume
  • Store.saveFreeVolumeIndex保存到meta文件中,启动的时候可以加载
// AddFreeVolume add free volumes.
func (s *Store) AddFreeVolume(n int, bdir, idir string) (sn int, err error) {
    var (
        i            int
        bfile, ifile string
        v            *volume.Volume
    )
    s.flock.Lock()
    for i = 0; i < n; i++ {
        s.FreeId++
        //根据bdir和idir以及FreeId获取文件名
        bfile, ifile = s.freeFile(s.FreeId, bdir, idir)
        if myos.Exist(bfile) || myos.Exist(ifile) {
            continue
        }
        //新建volume
        if v, err = newVolume(volumeFreeId, bfile, ifile, s.conf); err != nil {
            // if no free space, delete the file
            os.Remove(bfile)
            os.Remove(ifile)
            break
        }
        v.Close()
        //添加到Free Volume上
        s.FreeVolumes = append(s.FreeVolumes, v)
        sn++
    }
    //保存
    err = s.saveFreeVolumeIndex()
    s.flock.Unlock()
    return
}
// 保存 Free Volume Meta文件
// saveFreeVolumeIndex save free volumes index info to disk.
func (s *Store) saveFreeVolumeIndex() (err error) {
    var (
        tn, n int
        v     *volume.Volume
    )
    // 定位到文件开头
    if _, err = s.fvf.Seek(0, os.SEEK_SET); err != nil {
        log.Errorf("fvf.Seek() error(%v)", err)
        return
    }
    //遍历 FreeVolumes,将每一项分别追加到文件中
    for _, v = range s.FreeVolumes {
        if n, err = s.fvf.WriteString(fmt.Sprintf("%s\n", string(v.Meta()))); err != nil {
            log.Errorf("fvf.WriteString() error(%v)", err)
            return
        }
        tn += n
    }
    // sync刷磁盘
    if err = s.fvf.Sync(); err != nil {
        log.Errorf("fvf.saveFreeVolumeIndex Sync() error(%v)", err)
        return
    }
    //os.Truncate(name, size),,将文件进行截断
    if err = os.Truncate(s.conf.Store.FreeVolumeIndex, int64(tn)); err != nil {
        log.Errorf("os.Truncate() error(%v)", err)
    }
    return
}

CompactVolume

  • 将老的volume数据copy到新的volume
// CompactVolume compact a super block to another file.
func (s *Store) CompactVolume(id int32) (err error) {
    var (
        v, nv      *volume.Volume
        bdir, idir string
    )
    // try check volume
    if v = s.Volumes[id]; v != nil {
        if v.Compact {
            return errors.ErrVolumeInCompact
        }
    } else {
        return errors.ErrVolumeExist
    }
    // find a free volume
    if nv, err = s.freeVolume(id); err != nil {
        return
    }
    log.Infof("start compact volume: (%d) %s to %s", id, v.Block.File, nv.Block.File)
    // no lock here, Compact is no side-effect
    if err = v.StartCompact(nv); err != nil {
        nv.Destroy()
        v.StopCompact(nil)
        return
    }
    s.vlock.Lock()
    if v = s.Volumes[id]; v != nil {
        log.Infof("stop compact volume: (%d) %s to %s", id, v.Block.File, nv.Block.File)
        if err = v.StopCompact(nv); err == nil {
            // WARN no need update volumes map, use same object, only update
            // zookeeper the local index cause the block and index file changed.
            if err = s.saveVolumeIndex(); err == nil {
                err = s.zk.SetVolume(id, v.Meta())
            }
            if err != nil {
                log.Errorf("compact volume: %d error(%v), local index or zookeeper index may save failed", id, err)
            }
        }
    } else {
        // never happen
        err = errors.ErrVolumeExist
        log.Errorf("compact volume: %d not exist(may bug)", id)
    }
    s.vlock.Unlock()
    // WARN if failed, nv is free volume, if succeed nv replace with v.
    // Sleep untill anyone had old volume variables all processed.
    time.Sleep(_compactSleep)
    nv.Destroy()
    if err == nil {
        bdir, idir = filepath.Dir(nv.Block.File), filepath.Dir(nv.Indexer.File)
        _, err = s.AddFreeVolume(1, bdir, idir)
    }
    return
}

DelVolume

// delVolume atomic del volume by copy-on-write.
func (s *Store) delVolume(id int32) {
    var (
        vid     int32
        v       *volume.Volume
        volumes = make(map[int32]*volume.Volume, len(s.Volumes)-1)
    )
    for vid, v = range s.Volumes {
        volumes[vid] = v
    }
    delete(volumes, id)
    // goroutine safe replace
    s.Volumes = volumes
}

// DelVolume del the volume by volume id.
func (s *Store) DelVolume(id int32) (err error) {
    var v *volume.Volume
    s.vlock.Lock()
    if v = s.Volumes[id]; v != nil {
        if !v.Compact {
            s.delVolume(id)
            if err = s.saveVolumeIndex(); err == nil {
                err = s.zk.DelVolume(id)
            }
            if err != nil {
                log.Errorf("del volume: %d error(%v), local index or zookeeper index may save failed", id, err)
            }
        } else {
            err = errors.ErrVolumeInCompact
        }
    } else {
        err = errors.ErrVolumeNotExist
    }
    s.vlock.Unlock()
    // if succced or not meta data saved error, close volume
    if err == nil || (err != errors.ErrVolumeInCompact &&
        err != errors.ErrVolumeNotExist) {
        v.Destroy()
    }
    return
}

BulkVolume

  • 加载Volume
  • 添加到store的Volumes
  • 保存Volume meta信息
  • 同步到zk集群上
// BulkVolume copy a super block from another store server add to this server.
func (s *Store) BulkVolume(id int32, bfile, ifile string) (err error) {
    var v, nv *volume.Volume
    // recovery new block
    if nv, err = newVolume(id, bfile, ifile, s.conf); err != nil {
        return
    }
    s.vlock.Lock()
    if v = s.Volumes[id]; v == nil {
        s.addVolume(id, nv)
        if err = s.saveVolumeIndex(); err == nil {
            err = s.zk.AddVolume(id, nv.Meta())
        }
        if err != nil {
            log.Errorf("bulk volume: %d error(%v), local index or zookeeper index may save failed", id, err)
        }
    } else {
        err = errors.ErrVolumeExist
    }
    s.vlock.Unlock()
    return
}

results matching ""

    No results matching ""