Proxy模块

Server

type server struct {
    bfs    *bfs.Bfs
    bucket *ibucket.Bucket
    auth   *auth.Auth
    c      *conf.Config
}

初始化

  • 初始化directory、store模块交互Bfs
  • bucket管理器
  • 权限验证管理器
  • 启动对外httpserver
// StartApi init the http module.
func StartApi(c *conf.Config) (err error) {
    var s = &server{}
    s.c = c
    //directory、store模块交互
    s.bfs = bfs.New(c)
    //bucket管理
    if s.bucket, err = ibucket.New(); err != nil {
        return
    }
    //权限验证管理
    if s.auth, err = auth.New(c); err != nil {
        return
    }
    go func() {
        mux := http.NewServeMux()
        //设置请求路由规则
        mux.HandleFunc("/", s.do)
        mux.HandleFunc("/ping", s.ping)
        //启动对外httpserver
        server := &http.Server{
            Addr:         c.HttpAddr,
            Handler:      mux,
            ReadTimeout:  _httpServerReadTimeout,
            WriteTimeout: _httpServerWriteTimeout,
        }
        if err := server.ListenAndServe(); err != nil {
            return
        }
    }()
    return
}

处理入口

  • 判断请求方法,HEAD,GET,PUT,DELETE,不同的方法设置不同的函数处理指针
  • 解析url中的bucket,filename
  • 获取bucket详细信息
  • 权限验证
  • 执行处理函数:
    • GET请求,download
    • PUT请求,upload
    • DELETE请求,delete
type handler func(*ibucket.Item, string, string, http.ResponseWriter, *http.Request)

func (s *server) do(wr http.ResponseWriter, r *http.Request) {
    var (
        bucket string
        file   string
        token  string
        status int
        err    error
        h      handler
        item   *ibucket.Item
        upload = false
        read   = false
    )
    switch r.Method {
    case "HEAD", "GET":
        h = s.download
        read = true
    case "PUT":
        h = s.upload
        upload = true
    case "DELETE":
        h = s.delete
    default:
        http.Error(wr, "", http.StatusMethodNotAllowed)
        return
    }
    if bucket, file, status = s.parseURI(r, upload); status != http.StatusOK {
        http.Error(wr, "", status)
        return
    }
    //获取bucket详细信息
    if item, err = s.bucket.Get(bucket); err != nil {
        log.Errorf("bucket.Get(%s) error(%v)", bucket, err)
        http.Error(wr, "", http.StatusNotFound)
        return
    }
    // item not public must use authorize
    if !item.Public(read) {
        token = r.URL.Query().Get("token")
        if token == "" {
            token = r.Header.Get("Authorization")
        }
        if err = s.auth.Authorize(item, r.Method, bucket, file, token); err != nil {
            log.Errorf("authorize(%s, %s, %s, %s) by item: %v error(%v)", r.Method, bucket, file, token, item, err)
            http.Error(wr, "", http.StatusUnauthorized)
            return
        }
    }
    h(item, bucket, file, wr, r)
    return
}

GET,下载

  • Bfs.Get
func (s *server) download(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
    var (
        mtime  int64
        ctlen  int
        mine   string
        sha1   string
        start  = time.Now()
        src    io.ReadCloser
        status = http.StatusOK
        err    error
    )
    defer httpLog("download", r.URL.Path, &bucket, &file, start, &status, &err)
    if src, ctlen, mtime, sha1, mine, err = s.bfs.Get(bucket, file); err == nil {
        wr.Header().Set("Content-Length", strconv.Itoa(ctlen))
        wr.Header().Set("Content-Type", mine)
        wr.Header().Set("Server", "bfs")
        wr.Header().Set("Last-Modified", time.Unix(0, mtime).Format(http.TimeFormat))
        wr.Header().Set("Etag", sha1)
        if src != nil {
            if r.Method == "GET" {
                io.Copy(wr, src)
            }
            src.Close()
        }
    } else {
        if err == errors.ErrNeedleNotExist {
            status = http.StatusNotFound
        } else if err == errors.ErrStoreNotAvailable {
            status = http.StatusServiceUnavailable
        } else {
            status = http.StatusInternalServerError
        }
        http.Error(wr, "", status)
    }
    return
}

