Binlog分析

pika_binlog.h/cc是Binlog实现的地方,文件中包含两个类BinlogVersion,其中Binlog负责将写命令持久化到本地磁盘中,类似于redisaof文件,里面存储的就是redis的原生命令。Version文件实际上记录的是当前Binlog正在写的文件编号以及写入的offset。有了Binlog为什么还要有Version?有了Version这个meta文件在主从同步的时候可以实现的非常简单。在主从同步的时候,slave端将本地的Binlog编号和offset发送过来,master端只要和本机的Binlog以及offset比较一下,就可以判断是否需要同步DB文件,以及从哪个Binlog编号对应的哪个offset开始发送相应的数据就可以了。

Binlog数据文件

  • Binlog文件物理视图,和leveldb中的log文件存储格式一样,对于一个log文件,会把它切割成以64k为单位的物理Block,每次读取的单位以一个Block作为基本读取单位,所以从物理布局来讲,一个log文件就是由连续的64K大小Block构成的。

    文件格式

  • 在应用的视野里是看不到这些Block的,应用看到的是一系列的Key/Value对,在Binlog内部,会将一个Key/Value对看做一条记录的数据,另外在这个数据前增加一个记录头,用来记载一些管理信息,以方便内部处理,下面两个图分别是逻辑视图和物理视图。 Record物理视图

    Binlog逻辑视图

    Record逻辑视图

    Binlog物理视图
  • Binlog初始化流程:

    • PikaServer构造函数中初始化Binlog句柄,这个句柄作为PikaServer的成员变量,直接用来写Binlog
    • 判断${log_path}路径下面是否有manifest文件,manifest文件保存的实际上是Version类序列化的结果
      • 如果有manifest文件,读取manifest文件,得到Binlog文件编号以及offset
      • 如果没有manifest文件,新建manifest文件,并保存
    • 初始化Binlog文件句柄PosixMmapFile,和Version句柄MmapRWFile类型的文件。
  • Binlog写流程
    • 外部调用Lock加锁
    • 外部调用Put写Binlog
      • 当前Binlog文件大小是否已经达到最大值,如果已经达到最大值,需要切换Binlog文件
        • Binlog文件编号递增,然后初始化句柄,设置Version句柄
        • 调用InitLogFile初始化
      • Produce写入文件
        • 根据Block大小分别写入kFullType/kFirstType/kMiddleType/kLastType
        • EmitPhysicalRecord最终写入到磁盘文件
        • Version句柄保存当前文件编号,以及offset
    • 外部调用UnLock解锁

Binlog实现类

//Binlog数据
class Binlog
{
 public:
 //构造函数,Binlog路径
  Binlog(const std::string& Binlog_path, const int file_size = 100 * 1024 * 1024);
  ~Binlog();
  //多线程写Binlog需要加锁
  void Lock()         { mutex_.Lock(); }
  void Unlock()       { mutex_.Unlock(); }
  //写日志
  Status Put(const std::string &item);
  Status Put(const char* item, int len);
  //得到当前正在写的Binlog文件的信息
  Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset);
  //设置当前正在写Binlog meta信息
  Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);
  static Status AppendBlank(slash::WritableFile *file, uint64_t len);
  slash::WritableFile *queue() { return queue_; }
  //最大文件大小
  uint64_t file_size() {
    return file_size_;
  }
  std::string filename;
 private:
  void InitLogFile();
  Status EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset);
  Status Produce(const Slice &item, int *pro_offset);
  Version* version_;
  slash::WritableFile *queue_;
  slash::RWFile *versionfile_;
  slash::Mutex mutex_;
  uint32_t pro_num_;
  int block_offset_;
  const std::string binlog_path_;
  uint64_t file_size_;
  // No copying allowed
  Binlog(const Binlog&);
  void operator=(const Binlog&);
};

初始化

