Store Http Server初始化
API Server初始化
- 设置http路由函数
// StartApi start api http listen.
func StartApi(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
//下载路由
serveMux.HandleFunc("/get", s.get)
//上传路由
serveMux.HandleFunc("/upload", s.upload)
//批量上传路由
serveMux.HandleFunc("/uploads", s.uploads)
//删除路由
serveMux.HandleFunc("/del", s.del)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
}
}()
return
}
1.下载api实现
- 下载图片的时候,需要提供
vid、key、cookie,vid用来定位volume所在的文件,key是一个用来定位needle所在的文件中的offset,cookie出于安全性考虑,做验证用的。 - api说明
- 提取
vid,key,cookie参数 - 用
vid在store.Volumes搜索到volume.Volume volume.Volume调用Read返回needle.Needle,根据key和cookie读取needle- 将
needle.Needle返回给客户端func (s *Server) get(wr http.ResponseWriter, r *http.Request) { var ( v *volume.Volume n *needle.Needle err error vid, key, cookie int64 ret = http.StatusOK params = r.URL.Query() now = time.Now() ) if r.Method != "GET" && r.Method != "HEAD" { ret = http.StatusMethodNotAllowed http.Error(wr, "method not allowed", ret) return } defer HttpGetWriter(r, wr, now, &err, &ret) if !s.rl.Allow() { ret = http.StatusServiceUnavailable return } //提取vid if vid, err = strconv.ParseInt(params.Get("vid"), 10, 32); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("vid"), err) ret = http.StatusBadRequest return } //提取key if key, err = strconv.ParseInt(params.Get("key"), 10, 64); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("key"), err) ret = http.StatusBadRequest return } //提取 cookie if cookie, err = strconv.ParseInt(params.Get("cookie"), 10, 32); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("cookie"), err) ret = http.StatusBadRequest return } //搜索对应的volumn if v = s.store.Volumes[int32(vid)]; v != nil { //根据key和cookie读取needle if n, err = v.Read(key, int32(cookie)); err == nil { wr.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) //返回客户端 if _, err = wr.Write(n.Data); err != nil { log.Errorf("wr.Write() error(%v)", err) err = nil // avoid HttpGetWriter write header twice } n.Close() } else { if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist { ret = http.StatusNotFound } else { ret = http.StatusInternalServerError } } } else { ret = http.StatusNotFound err = errors.ErrVolumeNotExist } return }
- 提取
2.上传API实现
- 上传图片的时候需要提供
vid、key、cookie - api说明
- 提取请求中的
vid、key、cookie参数 - 用
vid在store.Volumes搜索到volume.Volume needle.NewWriter新建一个NeedleNeedle从post参数中读取图片二进制数据volume.Volume写入Needle
- 提取请求中的
- api实现
func (s *Server) upload(wr http.ResponseWriter, r *http.Request) { var ( vid int64 key int64 cookie int64 size int64 err error str string v *volume.Volume n *needle.Needle file multipart.File res = map[string]interface{}{} ) if r.Method != "POST" { http.Error(wr, "method not allowed", http.StatusMethodNotAllowed) return } defer HttpPostWriter(r, wr, time.Now(), &err, res) if !s.wl.Allow() { err = errors.ErrServiceUnavailable return } if err = checkContentLength(r, s.conf.NeedleMaxSize); err != nil { return } //提取vid str = r.FormValue("vid") if vid, err = strconv.ParseInt(str, 10, 32); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err) err = errors.ErrParam return } //提取key str = r.FormValue("key") if key, err = strconv.ParseInt(str, 10, 64); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err) err = errors.ErrParam return } //提取cookie str = r.FormValue("cookie") if cookie, err = strconv.ParseInt(str, 10, 32); err != nil { log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err) err = errors.ErrParam return } if file, _, err = r.FormFile("file"); err != nil { log.Errorf("r.FormFile() error(%v)", err) err = errors.ErrInternal return } if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil { //查找volumn if v = s.store.Volumes[int32(vid)]; v != nil { //创建needle n = needle.NewWriter(key, int32(cookie), int32(size)) //读取图片数据 if err = n.ReadFrom(file); err == nil { //volumn写入needle err = v.Write(n) } n.Close() } else { err = errors.ErrVolumeNotExist } } file.Close() return }
3.批量上传API实现
func (s *Server) uploads(wr http.ResponseWriter, r *http.Request) {
var (
i, nn int
err error
vid int64
key int64
cookie int64
size int64
str string
keys []string
cookies []string
v *volume.Volume
file multipart.File
fh *multipart.FileHeader
fhs []*multipart.FileHeader
ns *needle.Needles
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
if !s.wl.Allow() {
err = errors.ErrServiceUnavailable
return
}
str = r.FormValue("vid")
if vid, err = strconv.ParseInt(str, 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
err = errors.ErrParam
return
}
keys = r.MultipartForm.Value["keys"]
cookies = r.MultipartForm.Value["cookies"]
if len(keys) != len(cookies) {
log.Errorf("param length not match, keys: %d, cookies: %d", len(keys), len(cookies))
err = errors.ErrParam
return
}
fhs = r.MultipartForm.File["file"]
nn = len(fhs)
if len(keys) != nn {
log.Errorf("param length not match, keys: %d, cookies: %d, files: %d", len(keys), len(cookies), len(fhs))
err = errors.ErrParam
return
}
ns = needle.NewNeedles(nn)
for i, fh = range fhs {
if key, err = strconv.ParseInt(keys[i], 10, 64); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", keys[i], err)
err = errors.ErrParam
break
}
if cookie, err = strconv.ParseInt(cookies[i], 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", cookies[i], err)
err = errors.ErrParam
break
}
if file, err = fh.Open(); err != nil {
log.Errorf("fh.Open() error(%v)", err)
break
}
if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil {
err = ns.ReadFrom(key, int32(cookie), int32(size), file)
}
file.Close()
if err != nil {
break
}
}
if err == nil {
if v = s.store.Volumes[int32(vid)]; v != nil {
err = v.Writes(ns)
} else {
err = errors.ErrVolumeNotExist
}
}
ns.Close()
return
}
4.删除API实现
删除图片的时候需要提供vid、key
func (s *Server) del(wr http.ResponseWriter, r *http.Request) {
var (
err error
key, vid int64
str string
v *volume.Volume
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
if !s.dl.Allow() {
err = errors.ErrServiceUnavailable
return
}
//提取key
str = r.PostFormValue("key")
if key, err = strconv.ParseInt(str, 10, 64); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
err = errors.ErrParam
return
}
//提取vid
str = r.PostFormValue("vid")
if vid, err = strconv.ParseInt(str, 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
err = errors.ErrParam
return
}
//找到vid对应的volumn
if v = s.store.Volumes[int32(vid)]; v != nil {
//volumn删除key
err = v.Delete(key)
} else {
err = errors.ErrVolumeNotExist
}
return
}
Admin Server初始化
- 新增
Free Volume - 新增
Volume - 复制
Volume - 压缩
Volume
// StartAdmin start admin http listen.
func StartAdmin(addr string, s *Server) {
go func() {
var (
err error
serveMux = http.NewServeMux()
)
//探测API
serveMux.HandleFunc("/probe", s.probe)
//复制 volumn id到本机
serveMux.HandleFunc("/bulk_volume", s.bulkVolume)
//对volumn进行compact
serveMux.HandleFunc("/compact_volume", s.compactVolume)
//新增volumn
serveMux.HandleFunc("/add_volume", s.addVolume)
//新增 free volumn
serveMux.HandleFunc("/add_free_volume", s.addFreeVolume)
if err = http.ListenAndServe(addr, serveMux); err != nil {
log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
return
}
}()
return
}
1.探测API
func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
var (
v *volume.Volume
err error
vid int64
ret = http.StatusOK
params = r.URL.Query()
now = time.Now()
)
if r.Method != "HEAD" {
ret = http.StatusMethodNotAllowed
http.Error(wr, "method not allowed", ret)
return
}
defer HttpGetWriter(r, wr, now, &err, &ret)
if vid, err = strconv.ParseInt(params.Get("vid"), 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("vid"), err)
ret = http.StatusBadRequest
return
}
if v = s.store.Volumes[int32(vid)]; v != nil {
if err = v.Probe(); err != nil {
if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
ret = http.StatusNotFound
} else {
ret = http.StatusInternalServerError
}
}
} else {
ret = http.StatusNotFound
err = errors.ErrVolumeNotExist
}
return
}
2.复制Volumn API
func (s *Server) bulkVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
bfile, ifile string
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
bfile = r.FormValue("bfile")
ifile = r.FormValue("ifile")
if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
err = errors.ErrParam
return
}
go func() {
log.Infof("bulk volume: %d start", vid)
err = s.store.BulkVolume(int32(vid), bfile, ifile)
log.Infof("bulk volume: %d stop", vid)
}()
return
}
3.Compact Volumn API
func (s *Server) compactVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
err = errors.ErrParam
return
}
// long time processing, not block, we can from info stat api get status.
go func() {
log.Infof("compact volume: %d start", vid)
if err = s.store.CompactVolume(int32(vid)); err != nil {
log.Errorf("s.CompactVolume() error(%v)", err)
}
log.Infof("compact volume: %d stop", vid)
}()
return
}
4.新增 Volumn
- 将从本地
Free Volumn中取一个出来进行上线,需要外部传入的参数就是vid,表示新增的volumn id - api说明
- 提取参数
vid store.AddVolume
- 提取参数
func (s *Server) addVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
vid int64
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
//提取vid参数
if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
err = errors.ErrParam
return
}
log.Infof("add volume: %d", vid)
//调用store模块的新增volumn api
_, err = s.store.AddVolume(int32(vid))
return
}
5.新增 Free Volumn
- 新增
Free Volumn相当于初始化一个空的Volumn,设置其bdir和idir以及n数量,但是这个volumn并不会被使用 - api说明
- 提取参数
bdir,idir,n store.AddFreeVolume
- 提取参数
- api实现
func (s *Server) addFreeVolume(wr http.ResponseWriter, r *http.Request) {
var (
err error
sn int
n int64
bdir, idir string
res = map[string]interface{}{}
)
if r.Method != "POST" {
http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
return
}
defer HttpPostWriter(r, wr, time.Now(), &err, res)
//提取bdir何idir
bdir, idir = r.FormValue("bdir"), r.FormValue("idir")
//提取n参数,表示要新增多少个
if n, err = strconv.ParseInt(r.FormValue("n"), 10, 32); err != nil {
log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
err = errors.ErrParam
return
}
log.Infof("add free volume: %d", n)
//调用store模块的新增Free Volumn api
sn, err = s.store.AddFreeVolume(int(n), bdir, idir)
res["succeed"] = sn
return
}