Bfs.Get

  • 拼接url,http://$directory_host:$directory_port/get
  • 发出http请求,返回MTime, Sha1,Mine,storeskey,cookie,vid
  • 遍历store
    • 拼接url,http://$store_host:$store_port/get
    • 发起http请求,将从store实例中获取图片
// Get
func (b *Bfs) Get(bucket, filename string) (src io.ReadCloser, ctlen int, mtime int64, sha1, mine string, err error) {
    var (
        i, ix, l int
        uri      string
        req      *http.Request
        resp     *http.Response
        res      meta.Response
        params   = url.Values{}
    )
    params.Set("bucket", bucket)
    params.Set("filename", filename)
    uri = fmt.Sprintf(_directoryGetApi, b.c.BfsAddr)
    if err = Http("GET", uri, params, nil, &res); err != nil {
        log.Errorf("GET called Http error(%v)", err)
        return
    }
    if res.Ret != errors.RetOK {
        log.Errorf("http.Get directory res.Ret: %d %s", res.Ret, uri)
        if res.Ret == errors.RetNeedleNotExist {
            err = errors.ErrNeedleNotExist
        } else {
            err = errors.ErrInternal
        }
        return
    }
    mtime = res.MTime
    sha1 = res.Sha1
    mine = res.Mine
    params = url.Values{}
    l = len(res.Stores)
    ix = _rand.Intn(l)
    for i = 0; i < l; i++ {
        params.Set("key", strconv.FormatInt(res.Key, 10))
        params.Set("cookie", strconv.FormatInt(int64(res.Cookie), 10))
        params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
        uri = fmt.Sprintf(_storeGetApi, res.Stores[(ix+i)%l]) + "?" + params.Encode()
        if req, err = http.NewRequest("GET", uri, nil); err != nil {
            continue
        }
        td := _timer.Start(5*time.Second, func() {
            _canceler(req)
        })
        if resp, err = _client.Do(req); err != nil {
            log.Errorf("_client.do(%s) error(%v)", uri, err)
            continue
        }
        td.Stop()
        if resp.StatusCode != http.StatusOK {
            resp.Body.Close()
            continue
        }
        src = resp.Body
        ctlen = int(resp.ContentLength)
        break
    }
    if err == nil && resp.StatusCode == http.StatusServiceUnavailable {
        err = errors.ErrStoreNotAvailable
    }
    return
}

PUT,上传

  • Bfs.upload
// upload upload file.
func (s *server) upload(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
    var (
        ok       bool
        body     []byte
        mine     string
        location string
        sha1sum  string
        ext      string
        sha      [sha1.Size]byte
        err      error
        uerr     errors.Error
        status   = http.StatusOK
        start    = time.Now()
    )
    defer httpLog("upload", r.URL.Path, &bucket, &file, start, &status, &err)
    defer retCode(wr, &status)
    if mine = r.Header.Get("Content-Type"); mine == "" {
        status = http.StatusBadRequest
        return
    }
    if ext = path.Base(mine); ext == "jpeg" {
        ext = "jpg"
    }
    if body, err = ioutil.ReadAll(r.Body); err != nil {
        status = http.StatusBadRequest
        log.Errorf("ioutil.ReadAll(r.Body) error(%s)", err)
        return
    }
    r.Body.Close()
    if len(body) > s.c.MaxFileSize {
        status = http.StatusRequestEntityTooLarge
        return
    }
    sha = sha1.Sum(body)
    sha1sum = hex.EncodeToString(sha[:])
    // if empty filename or endwith "/": dir
    if file == "" || strings.HasSuffix(file, "/") {
        file += sha1sum + "." + ext
    }
    if err = s.bfs.Upload(bucket, file, mine, sha1sum, body); err != nil && err != errors.ErrNeedleExist {
        if uerr, ok = (err).(errors.Error); ok {
            status = int(uerr)
        } else {
            status = http.StatusInternalServerError
        }
        return
    }
    location = s.getURI(bucket, file)
    wr.Header().Set("Location", location)
    wr.Header().Set("ETag", sha1sum)
    return
}

Bfs.Upload

  • bucket,filename,mine,sha1
  • 拼接directory url,http://$directory_host:$directory_port/upload
  • 发起http请求,将记录插入到hbase中
  • 对每个store实例,分别拼接url
    • key,cookie,vid
    • http://$store_host:$store_port/upload
    • 发起http请求,将图片存储到store实例中
