Directory模块初始化
directory模块负责管理元数据,图片元数据(bucket, filename, vid,key,cookie,mine)通过hbase存储。集群机器元数据是存储在zookeeper中,directory使用机器元数据信息建立vid到store机器的映射关系,同时需要根据每个store机器管理的volume的统计元数据(一段时间内读写次数、容量)对图片上传请求进行合理调度。
定义
- 同步
/rack元数据,storeid->volumeid作用是在调度的时候选中storeid,然后再随机选volumeid - 同步
/group元数据,groupid->storeids作用是在调度的时候,判断某一个group下面的store是否全部可用 - 同步
/volume元数据,volumeid->storeids作用是在处理上传、下载请求时,根据volumeid直接定位到store机器
// Directory
// id means store serverid; vid means volume id; gid means group id
type Directory struct {
// STORE
store map[string]*meta.Store // store_server_id:store_info
//storeid->volumeids映射关系
storeVolume map[string][]int32 // store_server_id:volume_ids
// GROUP
storeGroup map[string]int // store_server_id:group
//groupid->stores映射关系
group map[int][]string // group_id:store_servers
// VOLUME
volume map[int32]*meta.VolumeState // volume_id:volume_state
//volume->store映射关系
volumeStore map[int32][]string // volume_id:store_server_id
//产生唯一的图片id
genkey *snowflake.Genkey // snowflake client for gen key
//hbase客户端
hBase *hbase.HBaseClient // hBase client
//调度器
dispatcher *Dispatcher // dispatch for write or read reqs
config *conf.Config
//zk客户端
zk *myzk.Zookeeper
}
初始化入口
- 初始化zk客户端
- 初始化snowflake客户端
- 初始化hbase客户端
- 初始化调度器
- 启动协程,从zk上同步元数据
// NewDirectory
func NewDirectory(config *conf.Config) (d *Directory, err error) {
d = &Directory{}
d.config = config
//初始化zk客户端
if d.zk, err = myzk.NewZookeeper(config); err != nil {
return
}
//初始化snowflake客户端
if d.genkey, err = snowflake.NewGenkey(config.Snowflake.ZkAddrs, config.Snowflake.ZkPath, config.Snowflake.ZkTimeout.Duration, config.Snowflake.WorkId); err != nil {
return
}
//初始化hbase客户端
if err = hbase.Init(config); err != nil {
return
}
d.hBase = hbase.NewHBaseClient()
//初始化调度引擎
d.dispatcher = NewDispatcher()
//从zk上同步元数据
go d.SyncZookeeper()
return
}
同步zk元数据
- 同步store的元数据,位于zk的/rack路径下面
- 同步group的元数据,位于zk的/group路径下面
- 同步volume的元数据,位于zk的/volume路径下面
- 重新更新调度模块
// SyncZookeeper Synchronous zookeeper data to memory
func (d *Directory) SyncZookeeper() {
var (
sev <-chan zk.Event
err error
)
for {
//同步/rack路径上的元数据
if sev, err = d.syncStores(); err != nil {
log.Errorf("syncStores() called error(%v)", err)
time.Sleep(retrySleep)
continue
}
//同步/group上的元数据
if err = d.syncGroups(); err != nil {
log.Errorf("syncGroups() called error(%v)", err)
time.Sleep(retrySleep)
continue
}
//同步/volume上的元数据
if err = d.syncVolumes(); err != nil {
log.Errorf("syncVolumes() called error(%v)", err)
time.Sleep(retrySleep)
continue
}
//计算调度器
if err = d.dispatcher.Update(d.group, d.store, d.volume, d.storeVolume); err != nil {
log.Errorf("Update() called error(%v)", err)
time.Sleep(retrySleep)
continue
}
select {
case <-sev:
log.Infof("stores status change or new store")
break
case <-time.After(d.config.Zookeeper.PullInterval.Duration):
log.Infof("pull from zk")
break
}
}
}
同步store
- 获取到全部rack
- 遍历rack,获取rack下所有的store
- 遍历store,获取store下的volume
- 建立storeid->volumeids映射关系
- 建立storeid->统计信息映射关系
// Stores get all the store nodes and set a watcher
func (d *Directory) syncStores() (ev <-chan zk.Event, err error) {
var (
storeMeta *meta.Store
store map[string]*meta.Store
storeVolume map[string][]int32
rack, str, volume string
racks, stores, volumes []string
data []byte
vid int
)
// get all rack
//获取到全部rack
if racks, ev, err = d.zk.WatchRacks(); err != nil {
return
}
store = make(map[string]*meta.Store)
storeVolume = make(map[string][]int32)
for _, rack = range racks {
// get all stores in the rack
if stores, err = d.zk.Stores(rack); err != nil {
return
}
for _, str = range stores {
// get store
if data, err = d.zk.Store(rack, str); err != nil {
return
}
storeMeta = new(meta.Store)
if err = json.Unmarshal(data, storeMeta); err != nil {
log.Errorf("json.Unmarshal() error(%v)", err)
return
}
// get all volumes in the store
if volumes, err = d.zk.StoreVolumes(rack, str); err != nil {
return
}
storeVolume[storeMeta.Id] = []int32{}
for _, volume = range volumes {
if vid, err = strconv.Atoi(volume); err != nil {
log.Errorf("wrong volume:%s", volume)
continue
}
storeVolume[storeMeta.Id] = append(storeVolume[storeMeta.Id], int32(vid))
}
store[storeMeta.Id] = storeMeta
}
}
d.store = store
d.storeVolume = storeVolume
return
}
同步group
- 获取所有的group
- 遍历group,获取group下的stores
- 建立groupid->stores
- 建立storeid->groupid映射关系
// syncGroups get all groups and set a watcher.
func (d *Directory) syncGroups() (err error) {
var (
gid int
str string
groups, stores []string
group map[int][]string
storeGroup map[string]int
)
// get all groups
if groups, err = d.zk.Groups(); err != nil {
return
}
group = make(map[int][]string)
storeGroup = make(map[string]int)
for _, str = range groups {
// get all stores by the group
if stores, err = d.zk.GroupStores(str); err != nil {
return
}
if gid, err = strconv.Atoi(str); err != nil {
log.Errorf("wrong group:%s", str)
continue
}
group[gid] = stores
for _, str = range stores {
storeGroup[str] = gid
}
}
d.group = group
d.storeGroup = storeGroup
return
}
同步volume
- 获取所有的volume
- 遍历volume,获取每个volume下面的stores
- 建立volumeid->stores映射关系
- 建立volume->统计信息映射关系
// Volumes get all volumes in zk
func (d *Directory) syncVolumes() (err error) {
var (
vid int
str string
volumes, stores []string
data []byte
volumeState *meta.VolumeState
volume map[int32]*meta.VolumeState
volumeStore map[int32][]string
)
// get all volumes
if volumes, err = d.zk.Volumes(); err != nil {
return
}
volume = make(map[int32]*meta.VolumeState)
volumeStore = make(map[int32][]string)
for _, str = range volumes {
// get the volume
if data, err = d.zk.Volume(str); err != nil {
return
}
volumeState = new(meta.VolumeState)
if err = json.Unmarshal(data, volumeState); err != nil {
log.Errorf("json.Unmarshal() error(%v)", err)
return
}
if vid, err = strconv.Atoi(str); err != nil {
log.Errorf("wrong volume:%s", str)
continue
}
volume[int32(vid)] = volumeState
// get the stores by the volume
if stores, err = d.zk.VolumeStores(str); err != nil {
return
}
volumeStore[int32(vid)] = stores
}
d.volume = volume
d.volumeStore = volumeStore
return
}