Binlog分析
pika_binlog.h/cc是Binlog实现的地方,文件中包含两个类Binlog和Version,其中Binlog负责将写命令持久化到本地磁盘中,类似于redis的aof文件,里面存储的就是redis的原生命令。Version文件实际上记录的是当前Binlog正在写的文件编号以及写入的offset。有了Binlog为什么还要有Version?有了Version这个meta文件在主从同步的时候可以实现的非常简单。在主从同步的时候,slave端将本地的Binlog编号和offset发送过来,master端只要和本机的Binlog以及offset比较一下,就可以判断是否需要同步DB文件,以及从哪个Binlog编号对应的哪个offset开始发送相应的数据就可以了。
Binlog数据文件
Binlog文件物理视图,和
leveldb中的log文件存储格式一样,对于一个log文件,会把它切割成以64k为单位的物理Block,每次读取的单位以一个Block作为基本读取单位,所以从物理布局来讲,一个log文件就是由连续的64K大小Block构成的。
在应用的视野里是看不到这些
Block的,应用看到的是一系列的Key/Value对,在Binlog内部,会将一个Key/Value对看做一条记录的数据,另外在这个数据前增加一个记录头,用来记载一些管理信息,以方便内部处理,下面两个图分别是逻辑视图和物理视图。
Binlog逻辑视图
Binlog物理视图 Binlog初始化流程:
- 在
PikaServer构造函数中初始化Binlog句柄,这个句柄作为PikaServer的成员变量,直接用来写Binlog - 判断
${log_path}路径下面是否有manifest文件,manifest文件保存的实际上是Version类序列化的结果- 如果有
manifest文件,读取manifest文件,得到Binlog文件编号以及offset - 如果没有
manifest文件,新建manifest文件,并保存
- 如果有
- 初始化
Binlog文件句柄PosixMmapFile,和Version句柄MmapRWFile类型的文件。
- 在
- Binlog写流程
- 外部调用
Lock加锁 - 外部调用
Put写Binlog- 当前Binlog文件大小是否已经达到最大值,如果已经达到最大值,需要切换Binlog文件
- Binlog文件编号递增,然后初始化句柄,设置
Version句柄 - 调用
InitLogFile初始化
- Binlog文件编号递增,然后初始化句柄,设置
Produce写入文件- 根据
Block大小分别写入kFullType/kFirstType/kMiddleType/kLastType EmitPhysicalRecord最终写入到磁盘文件Version句柄保存当前文件编号,以及offset
- 根据
- 当前Binlog文件大小是否已经达到最大值,如果已经达到最大值,需要切换Binlog文件
- 外部调用
UnLock解锁
- 外部调用
Binlog实现类
//Binlog数据
class Binlog
{
public:
//构造函数,Binlog路径
Binlog(const std::string& Binlog_path, const int file_size = 100 * 1024 * 1024);
~Binlog();
//多线程写Binlog需要加锁
void Lock() { mutex_.Lock(); }
void Unlock() { mutex_.Unlock(); }
//写日志
Status Put(const std::string &item);
Status Put(const char* item, int len);
//得到当前正在写的Binlog文件的信息
Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset);
//设置当前正在写Binlog meta信息
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);
static Status AppendBlank(slash::WritableFile *file, uint64_t len);
slash::WritableFile *queue() { return queue_; }
//最大文件大小
uint64_t file_size() {
return file_size_;
}
std::string filename;
private:
void InitLogFile();
Status EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset);
Status Produce(const Slice &item, int *pro_offset);
Version* version_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;
slash::Mutex mutex_;
uint32_t pro_num_;
int block_offset_;
const std::string binlog_path_;
uint64_t file_size_;
// No copying allowed
Binlog(const Binlog&);
void operator=(const Binlog&);
};
初始化
/*
* Binlog
*/
Binlog::Binlog(const std::string& binlog_path, const int file_size) :
version_(NULL),
queue_(NULL),
versionfile_(NULL),
pro_num_(0),
binlog_path_(binlog_path),
file_size_(file_size) {
Status s;
//创建Binlog文件夹
slash::CreateDir(binlog_path_);
//kBinlogPrefix="write2file"
//BinLog文件名字前缀
filename = binlog_path_ + kBinlogPrefix;
//Binlog Meta文件
//kManifest="manifest"
const std::string manifest = binlog_path_ + kManifest;
std::string profile;
//manifest文件不存在
if (!slash::FileExists(manifest)) {
LOG(INFO) << "Binlog: Manifest file not exist, we create a new one.";
//文件名
profile = NewFileName(filename, pro_num_);
s = slash::NewWritableFile(profile, &queue_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new " << filename << " " << s.ToString();
}
//MmapRWFile
s = slash::NewRWFile(manifest, &versionfile_);
if (!s.ok()) {
LOG(WARNING) << "Binlog: new versionfile error " << s.ToString();
}
//初始化version
version_ = new Version(versionfile_);
version_->StableSave();
} else {
LOG(INFO) << "Binlog: Find the exist file.";
s = slash::NewRWFile(manifest, &versionfile_);
if (s.ok()) {
version_ = new Version(versionfile_);
version_->Init();
pro_num_ = version_->pro_num_;
} else {
LOG(WARNING) << "Binlog: open versionfile error";
}
profile = NewFileName(filename, pro_num_);
DLOG(INFO) << "Binlog: open profile " << profile;
slash::AppendWritableFile(profile, &queue_, version_->pro_offset_);
uint64_t filesize = queue_->Filesize();
DLOG(INFO) << "Binlog: filesize is " << filesize;
}
InitLogFile();
}
`
Binlog写入
//多线程写Binlog的时候需要加锁,保证线程安全
// Note: mutex lock should be held
Status Binlog::Put(const std::string &item) {
Status s;
/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
//当前Binlog大小超过文件最大大小
if (filesize > file_size_) {
delete queue_;
queue_ = NULL;
pro_num_++;
std::string profile = NewFileName(filename, pro_num_);
slash::NewWritableFile(profile, &queue_);
{
slash::RWLock(&(version_->rwlock_), true);
version_->pro_offset_ = 0;
version_->pro_num_ = pro_num_;
version_->StableSave();
}
InitLogFile();
}
int pro_offset;
s = Produce(Slice(item.data(), item.size()), &pro_offset);
if (s.ok()) {
slash::RWLock(&(version_->rwlock_), true);
version_->pro_offset_ = pro_offset;
version_->StableSave();
}
return s;
}
// Note: mutex lock should be held
Status Binlog::Put(const char* item, int len) {
Status s;
/* Check to roll log file */
uint64_t filesize = queue_->Filesize();
if (filesize > file_size_) {
delete queue_;
queue_ = NULL;
pro_num_++;
std::string profile = NewFileName(filename, pro_num_);
slash::NewWritableFile(profile, &queue_);
{
slash::RWLock(&(version_->rwlock_), true);
version_->pro_offset_ = 0;
version_->pro_num_ = pro_num_;
version_->StableSave();
}
InitLogFile();
}
int pro_offset;
s = Produce(Slice(item, len), &pro_offset);
if (s.ok()) {
slash::RWLock(&(version_->rwlock_), true);
version_->pro_offset_ = pro_offset;
version_->StableSave();
}
return s;
}
Status Binlog::EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset) {
Status s;
assert(n <= 0xffffff);
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
char buf[kHeaderSize];
buf[0] = static_cast<char>(n & 0xff);
buf[1] = static_cast<char>((n & 0xff00) >> 8);
buf[2] = static_cast<char>(n >> 16);
buf[3] = static_cast<char>(t);
s = queue_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = queue_->Append(Slice(ptr, n));
if (s.ok()) {
s = queue_->Flush();
}
}
block_offset_ += static_cast<int>(kHeaderSize + n);
*temp_pro_offset += kHeaderSize + n;
return s;
}
Status Binlog::Produce(const Slice &item, int *temp_pro_offset) {
Status s;
const char *ptr = item.data();
size_t left = item.size();
bool begin = true;
//文件内部偏移
*temp_pro_offset = version_->pro_offset_;
do {
const int leftover = static_cast<int>(kBlockSize) - block_offset_;
assert(leftover >= 0);
//当前Block可写入字节不足kHeaderSize
if (static_cast<size_t>(leftover) < kHeaderSize) {
if (leftover > 0) {
queue_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00", leftover));
*temp_pro_offset += leftover;
}
//block内部偏移
block_offset_ = 0;
}
//当前Block除去header还剩多少可以写入的字节数
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
//当前Block实际最终会写入多少数据,如果需要写入的数据小于可写入数据长度,那么这条记录就是kFullType
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
//最终调用写文件接口
s = EmitPhysicalRecord(type, ptr, fragment_length, temp_pro_offset);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
Binlog meta文件
//Meta实现类
class Version {
public:
Version(slash::RWFile *save);
~Version();
Status Init();
// RWLock should be held when access members.
Status StableSave();
//当前正在写Binlog的文件offset
uint64_t pro_offset_;
//当前正在写Binlog的文件的编号
uint32_t pro_num_;
pthread_rwlock_t rwlock_;
private:
slash::RWFile *save_;
// No copying allowed;
Version(const Version&);
void operator=(const Version&);
};
Status Version::StableSave() {
char *p = save_->GetData();
memcpy(p, &pro_offset_, sizeof(uint64_t));
p += 20;
memcpy(p, &pro_num_, sizeof(uint32_t));
return Status::OK();
}
Status Version::Init() {
Status s;
if (save_->GetData() != NULL) {
memcpy((char*)(&pro_offset_), save_->GetData(), sizeof(uint64_t));
memcpy((char*)(&pro_num_), save_->GetData() + 20, sizeof(uint32_t));
return Status::OK();
} else {
return Status::Corruption("version init error");
}
}