Merge pull request #682 from prometheus/beorn7/fingerprint

The desperately awaited collision detection.
This commit is contained in:
Björn Rabenstein 2015-05-11 17:15:43 +02:00
commit 2e8a50649b
10 changed files with 909 additions and 46 deletions

View file

@ -84,6 +84,19 @@ func EncodeVarint(w io.Writer, i int64) (int, error) {
return bytesWritten, err
}
// EncodeUvarint encodes an uint64 as a varint and writes it to an io.Writer.
// It returns the number of bytes written.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.
func EncodeUvarint(w io.Writer, i uint64) (int, error) {
buf := getBuf(binary.MaxVarintLen64)
defer putBuf(buf)
bytesWritten := binary.PutUvarint(buf, i)
_, err := w.Write(buf[:bytesWritten])
return bytesWritten, err
}
// EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order.
// This is a GC-friendly implementation that takes the required staging buffer
// from a buffer pool.

View file

@ -42,6 +42,12 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
count := 0
seriesDirNameFmt := fmt.Sprintf("%%0%dx", seriesDirNameLen)
// Delete the fingerprint mapping file as it might be stale or
// corrupt. We'll rebuild the mappings as we go.
os.Remove(p.mappingsFileName())
// The mappings to rebuild.
fpm := fpMappings{}
glog.Info("Scanning files.")
for i := 0; i < 1<<(seriesDirNameLen*4); i++ {
dirname := path.Join(p.basePath, fmt.Sprintf(seriesDirNameFmt, i))
@ -58,7 +64,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
return err
}
for _, fi := range fis {
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries)
fp, ok := p.sanitizeSeries(dirname, fi, fingerprintToSeries, fpm)
if ok {
fpsSeen[fp] = struct{}{}
}
@ -75,7 +81,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
for fp, s := range fingerprintToSeries {
if _, seen := fpsSeen[fp]; !seen {
// fp exists in fingerprintToSeries, but has no representation on disk.
if s.headChunkClosed {
if s.persistWatermark == len(s.chunkDescs) {
// Oops, everything including the head chunk was
// already persisted, but nothing on disk.
// Thus, we lost that series completely. Clean
@ -112,17 +118,24 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
s.persistWatermark = 0
s.chunkDescsOffset = 0
}
maybeAddMapping(fp, s.metric, fpm)
fpsSeen[fp] = struct{}{} // Add so that fpsSeen is complete.
}
}
glog.Info("Check for series without series file complete.")
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen); err != nil {
if err := p.cleanUpArchiveIndexes(fingerprintToSeries, fpsSeen, fpm); err != nil {
return err
}
if err := p.rebuildLabelIndexes(fingerprintToSeries); err != nil {
return err
}
// Finally rewrite the mappings file if there are any mappings.
if len(fpm) > 0 {
if err := p.checkpointFPMappings(fpm); err != nil {
return err
}
}
p.setDirty(false)
glog.Warning("Crash recovery complete.")
@ -156,7 +169,9 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[clientmodel.Finge
// is checked for its presence in the index of archived series. If it cannot
// be found there, it is moved into the orphaned directory.
func (p *persistence) sanitizeSeries(
dirname string, fi os.FileInfo, fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries,
dirname string, fi os.FileInfo,
fingerprintToSeries map[clientmodel.Fingerprint]*memorySeries,
fpm fpMappings,
) (clientmodel.Fingerprint, bool) {
filename := path.Join(dirname, fi.Name())
purge := func() {
@ -222,6 +237,7 @@ func (p *persistence) sanitizeSeries(
if s == nil {
panic("fingerprint mapped to nil pointer")
}
maybeAddMapping(fp, s.metric, fpm)
if !p.pedanticChecks &&
bytesToTrim == 0 &&
s.chunkDescsOffset != -1 &&
@ -320,12 +336,14 @@ func (p *persistence) sanitizeSeries(
return fp, false
}
// This series looks like a properly archived one.
maybeAddMapping(fp, metric, fpm)
return fp, true
}
func (p *persistence) cleanUpArchiveIndexes(
fpToSeries map[clientmodel.Fingerprint]*memorySeries,
fpsSeen map[clientmodel.Fingerprint]struct{},
fpm fpMappings,
) error {
glog.Info("Cleaning up archive indexes.")
var fp codable.Fingerprint
@ -359,7 +377,12 @@ func (p *persistence) cleanUpArchiveIndexes(
_, err := p.archivedFingerprintToTimeRange.Delete(fp)
return err
}
// fp is legitimately archived. Make sure it is in timerange index, too.
// fp is legitimately archived. Now we need the metric to check for a mapped fingerprint.
if err := kv.Value(&m); err != nil {
return err
}
maybeAddMapping(clientmodel.Fingerprint(fp), clientmodel.Metric(m), fpm)
// Make sure it is in timerange index, too.
has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil {
return err
@ -372,9 +395,6 @@ func (p *persistence) cleanUpArchiveIndexes(
if _, err := p.archivedFingerprintToMetrics.Delete(fp); err != nil {
return err
}
if err := kv.Value(&m); err != nil {
return err
}
series := newMemorySeries(clientmodel.Metric(m), false, clientmodel.Earliest)
cds, err := p.loadChunkDescs(clientmodel.Fingerprint(fp), clientmodel.Now())
if err != nil {
@ -455,3 +475,20 @@ func (p *persistence) rebuildLabelIndexes(
glog.Info("All requests for rebuilding the label indexes queued. (Actual processing may lag behind.)")
return nil
}
// maybeAddMapping adds a fingerprint mapping to fpm if the FastFingerprint of m is different from fp.
func maybeAddMapping(fp clientmodel.Fingerprint, m clientmodel.Metric, fpm fpMappings) {
if rawFP := m.FastFingerprint(); rawFP != fp {
glog.Warningf(
"Metric %v with fingerprint %v is mapped from raw fingerprint %v.",
m, fp, rawFP,
)
if mappedFPs, ok := fpm[rawFP]; ok {
mappedFPs[metricToUniqueString(m)] = fp
} else {
fpm[rawFP] = map[string]clientmodel.Fingerprint{
metricToUniqueString(m): fp,
}
}
}
}

179
storage/local/mapper.go Normal file
View file

@ -0,0 +1,179 @@
package local
import (
"fmt"
"sort"
"strings"
"sync"
"sync/atomic"
"github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model"
)
const maxMappedFP = 1 << 20 // About 1M fingerprints reserved for mapping.
var separatorString = string([]byte{clientmodel.SeparatorByte})
// fpMappings maps original fingerprints to a map of string representations of
// metrics to the truly unique fingerprint.
type fpMappings map[clientmodel.Fingerprint]map[string]clientmodel.Fingerprint
// fpMapper is used to map fingerprints in order to work around fingerprint
// collisions.
type fpMapper struct {
mtx sync.RWMutex // Protects mappings.
mappings fpMappings
fpToSeries *seriesMap
p *persistence
highestMappedFP clientmodel.Fingerprint
}
// newFPMapper loads the collision map from the persistence and
// returns an fpMapper ready to use.
func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) {
r := &fpMapper{
fpToSeries: fpToSeries,
p: p,
}
mappings, nextFP, err := p.loadFPMappings()
if err != nil {
return nil, err
}
r.mappings = mappings
r.highestMappedFP = nextFP
return r, nil
}
// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and
// returns a truly unique fingerprint. The caller must have locked the raw
// fingerprint.
//
// If an error is encountered, it is returned together with the unchanged raw
// fingerprint.
func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clientmodel.Fingerprint, error) {
// First check if we are in the reserved FP space, in which case this is
// automatically a collision that has to be mapped.
if fp <= maxMappedFP {
return r.maybeAddMapping(fp, m)
}
// Then check the most likely case: This fp belongs to a series that is
// already in memory.
s, ok := r.fpToSeries.get(fp)
if ok {
// FP exists in memory, but is it for the same metric?
if m.Equal(s.metric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return r.maybeAddMapping(fp, m)
}
// Metric is not in memory. Before doing the expensive archive lookup,
// check if we have a mapping for this metric in place already.
r.mtx.RLock()
mappedFPs, fpAlreadyMapped := r.mappings[fp]
r.mtx.RUnlock()
if fpAlreadyMapped {
// We indeed have mapped fp historically.
ms := metricToUniqueString(m)
// fp is locked by the caller, so no further locking of
// 'collisions' required (it is specific to fp).
mappedFP, ok := mappedFPs[ms]
if ok {
// Historical mapping found, return the mapped FP.
return mappedFP, nil
}
}
// If we are here, FP does not exist in memory and is either not mapped
// at all, or existing mappings for FP are not for m. Check if we have
// something for FP in the archive.
archivedMetric, err := r.p.getArchivedMetric(fp)
if err != nil {
return fp, err
}
if archivedMetric != nil {
// FP exists in archive, but is it for the same metric?
if m.Equal(archivedMetric) {
// Yupp. We are done.
return fp, nil
}
// Collision detected!
return r.maybeAddMapping(fp, m)
}
// As fp does not exist, neither in memory nor in archive, we can safely
// keep it unmapped.
return fp, nil
}
// maybeAddMapping is only used internally. It takes a detected collision and
// adds it to the collisions map if not yet there. In any case, it returns the
// truly unique fingerprint for the colliding metric.
func (r *fpMapper) maybeAddMapping(
fp clientmodel.Fingerprint,
collidingMetric clientmodel.Metric,
) (clientmodel.Fingerprint, error) {
ms := metricToUniqueString(collidingMetric)
r.mtx.RLock()
mappedFPs, ok := r.mappings[fp]
r.mtx.RUnlock()
if ok {
// fp is locked by the caller, so no further locking required.
mappedFP, ok := mappedFPs[ms]
if ok {
return mappedFP, nil // Existing mapping.
}
// A new mapping has to be created.
mappedFP = r.nextMappedFP()
mappedFPs[ms] = mappedFP
r.mtx.RLock()
// Checkpoint mappings after each change.
err := r.p.checkpointFPMappings(r.mappings)
r.mtx.RUnlock()
glog.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
}
// This is the first collision for fp.
mappedFP := r.nextMappedFP()
mappedFPs = map[string]clientmodel.Fingerprint{ms: mappedFP}
r.mtx.Lock()
r.mappings[fp] = mappedFPs
// Checkpoint mappings after each change.
err := r.p.checkpointFPMappings(r.mappings)
r.mtx.Unlock()
glog.Infof(
"Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.",
fp, collidingMetric, mappedFP,
)
return mappedFP, err
}
func (r *fpMapper) nextMappedFP() clientmodel.Fingerprint {
mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&r.highestMappedFP), 1))
if mappedFP > maxMappedFP {
panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP))
}
return mappedFP
}
// metricToUniqueString turns a metric into a string in a reproducible and
// unique way, i.e. the same metric will always create the same string, and
// different metrics will always create different strings. In a way, it is the
// "ideal" fingerprint function, only that it is more expensive than the
// FastFingerprint function, and its result is not suitable as a key for maps
// and indexes as it might become really large, causing a lot of hashing effort
// in maps and a lot of storage overhead in indexes.
func metricToUniqueString(m clientmodel.Metric) string {
parts := make([]string, 0, len(m))
for ln, lv := range m {
parts = append(parts, string(ln)+separatorString+string(lv))
}
sort.Strings(parts)
return strings.Join(parts, separatorString)
}

View file

@ -0,0 +1,403 @@
package local
import (
"testing"
clientmodel "github.com/prometheus/client_golang/model"
)
var (
// cm11, cm12, cm13 are colliding with fp1.
// cm21, cm22 are colliding with fp2.
// cm31, cm32 are colliding with fp3, which is below maxMappedFP.
// Note that fingerprints are set and not actually calculated.
// The collision detection is independent from the actually used
// fingerprinting algorithm.
fp1 = clientmodel.Fingerprint(maxMappedFP + 1)
fp2 = clientmodel.Fingerprint(maxMappedFP + 2)
fp3 = clientmodel.Fingerprint(1)
cm11 = clientmodel.Metric{
"foo": "bar",
"dings": "bumms",
}
cm12 = clientmodel.Metric{
"bar": "foo",
}
cm13 = clientmodel.Metric{
"foo": "bar",
}
cm21 = clientmodel.Metric{
"foo": "bumms",
"dings": "bar",
}
cm22 = clientmodel.Metric{
"dings": "foo",
"bar": "bumms",
}
cm31 = clientmodel.Metric{
"bumms": "dings",
}
cm32 = clientmodel.Metric{
"bumms": "dings",
"bar": "foo",
}
)
func TestFPMapper(t *testing.T) {
sm := newSeriesMap()
p, closer := newTestPersistence(t, 1)
defer closer.Close()
mapper, err := newFPMapper(sm, p)
if err != nil {
t.Fatal(err)
}
// Everything is empty, resolving a FP should do nothing.
gotFP, err := mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve
// the collision.
sm.put(fp1, &memorySeries{metric: cm11})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// The mapped cm12 is added to sm, too. That should not change the outcome.
sm.put(clientmodel.Fingerprint(1), &memorySeries{metric: cm12})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now map cm13, should reproducibly result in the next mapped FP.
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Add cm13 to sm. Should not change anything.
sm.put(clientmodel.Fingerprint(2), &memorySeries{metric: cm13})
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Now add cm21 and cm22 in the same way, checking the mapped FPs.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(fp2, &memorySeries{metric: cm21})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(3), &memorySeries{metric: cm22})
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Map cm31, resulting in a mapping straight away.
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(4), &memorySeries{metric: cm31})
// Map cm32, which is now mapped for two reasons...
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
sm.put(clientmodel.Fingerprint(5), &memorySeries{metric: cm32})
// Now check ALL the mappings, just to be sure.
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Remove all the fingerprints from sm, which should change nothing, as
// the existing mappings stay and should be detected.
sm.del(fp1)
sm.del(fp2)
sm.del(fp3)
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// Load the mapper anew from disk and then check all the mappings again
// to make sure all changes have made it to disk.
mapper, err = newFPMapper(sm, p)
if err != nil {
t.Fatal(err)
}
gotFP, err = mapper.mapFP(fp1, cm11)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp1, cm13)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := fp2; gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm31)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp3, cm32)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// To make sure that the mapping layer is not queried if the FP is found
// in sm but the mapping layer is queried before going to the archive,
// now put fp1 with cm12 in sm and fp2 with cm22 into archive (which
// will never happen in practice as only mapped FPs are put into sm and
// the archive).
sm.put(fp1, &memorySeries{metric: cm12})
p.archiveMetric(fp2, cm22, 0, 0)
gotFP, err = mapper.mapFP(fp1, cm12)
if err != nil {
t.Fatal(err)
}
if wantFP := fp1; gotFP != wantFP { // No mapping happened.
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
gotFP, err = mapper.mapFP(fp2, cm22)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { // Old mapping still applied.
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
// If we now map cm21, we should get a mapping as the collision with the
// archived metric is detected. Again, this is a pathological situation
// that must never happen in real operations. It's just staged here to
// test the expected behavior.
gotFP, err = mapper.mapFP(fp2, cm21)
if err != nil {
t.Fatal(err)
}
if wantFP := clientmodel.Fingerprint(6); gotFP != wantFP {
t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP)
}
}