/*
 * Binlog
 */
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
    version_(NULL),
    queue_(NULL),
    versionfile_(NULL),
    pro_num_(0),
    binlog_path_(binlog_path),
    file_size_(file_size) {
  Status s;
  //创建Binlog文件夹
  slash::CreateDir(binlog_path_);
  //kBinlogPrefix="write2file"
  //BinLog文件名字前缀
  filename = binlog_path_ + kBinlogPrefix;
  //Binlog Meta文件
  //kManifest="manifest"
  const std::string manifest = binlog_path_ + kManifest;
  std::string profile;

  //manifest文件不存在
  if (!slash::FileExists(manifest)) {
    LOG(INFO) << "Binlog: Manifest file not exist, we create a new one.";
    //文件名
    profile = NewFileName(filename, pro_num_);
    s = slash::NewWritableFile(profile, &queue_);
    if (!s.ok()) {
      LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString();
    }
    //MmapRWFile
    s = slash::NewRWFile(manifest, &versionfile_);
    if (!s.ok()) {
      LOG(WARNING) << "Binlog: new versionfile error " << s.ToString();
    }
    //初始化version
    version_ = new Version(versionfile_);
    version_->StableSave();
  } else {
    LOG(INFO) << "Binlog: Find the exist file.";
    s = slash::NewRWFile(manifest, &versionfile_);
    if (s.ok()) {
      version_ = new Version(versionfile_);
      version_->Init();
      pro_num_ = version_->pro_num_;
    } else {
      LOG(WARNING) << "Binlog: open versionfile error";
    }
    profile = NewFileName(filename, pro_num_);
    DLOG(INFO) << "Binlog: open profile " << profile;
    slash::AppendWritableFile(profile, &queue_, version_->pro_offset_);
    uint64_t filesize = queue_->Filesize();
    DLOG(INFO) << "Binlog: filesize is " << filesize;
  }
  InitLogFile();
}
`

Binlog写入

//多线程写Binlog的时候需要加锁,保证线程安全
// Note: mutex lock should be held
Status Binlog::Put(const std::string &item) {
  Status s;
  /* Check to roll log file */
  uint64_t filesize = queue_->Filesize();
  //当前Binlog大小超过文件最大大小
  if (filesize > file_size_) {
    delete queue_;
    queue_ = NULL;
    pro_num_++;
    std::string profile = NewFileName(filename, pro_num_);
    slash::NewWritableFile(profile, &queue_);
    {
      slash::RWLock(&(version_->rwlock_), true);
      version_->pro_offset_ = 0;
      version_->pro_num_ = pro_num_;
      version_->StableSave();
    }
    InitLogFile();
  }

  int pro_offset;
  s = Produce(Slice(item.data(), item.size()), &pro_offset);
  if (s.ok()) {
    slash::RWLock(&(version_->rwlock_), true);
    version_->pro_offset_ = pro_offset;
    version_->StableSave();
  }
  return s;
}

// Note: mutex lock should be held
Status Binlog::Put(const char* item, int len) {
  Status s;
  /* Check to roll log file */
  uint64_t filesize = queue_->Filesize();
  if (filesize > file_size_) {
    delete queue_;
    queue_ = NULL;
    pro_num_++;
    std::string profile = NewFileName(filename, pro_num_);
    slash::NewWritableFile(profile, &queue_);
    {
      slash::RWLock(&(version_->rwlock_), true);
      version_->pro_offset_ = 0;
      version_->pro_num_ = pro_num_;
      version_->StableSave();
    }
    InitLogFile();
  }
  int pro_offset;
  s = Produce(Slice(item, len), &pro_offset);
  if (s.ok()) {
    slash::RWLock(&(version_->rwlock_), true);
    version_->pro_offset_ = pro_offset;
    version_->StableSave();
  }
  return s;
}

Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset) {
    Status s;
    assert(n <= 0xffffff);
    assert(block_offset_ + kHeaderSize + n <= kBlockSize);
    char buf[kHeaderSize];
    buf[0] = static_cast<char>(n & 0xff);
    buf[1] = static_cast<char>((n & 0xff00) >> 8);
    buf[2] = static_cast<char>(n >> 16);
    buf[3] = static_cast<char>(t);

    s = queue_->Append(Slice(buf, kHeaderSize));
    if (s.ok()) {
        s = queue_->Append(Slice(ptr, n));
        if (s.ok()) {
            s = queue_->Flush();
        }
    }
    block_offset_ += static_cast<int>(kHeaderSize + n);
    *temp_pro_offset += kHeaderSize + n;
    return s;
}

Status Binlog::Produce(const Slice &item, int *temp_pro_offset) {
  Status s;
  const char *ptr = item.data();
  size_t left = item.size();
  bool begin = true;
  //文件内部偏移
  *temp_pro_offset = version_->pro_offset_;
  do {
    const int leftover = static_cast<int>(kBlockSize) - block_offset_;
    assert(leftover >= 0);
    //当前Block可写入字节不足kHeaderSize
    if (static_cast<size_t>(leftover) < kHeaderSize) {
      if (leftover > 0) {
        queue_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00", leftover));
        *temp_pro_offset += leftover;
      }
      //block内部偏移
      block_offset_ = 0;
    }

    //当前Block除去header还剩多少可以写入的字节数
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    //当前Block实际最终会写入多少数据,如果需要写入的数据小于可写入数据长度,那么这条记录就是kFullType
    const size_t fragment_length = (left < avail) ? left : avail;
    RecordType type;
    const bool end = (left == fragment_length);
    if (begin && end) {
      type = kFullType;
    } else if (begin) {
      type = kFirstType;
    } else if (end) {
      type = kLastType;
    } else {
      type = kMiddleType;
    }
    //最终调用写文件接口
    s = EmitPhysicalRecord(type, ptr, fragment_length, temp_pro_offset);
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
  } while (s.ok() && left > 0);
  return s;
}

Binlog meta文件

//Meta实现类
class Version {
 public:
  Version(slash::RWFile *save);
  ~Version();
  Status Init();
  // RWLock should be held when access members.
  Status StableSave();
  //当前正在写Binlog的文件offset
  uint64_t pro_offset_;
  //当前正在写Binlog的文件的编号
  uint32_t pro_num_;
  pthread_rwlock_t rwlock_;
 private:
  slash::RWFile *save_;
  // No copying allowed;
  Version(const Version&);
  void operator=(const Version&);
};

Status Version::StableSave() {
  char *p = save_->GetData();
  memcpy(p, &pro_offset_, sizeof(uint64_t));
  p += 20;
  memcpy(p, &pro_num_, sizeof(uint32_t));
  return Status::OK();
}

Status Version::Init() {
  Status s;
  if (save_->GetData() != NULL) {
    memcpy((char*)(&pro_offset_), save_->GetData(), sizeof(uint64_t));
    memcpy((char*)(&pro_num_), save_->GetData() + 20, sizeof(uint32_t));
    return Status::OK();
  } else {
    return Status::Corruption("version init error");
  }
}

results matching ""

    No results matching ""