Directory模块初始化

directory模块负责管理元数据,图片元数据(bucket, filename, vid,key,cookie,mine)通过hbase存储。集群机器元数据是存储在zookeeper中,directory使用机器元数据信息建立vid到store机器的映射关系,同时需要根据每个store机器管理的volume的统计元数据(一段时间内读写次数、容量)对图片上传请求进行合理调度。

定义

  • 同步/rack元数据,storeid->volumeid作用是在调度的时候选中storeid,然后再随机选volumeid
  • 同步/group元数据,groupid->storeids作用是在调度的时候,判断某一个group下面的store是否全部可用
  • 同步/volume元数据,volumeid->storeids作用是在处理上传、下载请求时,根据volumeid直接定位到store机器
// Directory
// id means store serverid; vid means volume id; gid means group id
type Directory struct {
    // STORE
    store       map[string]*meta.Store // store_server_id:store_info
    //storeid->volumeids映射关系
    storeVolume map[string][]int32     // store_server_id:volume_ids

    // GROUP
    storeGroup map[string]int   // store_server_id:group
    //groupid->stores映射关系
    group      map[int][]string // group_id:store_servers

    // VOLUME
    volume      map[int32]*meta.VolumeState // volume_id:volume_state
    //volume->store映射关系
    volumeStore map[int32][]string          // volume_id:store_server_id
    //产生唯一的图片id
    genkey     *snowflake.Genkey  // snowflake client for gen key
    //hbase客户端
    hBase      *hbase.HBaseClient // hBase client
    //调度器
    dispatcher *Dispatcher        // dispatch for write or read reqs

    config *conf.Config
    //zk客户端
    zk     *myzk.Zookeeper
}

初始化入口

  • 初始化zk客户端
  • 初始化snowflake客户端
  • 初始化hbase客户端
  • 初始化调度器
  • 启动协程,从zk上同步元数据
// NewDirectory
func NewDirectory(config *conf.Config) (d *Directory, err error) {
    d = &Directory{}
    d.config = config
    //初始化zk客户端
    if d.zk, err = myzk.NewZookeeper(config); err != nil {
        return
    }
    //初始化snowflake客户端
    if d.genkey, err = snowflake.NewGenkey(config.Snowflake.ZkAddrs, config.Snowflake.ZkPath, config.Snowflake.ZkTimeout.Duration, config.Snowflake.WorkId); err != nil {
        return
    }
    //初始化hbase客户端
    if err = hbase.Init(config); err != nil {
        return
    }
    d.hBase = hbase.NewHBaseClient()
    //初始化调度引擎
    d.dispatcher = NewDispatcher()
    //从zk上同步元数据
    go d.SyncZookeeper()
    return
}

同步zk元数据

  • 同步store的元数据,位于zk的/rack路径下面
  • 同步group的元数据,位于zk的/group路径下面
  • 同步volume的元数据,位于zk的/volume路径下面
  • 重新更新调度模块
// SyncZookeeper Synchronous zookeeper data to memory
func (d *Directory) SyncZookeeper() {
    var (
        sev <-chan zk.Event
        err error
    )
    for {
        //同步/rack路径上的元数据
        if sev, err = d.syncStores(); err != nil {
            log.Errorf("syncStores() called error(%v)", err)
            time.Sleep(retrySleep)
            continue
        }
        //同步/group上的元数据
        if err = d.syncGroups(); err != nil {
            log.Errorf("syncGroups() called error(%v)", err)
            time.Sleep(retrySleep)
            continue
        }
        //同步/volume上的元数据
        if err = d.syncVolumes(); err != nil {
            log.Errorf("syncVolumes() called error(%v)", err)
            time.Sleep(retrySleep)
            continue
        }
        //计算调度器
        if err = d.dispatcher.Update(d.group, d.store, d.volume, d.storeVolume); err != nil {
            log.Errorf("Update() called error(%v)", err)
            time.Sleep(retrySleep)
            continue
        }
        select {
        case <-sev:
            log.Infof("stores status change or new store")
            break
        case <-time.After(d.config.Zookeeper.PullInterval.Duration):
            log.Infof("pull from zk")
            break
        }
    }
}