View file

@ -55,6 +55,11 @@ const (
headsFormatLegacyVersion = 1 // Can read, but will never write.
headsMagicString = "PrometheusHeads"
mappingsFileName = "mappings.db"
mappingsTempFileName = "mappings.db.tmp"
mappingsFormatVersion = 1
mappingsMagicString = "PrometheusMappings"
dirtyFileName = "DIRTY"
fileBufSize = 1 << 16 // 64kiB.
@ -1278,6 +1283,14 @@ func (p *persistence) headsTempFileName() string {
return path.Join(p.basePath, headsTempFileName)
}
func (p *persistence) mappingsFileName() string {
return path.Join(p.basePath, mappingsFileName)
}
func (p *persistence) mappingsTempFileName() string {
return path.Join(p.basePath, mappingsTempFileName)
}
func (p *persistence) processIndexingQueue() {
batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{}
@ -1383,6 +1396,157 @@ loop:
close(p.indexingStopped)
}
// checkpointFPMappings persists the fingerprint mappings. This method is not
// goroutine-safe.
//
// Description of the file format, v1:
//
// (1) Magic string (const mappingsMagicString).
//
// (2) Uvarint-encoded format version (const mappingsFormatVersion).
//
// (3) Uvarint-encoded number of mappings in fpMappings.
//
// (4) Repeated once per mapping:
//
// (4.1) The raw fingerprint as big-endian uint64.
//
// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint.
//
// (4.3) Repeated once per sub-mapping:
//
// (4.3.1) The uvarint-encoded length of the unique metric string.
// (4.3.2) The unique metric string.
// (4.3.3) The mapped fingerprint as big-endian uint64.
func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) {
glog.Info("Checkpointing fingerprint mappings...")
begin := time.Now()
f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil {
return
}
defer func() {
f.Sync()
closeErr := f.Close()
if err != nil {
return
}
err = closeErr
if err != nil {
return
}
err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName())
duration := time.Since(begin)
glog.Infof("Done checkpointing fingerprint mappings in %v.", duration)
}()
w := bufio.NewWriterSize(f, fileBufSize)
if _, err = w.WriteString(mappingsMagicString); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil {
return
}
for fp, mappings := range fpm {
if err = codable.EncodeUint64(w, uint64(fp)); err != nil {
return
}
if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil {
return
}
for ms, mappedFP := range mappings {
if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil {
return
}
if _, err = w.WriteString(ms); err != nil {
return
}
if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil {
return
}
}
}
err = w.Flush()
return
}
// loadFPMappings loads the fingerprint mappings. It also returns the highest
// mapped fingerprint and any error encountered. If p.mappingsFileName is not
// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently
// with checkpointFPMappings.
func (p *persistence) loadFPMappings() (fpMappings, clientmodel.Fingerprint, error) {
fpm := fpMappings{}
var highestMappedFP clientmodel.Fingerprint
f, err := os.Open(p.mappingsFileName())
if os.IsNotExist(err) {
return fpm, 0, nil
}
if err != nil {
return nil, 0, err
}
defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize)
buf := make([]byte, len(mappingsMagicString))
if _, err := io.ReadFull(r, buf); err != nil {
return nil, 0, err
}
magic := string(buf)
if magic != mappingsMagicString {
return nil, 0, fmt.Errorf(
"unexpected magic string, want %q, got %q",
mappingsMagicString, magic,
)
}
version, err := binary.ReadUvarint(r)
if version != mappingsFormatVersion || err != nil {
return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion)
}
numRawFPs, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
for ; numRawFPs > 0; numRawFPs-- {
rawFP, err := codable.DecodeUint64(r)
if err != nil {
return nil, 0, err
}
numMappings, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
mappings := make(map[string]clientmodel.Fingerprint, numMappings)
for ; numMappings > 0; numMappings-- {
lenMS, err := binary.ReadUvarint(r)
if err != nil {
return nil, 0, err
}
buf := make([]byte, lenMS)
if _, err := io.ReadFull(r, buf); err != nil {
return nil, 0, err
}
fp, err := codable.DecodeUint64(r)
if err != nil {
return nil, 0, err
}
mappedFP := clientmodel.Fingerprint(fp)
if mappedFP > highestMappedFP {
highestMappedFP = mappedFP
}
mappings[string(buf)] = mappedFP
}
fpm[clientmodel.Fingerprint(rawFP)] = mappings
}
return fpm, highestMappedFP, nil
}
func offsetForChunkIndex(i int) int64 {
return int64(i * chunkLenWithHeader)
}

