Handle fingerprint collisions.

This commit is contained in:
beorn7 2015-05-06 16:53:12 +02:00
parent b404ad5c91
commit 2235cec175
9 changed files with 861 additions and 38 deletions

View file

@ -84,6 +84,19 @@ func EncodeVarint(w io.Writer, i int64) (int, error) {
return bytesWritten, err return bytesWritten, err
} }
// EncodeUvarint encodes an uint64 as a varint and writes it to an io.Writer.
// It returns the number of bytes written.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func EncodeUvarint(w io.Writer, i uint64) (int, error) {
buf := getBuf(binary.MaxVarintLen64)
defer putBuf(buf)
bytesWritten := binary.PutUvarint(buf, i)
_, err := w.Write(buf[:bytesWritten])
return bytesWritten, err
}
// EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order. // EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order.
// This is a GC-friendly implementation that takes the required staging buffer // This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool. // from a buffer pool.

179
storage/local/mapper.go Normal file
View file

@ -0,0 +1,179 @@
package local
import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
)
const maxMappedFP = 1 << 30 // About 1B fingerprints reserved for mapping.
var separatorString = string([]byte{clientmodel.SeparatorByte})
// fpMappings maps original fingerprints to a map of string representations of
// metrics to the truly unique fingerprint.
type fpMappings map[clientmodel.Fingerprint]map[string]clientmodel.Fingerprint
// fpMapper is used to map fingerprints in order to work around fingerprint
// collisions.
type fpMapper struct {
mtx sync.RWMutex // Protects collisions.
mappings fpMappings
fpToSeries *seriesMap
p *persistence
highestMappedFP clientmodel.Fingerprint
}
// newFPMapper loads the collision map from the persistence and
// returns an fpCollisionResolver ready to use.
func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
r := &fpMapper{
fpToSeries: fpToSeries,
p: p,
}
mappings, nextFP, err := p.loadFPMappings()
if err != nil {
return nil, err
}
r.mappings = mappings
r.highestMappedFP = nextFP
return r, nil
}
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
//
// If an error is encountered, it is returned together with the unchanged raw
// fingerprint.
func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clientmodel.Fingerprint, error) {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
return r.maybeAddMapping(fp, m)
}
// Then check the most likely case: This fp belongs to a series that is
// already in memory.
s, ok := r.fpToSeries.get(fp)
if ok {
// FP exists in memory, but is it for the same metric?
if m.Equal(s.metric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return r.maybeAddMapping(fp, m)
}
// Metric is not in memory. Check the archive next.
archivedMetric, err := r.p.getArchivedMetric(fp)
if err != nil {
return fp, err
}
if archivedMetric != nil {
// FP exists in archive, but is it for the same metric?
if m.Equal(archivedMetric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return r.maybeAddMapping(fp, m)
}
// The fingerprint is genuinely new. We might have mapped it
// historically, though. so we need to check the collisions map.
r.mtx.RLock()
mappedFPs, ok := r.mappings[fp]
r.mtx.RUnlock()
if !ok {
// No historical mapping, we are good.
return fp, nil
}
// We indeed have mapped fp historically.
ms := metricToUniqueString(m)
// fp is locked by the caller, so no further locking of 'collisions'
// required (it is specific to fp).
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP, nil
}
// As fp does not exist, neither in memory nor in archive, we can safely
// keep it unmapped.
return fp, nil
}
// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (r *fpMapper) maybeAddMapping(
fp clientmodel.Fingerprint,
collidingMetric clientmodel.Metric,
) (clientmodel.Fingerprint, error) {
ms := metricToUniqueString(collidingMetric)
r.mtx.RLock()
mappedFPs, ok := r.mappings[fp]
r.mtx.RUnlock()
if ok {
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP, nil // Existing mapping.
}
// A new mapping has to be created.
mappedFP = r.nextMappedFP()
mappedFPs[ms] = mappedFP
r.mtx.RLock()
// Checkpoint mappings after each change.
err := r.p.checkpointFPMappings(r.mappings)
r.mtx.RUnlock()
glog.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
}
// This is the first collision for fp.
mappedFP := r.nextMappedFP()
mappedFPs = map[string]clientmodel.Fingerprint{ms: mappedFP}
r.mtx.Lock()
r.mappings[fp] = mappedFPs
// Checkpoint mappings after each change.
err := r.p.checkpointFPMappings(r.mappings)
r.mtx.Unlock()
glog.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
}
func (r *fpMapper) nextMappedFP() clientmodel.Fingerprint {
mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&r.highestMappedFP), 1))
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
return mappedFP
}
// metricToUniqueString turns a metric into a string in a reproducible and
// unique way, i.e. the same metric will always create the same string, and
// different metrics will always create different strings. In a way, it is the
// "ideal" fingerprint function, only that it is more expensive than the
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m clientmodel.Metric) string {
parts := make([]string, 0, len(m))
for ln, lv := range m {
parts = append(parts, string(ln)+separatorString+string(lv))
}
sort.Strings(parts)
return strings.Join(parts, separatorString)
}

