Store Http Server初始化

API Server初始化

  • 设置http路由函数
// StartApi start api http listen.
func StartApi(addr string, s *Server) {
    go func() {
        var (
            err      error
            serveMux = http.NewServeMux()
        )
        //下载路由
        serveMux.HandleFunc("/get", s.get)
        //上传路由
        serveMux.HandleFunc("/upload", s.upload)
        //批量上传路由
        serveMux.HandleFunc("/uploads", s.uploads)
        //删除路由
        serveMux.HandleFunc("/del", s.del)
        if err = http.ListenAndServe(addr, serveMux); err != nil {
            log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
            return
        }
    }()
    return
}

1.下载api实现

  • 下载图片的时候,需要提供vidkeycookie,vid用来定位volume所在的文件,key是一个用来定位needle所在的文件中的offset,cookie出于安全性考虑,做验证用的。
  • api说明
    • 提取vid,key,cookie参数
    • vidstore.Volumes搜索到volume.Volume
    • volume.Volume调用Read返回needle.Needle,根据keycookie读取needle
    • needle.Needle返回给客户端
      func (s *Server) get(wr http.ResponseWriter, r *http.Request) {
      var (
          v                *volume.Volume
          n                *needle.Needle
          err              error
          vid, key, cookie int64
          ret              = http.StatusOK
          params           = r.URL.Query()
          now              = time.Now()
      )
      if r.Method != "GET" && r.Method != "HEAD" {
          ret = http.StatusMethodNotAllowed
          http.Error(wr, "method not allowed", ret)
          return
      }
      defer HttpGetWriter(r, wr, now, &err, &ret)
      if !s.rl.Allow() {
          ret = http.StatusServiceUnavailable
          return
      }
      //提取vid
      if vid, err = strconv.ParseInt(params.Get("vid"), 10, 32); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("vid"), err)
          ret = http.StatusBadRequest
          return
      }
      //提取key
      if key, err = strconv.ParseInt(params.Get("key"), 10, 64); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("key"), err)
          ret = http.StatusBadRequest
          return
      }
      //提取 cookie
      if cookie, err = strconv.ParseInt(params.Get("cookie"), 10, 32); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("cookie"), err)
          ret = http.StatusBadRequest
          return
      }
      //搜索对应的volumn
      if v = s.store.Volumes[int32(vid)]; v != nil {
          //根据key和cookie读取needle
          if n, err = v.Read(key, int32(cookie)); err == nil {
              wr.Header().Set("Content-Length", strconv.Itoa(len(n.Data)))
              //返回客户端
              if _, err = wr.Write(n.Data); err != nil {
                  log.Errorf("wr.Write() error(%v)", err)
                  err = nil // avoid HttpGetWriter write header twice
              }
              n.Close()
          } else {
              if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
                  ret = http.StatusNotFound
              } else {
                  ret = http.StatusInternalServerError
              }
          }
      } else {
          ret = http.StatusNotFound
          err = errors.ErrVolumeNotExist
      }
      return
      }
      

2.上传API实现

  • 上传图片的时候需要提供vidkeycookie
  • api说明
    • 提取请求中的vidkeycookie参数
    • vidstore.Volumes搜索到volume.Volume
    • needle.NewWriter新建一个Needle
    • Needle从post参数中读取图片二进制数据
    • volume.Volume写入Needle
  • api实现
    func (s *Server) upload(wr http.ResponseWriter, r *http.Request) {
      var (
          vid    int64
          key    int64
          cookie int64
          size   int64
          err    error
          str    string
          v      *volume.Volume
          n      *needle.Needle
          file   multipart.File
          res    = map[string]interface{}{}
      )
      if r.Method != "POST" {
          http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
          return
      }
      defer HttpPostWriter(r, wr, time.Now(), &err, res)
      if !s.wl.Allow() {
          err = errors.ErrServiceUnavailable
          return
      }
      if err = checkContentLength(r, s.conf.NeedleMaxSize); err != nil {
          return
      }
      //提取vid
      str = r.FormValue("vid")
      if vid, err = strconv.ParseInt(str, 10, 32); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
          err = errors.ErrParam
          return
      }
      //提取key
      str = r.FormValue("key")
      if key, err = strconv.ParseInt(str, 10, 64); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
          err = errors.ErrParam
          return
      }
      //提取cookie
      str = r.FormValue("cookie")
      if cookie, err = strconv.ParseInt(str, 10, 32); err != nil {
          log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
          err = errors.ErrParam
          return
      }
      if file, _, err = r.FormFile("file"); err != nil {
          log.Errorf("r.FormFile() error(%v)", err)
          err = errors.ErrInternal
          return
      }
      if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil {
          //查找volumn
          if v = s.store.Volumes[int32(vid)]; v != nil {
              //创建needle
              n = needle.NewWriter(key, int32(cookie), int32(size))
              //读取图片数据
              if err = n.ReadFrom(file); err == nil {
                  //volumn写入needle
                  err = v.Write(n)
              }
              n.Close()
          } else {
              err = errors.ErrVolumeNotExist
          }
      }
      file.Close()
      return
    }
    