同步store

  • 获取到全部rack
  • 遍历rack,获取rack下所有的store
  • 遍历store,获取store下的volume
  • 建立storeid->volumeids映射关系
  • 建立storeid->统计信息映射关系
// Stores get all the store nodes and set a watcher
func (d *Directory) syncStores() (ev <-chan zk.Event, err error) {
    var (
        storeMeta              *meta.Store
        store                  map[string]*meta.Store
        storeVolume            map[string][]int32
        rack, str, volume      string
        racks, stores, volumes []string
        data                   []byte
        vid                    int
    )
    // get all rack
    //获取到全部rack
    if racks, ev, err = d.zk.WatchRacks(); err != nil {
        return
    }
    store = make(map[string]*meta.Store)
    storeVolume = make(map[string][]int32)
    for _, rack = range racks {
        // get all stores in the rack
        if stores, err = d.zk.Stores(rack); err != nil {
            return
        }
        for _, str = range stores {
            // get store
            if data, err = d.zk.Store(rack, str); err != nil {
                return
            }
            storeMeta = new(meta.Store)
            if err = json.Unmarshal(data, storeMeta); err != nil {
                log.Errorf("json.Unmarshal() error(%v)", err)
                return
            }
            // get all volumes in the store
            if volumes, err = d.zk.StoreVolumes(rack, str); err != nil {
                return
            }
            storeVolume[storeMeta.Id] = []int32{}
            for _, volume = range volumes {
                if vid, err = strconv.Atoi(volume); err != nil {
                    log.Errorf("wrong volume:%s", volume)
                    continue
                }
                storeVolume[storeMeta.Id] = append(storeVolume[storeMeta.Id], int32(vid))
            }
            store[storeMeta.Id] = storeMeta
        }
    }
    d.store = store
    d.storeVolume = storeVolume
    return
}

同步group

  • 获取所有的group
  • 遍历group,获取group下的stores
  • 建立groupid->stores
  • 建立storeid->groupid映射关系
// syncGroups get all groups and set a watcher.
func (d *Directory) syncGroups() (err error) {
    var (
        gid            int
        str            string
        groups, stores []string
        group          map[int][]string
        storeGroup     map[string]int
    )
    // get all groups
    if groups, err = d.zk.Groups(); err != nil {
        return
    }
    group = make(map[int][]string)
    storeGroup = make(map[string]int)
    for _, str = range groups {
        // get all stores by the group
        if stores, err = d.zk.GroupStores(str); err != nil {
            return
        }
        if gid, err = strconv.Atoi(str); err != nil {
            log.Errorf("wrong group:%s", str)
            continue
        }
        group[gid] = stores
        for _, str = range stores {
            storeGroup[str] = gid
        }
    }
    d.group = group
    d.storeGroup = storeGroup
    return
}

同步volume

  • 获取所有的volume
  • 遍历volume,获取每个volume下面的stores
  • 建立volumeid->stores映射关系
  • 建立volume->统计信息映射关系
// Volumes get all volumes in zk
func (d *Directory) syncVolumes() (err error) {
    var (
        vid             int
        str             string
        volumes, stores []string
        data            []byte
        volumeState     *meta.VolumeState
        volume          map[int32]*meta.VolumeState
        volumeStore     map[int32][]string
    )
    // get all volumes
    if volumes, err = d.zk.Volumes(); err != nil {
        return
    }
    volume = make(map[int32]*meta.VolumeState)
    volumeStore = make(map[int32][]string)
    for _, str = range volumes {
        // get the volume
        if data, err = d.zk.Volume(str); err != nil {
            return
        }
        volumeState = new(meta.VolumeState)
        if err = json.Unmarshal(data, volumeState); err != nil {
            log.Errorf("json.Unmarshal() error(%v)", err)
            return
        }
        if vid, err = strconv.Atoi(str); err != nil {
            log.Errorf("wrong volume:%s", str)
            continue
        }
        volume[int32(vid)] = volumeState
        // get the stores by the volume
        if stores, err = d.zk.VolumeStores(str); err != nil {
            return
        }
        volumeStore[int32(vid)] = stores
    }
    d.volume = volume
    d.volumeStore = volumeStore
    return
}

results matching ""

    No results matching ""