mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-06 19:37:27 -08:00
1072 lines
25 KiB
Go
1072 lines
25 KiB
Go
|
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
||
|
// All rights reserved.
|
||
|
//
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file.
|
||
|
|
||
|
package leveldb
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"os"
|
||
|
"runtime"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
|
||
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
||
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||
|
"github.com/syndtr/goleveldb/leveldb/journal"
|
||
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
||
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
||
|
"github.com/syndtr/goleveldb/leveldb/table"
|
||
|
"github.com/syndtr/goleveldb/leveldb/util"
|
||
|
)
|
||
|
|
||
|
// DB is a LevelDB database.
|
||
|
type DB struct {
|
||
|
// Need 64-bit alignment.
|
||
|
seq uint64
|
||
|
|
||
|
// Session.
|
||
|
s *session
|
||
|
|
||
|
// MemDB.
|
||
|
memMu sync.RWMutex
|
||
|
memPool chan *memdb.DB
|
||
|
mem, frozenMem *memDB
|
||
|
journal *journal.Writer
|
||
|
journalWriter storage.Writer
|
||
|
journalFile storage.File
|
||
|
frozenJournalFile storage.File
|
||
|
frozenSeq uint64
|
||
|
|
||
|
// Snapshot.
|
||
|
snapsMu sync.Mutex
|
||
|
snapsList *list.List
|
||
|
|
||
|
// Stats.
|
||
|
aliveSnaps, aliveIters int32
|
||
|
|
||
|
// Write.
|
||
|
writeC chan *Batch
|
||
|
writeMergedC chan bool
|
||
|
writeLockC chan struct{}
|
||
|
writeAckC chan error
|
||
|
writeDelay time.Duration
|
||
|
writeDelayN int
|
||
|
journalC chan *Batch
|
||
|
journalAckC chan error
|
||
|
|
||
|
// Compaction.
|
||
|
tcompCmdC chan cCmd
|
||
|
tcompPauseC chan chan<- struct{}
|
||
|
mcompCmdC chan cCmd
|
||
|
compErrC chan error
|
||
|
compPerErrC chan error
|
||
|
compErrSetC chan error
|
||
|
compWriteLocking bool
|
||
|
compStats []cStats
|
||
|
|
||
|
// Close.
|
||
|
closeW sync.WaitGroup
|
||
|
closeC chan struct{}
|
||
|
closed uint32
|
||
|
closer io.Closer
|
||
|
}
|
||
|
|
||
|
func openDB(s *session) (*DB, error) {
|
||
|
s.log("db@open opening")
|
||
|
start := time.Now()
|
||
|
db := &DB{
|
||
|
s: s,
|
||
|
// Initial sequence
|
||
|
seq: s.stSeqNum,
|
||
|
// MemDB
|
||
|
memPool: make(chan *memdb.DB, 1),
|
||
|
// Snapshot
|
||
|
snapsList: list.New(),
|
||
|
// Write
|
||
|
writeC: make(chan *Batch),
|
||
|
writeMergedC: make(chan bool),
|
||
|
writeLockC: make(chan struct{}, 1),
|
||
|
writeAckC: make(chan error),
|
||
|
journalC: make(chan *Batch),
|
||
|
journalAckC: make(chan error),
|
||
|
// Compaction
|
||
|
tcompCmdC: make(chan cCmd),
|
||
|
tcompPauseC: make(chan chan<- struct{}),
|
||
|
mcompCmdC: make(chan cCmd),
|
||
|
compErrC: make(chan error),
|
||
|
compPerErrC: make(chan error),
|
||
|
compErrSetC: make(chan error),
|
||
|
compStats: make([]cStats, s.o.GetNumLevel()),
|
||
|
// Close
|
||
|
closeC: make(chan struct{}),
|
||
|
}
|
||
|
|
||
|
// Read-only mode.
|
||
|
readOnly := s.o.GetReadOnly()
|
||
|
|
||
|
if readOnly {
|
||
|
// Recover journals (read-only mode).
|
||
|
if err := db.recoverJournalRO(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
} else {
|
||
|
// Recover journals.
|
||
|
if err := db.recoverJournal(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// Remove any obsolete files.
|
||
|
if err := db.checkAndCleanFiles(); err != nil {
|
||
|
// Close journal.
|
||
|
if db.journal != nil {
|
||
|
db.journal.Close()
|
||
|
db.journalWriter.Close()
|
||
|
}
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
// Doesn't need to be included in the wait group.
|
||
|
go db.compactionError()
|
||
|
go db.mpoolDrain()
|
||
|
|
||
|
if readOnly {
|
||
|
db.SetReadOnly()
|
||
|
} else {
|
||
|
db.closeW.Add(3)
|
||
|
go db.tCompaction()
|
||
|
go db.mCompaction()
|
||
|
go db.jWriter()
|
||
|
}
|
||
|
|
||
|
s.logf("db@open done T·%v", time.Since(start))
|
||
|
|
||
|
runtime.SetFinalizer(db, (*DB).Close)
|
||
|
return db, nil
|
||
|
}
|
||
|
|
||
|
// Open opens or creates a DB for the given storage.
|
||
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
||
|
// Also, if ErrorIfExist is true and the DB exist Open will returns
|
||
|
// os.ErrExist error.
|
||
|
//
|
||
|
// Open will return an error with type of ErrCorrupted if corruption
|
||
|
// detected in the DB. Corrupted DB can be recovered with Recover
|
||
|
// function.
|
||
|
//
|
||
|
// The returned DB instance is goroutine-safe.
|
||
|
// The DB must be closed after use, by calling Close method.
|
||
|
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
||
|
s, err := newSession(stor, o)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
s.close()
|
||
|
s.release()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
err = s.recover()
|
||
|
if err != nil {
|
||
|
if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
|
||
|
return
|
||
|
}
|
||
|
err = s.create()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
} else if s.o.GetErrorIfExist() {
|
||
|
err = os.ErrExist
|
||
|
return
|
||
|
}
|
||
|
|
||
|
return openDB(s)
|
||
|
}
|
||
|
|
||
|
// OpenFile opens or creates a DB for the given path.
|
||
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
||
|
// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
|
||
|
// os.ErrExist error.
|
||
|
//
|
||
|
// OpenFile uses standard file-system backed storage implementation as
|
||
|
// desribed in the leveldb/storage package.
|
||
|
//
|
||
|
// OpenFile will return an error with type of ErrCorrupted if corruption
|
||
|
// detected in the DB. Corrupted DB can be recovered with Recover
|
||
|
// function.
|
||
|
//
|
||
|
// The returned DB instance is goroutine-safe.
|
||
|
// The DB must be closed after use, by calling Close method.
|
||
|
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
|
||
|
stor, err := storage.OpenFile(path)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
db, err = Open(stor, o)
|
||
|
if err != nil {
|
||
|
stor.Close()
|
||
|
} else {
|
||
|
db.closer = stor
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Recover recovers and opens a DB with missing or corrupted manifest files
|
||
|
// for the given storage. It will ignore any manifest files, valid or not.
|
||
|
// The DB must already exist or it will returns an error.
|
||
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
||
|
//
|
||
|
// The returned DB instance is goroutine-safe.
|
||
|
// The DB must be closed after use, by calling Close method.
|
||
|
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
||
|
s, err := newSession(stor, o)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
s.close()
|
||
|
s.release()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
err = recoverTable(s, o)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
return openDB(s)
|
||
|
}
|
||
|
|
||
|
// RecoverFile recovers and opens a DB with missing or corrupted manifest files
|
||
|
// for the given path. It will ignore any manifest files, valid or not.
|
||
|
// The DB must already exist or it will returns an error.
|
||
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
||
|
//
|
||
|
// RecoverFile uses standard file-system backed storage implementation as desribed
|
||
|
// in the leveldb/storage package.
|
||
|
//
|
||
|
// The returned DB instance is goroutine-safe.
|
||
|
// The DB must be closed after use, by calling Close method.
|
||
|
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
|
||
|
stor, err := storage.OpenFile(path)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
db, err = Recover(stor, o)
|
||
|
if err != nil {
|
||
|
stor.Close()
|
||
|
} else {
|
||
|
db.closer = stor
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func recoverTable(s *session, o *opt.Options) error {
|
||
|
o = dupOptions(o)
|
||
|
// Mask StrictReader, lets StrictRecovery doing its job.
|
||
|
o.Strict &= ^opt.StrictReader
|
||
|
|
||
|
// Get all tables and sort it by file number.
|
||
|
tableFiles_, err := s.getFiles(storage.TypeTable)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
tableFiles := files(tableFiles_)
|
||
|
tableFiles.sort()
|
||
|
|
||
|
var (
|
||
|
maxSeq uint64
|
||
|
recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
|
||
|
|
||
|
// We will drop corrupted table.
|
||
|
strict = o.GetStrict(opt.StrictRecovery)
|
||
|
noSync = o.GetNoSync()
|
||
|
|
||
|
rec = &sessionRecord{}
|
||
|
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
|
||
|
)
|
||
|
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
|
||
|
tmp = s.newTemp()
|
||
|
writer, err := tmp.Create()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer func() {
|
||
|
writer.Close()
|
||
|
if err != nil {
|
||
|
tmp.Remove()
|
||
|
tmp = nil
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Copy entries.
|
||
|
tw := table.NewWriter(writer, o)
|
||
|
for iter.Next() {
|
||
|
key := iter.Key()
|
||
|
if validIkey(key) {
|
||
|
err = tw.Append(key, iter.Value())
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
err = iter.Error()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
err = tw.Close()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
if !noSync {
|
||
|
err = writer.Sync()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
size = int64(tw.BytesLen())
|
||
|
return
|
||
|
}
|
||
|
recoverTable := func(file storage.File) error {
|
||
|
s.logf("table@recovery recovering @%d", file.Num())
|
||
|
reader, err := file.Open()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
var closed bool
|
||
|
defer func() {
|
||
|
if !closed {
|
||
|
reader.Close()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Get file size.
|
||
|
size, err := reader.Seek(0, 2)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
tSeq uint64
|
||
|
tgoodKey, tcorruptedKey, tcorruptedBlock int
|
||
|
imin, imax []byte
|
||
|
)
|
||
|
tr, err := table.NewReader(reader, size, storage.NewFileInfo(file), nil, bpool, o)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
iter := tr.NewIterator(nil, nil)
|
||
|
if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
|
||
|
itererr.SetErrorCallback(func(err error) {
|
||
|
if errors.IsCorrupted(err) {
|
||
|
s.logf("table@recovery block corruption @%d %q", file.Num(), err)
|
||
|
tcorruptedBlock++
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Scan the table.
|
||
|
for iter.Next() {
|
||
|
key := iter.Key()
|
||
|
_, seq, _, kerr := parseIkey(key)
|
||
|
if kerr != nil {
|
||
|
tcorruptedKey++
|
||
|
continue
|
||
|
}
|
||
|
tgoodKey++
|
||
|
if seq > tSeq {
|
||
|
tSeq = seq
|
||
|
}
|
||
|
if imin == nil {
|
||
|
imin = append([]byte{}, key...)
|
||
|
}
|
||
|
imax = append(imax[:0], key...)
|
||
|
}
|
||
|
if err := iter.Error(); err != nil {
|
||
|
iter.Release()
|
||
|
return err
|
||
|
}
|
||
|
iter.Release()
|
||
|
|
||
|
goodKey += tgoodKey
|
||
|
corruptedKey += tcorruptedKey
|
||
|
corruptedBlock += tcorruptedBlock
|
||
|
|
||
|
if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
|
||
|
droppedTable++
|
||
|
s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if tgoodKey > 0 {
|
||
|
if tcorruptedKey > 0 || tcorruptedBlock > 0 {
|
||
|
// Rebuild the table.
|
||
|
s.logf("table@recovery rebuilding @%d", file.Num())
|
||
|
iter := tr.NewIterator(nil, nil)
|
||
|
tmp, newSize, err := buildTable(iter)
|
||
|
iter.Release()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
closed = true
|
||
|
reader.Close()
|
||
|
if err := file.Replace(tmp); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
size = newSize
|
||
|
}
|
||
|
if tSeq > maxSeq {
|
||
|
maxSeq = tSeq
|
||
|
}
|
||
|
recoveredKey += tgoodKey
|
||
|
// Add table to level 0.
|
||
|
rec.addTable(0, file.Num(), uint64(size), imin, imax)
|
||
|
s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", file.Num(), tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
||
|
} else {
|
||
|
droppedTable++
|
||
|
s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", file.Num(), tcorruptedKey, tcorruptedBlock, size)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Recover all tables.
|
||
|
if len(tableFiles) > 0 {
|
||
|
s.logf("table@recovery F·%d", len(tableFiles))
|
||
|
|
||
|
// Mark file number as used.
|
||
|
s.markFileNum(tableFiles[len(tableFiles)-1].Num())
|
||
|
|
||
|
for _, file := range tableFiles {
|
||
|
if err := recoverTable(file); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(tableFiles), recoveredKey, goodKey, corruptedKey, maxSeq)
|
||
|
}
|
||
|
|
||
|
// Set sequence number.
|
||
|
rec.setSeqNum(maxSeq)
|
||
|
|
||
|
// Create new manifest.
|
||
|
if err := s.create(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Commit.
|
||
|
return s.commit(rec)
|
||
|
}
|
||
|
|
||
|
func (db *DB) recoverJournal() error {
|
||
|
// Get all journals and sort it by file number.
|
||
|
allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
files(allJournalFiles).sort()
|
||
|
|
||
|
// Journals that will be recovered.
|
||
|
var recJournalFiles []storage.File
|
||
|
for _, jf := range allJournalFiles {
|
||
|
if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
|
||
|
recJournalFiles = append(recJournalFiles, jf)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
of storage.File // Obsolete file.
|
||
|
rec = &sessionRecord{}
|
||
|
)
|
||
|
|
||
|
// Recover journals.
|
||
|
if len(recJournalFiles) > 0 {
|
||
|
db.logf("journal@recovery F·%d", len(recJournalFiles))
|
||
|
|
||
|
// Mark file number as used.
|
||
|
db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num())
|
||
|
|
||
|
var (
|
||
|
// Options.
|
||
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
||
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
||
|
writeBuffer = db.s.o.GetWriteBuffer()
|
||
|
|
||
|
jr *journal.Reader
|
||
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
||
|
buf = &util.Buffer{}
|
||
|
batch = &Batch{}
|
||
|
)
|
||
|
|
||
|
for _, jf := range recJournalFiles {
|
||
|
db.logf("journal@recovery recovering @%d", jf.Num())
|
||
|
|
||
|
fr, err := jf.Open()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Create or reset journal reader instance.
|
||
|
if jr == nil {
|
||
|
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
|
||
|
} else {
|
||
|
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
|
||
|
}
|
||
|
|
||
|
// Flush memdb and remove obsolete journal file.
|
||
|
if of != nil {
|
||
|
if mdb.Len() > 0 {
|
||
|
if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
|
||
|
fr.Close()
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
rec.setJournalNum(jf.Num())
|
||
|
rec.setSeqNum(db.seq)
|
||
|
if err := db.s.commit(rec); err != nil {
|
||
|
fr.Close()
|
||
|
return err
|
||
|
}
|
||
|
rec.resetAddedTables()
|
||
|
|
||
|
of.Remove()
|
||
|
of = nil
|
||
|
}
|
||
|
|
||
|
// Replay journal to memdb.
|
||
|
mdb.Reset()
|
||
|
for {
|
||
|
r, err := jr.Next()
|
||
|
if err != nil {
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
|
||
|
buf.Reset()
|
||
|
if _, err := buf.ReadFrom(r); err != nil {
|
||
|
if err == io.ErrUnexpectedEOF {
|
||
|
// This is error returned due to corruption, with strict == false.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
|
||
|
if !strict && errors.IsCorrupted(err) {
|
||
|
db.s.logf("journal error: %v (skipped)", err)
|
||
|
// We won't apply sequence number as it might be corrupted.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
|
||
|
// Save sequence number.
|
||
|
db.seq = batch.seq + uint64(batch.Len())
|
||
|
|
||
|
// Flush it if large enough.
|
||
|
if mdb.Size() >= writeBuffer {
|
||
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
||
|
fr.Close()
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
mdb.Reset()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
of = jf
|
||
|
}
|
||
|
|
||
|
// Flush the last memdb.
|
||
|
if mdb.Len() > 0 {
|
||
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Create a new journal.
|
||
|
if _, err := db.newMem(0); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Commit.
|
||
|
rec.setJournalNum(db.journalFile.Num())
|
||
|
rec.setSeqNum(db.seq)
|
||
|
if err := db.s.commit(rec); err != nil {
|
||
|
// Close journal on error.
|
||
|
if db.journal != nil {
|
||
|
db.journal.Close()
|
||
|
db.journalWriter.Close()
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Remove the last obsolete journal file.
|
||
|
if of != nil {
|
||
|
of.Remove()
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *DB) recoverJournalRO() error {
|
||
|
// Get all journals and sort it by file number.
|
||
|
allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
files(allJournalFiles).sort()
|
||
|
|
||
|
// Journals that will be recovered.
|
||
|
var recJournalFiles []storage.File
|
||
|
for _, jf := range allJournalFiles {
|
||
|
if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
|
||
|
recJournalFiles = append(recJournalFiles, jf)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
// Options.
|
||
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
||
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
||
|
writeBuffer = db.s.o.GetWriteBuffer()
|
||
|
|
||
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
||
|
)
|
||
|
|
||
|
// Recover journals.
|
||
|
if len(recJournalFiles) > 0 {
|
||
|
db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
|
||
|
|
||
|
var (
|
||
|
jr *journal.Reader
|
||
|
buf = &util.Buffer{}
|
||
|
batch = &Batch{}
|
||
|
)
|
||
|
|
||
|
for _, jf := range recJournalFiles {
|
||
|
db.logf("journal@recovery recovering @%d", jf.Num())
|
||
|
|
||
|
fr, err := jf.Open()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Create or reset journal reader instance.
|
||
|
if jr == nil {
|
||
|
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
|
||
|
} else {
|
||
|
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
|
||
|
}
|
||
|
|
||
|
// Replay journal to memdb.
|
||
|
for {
|
||
|
r, err := jr.Next()
|
||
|
if err != nil {
|
||
|
if err == io.EOF {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
|
||
|
buf.Reset()
|
||
|
if _, err := buf.ReadFrom(r); err != nil {
|
||
|
if err == io.ErrUnexpectedEOF {
|
||
|
// This is error returned due to corruption, with strict == false.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
|
||
|
if !strict && errors.IsCorrupted(err) {
|
||
|
db.s.logf("journal error: %v (skipped)", err)
|
||
|
// We won't apply sequence number as it might be corrupted.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
return errors.SetFile(err, jf)
|
||
|
}
|
||
|
|
||
|
// Save sequence number.
|
||
|
db.seq = batch.seq + uint64(batch.Len())
|
||
|
}
|
||
|
|
||
|
fr.Close()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Set memDB.
|
||
|
db.mem = &memDB{db: db, DB: mdb, ref: 1}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
|
||
|
ikey := newIkey(key, seq, ktSeek)
|
||
|
|
||
|
em, fm := db.getMems()
|
||
|
for _, m := range [...]*memDB{em, fm} {
|
||
|
if m == nil {
|
||
|
continue
|
||
|
}
|
||
|
defer m.decref()
|
||
|
|
||
|
mk, mv, me := m.Find(ikey)
|
||
|
if me == nil {
|
||
|
ukey, _, kt, kerr := parseIkey(mk)
|
||
|
if kerr != nil {
|
||
|
// Shouldn't have had happen.
|
||
|
panic(kerr)
|
||
|
}
|
||
|
if db.s.icmp.uCompare(ukey, key) == 0 {
|
||
|
if kt == ktDel {
|
||
|
return nil, ErrNotFound
|
||
|
}
|
||
|
return append([]byte{}, mv...), nil
|
||
|
}
|
||
|
} else if me != ErrNotFound {
|
||
|
return nil, me
|
||
|
}
|
||
|
}
|
||
|
|
||
|
v := db.s.version()
|
||
|
value, cSched, err := v.get(ikey, ro, false)
|
||
|
v.release()
|
||
|
if cSched {
|
||
|
// Trigger table compaction.
|
||
|
db.compSendTrigger(db.tcompCmdC)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
|
||
|
ikey := newIkey(key, seq, ktSeek)
|
||
|
|
||
|
em, fm := db.getMems()
|
||
|
for _, m := range [...]*memDB{em, fm} {
|
||
|
if m == nil {
|
||
|
continue
|
||
|
}
|
||
|
defer m.decref()
|
||
|
|
||
|
mk, _, me := m.Find(ikey)
|
||
|
if me == nil {
|
||
|
ukey, _, kt, kerr := parseIkey(mk)
|
||
|
if kerr != nil {
|
||
|
// Shouldn't have had happen.
|
||
|
panic(kerr)
|
||
|
}
|
||
|
if db.s.icmp.uCompare(ukey, key) == 0 {
|
||
|
if kt == ktDel {
|
||
|
return false, nil
|
||
|
}
|
||
|
return true, nil
|
||
|
}
|
||
|
} else if me != ErrNotFound {
|
||
|
return false, me
|
||
|
}
|
||
|
}
|
||
|
|
||
|
v := db.s.version()
|
||
|
_, cSched, err := v.get(ikey, ro, true)
|
||
|
v.release()
|
||
|
if cSched {
|
||
|
// Trigger table compaction.
|
||
|
db.compSendTrigger(db.tcompCmdC)
|
||
|
}
|
||
|
if err == nil {
|
||
|
ret = true
|
||
|
} else if err == ErrNotFound {
|
||
|
err = nil
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Get gets the value for the given key. It returns ErrNotFound if the
|
||
|
// DB does not contains the key.
|
||
|
//
|
||
|
// The returned slice is its own copy, it is safe to modify the contents
|
||
|
// of the returned slice.
|
||
|
// It is safe to modify the contents of the argument after Get returns.
|
||
|
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
|
||
|
err = db.ok()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
se := db.acquireSnapshot()
|
||
|
defer db.releaseSnapshot(se)
|
||
|
return db.get(key, se.seq, ro)
|
||
|
}
|
||
|
|
||
|
// Has returns true if the DB does contains the given key.
|
||
|
//
|
||
|
// It is safe to modify the contents of the argument after Get returns.
|
||
|
func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
|
||
|
err = db.ok()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
se := db.acquireSnapshot()
|
||
|
defer db.releaseSnapshot(se)
|
||
|
return db.has(key, se.seq, ro)
|
||
|
}
|
||
|
|
||
|
// NewIterator returns an iterator for the latest snapshot of the
|
||
|
// uderlying DB.
|
||
|
// The returned iterator is not goroutine-safe, but it is safe to use
|
||
|
// multiple iterators concurrently, with each in a dedicated goroutine.
|
||
|
// It is also safe to use an iterator concurrently with modifying its
|
||
|
// underlying DB. The resultant key/value pairs are guaranteed to be
|
||
|
// consistent.
|
||
|
//
|
||
|
// Slice allows slicing the iterator to only contains keys in the given
|
||
|
// range. A nil Range.Start is treated as a key before all keys in the
|
||
|
// DB. And a nil Range.Limit is treated as a key after all keys in
|
||
|
// the DB.
|
||
|
//
|
||
|
// The iterator must be released after use, by calling Release method.
|
||
|
//
|
||
|
// Also read Iterator documentation of the leveldb/iterator package.
|
||
|
func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
||
|
if err := db.ok(); err != nil {
|
||
|
return iterator.NewEmptyIterator(err)
|
||
|
}
|
||
|
|
||
|
se := db.acquireSnapshot()
|
||
|
defer db.releaseSnapshot(se)
|
||
|
// Iterator holds 'version' lock, 'version' is immutable so snapshot
|
||
|
// can be released after iterator created.
|
||
|
return db.newIterator(se.seq, slice, ro)
|
||
|
}
|
||
|
|
||
|
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
|
||
|
// is a frozen snapshot of a DB state at a particular point in time. The
|
||
|
// content of snapshot are guaranteed to be consistent.
|
||
|
//
|
||
|
// The snapshot must be released after use, by calling Release method.
|
||
|
func (db *DB) GetSnapshot() (*Snapshot, error) {
|
||
|
if err := db.ok(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return db.newSnapshot(), nil
|
||
|
}
|
||
|
|
||
|
// GetProperty returns value of the given property name.
|
||
|
//
|
||
|
// Property names:
|
||
|
// leveldb.num-files-at-level{n}
|
||
|
// Returns the number of files at level 'n'.
|
||
|
// leveldb.stats
|
||
|
// Returns statistics of the underlying DB.
|
||
|
// leveldb.sstables
|
||
|
// Returns sstables list for each level.
|
||
|
// leveldb.blockpool
|
||
|
// Returns block pool stats.
|
||
|
// leveldb.cachedblock
|
||
|
// Returns size of cached block.
|
||
|
// leveldb.openedtables
|
||
|
// Returns number of opened tables.
|
||
|
// leveldb.alivesnaps
|
||
|
// Returns number of alive snapshots.
|
||
|
// leveldb.aliveiters
|
||
|
// Returns number of alive iterators.
|
||
|
func (db *DB) GetProperty(name string) (value string, err error) {
|
||
|
err = db.ok()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
const prefix = "leveldb."
|
||
|
if !strings.HasPrefix(name, prefix) {
|
||
|
return "", ErrNotFound
|
||
|
}
|
||
|
p := name[len(prefix):]
|
||
|
|
||
|
v := db.s.version()
|
||
|
defer v.release()
|
||
|
|
||
|
numFilesPrefix := "num-files-at-level"
|
||
|
switch {
|
||
|
case strings.HasPrefix(p, numFilesPrefix):
|
||
|
var level uint
|
||
|
var rest string
|
||
|
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
|
||
|
if n != 1 || int(level) >= db.s.o.GetNumLevel() {
|
||
|
err = ErrNotFound
|
||
|
} else {
|
||
|
value = fmt.Sprint(v.tLen(int(level)))
|
||
|
}
|
||
|
case p == "stats":
|
||
|
value = "Compactions\n" +
|
||
|
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
|
||
|
"-------+------------+---------------+---------------+---------------+---------------\n"
|
||
|
for level, tables := range v.tables {
|
||
|
duration, read, write := db.compStats[level].get()
|
||
|
if len(tables) == 0 && duration == 0 {
|
||
|
continue
|
||
|
}
|
||
|
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
|
||
|
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
|
||
|
float64(read)/1048576.0, float64(write)/1048576.0)
|
||
|
}
|
||
|
case p == "sstables":
|
||
|
for level, tables := range v.tables {
|
||
|
value += fmt.Sprintf("--- level %d ---\n", level)
|
||
|
for _, t := range tables {
|
||
|
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
|
||
|
}
|
||
|
}
|
||
|
case p == "blockpool":
|
||
|
value = fmt.Sprintf("%v", db.s.tops.bpool)
|
||
|
case p == "cachedblock":
|
||
|
if db.s.tops.bcache != nil {
|
||
|
value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
|
||
|
} else {
|
||
|
value = "<nil>"
|
||
|
}
|
||
|
case p == "openedtables":
|
||
|
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
|
||
|
case p == "alivesnaps":
|
||
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
|
||
|
case p == "aliveiters":
|
||
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
|
||
|
default:
|
||
|
err = ErrNotFound
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// SizeOf calculates approximate sizes of the given key ranges.
|
||
|
// The length of the returned sizes are equal with the length of the given
|
||
|
// ranges. The returned sizes measure storage space usage, so if the user
|
||
|
// data compresses by a factor of ten, the returned sizes will be one-tenth
|
||
|
// the size of the corresponding user data size.
|
||
|
// The results may not include the sizes of recently written data.
|
||
|
func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
|
||
|
if err := db.ok(); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
v := db.s.version()
|
||
|
defer v.release()
|
||
|
|
||
|
sizes := make(Sizes, 0, len(ranges))
|
||
|
for _, r := range ranges {
|
||
|
imin := newIkey(r.Start, kMaxSeq, ktSeek)
|
||
|
imax := newIkey(r.Limit, kMaxSeq, ktSeek)
|
||
|
start, err := v.offsetOf(imin)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
limit, err := v.offsetOf(imax)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
var size uint64
|
||
|
if limit >= start {
|
||
|
size = limit - start
|
||
|
}
|
||
|
sizes = append(sizes, size)
|
||
|
}
|
||
|
|
||
|
return sizes, nil
|
||
|
}
|
||
|
|
||
|
// Close closes the DB. This will also releases any outstanding snapshot and
|
||
|
// abort any in-flight compaction.
|
||
|
//
|
||
|
// It is not safe to close a DB until all outstanding iterators are released.
|
||
|
// It is valid to call Close multiple times. Other methods should not be
|
||
|
// called after the DB has been closed.
|
||
|
func (db *DB) Close() error {
|
||
|
if !db.setClosed() {
|
||
|
return ErrClosed
|
||
|
}
|
||
|
|
||
|
start := time.Now()
|
||
|
db.log("db@close closing")
|
||
|
|
||
|
// Clear the finalizer.
|
||
|
runtime.SetFinalizer(db, nil)
|
||
|
|
||
|
// Get compaction error.
|
||
|
var err error
|
||
|
select {
|
||
|
case err = <-db.compErrC:
|
||
|
if err == ErrReadOnly {
|
||
|
err = nil
|
||
|
}
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
// Signal all goroutines.
|
||
|
close(db.closeC)
|
||
|
|
||
|
// Wait for all gorotines to exit.
|
||
|
db.closeW.Wait()
|
||
|
|
||
|
// Lock writer and closes journal.
|
||
|
db.writeLockC <- struct{}{}
|
||
|
if db.journal != nil {
|
||
|
db.journal.Close()
|
||
|
db.journalWriter.Close()
|
||
|
}
|
||
|
|
||
|
if db.writeDelayN > 0 {
|
||
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
||
|
}
|
||
|
|
||
|
// Close session.
|
||
|
db.s.close()
|
||
|
db.logf("db@close done T·%v", time.Since(start))
|
||
|
db.s.release()
|
||
|
|
||
|
if db.closer != nil {
|
||
|
if err1 := db.closer.Close(); err == nil {
|
||
|
err = err1
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// NIL'ing pointers.
|
||
|
db.s = nil
|
||
|
db.mem = nil
|
||
|
db.frozenMem = nil
|
||
|
db.journal = nil
|
||
|
db.journalWriter = nil
|
||
|
db.journalFile = nil
|
||
|
db.frozenJournalFile = nil
|
||
|
db.closer = nil
|
||
|
|
||
|
return err
|
||
|
}
|