Directory调度
调度器就是根据当前集群资源使用情况,将上传的请求调度到合适的store机器上
// Dispatcher
// get raw data and processed into memory for http reqs
type Dispatcher struct {
gids []int // for write eg: gid:1;2 gids: [1,1,2,2,2,2,2]
rand *rand.Rand
rlock sync.Mutex
}
func NewDispatcher() (d *Dispatcher) {
d = new(Dispatcher)
d.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
return
}
重新计算
- 每次更新元数据的时候都会被执行
- 首先确定一批可以写入的机器列表
- 遍历每个可写的store机器的volume信息
- totalAdd,totalAddDelay,restSpace计算得分
// Update when zk updates
func (d *Dispatcher) Update(group map[int][]string,
store map[string]*meta.Store, volume map[int32]*meta.VolumeState,
storeVolume map[string][]int32) (err error) {
var (
gid int
i int
vid int32
gids []int
sid string
stores []string
restSpace, minScore, score int
totalAdd, totalAddDelay uint64
write, ok bool
storeMeta *meta.Store
volumeState *meta.VolumeState
)
gids = []int{}
for gid, stores = range group {
write = true
// check all stores can writeable by the group.
for _, sid = range stores {
if storeMeta, ok = store[sid]; !ok {
log.Errorf("idStore cannot match store: %s", sid)
break
}
if storeMeta == nil {
log.Warningf("storeMeta is null, %s", sid)
return
}
if !storeMeta.CanWrite() {
write = false
break
}
}
if !write {
continue
}
// calc score
//只有可以写入的机器才会执行
for _, sid = range stores {
totalAdd, totalAddDelay, restSpace, minScore = 0, 0, 0, 0
// get all volumes by the store.
//遍历每个可写的store机器的volume信息
for _, vid = range storeVolume[sid] {
volumeState = volume[vid]
if volumeState == nil {
log.Warningf("volumeState is nil, %d", vid)
return
}
totalAdd = totalAdd + volumeState.TotalWriteProcessed
restSpace = restSpace + int(volumeState.FreeSpace)
totalAddDelay = totalAddDelay + volumeState.TotalWriteDelay
}
//计算得分
score = d.calScore(int(totalAdd), int(totalAddDelay), restSpace)
if score < minScore || minScore == 0 {
minScore = score
}
}
for i = 0; i < minScore; i++ {
gids = append(gids, gid)
}
}
d.gids = gids
return
}
调度
// VolumeId get a volume id.
func (d *Dispatcher) VolumeId(group map[int][]string, storeVolume map[string][]int32) (vid int32, err error) {
var (
sid string
stores []string
gid int
vids []int32
)
if len(d.gids) == 0 {
err = errors.ErrStoreNotAvailable
return
}
d.rlock.Lock()
defer d.rlock.Unlock()
gid = d.gids[d.rand.Intn(len(d.gids))]
stores = group[gid]
if len(stores) == 0 {
err = errors.ErrZookeeperDataError
return
}
sid = stores[0]
vids = storeVolume[sid]
vid = vids[d.rand.Intn(len(vids))]
return
}