Directory调度

调度器就是根据当前集群资源使用情况,将上传的请求调度到合适的store机器上

// Dispatcher
// get raw data and processed into memory for http reqs
type Dispatcher struct {
    gids  []int // for write eg:  gid:1;2   gids: [1,1,2,2,2,2,2]
    rand  *rand.Rand
    rlock sync.Mutex
}

func NewDispatcher() (d *Dispatcher) {
    d = new(Dispatcher)
    d.rand = rand.New(rand.NewSource(time.Now().UnixNano()))
    return
}

重新计算

  • 每次更新元数据的时候都会被执行
  • 首先确定一批可以写入的机器列表
    • 遍历每个可写的store机器的volume信息
    • totalAdd,totalAddDelay,restSpace计算得分
// Update when zk updates
func (d *Dispatcher) Update(group map[int][]string,
    store map[string]*meta.Store, volume map[int32]*meta.VolumeState,
    storeVolume map[string][]int32) (err error) {
    var (
        gid                        int
        i                          int
        vid                        int32
        gids                       []int
        sid                        string
        stores                     []string
        restSpace, minScore, score int
        totalAdd, totalAddDelay    uint64
        write, ok                  bool
        storeMeta                  *meta.Store
        volumeState                *meta.VolumeState
    )
    gids = []int{}
    for gid, stores = range group {
        write = true
        // check all stores can writeable by the group.
        for _, sid = range stores {
            if storeMeta, ok = store[sid]; !ok {
                log.Errorf("idStore cannot match store: %s", sid)
                break
            }
            if storeMeta == nil {
                log.Warningf("storeMeta is null, %s", sid)
                return
            }
            if !storeMeta.CanWrite() {
                write = false
                break
            }
        }
        if !write {
            continue
        }
        // calc score
        //只有可以写入的机器才会执行
        for _, sid = range stores {
            totalAdd, totalAddDelay, restSpace, minScore = 0, 0, 0, 0
            // get all volumes by the store.
            //遍历每个可写的store机器的volume信息
            for _, vid = range storeVolume[sid] {
                volumeState = volume[vid]
                if volumeState == nil {
                    log.Warningf("volumeState is nil, %d", vid)
                    return
                }
                totalAdd = totalAdd + volumeState.TotalWriteProcessed
                restSpace = restSpace + int(volumeState.FreeSpace)
                totalAddDelay = totalAddDelay + volumeState.TotalWriteDelay
            }
            //计算得分
            score = d.calScore(int(totalAdd), int(totalAddDelay), restSpace)
            if score < minScore || minScore == 0 {
                minScore = score
            }
        }
        for i = 0; i < minScore; i++ {
            gids = append(gids, gid)
        }
    }
    d.gids = gids
    return
}

调度

// VolumeId get a volume id.
func (d *Dispatcher) VolumeId(group map[int][]string, storeVolume map[string][]int32) (vid int32, err error) {
    var (
        sid    string
        stores []string
        gid    int
        vids   []int32
    )
    if len(d.gids) == 0 {
        err = errors.ErrStoreNotAvailable
        return
    }
    d.rlock.Lock()
    defer d.rlock.Unlock()
    gid = d.gids[d.rand.Intn(len(d.gids))]
    stores = group[gid]
    if len(stores) == 0 {
        err = errors.ErrZookeeperDataError
        return
    }
    sid = stores[0]
    vids = storeVolume[sid]
    vid = vids[d.rand.Intn(len(vids))]
    return
}

results matching ""

    No results matching ""