View file

@ -0,0 +1,400 @@
package local
import (
"testing"
clientmodel "github.com/prometheus/client_golang/model"
)
var (
// cm11, cm12, cm13 are colliding with fp1.
// cm21, cm22 are colliding with fp2.
// cm31, cm32 are colliding with fp3, which is below maxMappedFP.
// Note that fingerprints are set and not actually calculated.
// The collision detection is independent from the actually used
// fingerprinting algorithm.
fp1 = clientmodel.Fingerprint(maxMappedFP + 1)
fp2 = clientmodel.Fingerprint(maxMappedFP + 2)
fp3 = clientmodel.Fingerprint(1)
cm11 = clientmodel.Metric{
"foo": "bar",
"dings": "bumms",
}
cm12 = clientmodel.Metric{
"bar": "foo",
}
cm13 = clientmodel.Metric{
"foo": "bar",
}
cm21 = clientmodel.Metric{
"foo": "bumms",
"dings": "bar",
}
cm22 = clientmodel.Metric{
"dings": "foo",
"bar": "bumms",
}
cm31 = clientmodel.Metric{
"bumms": "dings",
}
cm32 = clientmodel.Metric{
"bumms": "dings",
"bar": "foo",
}
)
func TestFPMapper(t *testing.T) {
sm := newSeriesMap()
p, closer := newTestPersistence(t, 1)
defer closer.Close()
mapper, err := newFPMapper(sm, p)
if err != nil {
t.Fatal(err)
}
// Everything is empty, resolving a FP should do nothing.
gotFP, err := mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve
// the collision.
sm.put(fp1, &memorySeries{metric: cm11})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// The mapped cm12 is added to sm, too. That should not change the outcome.
sm.put(clientmodel.Fingerprint(1), &memorySeries{metric: cm12})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now map cm13, should reproducibly result in the next mapped FP.
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Add cm13 to sm. Should not change anything.
sm.put(clientmodel.Fingerprint(2), &memorySeries{metric: cm13})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now add cm21 and cm22 in the same way, checking the mapped FPs.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(fp2, &memorySeries{metric: cm21})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(3), &memorySeries{metric: cm22})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Map cm31, resulting in a mapping straight away.
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(4), &memorySeries{metric: cm31})
// Map cm32, which is now mapped for two reasons...
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(5), &memorySeries{metric: cm32})
// Now check ALL the mappings, just to be sure.
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Remove all the fingerprints from sm, which should change nothing, as
// the existing mappings stay and should be detected.
sm.del(fp1)
sm.del(fp2)
sm.del(fp3)
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Load the mapper anew from disk and then check all the mappings again
// to make sure all changes have made it to disk.
mapper, err = newFPMapper(sm, p)
if err != nil {
t.Fatal(err)
}
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// To make sure that the mapping layer is not queried if the FP is found
// either in sm or in the archive, now put fp1 with cm12 in sm and fp2
// with cm22 into archive (which will never happen in practice as only
// mapped FPs are put into sm and the archive.
sm.put(fp1, &memorySeries{metric: cm12})
p.archiveMetric(fp2, cm22, 0, 0)
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// If we now map cm21, we should get a mapping as the collision with the
// archived metric is detected.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(6); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
}

