Also consider on-disk fingerprints during purge.

This reintroduces LevelDB iterators so that we can iterate through all
the on-disk fingerprints.

Change-Id: I007ee4638d038d2a4461bbda27f30fcaad411474
This commit is contained in:
Julius Volz 2014-09-18 17:14:14 +02:00 committed by Bjoern Rabenstein
parent f5f9f3514a
commit 630b5a087a
5 changed files with 165 additions and 15 deletions

View file

@ -21,6 +21,7 @@ type KeyValueStore interface {
NewBatch() Batch
Commit(b Batch) error
ForEach(func(kv KeyValueAccessor) error) error
Close() error
}

View file

@ -4,16 +4,29 @@ import (
"encoding"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
leveldb_cache "github.com/syndtr/goleveldb/leveldb/cache"
leveldb_filter "github.com/syndtr/goleveldb/leveldb/filter"
leveldb_iterator "github.com/syndtr/goleveldb/leveldb/iterator"
leveldb_opt "github.com/syndtr/goleveldb/leveldb/opt"
leveldb_util "github.com/syndtr/goleveldb/leveldb/util"
)
var (
keyspace = &leveldb_util.Range{
Start: nil,
Limit: nil,
}
iteratorOpts = &leveldb_opt.ReadOptions{
DontFillCache: true,
}
)
// LevelDB is a LevelDB-backed sorted KeyValueStore.
type LevelDB struct {
storage *leveldb.DB
readOpts *opt.ReadOptions
writeOpts *opt.WriteOptions
readOpts *leveldb_opt.ReadOptions
writeOpts *leveldb_opt.WriteOptions
}
// LevelDBOptions provides options for a LevelDB.
@ -25,10 +38,10 @@ type LevelDBOptions struct {
// NewLevelDB returns a newly allocated LevelDB-backed KeyValueStore ready to
// use.
func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) {
options := &opt.Options{
Compression: opt.SnappyCompression,
BlockCache: cache.NewLRUCache(o.CacheSizeBytes),
Filter: filter.NewBloomFilter(10),
options := &leveldb_opt.Options{
Compression: leveldb_opt.SnappyCompression,
BlockCache: leveldb_cache.NewLRUCache(o.CacheSizeBytes),
Filter: leveldb_filter.NewBloomFilter(10),
}
storage, err := leveldb.OpenFile(o.Path, options)
@ -38,8 +51,8 @@ func NewLevelDB(o LevelDBOptions) (KeyValueStore, error) {
return &LevelDB{
storage: storage,
readOpts: &opt.ReadOptions{},
writeOpts: &opt.WriteOptions{},
readOpts: &leveldb_opt.ReadOptions{},
writeOpts: &leveldb_opt.WriteOptions{},
}, nil
}
@ -106,6 +119,26 @@ func (l *LevelDB) Commit(b Batch) error {
return l.storage.Write(b.(*LevelDBBatch).batch, l.writeOpts)
}
func (l *LevelDB) ForEach(cb func(kv KeyValueAccessor) error) error {
it, err := l.NewIterator(true)
if err != nil {
return err
}
defer it.Close()
for valid := it.SeekToFirst(); valid; valid = it.Next() {
if err = it.Error(); err != nil {
return err
}
if err := cb(it); err != nil {
return err
}
}
return nil
}
// LevelDBBatch is a Batch implementation for LevelDB.
type LevelDBBatch struct {
batch *leveldb.Batch
@ -139,3 +172,89 @@ func (b *LevelDBBatch) Delete(key encoding.BinaryMarshaler) error {
func (b *LevelDBBatch) Reset() {
b.batch.Reset()
}
// levelDBIterator implements Iterator.
type levelDBIterator struct {
it leveldb_iterator.Iterator
}
func (i *levelDBIterator) Error() error {
return i.it.Error()
}
func (i *levelDBIterator) Valid() bool {
return i.it.Valid()
}
func (i *levelDBIterator) SeekToFirst() bool {
return i.it.First()
}
func (i *levelDBIterator) SeekToLast() bool {
return i.it.Last()
}
func (i *levelDBIterator) Seek(k encoding.BinaryMarshaler) bool {
key, err := k.MarshalBinary()
if err != nil {
panic(err)
}
return i.it.Seek(key)
}
func (i *levelDBIterator) Next() bool {
return i.it.Next()
}
func (i *levelDBIterator) Previous() bool {
return i.it.Prev()
}
func (i *levelDBIterator) Key(key encoding.BinaryUnmarshaler) error {
return key.UnmarshalBinary(i.it.Key())
}
func (i *levelDBIterator) Value(value encoding.BinaryUnmarshaler) error {
return value.UnmarshalBinary(i.it.Value())
}
func (*levelDBIterator) Close() error {
return nil
}
type snapshottedIterator struct {
levelDBIterator
snap *leveldb.Snapshot
}
func (i *snapshottedIterator) Close() error {
i.snap.Release()
return nil
}
// newIterator creates a new LevelDB iterator which is optionally based on a
// snapshot of the current DB state.
//
// For each of the iterator methods that have a return signature of (ok bool),
// if ok == false, the iterator may not be used any further and must be closed.
// Further work with the database requires the creation of a new iterator.
func (l *LevelDB) NewIterator(snapshotted bool) (Iterator, error) {
if !snapshotted {
return &levelDBIterator{
it: l.storage.NewIterator(keyspace, iteratorOpts),
}, nil
}
snap, err := l.storage.GetSnapshot()
if err != nil {
return nil, err
}
return &snapshottedIterator{
levelDBIterator: levelDBIterator{
it: snap.NewIterator(keyspace, iteratorOpts),
},
snap: snap,
}, nil
}

View file

@ -78,6 +78,9 @@ type Persistence interface {
// GetLabelValuesForLabelName returns the label values for the given
// label name.
GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error)
// GetFingerprintsModifiedBefore returns the fingerprints whose timeseries
// have live samples before the provided timestamp.
GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error)
// IndexMetric indexes the given metric for the needs of
// GetFingerprintsForLabelPair and GetLabelValuesForLabelName.

View file

@ -95,6 +95,25 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (
return lvs, nil
}
func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
var fp codec.CodableFingerprint
var tr codec.CodableTimeRange
fps := []clientmodel.Fingerprint{}
p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
if err := kv.Value(&tr); err != nil {
return err
}
if tr.First.Before(beforeTime) {
if err := kv.Key(&fp); err != nil {
return err
}
fps = append(fps, clientmodel.Fingerprint(fp))
}
return nil
})
return fps, nil
}
func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) error {
// 1. Open chunk file.
f, err := p.openChunkFileForWriting(fp)

View file

@ -291,10 +291,15 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
// TODO: Add archived fps:
// - Add iterator interface for KeyValueStore.
// - Iterate over s.persistence.archivedFingerprintToTimeRange.
// - If timeRange extends before ts, add fp to fps.
// TODO: If we decide not to remove entries from the timerange disk index
// upon unarchival, we could remove the memory copy above and only use
// the fingerprints from the disk index.
persistedFPs, err := s.persistence.GetFingerprintsModifiedBefore(ts)
if err != nil {
glog.Error("Failed to lookup persisted fingerprint ranges: ", err)
break
}
fps = append(fps, persistedFPs...)
for _, fp := range fps {
select {
@ -302,6 +307,9 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
glog.Info("Interrupted running series purge.")
return
default:
// TODO: Decide whether we also want to adjust the timerange index
// entries here. Not updating them shouldn't break anything, but will
// make some scenarios less efficient.
s.purgeSeries(fp, ts)
}
}