diff --git a/storage/interface.go b/storage/interface.go new file mode 100644 index 000000000..0c3c722cc --- /dev/null +++ b/storage/interface.go @@ -0,0 +1,63 @@ +// Copyright 2012 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +// RecordDecoder decodes each key-value pair in the database. The protocol +// around it makes the assumption that the underlying implementation is +// concurrency safe. +type RecordDecoder interface { + DecodeKey(in interface{}) (out interface{}, err error) + DecodeValue(in interface{}) (out interface{}, err error) +} + +// FilterResult describes the record matching and scanning behavior for the +// database. +type FilterResult int + +const ( + // Stop scanning the database. + STOP FilterResult = iota + // Skip this record but continue scanning. + SKIP + // Accept this record for the Operator. + ACCEPT +) + +type OperatorErrorType int + +type OperatorError struct { + error + Continuable bool +} + +// Filter is responsible for controlling the behavior of the database scan +// process and determines the disposition of various records. +// +// The protocol around it makes the assumption that the underlying +// implementation is concurrency safe. +type RecordFilter interface { + // Filter receives the key and value as decoded from the RecordDecoder type. + Filter(key, value interface{}) (filterResult FilterResult) +} + +// RecordOperator is responsible for taking action upon each entity that is +// passed to it. +// +// The protocol around it makes the assumption that the underlying +// implementation is concurrency safe. +type RecordOperator interface { + // Take action on a given record. If the action returns an error, the entire + // scan process stops. + Operate(key, value interface{}) (err *OperatorError) +} diff --git a/storage/raw/interface.go b/storage/raw/interface.go index 5f2539bcc..67103ba2b 100644 --- a/storage/raw/interface.go +++ b/storage/raw/interface.go @@ -15,6 +15,7 @@ package raw import ( "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/storage" ) type Pair struct { @@ -22,11 +23,25 @@ type Pair struct { Right []byte } +type EachFunc func(pair *Pair) + type Persistence interface { Has(key coding.Encoder) (bool, error) Get(key coding.Encoder) ([]byte, error) - GetAll() ([]Pair, error) Drop(key coding.Encoder) error Put(key, value coding.Encoder) error Close() error + + // ForEach is responsible for iterating through all records in the database + // until one of the following conditions are met: + // + // 1.) A system anomaly in the database scan. + // 2.) The last record in the database is reached. + // 3.) A FilterResult of STOP is emitted by the Filter. + // + // Decoding errors for an entity cause that entity to be skipped. + ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) + + // Pending removal. + GetAll() ([]Pair, error) } diff --git a/storage/raw/leveldb/leveldb.go b/storage/raw/leveldb/leveldb.go index 31dfaf84f..fffb2d4a3 100644 --- a/storage/raw/leveldb/leveldb.go +++ b/storage/raw/leveldb/leveldb.go @@ -17,6 +17,7 @@ import ( "flag" "github.com/jmhodges/levigo" "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/raw" "io" ) @@ -230,3 +231,44 @@ func (l *LevelDBPersistence) GetIterator() (i *levigo.Iterator, c io.Closer, err return } + +func (l *LevelDBPersistence) ForEach(decoder storage.RecordDecoder, filter storage.RecordFilter, operator storage.RecordOperator) (scannedEntireCorpus bool, err error) { + iterator, closer, err := l.GetIterator() + if err != nil { + return + } + defer closer.Close() + + for iterator.SeekToFirst(); iterator.Valid(); iterator.Next() { + err = iterator.GetError() + if err != nil { + return + } + + decodedKey, decodeErr := decoder.DecodeKey(iterator.Key()) + if decodeErr != nil { + continue + } + decodedValue, decodeErr := decoder.DecodeValue(iterator.Value()) + if decodeErr != nil { + continue + } + + switch filter.Filter(decodedKey, decodedValue) { + case storage.STOP: + return + case storage.SKIP: + continue + case storage.ACCEPT: + opErr := operator.Operate(decodedKey, decodedValue) + if opErr != nil { + if opErr.Continuable { + continue + } + break + } + } + } + scannedEntireCorpus = true + return +}