View file

@ -484,6 +484,36 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) {
testCheckpointAndLoadSeriesMapAndHeads(t, 1)
}
func TestCheckpointAndLoadFPMappings(t *testing.T) {
p, closer := newTestPersistence(t, 1)
defer closer.Close()
in := fpMappings{
1: map[string]clientmodel.Fingerprint{
"foo": 1,
"bar": 2,
},
3: map[string]clientmodel.Fingerprint{
"baz": 4,
},
}
if err := p.checkpointFPMappings(in); err != nil {
t.Fatal(err)
}
out, fp, err := p.loadFPMappings()
if err != nil {
t.Fatal(err)
}
if got, want := fp, clientmodel.Fingerprint(4); got != want {
t.Errorf("got highest FP %v, want %v", got, want)
}
if !reflect.DeepEqual(in, out) {
t.Errorf("got collision map %v, want %v", out, in)
}
}
func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
p, closer := newTestPersistence(t, encoding)
defer closer.Close()

View file

@ -301,7 +301,9 @@ func (s *memorySeries) dropChunks(t clientmodel.Timestamp) {
}
// preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([]*chunkDesc, error) {
func (s *memorySeries) preloadChunks(
indexes []int, fp clientmodel.Fingerprint, mss *memorySeriesStorage,
) ([]*chunkDesc, error) {
loadIndexes := []int{}
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes {
@ -318,7 +320,6 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([
if s.chunkDescsOffset == -1 {
panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory")
}
fp := s.metric.FastFingerprint() // TODO(beorn): Handle collisions.
chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
if err != nil {
// Unpin the chunks since we won't return them as pinned chunks now.
@ -409,7 +410,7 @@ func (s *memorySeries) preloadChunksForRange(
for i := fromIdx; i <= throughIdx; i++ {
pinIndexes = append(pinIndexes, i)
}
return s.preloadChunks(pinIndexes, mss)
return s.preloadChunks(pinIndexes, fp, mss)
}
// newIterator returns a new SeriesIterator. The caller must have locked the

View file

@ -93,6 +93,7 @@ type memorySeriesStorage struct {
degraded bool
persistence *persistence
mapper *fpMapper
evictList *list.List
evictRequests chan evictRequest
@ -211,6 +212,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
glog.Infof("%d series loaded.", s.fpToSeries.length())
s.numSeries.Set(float64(s.fpToSeries.length()))
mapper, err := newFPMapper(s.fpToSeries, p)
if err != nil {
return nil, err
}
s.mapper = mapper
return s, nil
}
@ -382,8 +389,18 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
}
glog.Warning("Sample ingestion resumed.")
}
fp := sample.Metric.FastFingerprint() // TODO(beorn): Handle collisions.
s.fpLocker.Lock(fp)
rawFP := sample.Metric.FastFingerprint()
s.fpLocker.Lock(rawFP)
fp, err := s.mapper.mapFP(rawFP, sample.Metric)
if err != nil {
glog.Errorf("Error while mapping fingerprint %v: %v", rawFP, err)
s.persistence.setDirty(true)
}
if fp != rawFP {
// Switch locks.
s.fpLocker.Unlock(rawFP)
s.fpLocker.Lock(fp)
}
series := s.getOrCreateSeries(fp, sample.Metric)
completedChunksCount := series.add(&metric.SamplePair{
Value: sample.Value,

View file

@ -201,8 +201,8 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
}
s.WaitForIndexing()
for m := range s.(*memorySeriesStorage).fpToSeries.iter() {
s.(*memorySeriesStorage).fpLocker.Lock(m.fp)
for m := range s.fpToSeries.iter() {
s.fpLocker.Lock(m.fp)
var values metric.Values
for _, cd := range m.series.chunkDescs {
@ -222,7 +222,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value)
}
}
s.(*memorySeriesStorage).fpLocker.Unlock(m.fp)
s.fpLocker.Unlock(m.fp)
}
glog.Info("test done, closing")
}
@ -491,8 +491,6 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
s, closer := NewTestStorage(t, encoding)
defer closer.Close()
ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods.
for _, sample := range samples {
s.Append(sample)
}
@ -501,7 +499,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
fp := clientmodel.Metric{}.FastFingerprint()
// Drop ~half of the chunks.
ms.maintainMemorySeries(fp, 1000)
s.maintainMemorySeries(fp, 1000)
it := s.NewIterator(fp)
actual := it.GetBoundaryValues(metric.Interval{
OldestInclusive: 0,
@ -519,7 +517,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
// Drop everything.
ms.maintainMemorySeries(fp, 10000)
s.maintainMemorySeries(fp, 10000)
it = s.NewIterator(fp)
actual = it.GetBoundaryValues(metric.Interval{
OldestInclusive: 0,
@ -535,24 +533,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
s.WaitForIndexing()
series, ok := ms.fpToSeries.get(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
t.Fatal("could not find series")
}
// Persist head chunk so we can safely archive.
series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest)
s.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics.
ms.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric(
s.fpToSeries.del(fp)
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil {
t.Fatal(err)
}
archived, _, _, err := ms.persistence.hasArchivedMetric(fp)
archived, _, _, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -561,8 +559,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
// Drop ~half of the chunks of an archived series.
ms.maintainArchivedSeries(fp, 1000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
s.maintainArchivedSeries(fp, 1000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -571,8 +569,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
// Drop everything.
ms.maintainArchivedSeries(fp, 10000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
s.maintainArchivedSeries(fp, 10000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -586,24 +584,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
s.WaitForIndexing()
series, ok = ms.fpToSeries.get(fp)
series, ok = s.fpToSeries.get(fp)
if !ok {
t.Fatal("could not find series")
}
// Persist head chunk so we can safely archive.
series.headChunkClosed = true
ms.maintainMemorySeries(fp, clientmodel.Earliest)
s.maintainMemorySeries(fp, clientmodel.Earliest)
// Archive metrics.
ms.fpToSeries.del(fp)
if err := ms.persistence.archiveMetric(
s.fpToSeries.del(fp)
if err := s.persistence.archiveMetric(
fp, series.metric, series.firstTime(), series.head().lastTime(),
); err != nil {
t.Fatal(err)
}
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -612,13 +610,13 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
}
// Unarchive metrics.
ms.getOrCreateSeries(fp, clientmodel.Metric{})
s.getOrCreateSeries(fp, clientmodel.Metric{})
series, ok = ms.fpToSeries.get(fp)
series, ok = s.fpToSeries.get(fp)
if !ok {
t.Fatal("could not find series")
}
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -628,8 +626,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// This will archive again, but must not drop it completely, despite the
// memorySeries being empty.
ms.maintainMemorySeries(fp, 1000)
archived, _, _, err = ms.persistence.hasArchivedMetric(fp)
s.maintainMemorySeries(fp, 1000)
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
if err != nil {
t.Fatal(err)
}
@ -751,11 +749,11 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
for _, sample := range samples[start:middle] {
s.Append(sample)
}
verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod)
verifyStorage(b, s.(*memorySeriesStorage), samples[:middle], o.PersistenceRetentionPeriod)
for _, sample := range samples[middle:end] {
s.Append(sample)
}
verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod)
verifyStorage(b, s.(*memorySeriesStorage), samples[:end], o.PersistenceRetentionPeriod)
}
}
@ -823,7 +821,25 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples {
}
)
result := clientmodel.Samples{}
// Prefill result with two samples with colliding metrics (to test fingerprint mapping).
result := clientmodel.Samples{
&clientmodel.Sample{
Metric: clientmodel.Metric{
"instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24483",
"status": "503",
},
Value: 42,
Timestamp: timestamp,
},
&clientmodel.Sample{
Metric: clientmodel.Metric{
"instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24480",
"status": "500",
},
Value: 2010,
Timestamp: timestamp + 1,
},
}
metrics := []clientmodel.Metric{}
for n := rand.Intn(maxMetrics); n >= 0; n-- {
@ -885,7 +901,7 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples {
return result
}
func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool {
func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Samples, maxAge time.Duration) bool {
s.WaitForIndexing()
result := true
for _, i := range rand.Perm(len(samples)) {
@ -896,7 +912,10 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge
// retention period, we can verify here that no results
// are returned.
}
fp := sample.Metric.FastFingerprint()
fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric)
if err != nil {
t.Fatal(err)
}
p := s.NewPreloader()
p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour)
found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp)

View file

@ -37,7 +37,7 @@ func (t *testStorageCloser) Close() {
// NewTestStorage creates a storage instance backed by files in a temporary
// directory. The returned storage is already in serving state. Upon closing the
// returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, test.Closer) {
*defaultChunkEncoding = int(encoding)
directory := test.NewTemporaryDirectory("test_storage", t)
o := &MemorySeriesStorageOptions{
@ -61,5 +61,5 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) {
directory: directory,
}
return storage, closer
return storage.(*memorySeriesStorage), closer
}