6 Star 54 Fork 7

roseduan/rosedb

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
db.go 19.77 KB
一键复制 编辑 原始数据 按行查看 历史
roseduan 提交于 2024-07-06 14:55 . fix concurrent read
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
package rosedb
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"sync"
"time"
"github.com/bwmarrin/snowflake"
"github.com/gofrs/flock"
"github.com/robfig/cron/v3"
"github.com/rosedblabs/rosedb/v2/index"
"github.com/rosedblabs/rosedb/v2/utils"
"github.com/rosedblabs/wal"
)
const (
fileLockName = "FLOCK"
dataFileNameSuffix = ".SEG"
hintFileNameSuffix = ".HINT"
mergeFinNameSuffix = ".MERGEFIN"
)
// DB represents a ROSEDB database instance.
// It is built on the bitcask model, which is a log-structured storage.
// It uses WAL to write data, and uses an in-memory index to store the key
// and the position of the data in the WAL,
// the index will be rebuilt when the database is opened.
//
// The main advantage of ROSEDB is that it is very fast to write, read, and delete data.
// Because it only needs one disk IO to complete a single operation.
//
// But since we should store all keys and their positions(index) in memory,
// our total data size is limited by the memory size.
//
// So if your memory can almost hold all the keys, ROSEDB is the perfect storage engine for you.
type DB struct {
dataFiles *wal.WAL // data files are a sets of segment files in WAL.
hintFile *wal.WAL // hint file is used to store the key and the position for fast startup.
index index.Indexer
options Options
fileLock *flock.Flock
mu sync.RWMutex
closed bool
mergeRunning uint32 // indicate if the database is merging
batchPool sync.Pool
recordPool sync.Pool
encodeHeader []byte
watchCh chan *Event // user consume channel for watch events
watcher *Watcher
expiredCursorKey []byte // the location to which DeleteExpiredKeys executes.
cronScheduler *cron.Cron // cron scheduler for auto merge task
}
// Stat represents the statistics of the database.
type Stat struct {
// Total number of keys
KeysNum int
// Total disk size of database directory
DiskSize int64
}
// Open a database with the specified options.
// If the database directory does not exist, it will be created automatically.
//
// Multiple processes can not use the same database directory at the same time,
// otherwise it will return ErrDatabaseIsUsing.
//
// It will open the wal files in the database directory and load the index from them.
// Return the DB instance, or an error if any.
func Open(options Options) (*DB, error) {
// check options
if err := checkOptions(options); err != nil {
return nil, err
}
// create data directory if not exist
if _, err := os.Stat(options.DirPath); err != nil {
if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {
return nil, err
}
}
// create file lock, prevent multiple processes from using the same database directory
fileLock := flock.New(filepath.Join(options.DirPath, fileLockName))
hold, err := fileLock.TryLock()
if err != nil {
return nil, err
}
if !hold {
return nil, ErrDatabaseIsUsing
}
// load merge files if exists
if err = loadMergeFiles(options.DirPath); err != nil {
return nil, err
}
// init DB instance
db := &DB{
index: index.NewIndexer(),
options: options,
fileLock: fileLock,
batchPool: sync.Pool{New: newBatch},
recordPool: sync.Pool{New: newRecord},
encodeHeader: make([]byte, maxLogRecordHeaderSize),
}
// open data files
if db.dataFiles, err = db.openWalFiles(); err != nil {
return nil, err
}
// load index
if err = db.loadIndex(); err != nil {
return nil, err
}
// enable watch
if options.WatchQueueSize > 0 {
db.watchCh = make(chan *Event, 100)
db.watcher = NewWatcher(options.WatchQueueSize)
// run a goroutine to synchronize event information
go db.watcher.sendEvent(db.watchCh)
}
// enable auto merge task
if len(options.AutoMergeCronExpr) > 0 {
db.cronScheduler = cron.New(
cron.WithParser(
cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour |
cron.Dom | cron.Month | cron.Dow | cron.Descriptor),
),
)
_, err = db.cronScheduler.AddFunc(options.AutoMergeCronExpr, func() {
// maybe we should deal with different errors with different logic, but a background task can't omit its error.
// after auto merge, we should close and reopen the db.
_ = db.Merge(true)
})
if err != nil {
return nil, err
}
db.cronScheduler.Start()
}
return db, nil
}
func (db *DB) openWalFiles() (*wal.WAL, error) {
// open data files from WAL
walFiles, err := wal.Open(wal.Options{
DirPath: db.options.DirPath,
SegmentSize: db.options.SegmentSize,
SegmentFileExt: dataFileNameSuffix,
Sync: db.options.Sync,
BytesPerSync: db.options.BytesPerSync,
})
if err != nil {
return nil, err
}
return walFiles, nil
}
func (db *DB) loadIndex() error {
// load index frm hint file
if err := db.loadIndexFromHintFile(); err != nil {
return err
}
// load index from data files
if err := db.loadIndexFromWAL(); err != nil {
return err
}
return nil
}
// Close the database, close all data files and release file lock.
// Set the closed flag to true.
// The DB instance cannot be used after closing.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
if err := db.closeFiles(); err != nil {
return err
}
// release file lock
if err := db.fileLock.Unlock(); err != nil {
return err
}
// close watch channel
if db.options.WatchQueueSize > 0 {
close(db.watchCh)
}
// close auto merge cron scheduler
if db.cronScheduler != nil {
db.cronScheduler.Stop()
}
db.closed = true
return nil
}
// closeFiles close all data files and hint file
func (db *DB) closeFiles() error {
// close wal
if err := db.dataFiles.Close(); err != nil {
return err
}
// close hint file if exists
if db.hintFile != nil {
if err := db.hintFile.Close(); err != nil {
return err
}
}
return nil
}
// Sync all data files to the underlying storage.
func (db *DB) Sync() error {
db.mu.Lock()
defer db.mu.Unlock()
return db.dataFiles.Sync()
}
// Stat returns the statistics of the database.
func (db *DB) Stat() *Stat {
db.mu.Lock()
defer db.mu.Unlock()
diskSize, err := utils.DirSize(db.options.DirPath)
if err != nil {
panic(fmt.Sprintf("rosedb: get database directory size error: %v", err))
}
return &Stat{
KeysNum: db.index.Size(),
DiskSize: diskSize,
}
}
// Put a key-value pair into the database.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one Put operation.
func (db *DB) Put(key []byte, value []byte) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single put operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.Put(key, value); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
// PutWithTTL a key-value pair into the database, with a ttl.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one PutWithTTL operation.
func (db *DB) PutWithTTL(key []byte, value []byte, ttl time.Duration) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single put operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.PutWithTTL(key, value, ttl); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
// Get the value of the specified key from the database.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one Get operation.
func (db *DB) Get(key []byte) ([]byte, error) {
batch := db.batchPool.Get().(*Batch)
batch.init(true, false, db)
defer func() {
_ = batch.Commit()
batch.reset()
db.batchPool.Put(batch)
}()
return batch.Get(key)
}
// Delete the specified key from the database.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one Delete operation.
func (db *DB) Delete(key []byte) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single delete operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.Delete(key); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
// Exist checks if the specified key exists in the database.
// Actually, it will open a new batch and commit it.
// You can think the batch has only one Exist operation.
func (db *DB) Exist(key []byte) (bool, error) {
batch := db.batchPool.Get().(*Batch)
batch.init(true, false, db)
defer func() {
_ = batch.Commit()
batch.reset()
db.batchPool.Put(batch)
}()
return batch.Exist(key)
}
// Expire sets the ttl of the key.
func (db *DB) Expire(key []byte, ttl time.Duration) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single expire operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.Expire(key, ttl); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
// TTL get the ttl of the key.
func (db *DB) TTL(key []byte) (time.Duration, error) {
batch := db.batchPool.Get().(*Batch)
batch.init(true, false, db)
defer func() {
_ = batch.Commit()
batch.reset()
db.batchPool.Put(batch)
}()
return batch.TTL(key)
}
// Persist removes the ttl of the key.
// If the key does not exist or expired, it will return ErrKeyNotFound.
func (db *DB) Persist(key []byte) error {
batch := db.batchPool.Get().(*Batch)
defer func() {
batch.reset()
db.batchPool.Put(batch)
}()
// This is a single persist operation, we can set Sync to false.
// Because the data will be written to the WAL,
// and the WAL file will be synced to disk according to the DB options.
batch.init(false, false, db)
if err := batch.Persist(key); err != nil {
_ = batch.Rollback()
return err
}
return batch.Commit()
}
func (db *DB) Watch() (<-chan *Event, error) {
if db.options.WatchQueueSize <= 0 {
return nil, ErrWatchDisabled
}
return db.watchCh, nil
}
// Ascend calls handleFn for each key/value pair in the db in ascending order.
func (db *DB) Ascend(handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.Ascend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// AscendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in ascending order.
func (db *DB) AscendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.AscendRange(startKey, endKey, func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// AscendGreaterOrEqual calls handleFn for each key/value pair in the db with keys greater than or equal to the given key.
func (db *DB) AscendGreaterOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.AscendGreaterOrEqual(key, func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// AscendKeys calls handleFn for each key in the db in ascending order.
// Since our expiry time is stored in the value, if you want to filter expired keys,
// you need to set parameter filterExpired to true. But the performance will be affected.
// Because we need to read the value of each key to determine if it is expired.
func (db *DB) AscendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
var reg *regexp.Regexp
if len(pattern) > 0 {
reg = regexp.MustCompile(string(pattern))
}
db.index.Ascend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if reg == nil || reg.Match(key) {
var invalid bool
if filterExpired {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value == nil {
invalid = true
}
}
if invalid {
return true, nil
}
return handleFn(key)
}
return true, nil
})
}
// Descend calls handleFn for each key/value pair in the db in descending order.
func (db *DB) Descend(handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.Descend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// DescendRange calls handleFn for each key/value pair in the db within the range [startKey, endKey] in descending order.
func (db *DB) DescendRange(startKey, endKey []byte, handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.DescendRange(startKey, endKey, func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// DescendLessOrEqual calls handleFn for each key/value pair in the db with keys less than or equal to the given key.
func (db *DB) DescendLessOrEqual(key []byte, handleFn func(k []byte, v []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
db.index.DescendLessOrEqual(key, func(key []byte, pos *wal.ChunkPosition) (bool, error) {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, nil
}
if value := db.checkValue(chunk); value != nil {
return handleFn(key, value)
}
return true, nil
})
}
// DescendKeys calls handleFn for each key in the db in descending order.
// Since our expiry time is stored in the value, if you want to filter expired keys,
// you need to set parameter filterExpired to true. But the performance will be affected.
// Because we need to read the value of each key to determine if it is expired.
func (db *DB) DescendKeys(pattern []byte, filterExpired bool, handleFn func(k []byte) (bool, error)) {
db.mu.RLock()
defer db.mu.RUnlock()
var reg *regexp.Regexp
if len(pattern) > 0 {
reg = regexp.MustCompile(string(pattern))
}
db.index.Descend(func(key []byte, pos *wal.ChunkPosition) (bool, error) {
if reg == nil || reg.Match(key) {
var invalid bool
if filterExpired {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
return false, err
}
if value := db.checkValue(chunk); value == nil {
invalid = true
}
}
if invalid {
return true, nil
}
return handleFn(key)
}
return true, nil
})
}
func (db *DB) checkValue(chunk []byte) []byte {
record := decodeLogRecord(chunk)
now := time.Now().UnixNano()
if record.Type != LogRecordDeleted && !record.IsExpired(now) {
return record.Value
}
return nil
}
func checkOptions(options Options) error {
if options.DirPath == "" {
return errors.New("database dir path is empty")
}
if options.SegmentSize <= 0 {
return errors.New("database data file size must be greater than 0")
}
if len(options.AutoMergeCronExpr) > 0 {
if _, err := cron.NewParser(cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor).
Parse(options.AutoMergeCronExpr); err != nil {
return fmt.Errorf("database auto merge cron expression is invalid, err: %s", err)
}
}
return nil
}
// loadIndexFromWAL loads index from WAL.
// It will iterate over all the WAL files and read data
// from them to rebuild the index.
func (db *DB) loadIndexFromWAL() error {
mergeFinSegmentId, err := getMergeFinSegmentId(db.options.DirPath)
if err != nil {
return err
}
indexRecords := make(map[uint64][]*IndexRecord)
now := time.Now().UnixNano()
// get a reader for WAL
reader := db.dataFiles.NewReader()
db.dataFiles.SetIsStartupTraversal(true)
for {
// if the current segment id is less than the mergeFinSegmentId,
// we can skip this segment because it has been merged,
// and we can load index from the hint file directly.
if reader.CurrentSegmentId() <= mergeFinSegmentId {
reader.SkipCurrentSegment()
continue
}
chunk, position, err := reader.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
// decode and get log record
record := decodeLogRecord(chunk)
// if we get the end of a batch,
// all records in this batch are ready to be indexed.
if record.Type == LogRecordBatchFinished {
batchId, err := snowflake.ParseBytes(record.Key)
if err != nil {
return err
}
for _, idxRecord := range indexRecords[uint64(batchId)] {
if idxRecord.recordType == LogRecordNormal {
db.index.Put(idxRecord.key, idxRecord.position)
}
if idxRecord.recordType == LogRecordDeleted {
db.index.Delete(idxRecord.key)
}
}
// delete indexRecords according to batchId after indexing
delete(indexRecords, uint64(batchId))
} else if record.Type == LogRecordNormal && record.BatchId == mergeFinishedBatchID {
// if the record is a normal record and the batch id is 0,
// it means that the record is involved in the merge operation.
// so put the record into index directly.
db.index.Put(record.Key, position)
} else {
// expired records should not be indexed
if record.IsExpired(now) {
db.index.Delete(record.Key)
continue
}
// put the record into the temporary indexRecords
indexRecords[record.BatchId] = append(indexRecords[record.BatchId],
&IndexRecord{
key: record.Key,
recordType: record.Type,
position: position,
})
}
}
db.dataFiles.SetIsStartupTraversal(false)
return nil
}
// DeleteExpiredKeys scan the entire index in ascending order to delete expired keys.
// It is a time-consuming operation, so we need to specify a timeout
// to prevent the DB from being unavailable for a long time.
func (db *DB) DeleteExpiredKeys(timeout time.Duration) error {
// set timeout
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan struct{}, 1)
var innerErr error
now := time.Now().UnixNano()
go func(ctx context.Context) {
db.mu.Lock()
defer db.mu.Unlock()
for {
// select 100 keys from the db.index
positions := make([]*wal.ChunkPosition, 0, 100)
db.index.AscendGreaterOrEqual(db.expiredCursorKey, func(k []byte, pos *wal.ChunkPosition) (bool, error) {
positions = append(positions, pos)
if len(positions) >= 100 {
return false, nil
}
return true, nil
})
// If keys in the db.index has been traversed, len(positions) will be 0.
if len(positions) == 0 {
db.expiredCursorKey = nil
done <- struct{}{}
return
}
// delete from index if the key is expired.
for _, pos := range positions {
chunk, err := db.dataFiles.Read(pos)
if err != nil {
innerErr = err
done <- struct{}{}
return
}
record := decodeLogRecord(chunk)
if record.IsExpired(now) {
db.index.Delete(record.Key)
}
db.expiredCursorKey = record.Key
}
}
}(ctx)
select {
case <-ctx.Done():
return innerErr
case <-done:
return nil
}
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/roseduan/rosedb.git
[email protected]:roseduan/rosedb.git
roseduan
rosedb
rosedb
main

搜索帮助

0d507c66 1850385 C8b1a773 1850385