directory模块

directory模块主要负责元数据管理,以及请求的调度。对外提供了上传、下载、删除等元数据管理api。 元数据分为两种:

  • 机器和volume相关数据,这部分数据主要和store后端机器以及其上存储的volume有关的元数据,这些数据一旦发生变化,directory需要即时感知到,同时数据量比较小,非常适合用zk存储。
  • 小文件的元数据,描述一个小文件的属性,这类数据一旦写入,几乎不需要变更,同时数据量非常大,选用hbase作为存储介质,能够很方便的扩展。

模块详细架构:

初始化

  • 设置http路由处理函数
    • /get
    • /upload
    • /del
  • 启动http服务
type server struct {
    d *Directory
}

// StartApi start api http listen.
func StartApi(addr string, d *Directory) {
    var s = &server{d: d}
    go func() {
        var (
            err      error
            serveMux = http.NewServeMux()
        )
        //设置下载请求路由
        serveMux.HandleFunc("/get", s.get)
        //设置上传请求路由
        serveMux.HandleFunc("/upload", s.upload)
        //设置删除请求路由
        serveMux.HandleFunc("/del", s.del)
        //设置健康检查路由
        serveMux.HandleFunc("/ping", s.ping)
        if err = http.ListenAndServe(addr, serveMux); err != nil {
            log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
            return
        }
    }()
    return
}

get

  • 判断方法是否是GET,不是GET出错
  • url中取出bucket,filename参数
  • Directory.GetStores方法,返回文件的vid,cookie,MTime,key,mine,sha1,stores
func (s *server) get(wr http.ResponseWriter, r *http.Request) {
    var (
        ok       bool
        bucket   string
        filename string
        res      meta.Response
        n        *meta.Needle
        f        *meta.File
        uerr     errors.Error
        err      error
    )
    if r.Method != "GET" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    if bucket = r.FormValue("bucket"); bucket == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    if filename = r.FormValue("filename"); filename == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    defer HttpGetWriter(r, wr, time.Now(), &res)
    if n, f, res.Stores, err = s.d.GetStores(bucket, filename); err != nil {
        log.Errorf("GetStores() error(%v)", err)
        if uerr, ok = err.(errors.Error); ok {
            res.Ret = int(uerr)
        } else {
            res.Ret = errors.RetInternalErr
        }
        return
    }
    res.Ret = errors.RetOK
    res.Key = n.Key
    res.Cookie = n.Cookie
    res.Vid = n.Vid
    res.Mine = f.Mine
    if f.MTime != 0 {
        res.MTime = f.MTime
    } else {
        res.MTime = n.MTime
    }
    res.Sha1 = f.Sha1
    return
}

upload

  • 判断方法是否是POST,
    • 从url参数中,取出bucket,filename,sha1,mine
  • Directory.UploadStores,存储vid,cookie,MTime,key,mine,sha1到hbase中,调度模块计算返回stores机器列表
func (s *server) upload(wr http.ResponseWriter, r *http.Request) {
    var (
        err    error
        n      *meta.Needle
        f      *meta.File
        bucket string
        res    meta.Response
        ok     bool
        uerr   errors.Error
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    f = new(meta.File)
    if bucket = r.FormValue("bucket"); bucket == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    if f.Filename = r.FormValue("filename"); f.Filename == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    if f.Sha1 = r.FormValue("sha1"); f.Sha1 == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    if f.Mine = r.FormValue("mine"); f.Mine == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    defer HttpUploadWriter(r, wr, time.Now(), &res)

    res.Ret = errors.RetOK
    if n, res.Stores, err = s.d.UploadStores(bucket, f); err != nil {
        if err == errors.ErrNeedleExist {
            // update file data
            res.Ret = errors.RetNeedleExist
            if n, _, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil {
                log.Errorf("GetStores() error(%v)", err)
                if uerr, ok = err.(errors.Error); ok {
                    res.Ret = int(uerr)
                } else {
                    res.Ret = errors.RetInternalErr
                }
                return
            }
        } else {
            log.Errorf("UploadStores() error(%v)", err)
            if uerr, ok = err.(errors.Error); ok {
                res.Ret = int(uerr)
            } else {
                res.Ret = errors.RetInternalErr
            }
            return
        }
    }
    res.Key = n.Key
    res.Cookie = n.Cookie
    res.Vid = n.Vid
    return
}

del

  • 判断方法是否是POST,
    • 从url参数中,取出bucket,filename
  • Directory.DelStores,hbase中元数据设置状态删除,返回stores机器列表
func (s *server) del(wr http.ResponseWriter, r *http.Request) {
    var (
        err      error
        n        *meta.Needle
        bucket   string
        filename string
        res      meta.Response
        ok       bool
        uerr     errors.Error
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    if bucket = r.FormValue("bucket"); bucket == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    if filename = r.FormValue("filename"); filename == "" {
        http.Error(wr, "bad request", http.StatusBadRequest)
        return
    }
    defer HttpDelWriter(r, wr, time.Now(), &res)
    if n, res.Stores, err = s.d.DelStores(bucket, filename); err != nil {
        log.Errorf("DelStores() error(%v)", err)
        if uerr, ok = err.(errors.Error); ok {
            res.Ret = int(uerr)
        } else {
            res.Ret = errors.RetInternalErr
        }
        return
    }
    res.Ret = errors.RetOK
    res.Key = n.Key
    res.Cookie = n.Cookie
    res.Vid = n.Vid
    return
}

results matching ""

    No results matching ""