View file

@ -55,6 +55,11 @@ const (
headsFormatLegacyVersion = 1 // Can read, but will never write. headsFormatLegacyVersion = 1 // Can read, but will never write.
headsMagicString = "PrometheusHeads" headsMagicString = "PrometheusHeads"
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
mappingsFormatVersion = 1
mappingsMagicString = "PrometheusMappings"
dirtyFileName = "DIRTY" dirtyFileName = "DIRTY"
fileBufSize = 1 << 16 // 64kiB. fileBufSize = 1 << 16 // 64kiB.
@ -1278,6 +1283,14 @@ func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName) return path.Join(p.basePath, headsTempFileName)
} }
func (p *persistence) mappingsFileName() string {
return path.Join(p.basePath, mappingsFileName)
}
func (p *persistence) mappingsTempFileName() string {
return path.Join(p.basePath, mappingsTempFileName)
}
func (p *persistence) processIndexingQueue() { func (p *persistence) processIndexingQueue() {
batchSize := 0 batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{} nameToValues := index.LabelNameLabelValuesMapping{}
@ -1383,6 +1396,157 @@ loop:
close(p.indexingStopped) close(p.indexingStopped)
} }
// checkpointFPMappings persists the fingerprint mappings. This method is not
// goroutine-safe.
//
// Description of the file format, v1:
//
// (1) Magic string (const mappingsMagicString).
//
// (2) Uvarint-encoded format version (const mappingsFormatVersion).
//
// (3) Uvarint-encoded number of mappings in fpMappings.
//
// (4) Repeated once per mapping:
//
// (4.1) The raw fingerprint as big-endian uint64.
//
// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint.
//
// (4.3) Repeated once per sub-mapping:
//
// (4.3.1) The uvarint-encoded length of the unique metric string.
// (4.3.2) The unique metric string.
// (4.3.3) The mapped fingerprint as big-endian uint64.
func (p *persistence) checkpointFPMappings(c fpMappings) (err error) {
glog.Info("Checkpointing fingerprint mappings...")
begin := time.Now()
f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return
}
defer func() {
f.Sync()
closeErr := f.Close()
if err != nil {
return
}
err = closeErr
if err != nil {
return
}
err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName())
duration := time.Since(begin)
glog.Infof("Done checkpointing fingerprint mappings in %v.", duration)
}()
w := bufio.NewWriterSize(f, fileBufSize)
if _, err = w.WriteString(mappingsMagicString); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, uint64(len(c))); err != nil {
return
}
for fp, mappings := range c {
if err = codable.EncodeUint64(w, uint64(fp)); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil {
return
}
for ms, mappedFP := range mappings {
if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil {
return
}
if _, err = w.WriteString(ms); err != nil {
return
}
if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil {
return
}
}
}
err = w.Flush()
return
}
// loadFPMappings loads the fingerprint mappings. It also returns the highest
// mapped fingerprint and any error encountered. If p.mappingsFileName is not
// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently
// with checkpointFPMappings.
func (p *persistence) loadFPMappings() (fpMappings, clientmodel.Fingerprint, error) {
fpm := fpMappings{}
var highestMappedFP clientmodel.Fingerprint
f, err := os.Open(p.mappingsFileName())
if os.IsNotExist(err) {
return fpm, 0, nil
}
if err != nil {
return nil, 0, err
}
defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize)
buf := make([]byte, len(mappingsMagicString))
if _, err := io.ReadFull(r, buf); err != nil {
return nil, 0, err
}
magic := string(buf)
if magic != mappingsMagicString {
return nil, 0, fmt.Errorf(
"unexpected magic string, want %q, got %q",
mappingsMagicString, magic,
)
}
version, err := binary.ReadUvarint(r)
if version != mappingsFormatVersion || err != nil {
return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion)
}
numRawFPs, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
for ; numRawFPs > 0; numRawFPs-- {
rawFP, err := codable.DecodeUint64(r)
if err != nil {
return nil, 0, err
}
numMappings, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
mappings := make(map[string]clientmodel.Fingerprint, numMappings)
for ; numMappings > 0; numMappings-- {
lenMS, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
buf := make([]byte, lenMS)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, 0, err
}
fp, err := codable.DecodeUint64(r)
if err != nil {
return nil, 0, err
}
mappedFP := clientmodel.Fingerprint(fp)
if mappedFP > highestMappedFP {
highestMappedFP = mappedFP
}
mappings[string(buf)] = mappedFP
}
fpm[clientmodel.Fingerprint(rawFP)] = mappings
}
return fpm, highestMappedFP, nil
}
func offsetForChunkIndex(i int) int64 { func offsetForChunkIndex(i int) int64 {
return int64(i * chunkLenWithHeader) return int64(i * chunkLenWithHeader)
} }

