Loading... ## 前置知识 TODO ## Bitcask TODO ## 代码实现 ### 项目结构 ``` |-bitcask |--data |---log_record.go |--index |---btree.go |---index.go ``` ### 数据存放 要把数据存放到磁盘中,就需要通过一个数据结构来记录这个位置信息。 * data/log_record.go ``` // LogRecordPos 数据内存索引,记录数据在磁盘中的位置 type LogRecordPos struct { Fid uint32 // 文件id,表示数据存放在哪个文件中 Offset int64 // 表示将数据存放在了文件中的哪个位置 } ``` ### 索引接口 实现索引接口是为了方便以后使用不同的数据结构作为索引,只要该数据结构实现此接口即可生效,不需要额外改动。 - index/index.go ``` // Indexer 抽象索引接口,如果要实现不同的索引结构,只需要实现该接口即可 type Indexer interface { // Put 往索引中 添加key对应的索引位置信息 Put(key []byte, pos *data.LogRecordPos) bool // Get 根据key获取索引位置信息 Get(key []byte) *data.LogRecordPos // Delete 根据key删除索引位置信息 Delete(key []byte) bool } ``` ### 索引实现 我们使用BTree作为索引结构,google提供了一个BTree库,直接导入即可:`github.com/google/btree`。 在实现索引接口之前,还需要对导入的BTree进行简单的封装: * data/btree.go ``` // BTree 索引数据结构,封装了google的btree库 type BTree struct { tree *btree.BTree lock *sync.RWMutex } // NewBTree 初始化一个BTree对象 func NewBTree() *BTree { return &BTree{ //这里的degree指的是BTree的度,也就是每个节点最多有多少个子节点 tree: btree.New(32), //lock: &sync.RWMutex{}, lock: new(sync.RWMutex), } } ``` 接口实现: * index/btree.go ``` func (bt *BTree) Put(key []byte, pos *data.LogRecordPos) bool { it := &Item{key: key, pos: pos} bt.lock.Lock() bt.tree.ReplaceOrInsert(it) bt.lock.Unlock() return true } func (bt *BTree) Get(key []byte) *data.LogRecordPos { it := &Item{key: key} bt.lock.RLock() value := bt.tree.Get(it) if value == nil { return nil } bt.lock.RUnlock() return value.(*Item).pos } func (bt *BTree) Delete(key []byte) bool { it := &Item{key: key} //会返回被删除的值,如果btree中没有这个值,则返回nil oldItem := bt.tree.Delete(it) if oldItem == nil { return false } return true } ``` 在Put方法中,我们调用了BTree中自带的`ReplaceOrInsert`方法: ``` // ReplaceOrInsert adds the given item to the tree. If an item in the tree // already equals the given one, it is removed from the tree and returned. // Otherwise, nil is returned. // // nil cannot be added to the tree (will panic). func (t *BTree) ReplaceOrInsert(item Item) Item { if item == nil { panic("nil item being added to BTree") } if t.root == nil { t.root = t.cow.newNode() t.root.items = append(t.root.items, item) t.length++ return nil } else { t.root = t.root.mutableFor(t.cow) if len(t.root.items) >= t.maxItems() { item2, second := t.root.split(t.maxItems() / 2) oldroot := t.root t.root = t.cow.newNode() t.root.items = append(t.root.items, item2) t.root.children = append(t.root.children, oldroot, second) } } out := t.root.insert(item, t.maxItems()) if out == nil { t.length++ } return out } ``` 从上面可以看出,这个方法要求入参和出参均是一个`Item`类型,所以我们还需要定义一个`Item`结构体,同时还要求这个结构体实现一个Less方法: ``` // Item represents a single object in the tree. type Item interface { // Less tests whether the current item is less than the given argument. // // This must provide a strict weak ordering. // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only // hold one of either a or b in the tree). Less(than Item) bool } ``` 添加`Item`结构体: * index/index.go ``` type Item struct { key []byte pos *data.LogRecordPos } // Less 实现btree.Item接口的Less方法,因为BTree自带的ReplaceOrInsert方法需要比较key的大小 func (itemA *Item) Less(itemB btree.Item) bool { //.(*Item) 是一个类型断言,它尝试将 itemB 断言为 *Item 类型。 //如果 itemB 确实是 *Item 类型,那么这个断言就会成功,我们就可以访问其 key 字段。 //如果 itemB 不是 *Item 类型,那么这个断言就会失败,程序会抛出一个运行时错误 return bytes.Compare(itemA.key, itemB.(*Item).key) == -1 } ``` ## 单元测试 * PUT ``` func TestBTree_Put(t *testing.T) { bTree := NewBTree() rep1 := bTree.Put(nil, &data.LogRecordPos{Fid: 1, Offset: 10}) assert.True(t, rep1) rep2 := bTree.Put([]byte("a"), &data.LogRecordPos{Fid: 1, Offset: 20}) assert.True(t, rep2) } ``` * GET ``` func TestBTree_Get(t *testing.T) { bTree := NewBTree() rep1 := bTree.Put(nil, &data.LogRecordPos{Fid: 1, Offset: 10}) assert.True(t, rep1) pos1 := bTree.Get(nil) assert.Equal(t, uint32(1),pos1.Fid) assert.Equal(t, int64(10),pos1.Offset) rep2 := bTree.Put([]byte("a"), &data.LogRecordPos{Fid: 1, Offset: 20}) assert.True(t, rep2) rep3 := bTree.Put([]byte("a"), &data.LogRecordPos{Fid: 1, Offset: 21}) assert.True(t, rep3) pos2 := bTree.Get([]byte("a")) assert.Equal(t, uint32(1), pos2.Fid) assert.Equal(t, int64(21), pos2.Offset) } ``` * DELETE ``` func TestBTree_Delete(t *testing.T) { bTree := NewBTree() //part1 rep1 := bTree.Put(nil, &data.LogRecordPos{Fid: 1, Offset: 10}) assert.True(t, rep1) pos1 := bTree.Get(nil) assert.Equal(t, uint32(1), pos1.Fid) assert.Equal(t, int64(10), pos1.Offset) del1 := bTree.Delete(nil) assert.True(t, del1) pos2 := bTree.Get(nil) assert.Nil(t,pos2) //part2 rep2 := bTree.Put([]byte("aa"), &data.LogRecordPos{Fid: 2, Offset: 20}) assert.True(t, rep2) pos3 := bTree.Get([]byte("aa")) assert.Equal(t, uint32(2), pos3.Fid) del2 := bTree.Delete([]byte("aa")) assert.True(t, del2) pos4 := bTree.Get([]byte("aa")) assert.Nil(t, pos4) } ``` ## 踩坑 如果完全按照上面的代码执行,会发现`delete`的单元测试无法通过,更奇怪的是,当单独执行`part1`或`part2`的测试时,又会发现没有问题。 解决方法: 在`Get`方法中,在执行完`btree.Get`方法后应该立刻释放读锁: ``` func (bt *BTree) Get(key []byte) *data.LogRecordPos { it := &Item{key: key} bt.lock.RLock() value := bt.tree.Get(it) bt.lock.RUnlock() if value == nil { return nil } return value.(*Item).pos } ``` 最后修改:2024 年 01 月 20 日 © 允许规范转载 打赏 赞赏作者 支付宝微信 赞 0 如果觉得我的文章对你有用,请随意赞赏
1 条评论
hello