diff --git a/storage/local/codable/codable.go b/storage/local/codable/codable.go index 364f7b45ee..a76c15780e 100644 --- a/storage/local/codable/codable.go +++ b/storage/local/codable/codable.go @@ -84,6 +84,19 @@ func EncodeVarint(w io.Writer, i int64) (int, error) { return bytesWritten, err } +// EncodeUvarint encodes an uint64 as a varint and writes it to an io.Writer. +// It returns the number of bytes written. +// This is a GC-friendly implementation that takes the required staging buffer +// from a buffer pool. +func EncodeUvarint(w io.Writer, i uint64) (int, error) { + buf := getBuf(binary.MaxVarintLen64) + defer putBuf(buf) + + bytesWritten := binary.PutUvarint(buf, i) + _, err := w.Write(buf[:bytesWritten]) + return bytesWritten, err +} + // EncodeUint64 writes an uint64 to an io.Writer in big-endian byte-order. // This is a GC-friendly implementation that takes the required staging buffer // from a buffer pool. diff --git a/storage/local/mapper.go b/storage/local/mapper.go new file mode 100644 index 0000000000..c86cebacbd --- /dev/null +++ b/storage/local/mapper.go @@ -0,0 +1,179 @@ +package local + +import ( + "fmt" + "sort" + "strings" + "sync" + "sync/atomic" + + "github.com/golang/glog" + + clientmodel "github.com/prometheus/client_golang/model" +) + +const maxMappedFP = 1 << 30 // About 1B fingerprints reserved for mapping. + +var separatorString = string([]byte{clientmodel.SeparatorByte}) + +// fpMappings maps original fingerprints to a map of string representations of +// metrics to the truly unique fingerprint. +type fpMappings map[clientmodel.Fingerprint]map[string]clientmodel.Fingerprint + +// fpMapper is used to map fingerprints in order to work around fingerprint +// collisions. +type fpMapper struct { + mtx sync.RWMutex // Protects collisions. + mappings fpMappings + + fpToSeries *seriesMap + p *persistence + highestMappedFP clientmodel.Fingerprint +} + +// newFPMapper loads the collision map from the persistence and +// returns an fpCollisionResolver ready to use. +func newFPMapper(fpToSeries *seriesMap, p *persistence) (*fpMapper, error) { + r := &fpMapper{ + fpToSeries: fpToSeries, + p: p, + } + mappings, nextFP, err := p.loadFPMappings() + if err != nil { + return nil, err + } + r.mappings = mappings + r.highestMappedFP = nextFP + return r, nil +} + +// mapFP takes a raw fingerprint (as returned by Metrics.FastFingerprint) and +// returns a truly unique fingerprint. The caller must have locked the raw +// fingerprint. +// +// If an error is encountered, it is returned together with the unchanged raw +// fingerprint. +func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clientmodel.Fingerprint, error) { + // First check if we are in the reserved FP space, in which case this is + // automatically a collision that has to be mapped. + if fp <= maxMappedFP { + return r.maybeAddMapping(fp, m) + } + + // Then check the most likely case: This fp belongs to a series that is + // already in memory. + s, ok := r.fpToSeries.get(fp) + if ok { + // FP exists in memory, but is it for the same metric? + if m.Equal(s.metric) { + // Yupp. We are done. + return fp, nil + } + // Collision detected! + return r.maybeAddMapping(fp, m) + } + // Metric is not in memory. Check the archive next. + archivedMetric, err := r.p.getArchivedMetric(fp) + if err != nil { + return fp, err + } + if archivedMetric != nil { + // FP exists in archive, but is it for the same metric? + if m.Equal(archivedMetric) { + // Yupp. We are done. + return fp, nil + } + // Collision detected! + return r.maybeAddMapping(fp, m) + } + // The fingerprint is genuinely new. We might have mapped it + // historically, though. so we need to check the collisions map. + r.mtx.RLock() + mappedFPs, ok := r.mappings[fp] + r.mtx.RUnlock() + if !ok { + // No historical mapping, we are good. + return fp, nil + } + // We indeed have mapped fp historically. + ms := metricToUniqueString(m) + // fp is locked by the caller, so no further locking of 'collisions' + // required (it is specific to fp). + mappedFP, ok := mappedFPs[ms] + if ok { + // Historical mapping found, return the mapped FP. + return mappedFP, nil + } + // As fp does not exist, neither in memory nor in archive, we can safely + // keep it unmapped. + return fp, nil +} + +// maybeAddMapping is only used internally. It takes a detected collision and +// adds it to the collisions map if not yet there. In any case, it returns the +// truly unique fingerprint for the colliding metric. +func (r *fpMapper) maybeAddMapping( + fp clientmodel.Fingerprint, + collidingMetric clientmodel.Metric, +) (clientmodel.Fingerprint, error) { + ms := metricToUniqueString(collidingMetric) + r.mtx.RLock() + mappedFPs, ok := r.mappings[fp] + r.mtx.RUnlock() + if ok { + // fp is locked by the caller, so no further locking required. + mappedFP, ok := mappedFPs[ms] + if ok { + return mappedFP, nil // Existing mapping. + } + // A new mapping has to be created. + mappedFP = r.nextMappedFP() + mappedFPs[ms] = mappedFP + r.mtx.RLock() + // Checkpoint mappings after each change. + err := r.p.checkpointFPMappings(r.mappings) + r.mtx.RUnlock() + glog.Infof( + "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", + fp, collidingMetric, mappedFP, + ) + return mappedFP, err + } + // This is the first collision for fp. + mappedFP := r.nextMappedFP() + mappedFPs = map[string]clientmodel.Fingerprint{ms: mappedFP} + r.mtx.Lock() + r.mappings[fp] = mappedFPs + // Checkpoint mappings after each change. + err := r.p.checkpointFPMappings(r.mappings) + r.mtx.Unlock() + glog.Infof( + "Collision detected for fingerprint %v, metric %v, mapping to new fingerprint %v.", + fp, collidingMetric, mappedFP, + ) + return mappedFP, err +} + +func (r *fpMapper) nextMappedFP() clientmodel.Fingerprint { + mappedFP := clientmodel.Fingerprint(atomic.AddUint64((*uint64)(&r.highestMappedFP), 1)) + if mappedFP > maxMappedFP { + panic(fmt.Errorf("more than %v fingerprints mapped in collision detection", maxMappedFP)) + } + return mappedFP +} + +// metricToUniqueString turns a metric into a string in a reproducible and +// unique way, i.e. the same metric will always create the same string, and +// different metrics will always create different strings. In a way, it is the +// "ideal" fingerprint function, only that it is more expensive than the +// FastFingerprint function, and its result is not suitable as a key for maps +// and indexes as it might become really large, causing a lot of hashing effort +// in maps and a lot of storage overhead in indexes. +func metricToUniqueString(m clientmodel.Metric) string { + parts := make([]string, 0, len(m)) + for ln, lv := range m { + parts = append(parts, string(ln)+separatorString+string(lv)) + } + sort.Strings(parts) + return strings.Join(parts, separatorString) +} diff --git a/storage/local/mapper_test.go b/storage/local/mapper_test.go new file mode 100644 index 0000000000..a322f50e9c --- /dev/null +++ b/storage/local/mapper_test.go @@ -0,0 +1,400 @@ +package local + +import ( + "testing" + + clientmodel "github.com/prometheus/client_golang/model" +) + +var ( + // cm11, cm12, cm13 are colliding with fp1. + // cm21, cm22 are colliding with fp2. + // cm31, cm32 are colliding with fp3, which is below maxMappedFP. + // Note that fingerprints are set and not actually calculated. + // The collision detection is independent from the actually used + // fingerprinting algorithm. + fp1 = clientmodel.Fingerprint(maxMappedFP + 1) + fp2 = clientmodel.Fingerprint(maxMappedFP + 2) + fp3 = clientmodel.Fingerprint(1) + cm11 = clientmodel.Metric{ + "foo": "bar", + "dings": "bumms", + } + cm12 = clientmodel.Metric{ + "bar": "foo", + } + cm13 = clientmodel.Metric{ + "foo": "bar", + } + cm21 = clientmodel.Metric{ + "foo": "bumms", + "dings": "bar", + } + cm22 = clientmodel.Metric{ + "dings": "foo", + "bar": "bumms", + } + cm31 = clientmodel.Metric{ + "bumms": "dings", + } + cm32 = clientmodel.Metric{ + "bumms": "dings", + "bar": "foo", + } +) + +func TestFPMapper(t *testing.T) { + sm := newSeriesMap() + + p, closer := newTestPersistence(t, 1) + defer closer.Close() + + mapper, err := newFPMapper(sm, p) + if err != nil { + t.Fatal(err) + } + + // Everything is empty, resolving a FP should do nothing. + gotFP, err := mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve + // the collision. + sm.put(fp1, &memorySeries{metric: cm11}) + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // The mapped cm12 is added to sm, too. That should not change the outcome. + sm.put(clientmodel.Fingerprint(1), &memorySeries{metric: cm12}) + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Now map cm13, should reproducibly result in the next mapped FP. + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Add cm13 to sm. Should not change anything. + sm.put(clientmodel.Fingerprint(2), &memorySeries{metric: cm13}) + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Now add cm21 and cm22 in the same way, checking the mapped FPs. + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + sm.put(fp2, &memorySeries{metric: cm21}) + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + sm.put(clientmodel.Fingerprint(3), &memorySeries{metric: cm22}) + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Map cm31, resulting in a mapping straight away. + gotFP, err = mapper.mapFP(fp3, cm31) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + sm.put(clientmodel.Fingerprint(4), &memorySeries{metric: cm31}) + + // Map cm32, which is now mapped for two reasons... + gotFP, err = mapper.mapFP(fp3, cm32) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + sm.put(clientmodel.Fingerprint(5), &memorySeries{metric: cm32}) + + // Now check ALL the mappings, just to be sure. + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm31) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm32) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Remove all the fingerprints from sm, which should change nothing, as + // the existing mappings stay and should be detected. + sm.del(fp1) + sm.del(fp2) + sm.del(fp3) + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm31) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm32) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // Load the mapper anew from disk and then check all the mappings again + // to make sure all changes have made it to disk. + mapper, err = newFPMapper(sm, p) + if err != nil { + t.Fatal(err) + } + gotFP, err = mapper.mapFP(fp1, cm11) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(1); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp1, cm13) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(2); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(3); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm31) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(4); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp3, cm32) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(5); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // To make sure that the mapping layer is not queried if the FP is found + // either in sm or in the archive, now put fp1 with cm12 in sm and fp2 + // with cm22 into archive (which will never happen in practice as only + // mapped FPs are put into sm and the archive. + sm.put(fp1, &memorySeries{metric: cm12}) + p.archiveMetric(fp2, cm22, 0, 0) + gotFP, err = mapper.mapFP(fp1, cm12) + if err != nil { + t.Fatal(err) + } + if wantFP := fp1; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + gotFP, err = mapper.mapFP(fp2, cm22) + if err != nil { + t.Fatal(err) + } + if wantFP := fp2; gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + + // If we now map cm21, we should get a mapping as the collision with the + // archived metric is detected. + gotFP, err = mapper.mapFP(fp2, cm21) + if err != nil { + t.Fatal(err) + } + if wantFP := clientmodel.Fingerprint(6); gotFP != wantFP { + t.Errorf("got fingerprint %v, want fingerprint %v", gotFP, wantFP) + } + +} diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 2cce4cf73f..f26ac76fbc 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -55,6 +55,11 @@ const ( headsFormatLegacyVersion = 1 // Can read, but will never write. headsMagicString = "PrometheusHeads" + mappingsFileName = "mappings.db" + mappingsTempFileName = "mappings.db.tmp" + mappingsFormatVersion = 1 + mappingsMagicString = "PrometheusMappings" + dirtyFileName = "DIRTY" fileBufSize = 1 << 16 // 64kiB. @@ -1278,6 +1283,14 @@ func (p *persistence) headsTempFileName() string { return path.Join(p.basePath, headsTempFileName) } +func (p *persistence) mappingsFileName() string { + return path.Join(p.basePath, mappingsFileName) +} + +func (p *persistence) mappingsTempFileName() string { + return path.Join(p.basePath, mappingsTempFileName) +} + func (p *persistence) processIndexingQueue() { batchSize := 0 nameToValues := index.LabelNameLabelValuesMapping{} @@ -1383,6 +1396,157 @@ loop: close(p.indexingStopped) } +// checkpointFPMappings persists the fingerprint mappings. This method is not +// goroutine-safe. +// +// Description of the file format, v1: +// +// (1) Magic string (const mappingsMagicString). +// +// (2) Uvarint-encoded format version (const mappingsFormatVersion). +// +// (3) Uvarint-encoded number of mappings in fpMappings. +// +// (4) Repeated once per mapping: +// +// (4.1) The raw fingerprint as big-endian uint64. +// +// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint. +// +// (4.3) Repeated once per sub-mapping: +// +// (4.3.1) The uvarint-encoded length of the unique metric string. +// (4.3.2) The unique metric string. +// (4.3.3) The mapped fingerprint as big-endian uint64. +func (p *persistence) checkpointFPMappings(c fpMappings) (err error) { + glog.Info("Checkpointing fingerprint mappings...") + begin := time.Now() + f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) + if err != nil { + return + } + + defer func() { + f.Sync() + closeErr := f.Close() + if err != nil { + return + } + err = closeErr + if err != nil { + return + } + err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName()) + duration := time.Since(begin) + glog.Infof("Done checkpointing fingerprint mappings in %v.", duration) + }() + + w := bufio.NewWriterSize(f, fileBufSize) + + if _, err = w.WriteString(mappingsMagicString); err != nil { + return + } + if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil { + return + } + if _, err = codable.EncodeUvarint(w, uint64(len(c))); err != nil { + return + } + + for fp, mappings := range c { + if err = codable.EncodeUint64(w, uint64(fp)); err != nil { + return + } + if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil { + return + } + for ms, mappedFP := range mappings { + if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil { + return + } + if _, err = w.WriteString(ms); err != nil { + return + } + if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil { + return + } + } + } + err = w.Flush() + return +} + +// loadFPMappings loads the fingerprint mappings. It also returns the highest +// mapped fingerprint and any error encountered. If p.mappingsFileName is not +// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently +// with checkpointFPMappings. +func (p *persistence) loadFPMappings() (fpMappings, clientmodel.Fingerprint, error) { + fpm := fpMappings{} + var highestMappedFP clientmodel.Fingerprint + + f, err := os.Open(p.mappingsFileName()) + if os.IsNotExist(err) { + return fpm, 0, nil + } + if err != nil { + return nil, 0, err + } + defer f.Close() + r := bufio.NewReaderSize(f, fileBufSize) + + buf := make([]byte, len(mappingsMagicString)) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, 0, err + } + magic := string(buf) + if magic != mappingsMagicString { + return nil, 0, fmt.Errorf( + "unexpected magic string, want %q, got %q", + mappingsMagicString, magic, + ) + } + version, err := binary.ReadUvarint(r) + if version != mappingsFormatVersion || err != nil { + return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion) + } + numRawFPs, err := binary.ReadUvarint(r) + if err != nil { + return nil, 0, err + } + for ; numRawFPs > 0; numRawFPs-- { + rawFP, err := codable.DecodeUint64(r) + if err != nil { + return nil, 0, err + } + numMappings, err := binary.ReadUvarint(r) + if err != nil { + return nil, 0, err + } + mappings := make(map[string]clientmodel.Fingerprint, numMappings) + for ; numMappings > 0; numMappings-- { + lenMS, err := binary.ReadUvarint(r) + if err != nil { + return nil, 0, err + } + buf := make([]byte, lenMS) + if _, err := io.ReadFull(r, buf); err != nil { + return nil, 0, err + } + fp, err := codable.DecodeUint64(r) + if err != nil { + return nil, 0, err + } + mappedFP := clientmodel.Fingerprint(fp) + if mappedFP > highestMappedFP { + highestMappedFP = mappedFP + } + mappings[string(buf)] = mappedFP + } + fpm[clientmodel.Fingerprint(rawFP)] = mappings + } + return fpm, highestMappedFP, nil +} + func offsetForChunkIndex(i int) int64 { return int64(i * chunkLenWithHeader) } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 61abf1b192..20bfe62160 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -484,6 +484,36 @@ func TestCheckpointAndLoadSeriesMapAndHeadsChunkType1(t *testing.T) { testCheckpointAndLoadSeriesMapAndHeads(t, 1) } +func TestCheckpointAndLoadFPMappings(t *testing.T) { + p, closer := newTestPersistence(t, 1) + defer closer.Close() + + in := fpMappings{ + 1: map[string]clientmodel.Fingerprint{ + "foo": 1, + "bar": 2, + }, + 3: map[string]clientmodel.Fingerprint{ + "baz": 4, + }, + } + + if err := p.checkpointFPMappings(in); err != nil { + t.Fatal(err) + } + + out, fp, err := p.loadFPMappings() + if err != nil { + t.Fatal(err) + } + if got, want := fp, clientmodel.Fingerprint(4); got != want { + t.Errorf("got highest FP %v, want %v", got, want) + } + if !reflect.DeepEqual(in, out) { + t.Errorf("got collision map %v, want %v", out, in) + } +} + func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) { p, closer := newTestPersistence(t, encoding) defer closer.Close() diff --git a/storage/local/series.go b/storage/local/series.go index 5e9c1e5e35..419ae78bcb 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -301,7 +301,9 @@ func (s *memorySeries) dropChunks(t clientmodel.Timestamp) { } // preloadChunks is an internal helper method. -func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([]*chunkDesc, error) { +func (s *memorySeries) preloadChunks( + indexes []int, fp clientmodel.Fingerprint, mss *memorySeriesStorage, +) ([]*chunkDesc, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) for _, idx := range indexes { @@ -318,7 +320,6 @@ func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([ if s.chunkDescsOffset == -1 { panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory") } - fp := s.metric.FastFingerprint() // TODO(beorn): Handle collisions. chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset) if err != nil { // Unpin the chunks since we won't return them as pinned chunks now. @@ -409,7 +410,7 @@ func (s *memorySeries) preloadChunksForRange( for i := fromIdx; i <= throughIdx; i++ { pinIndexes = append(pinIndexes, i) } - return s.preloadChunks(pinIndexes, mss) + return s.preloadChunks(pinIndexes, fp, mss) } // newIterator returns a new SeriesIterator. The caller must have locked the diff --git a/storage/local/storage.go b/storage/local/storage.go index 3782359b6b..c6db45b2b8 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -93,6 +93,7 @@ type memorySeriesStorage struct { degraded bool persistence *persistence + mapper *fpMapper evictList *list.List evictRequests chan evictRequest @@ -211,6 +212,12 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { glog.Infof("%d series loaded.", s.fpToSeries.length()) s.numSeries.Set(float64(s.fpToSeries.length())) + mapper, err := newFPMapper(s.fpToSeries, p) + if err != nil { + return nil, err + } + s.mapper = mapper + return s, nil } @@ -382,8 +389,18 @@ func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) { } glog.Warning("Sample ingestion resumed.") } - fp := sample.Metric.FastFingerprint() // TODO(beorn): Handle collisions. - s.fpLocker.Lock(fp) + rawFP := sample.Metric.FastFingerprint() + s.fpLocker.Lock(rawFP) + fp, err := s.mapper.mapFP(rawFP, sample.Metric) + if err != nil { + glog.Errorf("Error while mapping fingerprint %v: %v", rawFP, err) + s.persistence.setDirty(true) + } + if fp != rawFP { + // Switch locks. + s.fpLocker.Unlock(rawFP) + s.fpLocker.Lock(fp) + } series := s.getOrCreateSeries(fp, sample.Metric) completedChunksCount := series.add(&metric.SamplePair{ Value: sample.Value, diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 330a4eafed..812bf353b0 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -201,8 +201,8 @@ func testChunk(t *testing.T, encoding chunkEncoding) { } s.WaitForIndexing() - for m := range s.(*memorySeriesStorage).fpToSeries.iter() { - s.(*memorySeriesStorage).fpLocker.Lock(m.fp) + for m := range s.fpToSeries.iter() { + s.fpLocker.Lock(m.fp) var values metric.Values for _, cd := range m.series.chunkDescs { @@ -222,7 +222,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) { t.Errorf("%d. Got %v; want %v", i, v.Value, samples[i].Value) } } - s.(*memorySeriesStorage).fpLocker.Unlock(m.fp) + s.fpLocker.Unlock(m.fp) } glog.Info("test done, closing") } @@ -491,8 +491,6 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { s, closer := NewTestStorage(t, encoding) defer closer.Close() - ms := s.(*memorySeriesStorage) // Going to test the internal maintain.*Series methods. - for _, sample := range samples { s.Append(sample) } @@ -501,7 +499,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { fp := clientmodel.Metric{}.FastFingerprint() // Drop ~half of the chunks. - ms.maintainMemorySeries(fp, 1000) + s.maintainMemorySeries(fp, 1000) it := s.NewIterator(fp) actual := it.GetBoundaryValues(metric.Interval{ OldestInclusive: 0, @@ -519,7 +517,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Drop everything. - ms.maintainMemorySeries(fp, 10000) + s.maintainMemorySeries(fp, 10000) it = s.NewIterator(fp) actual = it.GetBoundaryValues(metric.Interval{ OldestInclusive: 0, @@ -535,24 +533,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } s.WaitForIndexing() - series, ok := ms.fpToSeries.get(fp) + series, ok := s.fpToSeries.get(fp) if !ok { t.Fatal("could not find series") } // Persist head chunk so we can safely archive. series.headChunkClosed = true - ms.maintainMemorySeries(fp, clientmodel.Earliest) + s.maintainMemorySeries(fp, clientmodel.Earliest) // Archive metrics. - ms.fpToSeries.del(fp) - if err := ms.persistence.archiveMetric( + s.fpToSeries.del(fp) + if err := s.persistence.archiveMetric( fp, series.metric, series.firstTime(), series.head().lastTime(), ); err != nil { t.Fatal(err) } - archived, _, _, err := ms.persistence.hasArchivedMetric(fp) + archived, _, _, err := s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -561,8 +559,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Drop ~half of the chunks of an archived series. - ms.maintainArchivedSeries(fp, 1000) - archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + s.maintainArchivedSeries(fp, 1000) + archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -571,8 +569,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Drop everything. - ms.maintainArchivedSeries(fp, 10000) - archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + s.maintainArchivedSeries(fp, 10000) + archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -586,24 +584,24 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } s.WaitForIndexing() - series, ok = ms.fpToSeries.get(fp) + series, ok = s.fpToSeries.get(fp) if !ok { t.Fatal("could not find series") } // Persist head chunk so we can safely archive. series.headChunkClosed = true - ms.maintainMemorySeries(fp, clientmodel.Earliest) + s.maintainMemorySeries(fp, clientmodel.Earliest) // Archive metrics. - ms.fpToSeries.del(fp) - if err := ms.persistence.archiveMetric( + s.fpToSeries.del(fp) + if err := s.persistence.archiveMetric( fp, series.metric, series.firstTime(), series.head().lastTime(), ); err != nil { t.Fatal(err) } - archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -612,13 +610,13 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { } // Unarchive metrics. - ms.getOrCreateSeries(fp, clientmodel.Metric{}) + s.getOrCreateSeries(fp, clientmodel.Metric{}) - series, ok = ms.fpToSeries.get(fp) + series, ok = s.fpToSeries.get(fp) if !ok { t.Fatal("could not find series") } - archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -628,8 +626,8 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) { // This will archive again, but must not drop it completely, despite the // memorySeries being empty. - ms.maintainMemorySeries(fp, 1000) - archived, _, _, err = ms.persistence.hasArchivedMetric(fp) + s.maintainMemorySeries(fp, 1000) + archived, _, _, err = s.persistence.hasArchivedMetric(fp) if err != nil { t.Fatal(err) } @@ -751,11 +749,11 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { for _, sample := range samples[start:middle] { s.Append(sample) } - verifyStorage(b, s, samples[:middle], o.PersistenceRetentionPeriod) + verifyStorage(b, s.(*memorySeriesStorage), samples[:middle], o.PersistenceRetentionPeriod) for _, sample := range samples[middle:end] { s.Append(sample) } - verifyStorage(b, s, samples[:end], o.PersistenceRetentionPeriod) + verifyStorage(b, s.(*memorySeriesStorage), samples[:end], o.PersistenceRetentionPeriod) } } @@ -823,7 +821,25 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples { } ) - result := clientmodel.Samples{} + // Prefill resust with two samples with colliding metrics (to test fingerprint mapping). + result := clientmodel.Samples{ + &clientmodel.Sample{ + Metric: clientmodel.Metric{ + "instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24483", + "status": "503", + }, + Value: 42, + Timestamp: timestamp, + }, + &clientmodel.Sample{ + Metric: clientmodel.Metric{ + "instance": "ip-10-33-84-73.l05.ams5.s-cloud.net:24480", + "status": "500", + }, + Value: 2010, + Timestamp: timestamp + 1, + }, + } metrics := []clientmodel.Metric{} for n := rand.Intn(maxMetrics); n >= 0; n-- { @@ -885,7 +901,7 @@ func createRandomSamples(metricName string, minLen int) clientmodel.Samples { return result } -func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge time.Duration) bool { +func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Samples, maxAge time.Duration) bool { s.WaitForIndexing() result := true for _, i := range rand.Perm(len(samples)) { @@ -896,7 +912,10 @@ func verifyStorage(t testing.TB, s Storage, samples clientmodel.Samples, maxAge // retention period, we can verify here that no results // are returned. } - fp := sample.Metric.FastFingerprint() + fp, err := s.mapper.mapFP(sample.Metric.FastFingerprint(), sample.Metric) + if err != nil { + t.Fatal(err) + } p := s.NewPreloader() p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour) found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp) diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 76c1f648d9..a645bd1f51 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -37,7 +37,7 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) { +func NewTestStorage(t test.T, encoding chunkEncoding) (*memorySeriesStorage, test.Closer) { *defaultChunkEncoding = int(encoding) directory := test.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ @@ -61,5 +61,5 @@ func NewTestStorage(t test.T, encoding chunkEncoding) (Storage, test.Closer) { directory: directory, } - return storage, closer + return storage.(*memorySeriesStorage), closer }