View file

@ -484,6 +484,36 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1) testCheckpointAndLoadSeriesMapAndHeads(t, 1)
} }
func TestCheckpointAndLoadFPMappings(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
in := fpMappings{
1: map[string]clientmodel.Fingerprint{
"foo": 1,
"bar": 2,
},
3: map[string]clientmodel.Fingerprint{
"baz": 4,
},
}
if err := p.checkpointFPMappings(in); err != nil {
t.Fatal(err)
}
out, fp, err := p.loadFPMappings()
if err != nil {
t.Fatal(err)
}
if got, want := fp, clientmodel.Fingerprint(4); got != want {
t.Errorf("got highest FP %v, want %v", got, want)
}
if !reflect.DeepEqual(in, out) {
t.Errorf("got collision map %v, want %v", out, in)
}
}
func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding) p, closer := newTestPersistence(t, encoding)
defer closer.Close() defer closer.Close()

View file

@ -301,7 +301,9 @@ func (s *memorySeries) dropChunks(t clientmodel.Timestamp) {
} }
// preloadChunks is an internal helper method. // preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([]*chunkDesc, error) { func (s *memorySeries) preloadChunks(
indexes []int, fp clientmodel.Fingerprint, mss *memorySeriesStorage,
) ([]*chunkDesc, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
@ -318,7 +320,6 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([
if s.chunkDescsOffset == -1 { if s.chunkDescsOffset == -1 {
panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory")
} }
fp := s.metric.FastFingerprint() // TODO(beorn): Handle collisions.
chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset) chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
if err != nil { if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now. // Unpin the chunks since we won't return them as pinned chunks now.
@ -409,7 +410,7 @@ func (s *memorySeries) preloadChunksForRange(
for i := fromIdx; i <= throughIdx; i++ { for i := fromIdx; i <= throughIdx; i++ {
pinIndexes = append(pinIndexes, i) pinIndexes = append(pinIndexes, i)
} }
return s.preloadChunks(pinIndexes, mss) return s.preloadChunks(pinIndexes, fp, mss)
} }
// newIterator returns a new SeriesIterator. The caller must have locked the // newIterator returns a new SeriesIterator. The caller must have locked the

View file

@ -93,6 +93,7 @@ type memorySeriesStorage struct {
degraded bool degraded bool
persistence *persistence persistence *persistence
mapper *fpMapper
evictList *list.List evictList *list.List
evictRequests chan evictRequest evictRequests chan evictRequest
@ -211,6 +212,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
glog.Infof("%d series loaded.", s.fpToSeries.length()) glog.Infof("%d series loaded.", s.fpToSeries.length())
s.numSeries.Set(float64(s.fpToSeries.length())) s.numSeries.Set(float64(s.fpToSeries.length()))
mapper, err := newFPMapper(s.fpToSeries, p)
if err != nil {
return nil, err
}
s.mapper = mapper
return s, nil return s, nil
} }
@ -382,8 +389,18 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
} }
glog.Warning("Sample ingestion resumed.") glog.Warning("Sample ingestion resumed.")
} }
fp := sample.Metric.FastFingerprint() // TODO(beorn): Handle collisions. rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(fp) s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
if err != nil {
glog.Errorf("Error while mapping fingerprint %v: %v", rawFP, err)
s.persistence.setDirty(true)
}
if fp != rawFP {
// Switch locks.
s.fpLocker.Unlock(rawFP)
s.fpLocker.Lock(fp)
}
series := s.getOrCreateSeries(fp, sample.Metric) series := s.getOrCreateSeries(fp, sample.Metric)
completedChunksCount := series.add(&metric.SamplePair{ completedChunksCount := series.add(&metric.SamplePair{
Value: sample.Value, Value: sample.Value,

View file

@ -201,8 +201,8 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
} }
s.WaitForIndexing() s.WaitForIndexing()
for m := range s.(*memorySeriesStorage).fpToSeries.iter() { for m := range s.fpToSeries.iter() {
s.(*memorySeriesStorage).fpLocker.Lock(m.fp) s.fpLocker.Lock(m.fp)
var values metric.Values var values metric.Values
for _, cd := range m.series.chunkDescs { for _, cd := range m.series.chunkDescs {
@ -222,7 +222,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value)
} }
} }
s.(*memorySeriesStorage).fpLocker.Unlock(m.fp) s.fpLocker.Unlock(m.fp)
} }
glog.Info("test done, closing") glog.Info("test done, closing")
} }
@ -491,8 +491,6 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
s, closer := NewTestStorage(t, encoding) s, closer := NewTestStorage(t, encoding)
defer closer.Close() defer closer.Close()
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
for _, sample := range samples { for _, sample := range samples {
s.Append(sample) s.Append(sample)
} }
@ -501,7 +499,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
fp := clientmodel.Metric{}.FastFingerprint() fp := clientmodel.Metric{}.FastFingerprint()
// Drop ~half of the chunks. // Drop ~half of the chunks.
ms.maintainMemorySeries(fp, 1000) s.maintainMemorySeries(fp, 1000)
it := s.NewIterator(fp) it := s.NewIterator(fp)
actual := it.GetBoundaryValues(metric.Interval{ actual := it.GetBoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
@ -519,7 +517,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
// Drop everything. // Drop everything.
ms.maintainMemorySeries(fp, 10000) s.maintainMemorySeries(fp, 10000)
it = s.NewIterator(fp) it = s.NewIterator(fp)
actual = it.GetBoundaryValues(metric.Interval{ actual = it.GetBoundaryValues(metric.Interval{
OldestInclusive: 0, OldestInclusive: 0,
@ -535,24 +533,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
s.WaitForIndexing() s.WaitForIndexing()
series, ok := ms.fpToSeries.get(fp) series, ok := s.fpToSeries.get(fp)
if !ok { if !ok {
t.Fatal("could not find series") t.Fatal("could not find series")
} }
// Persist head chunk so we can safely archive. // Persist head chunk so we can safely archive.
series.headChunkClosed = true series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest) s.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics. // Archive metrics.
ms.fpToSeries.del(fp) s.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(), fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil { ); err != nil {
t.Fatal(err) t.Fatal(err)
} }
archived, _, _, err := ms.persistence.hasArchivedMetric(fp) archived, _, _, err := s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -561,8 +559,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
// Drop ~half of the chunks of an archived series. // Drop ~half of the chunks of an archived series.
ms.maintainArchivedSeries(fp, 1000) s.maintainArchivedSeries(fp, 1000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp) archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -571,8 +569,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
// Drop everything. // Drop everything.
ms.maintainArchivedSeries(fp, 10000) s.maintainArchivedSeries(fp, 10000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp) archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -586,24 +584,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
s.WaitForIndexing() s.WaitForIndexing()
series, ok = ms.fpToSeries.get(fp) series, ok = s.fpToSeries.get(fp)
if !ok { if !ok {
t.Fatal("could not find series") t.Fatal("could not find series")
} }
// Persist head chunk so we can safely archive. // Persist head chunk so we can safely archive.
series.headChunkClosed = true series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest) s.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics. // Archive metrics.
ms.fpToSeries.del(fp) s.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric( if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(), fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil { ); err != nil {
t.Fatal(err) t.Fatal(err)
} }
archived, _, _, err = ms.persistence.hasArchivedMetric(fp) archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -612,13 +610,13 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
} }
// Unarchive metrics. // Unarchive metrics.
ms.getOrCreateSeries(fp, clientmodel.Metric{}) s.getOrCreateSeries(fp, clientmodel.Metric{})
series, ok = ms.fpToSeries.get(fp) series, ok = s.fpToSeries.get(fp)
if !ok { if !ok {
t.Fatal("could not find series") t.Fatal("could not find series")
} }
archived, _, _, err = ms.persistence.hasArchivedMetric(fp) archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -628,8 +626,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// This will archive again, but must not drop it completely, despite the // This will archive again, but must not drop it completely, despite the
// memorySeries being empty. // memorySeries being empty.
ms.maintainMemorySeries(fp, 1000) s.maintainMemorySeries(fp, 1000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp) archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -751,11 +749,11 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
for _, sample := range samples[start:middle] { for _, sample := range samples[start:middle] {
s.Append(sample) s.Append(sample)
} }
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod) verifyStorage(b, s.(*memorySeriesStorage), samples[:middle], o.PersistenceRetentionPeriod)
for _, sample := range samples[middle:end] { for _, sample := range samples[middle:end] {
s.Append(sample) s.Append(sample)
} }
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod) verifyStorage(b, s.(*memorySeriesStorage), samples[:end], o.PersistenceRetentionPeriod)
} }
} }
@ -823,7 +821,25 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples {
} }
) )
result := clientmodel.Samples{} // Prefill resust with two samples with colliding metrics (to test fingerprint mapping).
result := clientmodel.Samples{
&clientmodel.Sample{
Metric: clientmodel.Metric{
"instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24483",
"status": "503",
},
Value: 42,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
"instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24480",
"status": "500",
},
Value: 2010,
Timestamp: timestamp + 1,
},
}
metrics := []clientmodel.Metric{} metrics := []clientmodel.Metric{}
for n := rand.Intn(maxMetrics); n >= 0; n-- { for n := rand.Intn(maxMetrics); n >= 0; n-- {
@ -885,7 +901,7 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples {
return result return result
} }
func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool { func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Samples, maxAge time.Duration) bool {
s.WaitForIndexing() s.WaitForIndexing()
result := true result := true
for _, i := range rand.Perm(len(samples)) { for _, i := range rand.Perm(len(samples)) {
@ -896,7 +912,10 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
// retention period, we can verify here that no results // retention period, we can verify here that no results
// are returned. // are returned.
} }
fp := sample.Metric.FastFingerprint() fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if err != nil {
t.Fatal(err)
}
p := s.NewPreloader() p := s.NewPreloader()
p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour)
found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp) found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp)

View file

@ -37,7 +37,7 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary // NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the // directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up. // returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) { func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, test.Closer) {
*defaultChunkEncoding = int(encoding) *defaultChunkEncoding = int(encoding)
directory := test.NewTemporaryDirectory("test_storage", t) directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
@ -61,5 +61,5 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
directory: directory, directory: directory,
} }
return storage, closer return storage.(*memorySeriesStorage), closer
} }