3.批量上传API实现

func (s *Server) uploads(wr http.ResponseWriter, r *http.Request) {
    var (
        i, nn   int
        err     error
        vid     int64
        key     int64
        cookie  int64
        size    int64
        str     string
        keys    []string
        cookies []string
        v       *volume.Volume
        file    multipart.File
        fh      *multipart.FileHeader
        fhs     []*multipart.FileHeader
        ns      *needle.Needles
        res     = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    if !s.wl.Allow() {
        err = errors.ErrServiceUnavailable
        return
    }
    str = r.FormValue("vid")
    if vid, err = strconv.ParseInt(str, 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
        err = errors.ErrParam
        return
    }
    keys = r.MultipartForm.Value["keys"]
    cookies = r.MultipartForm.Value["cookies"]
    if len(keys) != len(cookies) {
        log.Errorf("param length not match, keys: %d, cookies: %d", len(keys), len(cookies))
        err = errors.ErrParam
        return
    }
    fhs = r.MultipartForm.File["file"]
    nn = len(fhs)
    if len(keys) != nn {
        log.Errorf("param length not match, keys: %d, cookies: %d, files: %d", len(keys), len(cookies), len(fhs))
        err = errors.ErrParam
        return
    }
    ns = needle.NewNeedles(nn)
    for i, fh = range fhs {
        if key, err = strconv.ParseInt(keys[i], 10, 64); err != nil {
            log.Errorf("strconv.ParseInt(\"%s\") error(%v)", keys[i], err)
            err = errors.ErrParam
            break
        }
        if cookie, err = strconv.ParseInt(cookies[i], 10, 32); err != nil {
            log.Errorf("strconv.ParseInt(\"%s\") error(%v)", cookies[i], err)
            err = errors.ErrParam
            break
        }
        if file, err = fh.Open(); err != nil {
            log.Errorf("fh.Open() error(%v)", err)
            break
        }
        if size, err = checkFileSize(file, s.conf.NeedleMaxSize); err == nil {
            err = ns.ReadFrom(key, int32(cookie), int32(size), file)
        }
        file.Close()
        if err != nil {
            break
        }
    }
    if err == nil {
        if v = s.store.Volumes[int32(vid)]; v != nil {
            err = v.Writes(ns)
        } else {
            err = errors.ErrVolumeNotExist
        }
    }
    ns.Close()
    return
}

4.删除API实现

删除图片的时候需要提供vidkey

func (s *Server) del(wr http.ResponseWriter, r *http.Request) {
    var (
        err      error
        key, vid int64
        str      string
        v        *volume.Volume
        res      = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    if !s.dl.Allow() {
        err = errors.ErrServiceUnavailable
        return
    }
    //提取key
    str = r.PostFormValue("key")
    if key, err = strconv.ParseInt(str, 10, 64); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
        err = errors.ErrParam
        return
    }
    //提取vid
    str = r.PostFormValue("vid")
    if vid, err = strconv.ParseInt(str, 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", str, err)
        err = errors.ErrParam
        return
    }
    //找到vid对应的volumn
    if v = s.store.Volumes[int32(vid)]; v != nil {
        //volumn删除key
        err = v.Delete(key)
    } else {
        err = errors.ErrVolumeNotExist
    }
    return
}

Admin Server初始化

  • 新增Free Volume
  • 新增Volume
  • 复制Volume
  • 压缩Volume
// StartAdmin start admin http listen.
func StartAdmin(addr string, s *Server) {
    go func() {
        var (
            err      error
            serveMux = http.NewServeMux()
        )
        //探测API
        serveMux.HandleFunc("/probe", s.probe)
        //复制 volumn id到本机
        serveMux.HandleFunc("/bulk_volume", s.bulkVolume)
        //对volumn进行compact
        serveMux.HandleFunc("/compact_volume", s.compactVolume)
        //新增volumn
        serveMux.HandleFunc("/add_volume", s.addVolume)
        //新增 free volumn
        serveMux.HandleFunc("/add_free_volume", s.addFreeVolume)
        if err = http.ListenAndServe(addr, serveMux); err != nil {
            log.Errorf("http.ListenAndServe(\"%s\") error(%v)", addr, err)
            return
        }
    }()
    return
}

1.探测API

func (s *Server) probe(wr http.ResponseWriter, r *http.Request) {
    var (
        v      *volume.Volume
        err    error
        vid    int64
        ret    = http.StatusOK
        params = r.URL.Query()
        now    = time.Now()
    )
    if r.Method != "HEAD" {
        ret = http.StatusMethodNotAllowed
        http.Error(wr, "method not allowed", ret)
        return
    }
    defer HttpGetWriter(r, wr, now, &err, &ret)
    if vid, err = strconv.ParseInt(params.Get("vid"), 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", params.Get("vid"), err)
        ret = http.StatusBadRequest
        return
    }
    if v = s.store.Volumes[int32(vid)]; v != nil {
        if err = v.Probe(); err != nil {
            if err == errors.ErrNeedleDeleted || err == errors.ErrNeedleNotExist {
                ret = http.StatusNotFound
            } else {
                ret = http.StatusInternalServerError
            }
        }
    } else {
        ret = http.StatusNotFound
        err = errors.ErrVolumeNotExist
    }
    return
}

2.复制Volumn API

func (s *Server) bulkVolume(wr http.ResponseWriter, r *http.Request) {
    var (
        err          error
        vid          int64
        bfile, ifile string
        res          = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    bfile = r.FormValue("bfile")
    ifile = r.FormValue("ifile")
    if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
        err = errors.ErrParam
        return
    }
    go func() {
        log.Infof("bulk volume: %d start", vid)
        err = s.store.BulkVolume(int32(vid), bfile, ifile)
        log.Infof("bulk volume: %d stop", vid)
    }()
    return
}

3.Compact Volumn API

func (s *Server) compactVolume(wr http.ResponseWriter, r *http.Request) {
    var (
        err error
        vid int64
        res = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
        err = errors.ErrParam
        return
    }
    // long time processing, not block, we can from info stat api get status.
    go func() {
        log.Infof("compact volume: %d start", vid)
        if err = s.store.CompactVolume(int32(vid)); err != nil {
            log.Errorf("s.CompactVolume() error(%v)", err)
        }
        log.Infof("compact volume: %d stop", vid)
    }()
    return
}

4.新增 Volumn

  • 将从本地Free Volumn中取一个出来进行上线,需要外部传入的参数就是vid,表示新增的volumn id
  • api说明
    • 提取参数vid
    • store.AddVolume
func (s *Server) addVolume(wr http.ResponseWriter, r *http.Request) {
    var (
        err error
        vid int64
        res = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    //提取vid参数
    if vid, err = strconv.ParseInt(r.FormValue("vid"), 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
        err = errors.ErrParam
        return
    }
    log.Infof("add volume: %d", vid)
    //调用store模块的新增volumn api
    _, err = s.store.AddVolume(int32(vid))
    return
}

5.新增 Free Volumn

  • 新增Free Volumn相当于初始化一个空的Volumn,设置其bdiridir以及n数量,但是这个volumn并不会被使用
  • api说明
    • 提取参数bdir,idir,n
    • store.AddFreeVolume
  • api实现
func (s *Server) addFreeVolume(wr http.ResponseWriter, r *http.Request) {
    var (
        err        error
        sn         int
        n          int64
        bdir, idir string
        res        = map[string]interface{}{}
    )
    if r.Method != "POST" {
        http.Error(wr, "method not allowed", http.StatusMethodNotAllowed)
        return
    }
    defer HttpPostWriter(r, wr, time.Now(), &err, res)
    //提取bdir何idir
    bdir, idir = r.FormValue("bdir"), r.FormValue("idir")
    //提取n参数,表示要新增多少个
    if n, err = strconv.ParseInt(r.FormValue("n"), 10, 32); err != nil {
        log.Errorf("strconv.ParseInt(\"%s\") error(%v)", r.FormValue("vid"), err)
        err = errors.ErrParam
        return
    }
    log.Infof("add free volume: %d", n)
    //调用store模块的新增Free Volumn api
    sn, err = s.store.AddFreeVolume(int(n), bdir, idir)
    res["succeed"] = sn
    return
}

results matching ""

    No results matching ""