Proxy模块

Server
type server struct {
bfs *bfs.Bfs
bucket *ibucket.Bucket
auth *auth.Auth
c *conf.Config
}
初始化
- 初始化directory、store模块交互Bfs
- bucket管理器
- 权限验证管理器
- 启动对外httpserver
// StartApi init the http module.
func StartApi(c *conf.Config) (err error) {
var s = &server{}
s.c = c
//directory、store模块交互
s.bfs = bfs.New(c)
//bucket管理
if s.bucket, err = ibucket.New(); err != nil {
return
}
//权限验证管理
if s.auth, err = auth.New(c); err != nil {
return
}
go func() {
mux := http.NewServeMux()
//设置请求路由规则
mux.HandleFunc("/", s.do)
mux.HandleFunc("/ping", s.ping)
//启动对外httpserver
server := &http.Server{
Addr: c.HttpAddr,
Handler: mux,
ReadTimeout: _httpServerReadTimeout,
WriteTimeout: _httpServerWriteTimeout,
}
if err := server.ListenAndServe(); err != nil {
return
}
}()
return
}
处理入口
- 判断请求方法,
HEAD,GET,PUT,DELETE,不同的方法设置不同的函数处理指针
- 解析url中的bucket,filename
- 获取bucket详细信息
- 权限验证
- 执行处理函数:
- GET请求,
download
- PUT请求,
upload
- DELETE请求,
delete
type handler func(*ibucket.Item, string, string, http.ResponseWriter, *http.Request)
func (s *server) do(wr http.ResponseWriter, r *http.Request) {
var (
bucket string
file string
token string
status int
err error
h handler
item *ibucket.Item
upload = false
read = false
)
switch r.Method {
case "HEAD", "GET":
h = s.download
read = true
case "PUT":
h = s.upload
upload = true
case "DELETE":
h = s.delete
default:
http.Error(wr, "", http.StatusMethodNotAllowed)
return
}
if bucket, file, status = s.parseURI(r, upload); status != http.StatusOK {
http.Error(wr, "", status)
return
}
//获取bucket详细信息
if item, err = s.bucket.Get(bucket); err != nil {
log.Errorf("bucket.Get(%s) error(%v)", bucket, err)
http.Error(wr, "", http.StatusNotFound)
return
}
// item not public must use authorize
if !item.Public(read) {
token = r.URL.Query().Get("token")
if token == "" {
token = r.Header.Get("Authorization")
}
if err = s.auth.Authorize(item, r.Method, bucket, file, token); err != nil {
log.Errorf("authorize(%s, %s, %s, %s) by item: %v error(%v)", r.Method, bucket, file, token, item, err)
http.Error(wr, "", http.StatusUnauthorized)
return
}
}
h(item, bucket, file, wr, r)
return
}
GET,下载
func (s *server) download(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
var (
mtime int64
ctlen int
mine string
sha1 string
start = time.Now()
src io.ReadCloser
status = http.StatusOK
err error
)
defer httpLog("download", r.URL.Path, &bucket, &file, start, &status, &err)
if src, ctlen, mtime, sha1, mine, err = s.bfs.Get(bucket, file); err == nil {
wr.Header().Set("Content-Length", strconv.Itoa(ctlen))
wr.Header().Set("Content-Type", mine)
wr.Header().Set("Server", "bfs")
wr.Header().Set("Last-Modified", time.Unix(0, mtime).Format(http.TimeFormat))
wr.Header().Set("Etag", sha1)
if src != nil {
if r.Method == "GET" {
io.Copy(wr, src)
}
src.Close()
}
} else {
if err == errors.ErrNeedleNotExist {
status = http.StatusNotFound
} else if err == errors.ErrStoreNotAvailable {
status = http.StatusServiceUnavailable
} else {
status = http.StatusInternalServerError
}
http.Error(wr, "", status)
}
return
}
Bfs.Get
- 拼接url,
http://$directory_host:$directory_port/get
- 发出http请求,返回
MTime, Sha1,Mine,stores,key,cookie,vid
- 遍历store
- 拼接url,
http://$store_host:$store_port/get
- 发起http请求,将从store实例中获取图片
// Get
func (b *Bfs) Get(bucket, filename string) (src io.ReadCloser, ctlen int, mtime int64, sha1, mine string, err error) {
var (
i, ix, l int
uri string
req *http.Request
resp *http.Response
res meta.Response
params = url.Values{}
)
params.Set("bucket", bucket)
params.Set("filename", filename)
uri = fmt.Sprintf(_directoryGetApi, b.c.BfsAddr)
if err = Http("GET", uri, params, nil, &res); err != nil {
log.Errorf("GET called Http error(%v)", err)
return
}
if res.Ret != errors.RetOK {
log.Errorf("http.Get directory res.Ret: %d %s", res.Ret, uri)
if res.Ret == errors.RetNeedleNotExist {
err = errors.ErrNeedleNotExist
} else {
err = errors.ErrInternal
}
return
}
mtime = res.MTime
sha1 = res.Sha1
mine = res.Mine
params = url.Values{}
l = len(res.Stores)
ix = _rand.Intn(l)
for i = 0; i < l; i++ {
params.Set("key", strconv.FormatInt(res.Key, 10))
params.Set("cookie", strconv.FormatInt(int64(res.Cookie), 10))
params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
uri = fmt.Sprintf(_storeGetApi, res.Stores[(ix+i)%l]) + "?" + params.Encode()
if req, err = http.NewRequest("GET", uri, nil); err != nil {
continue
}
td := _timer.Start(5*time.Second, func() {
_canceler(req)
})
if resp, err = _client.Do(req); err != nil {
log.Errorf("_client.do(%s) error(%v)", uri, err)
continue
}
td.Stop()
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
continue
}
src = resp.Body
ctlen = int(resp.ContentLength)
break
}
if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
err = errors.ErrStoreNotAvailable
}
return
}
PUT,上传
// upload upload file.
func (s *server) upload(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
var (
ok bool
body []byte
mine string
location string
sha1sum string
ext string
sha [sha1.Size]byte
err error
uerr errors.Error
status = http.StatusOK
start = time.Now()
)
defer httpLog("upload", r.URL.Path, &bucket, &file, start, &status, &err)
defer retCode(wr, &status)
if mine = r.Header.Get("Content-Type"); mine == "" {
status = http.StatusBadRequest
return
}
if ext = path.Base(mine); ext == "jpeg" {
ext = "jpg"
}
if body, err = ioutil.ReadAll(r.Body); err != nil {
status = http.StatusBadRequest
log.Errorf("ioutil.ReadAll(r.Body) error(%s)", err)
return
}
r.Body.Close()
if len(body) > s.c.MaxFileSize {
status = http.StatusRequestEntityTooLarge
return
}
sha = sha1.Sum(body)
sha1sum = hex.EncodeToString(sha[:])
// if empty filename or endwith "/": dir
if file == "" || strings.HasSuffix(file, "/") {
file += sha1sum + "." + ext
}
if err = s.bfs.Upload(bucket, file, mine, sha1sum, body); err != nil && err != errors.ErrNeedleExist {
if uerr, ok = (err).(errors.Error); ok {
status = int(uerr)
} else {
status = http.StatusInternalServerError
}
return
}
location = s.getURI(bucket, file)
wr.Header().Set("Location", location)
wr.Header().Set("ETag", sha1sum)
return
}
Bfs.Upload
bucket,filename,mine,sha1
- 拼接directory url,
http://$directory_host:$directory_port/upload
- 发起http请求,将记录插入到hbase中
- 对每个store实例,分别拼接url
key,cookie,vid
http://$store_host:$store_port/upload
- 发起http请求,将图片存储到store实例中
// Upload
func (b *Bfs) Upload(bucket, filename, mine, sha1 string, buf []byte) (err error) {
var (
params = url.Values{}
uri string
host string
res meta.Response
sRet meta.StoreRet
)
params.Set("bucket", bucket)
params.Set("filename", filename)
params.Set("mine", mine)
params.Set("sha1", sha1)
uri = fmt.Sprintf(_directoryUploadApi, b.c.BfsAddr)
//发起http请求,将记录插入到hbase中
if err = Http("POST", uri, params, nil, &res); err != nil {
return
}
if res.Ret != errors.RetOK && res.Ret != errors.RetNeedleExist {
log.Errorf("http.Post directory res.Ret: %d %s", res.Ret, uri)
err = errors.ErrInternal
return
}
// same sha1sum.
if strings.HasPrefix(filename, sha1) && res.Ret == errors.RetNeedleExist {
err = errors.ErrNeedleExist
return
}
params = url.Values{}
for _, host = range res.Stores {
params.Set("key", strconv.FormatInt(res.Key, 10))
params.Set("cookie", strconv.FormatInt(int64(res.Cookie), 10))
params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
uri = fmt.Sprintf(_storeUploadApi, host)
if err = Http("POST", uri, params, buf, &sRet); err != nil {
return
}
if sRet.Ret != 1 {
log.Errorf("http.Post store sRet.Ret: %d %s %d %d %d", sRet.Ret, uri, res.Key, res.Cookie, res.Vid)
err = errors.ErrInternal
return
}
}
if res.Ret == errors.RetNeedleExist {
err = errors.ErrNeedleExist
}
log.Infof("bfs.upload bucket:%s filename:%s key:%d cookie:%d vid:%d", bucket, filename, res.Key, res.Cookie, res.Vid)
return
}
DELETE,删除
// delete
func (s *server) delete(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
var (
ok bool
err error
uerr errors.Error
status = http.StatusOK
start = time.Now()
)
defer httpLog("delete", r.URL.Path, &bucket, &file, start, &status, &err)
if err = s.bfs.Delete(bucket, file); err != nil {
if err == errors.ErrNeedleNotExist {
status = http.StatusNotFound
http.Error(wr, "", status)
} else {
if uerr, ok = (err).(errors.Error); ok {
status = int(uerr)
} else {
status = http.StatusInternalServerError
}
}
} else {
wr.Header().Set("Code", strconv.Itoa(status))
}
return
}
Bfs.Delete
- 拼接
http://$directory_host:$directory_port/del
bucket,filename
- 发起请求从hbase中删除
- 拼接
http://$store_host:$store_port/del
key,vid
- 发起请求从store中删除
// Delete
func (b *Bfs) Delete(bucket, filename string) (err error) {
var (
params = url.Values{}
host string
uri string
res meta.Response
sRet meta.StoreRet
)
params.Set("bucket", bucket)
params.Set("filename", filename)
uri = fmt.Sprintf(_directoryDelApi, b.c.BfsAddr)
if err = Http("POST", uri, params, nil, &res); err != nil {
log.Errorf("Delete called Http error(%v)", err)
return
}
if res.Ret != errors.RetOK {
log.Errorf("http.Get directory res.Ret: %d %s", res.Ret, uri)
if res.Ret == errors.RetNeedleNotExist {
err = errors.ErrNeedleNotExist
} else {
err = errors.ErrInternal
}
return
}
params = url.Values{}
for _, host = range res.Stores {
params.Set("key", strconv.FormatInt(res.Key, 10))
params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
uri = fmt.Sprintf(_storeDelApi, host)
if err = Http("POST", uri, params, nil, &sRet); err != nil {
log.Errorf("Update called Http error(%v)", err)
return
}
if sRet.Ret != 1 {
log.Errorf("Delete store sRet.Ret: %d %s", sRet.Ret, uri)
err = errors.ErrInternal
return
}
}
return
}