// Upload
func (b *Bfs) Upload(bucket, filename, mine, sha1 string, buf []byte) (err error) {
    var (
        params = url.Values{}
        uri    string
        host   string
        res    meta.Response
        sRet   meta.StoreRet
    )
    params.Set("bucket", bucket)
    params.Set("filename", filename)
    params.Set("mine", mine)
    params.Set("sha1", sha1)
    uri = fmt.Sprintf(_directoryUploadApi, b.c.BfsAddr)
    //发起http请求,将记录插入到hbase中
    if err = Http("POST", uri, params, nil, &res); err != nil {
        return
    }
    if res.Ret != errors.RetOK && res.Ret != errors.RetNeedleExist {
        log.Errorf("http.Post directory res.Ret: %d %s", res.Ret, uri)
        err = errors.ErrInternal
        return
    }
    // same sha1sum.
    if strings.HasPrefix(filename, sha1) && res.Ret == errors.RetNeedleExist {
        err = errors.ErrNeedleExist
        return
    }

    params = url.Values{}
    for _, host = range res.Stores {
        params.Set("key", strconv.FormatInt(res.Key, 10))
        params.Set("cookie", strconv.FormatInt(int64(res.Cookie), 10))
        params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
        uri = fmt.Sprintf(_storeUploadApi, host)
        if err = Http("POST", uri, params, buf, &sRet); err != nil {
            return
        }
        if sRet.Ret != 1 {
            log.Errorf("http.Post store sRet.Ret: %d  %s %d %d %d", sRet.Ret, uri, res.Key, res.Cookie, res.Vid)
            err = errors.ErrInternal
            return
        }
    }
    if res.Ret == errors.RetNeedleExist {
        err = errors.ErrNeedleExist
    }
    log.Infof("bfs.upload bucket:%s filename:%s key:%d cookie:%d vid:%d", bucket, filename, res.Key, res.Cookie, res.Vid)
    return
}

DELETE,删除

  • Bfs.Delete
// delete
func (s *server) delete(item *ibucket.Item, bucket, file string, wr http.ResponseWriter, r *http.Request) {
    var (
        ok     bool
        err    error
        uerr   errors.Error
        status = http.StatusOK
        start  = time.Now()
    )
    defer httpLog("delete", r.URL.Path, &bucket, &file, start, &status, &err)
    if err = s.bfs.Delete(bucket, file); err != nil {
        if err == errors.ErrNeedleNotExist {
            status = http.StatusNotFound
            http.Error(wr, "", status)
        } else {
            if uerr, ok = (err).(errors.Error); ok {
                status = int(uerr)
            } else {
                status = http.StatusInternalServerError
            }
        }
    } else {
        wr.Header().Set("Code", strconv.Itoa(status))
    }
    return
}

Bfs.Delete

  • 拼接http://$directory_host:$directory_port/del
  • bucket,filename
  • 发起请求从hbase中删除
  • 拼接http://$store_host:$store_port/del
  • key,vid
  • 发起请求从store中删除
// Delete
func (b *Bfs) Delete(bucket, filename string) (err error) {
    var (
        params = url.Values{}
        host   string
        uri    string
        res    meta.Response
        sRet   meta.StoreRet
    )
    params.Set("bucket", bucket)
    params.Set("filename", filename)
    uri = fmt.Sprintf(_directoryDelApi, b.c.BfsAddr)
    if err = Http("POST", uri, params, nil, &res); err != nil {
        log.Errorf("Delete called Http error(%v)", err)
        return
    }
    if res.Ret != errors.RetOK {
        log.Errorf("http.Get directory res.Ret: %d %s", res.Ret, uri)
        if res.Ret == errors.RetNeedleNotExist {
            err = errors.ErrNeedleNotExist
        } else {
            err = errors.ErrInternal
        }
        return
    }

    params = url.Values{}
    for _, host = range res.Stores {
        params.Set("key", strconv.FormatInt(res.Key, 10))
        params.Set("vid", strconv.FormatInt(int64(res.Vid), 10))
        uri = fmt.Sprintf(_storeDelApi, host)
        if err = Http("POST", uri, params, nil, &sRet); err != nil {
            log.Errorf("Update called Http error(%v)", err)
            return
        }
        if sRet.Ret != 1 {
            log.Errorf("Delete store sRet.Ret: %d  %s", sRet.Ret, uri)
            err = errors.ErrInternal
            return
        }
    }
    return
}

results matching ""

    No results matching ""