directory模块
directory模块主要负责元数据管理,以及请求的调度。对外提供了上传、下载、删除等元数据管理api。 元数据分为两种:
- 机器和volume相关数据,这部分数据主要和store后端机器以及其上存储的volume有关的元数据,这些数据一旦发生变化,directory需要即时感知到,同时数据量比较小,非常适合用zk存储。
- 小文件的元数据,描述一个小文件的属性,这类数据一旦写入,几乎不需要变更,同时数据量非常大,选用hbase作为存储介质,能够很方便的扩展。
模块详细架构:
初始化
- 设置http路由处理函数
/get
/upload
/del
- 启动http服务
type server struct {
d *Directory
}
// StartApi start api http listen.
func StartApi(addr string, d *Directory) {
var s = &server{d: d}
go func() {
var (
err error
serveMux = http.NewServeMux()
)
//设置下载请求路由
serveMux.HandleFunc("/get", s.get)
//设置上传请求路由
serveMux.HandleFunc("/upload", s.upload)
//设置删除请求路由
serveMux.HandleFunc("/del", s.del)
//设置健康检查路由
serveMux.HandleFunc("/ping", s.ping)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
}
}()
return
}
get
- 判断方法是否是
GET
,不是GET
出错 - url中取出
bucket
,filename
参数 Directory.GetStores
方法,返回文件的vid
,cookie
,MTime
,key
,mine
,sha1
,stores
func (s *server) get(wr http.ResponseWriter, r *http.Request) {
var (
ok bool
bucket string
filename string
res meta.Response
n *meta.Needle
f *meta.File
uerr errors.Error
err error
)
if r.Method != "GET" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
if bucket = r.FormValue("bucket"); bucket == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
if filename = r.FormValue("filename"); filename == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
defer HttpGetWriter(r, wr, time.Now(), &res)
if n, f, res.Stores, err = s.d.GetStores(bucket, filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
} else {
res.Ret = errors.RetInternalErr
}
return
}
res.Ret = errors.RetOK
res.Key = n.Key
res.Cookie = n.Cookie
res.Vid = n.Vid
res.Mine = f.Mine
if f.MTime != 0 {
res.MTime = f.MTime
} else {
res.MTime = n.MTime
}
res.Sha1 = f.Sha1
return
}
upload
- 判断方法是否是
POST
,- 从url参数中,取出
bucket
,filename
,sha1
,mine
- 从url参数中,取出
Directory.UploadStores
,存储vid
,cookie
,MTime
,key
,mine
,sha1
到hbase中,调度模块计算返回stores机器列表
func (s *server) upload(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
f *meta.File
bucket string
res meta.Response
ok bool
uerr errors.Error
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
f = new(meta.File)
if bucket = r.FormValue("bucket"); bucket == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
if f.Filename = r.FormValue("filename"); f.Filename == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
if f.Sha1 = r.FormValue("sha1"); f.Sha1 == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
if f.Mine = r.FormValue("mine"); f.Mine == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
defer HttpUploadWriter(r, wr, time.Now(), &res)
res.Ret = errors.RetOK
if n, res.Stores, err = s.d.UploadStores(bucket, f); err != nil {
if err == errors.ErrNeedleExist {
// update file data
res.Ret = errors.RetNeedleExist
if n, _, res.Stores, err = s.d.GetStores(bucket, f.Filename); err != nil {
log.Errorf("GetStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
} else {
res.Ret = errors.RetInternalErr
}
return
}
} else {
log.Errorf("UploadStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
} else {
res.Ret = errors.RetInternalErr
}
return
}
}
res.Key = n.Key
res.Cookie = n.Cookie
res.Vid = n.Vid
return
}
del
- 判断方法是否是
POST
,- 从url参数中,取出
bucket
,filename
- 从url参数中,取出
Directory.DelStores
,hbase中元数据设置状态删除,返回stores机器列表
func (s *server) del(wr http.ResponseWriter, r *http.Request) {
var (
err error
n *meta.Needle
bucket string
filename string
res meta.Response
ok bool
uerr errors.Error
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
if bucket = r.FormValue("bucket"); bucket == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
if filename = r.FormValue("filename"); filename == "" {
http.Error(wr, "bad request", http.StatusBadRequest)
return
}
defer HttpDelWriter(r, wr, time.Now(), &res)
if n, res.Stores, err = s.d.DelStores(bucket, filename); err != nil {
log.Errorf("DelStores() error(%v)", err)
if uerr, ok = err.(errors.Error); ok {
res.Ret = int(uerr)
} else {
res.Ret = errors.RetInternalErr
}
return
}
res.Ret = errors.RetOK
res.Key = n.Key
res.Cookie = n.Cookie
res.Vid = n.Vid
return
}