diff --git a/storage/local/codec/codec.go b/storage/local/codec/codec.go index c8a30fe203..d4e5dc5944 100644 --- a/storage/local/codec/codec.go +++ b/storage/local/codec/codec.go @@ -266,15 +266,15 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { } type CodableTimeRange struct { - first, last clientmodel.Timestamp + First, Last clientmodel.Timestamp } func (tr CodableTimeRange) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} - if err := EncodeVarint(buf, int64(tr.first)); err != nil { + if err := EncodeVarint(buf, int64(tr.First)); err != nil { return nil, err } - if err := EncodeVarint(buf, int64(tr.last)); err != nil { + if err := EncodeVarint(buf, int64(tr.Last)); err != nil { return nil, err } return buf.Bytes(), nil @@ -290,7 +290,7 @@ func (tr *CodableTimeRange) UnmarshalBinary(buf []byte) error { if err != nil { return err } - tr.first = clientmodel.Timestamp(first) - tr.last = clientmodel.Timestamp(last) + tr.First = clientmodel.Timestamp(first) + tr.Last = clientmodel.Timestamp(last) return nil } diff --git a/storage/local/codec/codec_test.go b/storage/local/codec/codec_test.go index 715333a5c2..062ae457a3 100644 --- a/storage/local/codec/codec_test.go +++ b/storage/local/codec/codec_test.go @@ -23,16 +23,16 @@ func TestCodec(t *testing.T) { equal func(in, out codable) bool }{ { - in: CodableMetric{ + in: &CodableMetric{ "label_1": "value_2", "label_2": "value_2", "label_3": "value_3", }, - out: CodableMetric{}, + out: &CodableMetric{}, equal: func(in, out codable) bool { - m1 := clientmodel.Metric(in.(CodableMetric)) - m2 := clientmodel.Metric(out.(CodableMetric)) - return m1.Equal(m2) + m1 := (*clientmodel.Metric)(in.(*CodableMetric)) + m2 := (*clientmodel.Metric)(out.(*CodableMetric)) + return m1.Equal(*m2) }, }, { in: newCodableFingerprint(12345), diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 9d54cfb1f9..4e9b2758ad 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -56,15 +56,9 @@ func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) } // Lookup looks up a metric by fingerprint. -func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (clientmodel.Metric, bool, error) { - m := codec.CodableMetric{} - if ok, err := i.Get(codec.CodableFingerprint(fp), &m); !ok { - return nil, false, nil - } else if err != nil { - return nil, false, err - } - - return clientmodel.Metric(m), true, nil +func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clientmodel.Metric, ok bool, err error) { + ok, err = i.Get(codec.CodableFingerprint(fp), (*codec.CodableMetric)(&metric)) + return } // NewFingerprintMetricIndex returns a FingerprintMetricIndex @@ -110,14 +104,64 @@ func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) er // Lookup looks up all label values for a given label name. func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { ok, err = i.Get(codec.CodableLabelName(l), (*codec.CodableLabelValues)(&values)) - if err != nil { - return nil, false, err - } - if !ok { - return nil, false, nil - } + return +} - return values, true, nil +func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error { + b := make(LabelNameLabelValuesMapping, len(m)) + for ln, lv := range m { + baseLVs, _, err := i.Lookup(ln) + if err != nil { + return err + } + lvSet := utility.Set{} + for _, baseLV := range baseLVs { + lvSet.Add(baseLV) + } + lvSet.Add(lv) + if len(lvSet) == len(baseLVs) { + continue + } + lvs := make(clientmodel.LabelValues, 0, len(lvSet)) + for v := range lvSet { + lvs = append(lvs, v.(clientmodel.LabelValue)) + } + b[ln] = lvs + } + return i.IndexBatch(b) +} + +func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error { + b := make(LabelNameLabelValuesMapping, len(m)) + for lp, fps := range m { + if len(fps) != 0 { + continue + } + ln := lp.Name + lv := lp.Value + baseValues, ok := b[ln] + if !ok { + var err error + baseValues, _, err = i.Lookup(ln) + if err != nil { + return err + } + } + lvSet := utility.Set{} + for _, baseValue := range baseValues { + lvSet.Add(baseValue) + } + lvSet.Remove(lv) + if len(lvSet) == len(baseValues) { + continue + } + lvs := make(clientmodel.LabelValues, 0, len(lvSet)) + for v := range lvSet { + lvs = append(lvs, v.(clientmodel.LabelValue)) + } + b[ln] = lvs + } + return i.IndexBatch(b) } // NewLabelNameLabelValuesIndex returns a LabelNameLabelValuesIndex @@ -163,14 +207,59 @@ func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) e // Lookup looks up all fingerprints for a given label pair. func (i *LabelPairFingerprintIndex) Lookup(p metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) { ok, err = i.Get((codec.CodableLabelPair)(p), (*codec.CodableFingerprints)(&fps)) - if !ok { - return nil, false, nil - } - if err != nil { - return nil, false, err - } + return +} - return fps, true, nil +func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.Fingerprint) error { + b := make(LabelPairFingerprintsMapping, len(m)) + for ln, lv := range m { + lp := metric.LabelPair{Name: ln, Value: lv} + baseFPs, _, err := i.Lookup(lp) + if err != nil { + return err + } + fpSet := utility.Set{} + for _, baseFP := range baseFPs { + fpSet.Add(baseFP) + } + fpSet.Add(fp) + if len(fpSet) == len(baseFPs) { + continue + } + fps := make(clientmodel.Fingerprints, 0, len(fpSet)) + for f := range fpSet { + fps = append(fps, f.(clientmodel.Fingerprint)) + } + b[lp] = fps + + } + return i.IndexBatch(b) +} + +func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.Fingerprint) (LabelPairFingerprintsMapping, error) { + b := make(LabelPairFingerprintsMapping, len(m)) + for ln, lv := range m { + lp := metric.LabelPair{Name: ln, Value: lv} + baseFPs, _, err := i.Lookup(lp) + if err != nil { + return nil, err + } + fpSet := utility.Set{} + for _, baseFP := range baseFPs { + fpSet.Add(baseFP) + } + fpSet.Remove(fp) + if len(fpSet) == len(baseFPs) { + continue + } + fps := make(clientmodel.Fingerprints, 0, len(fpSet)) + for f := range fpSet { + fps = append(fps, f.(clientmodel.Fingerprint)) + } + b[lp] = fps + + } + return b, i.IndexBatch(b) } // NewLabelPairFingerprintIndex returns a LabelPairFingerprintIndex @@ -194,15 +283,11 @@ type FingerprintTimeRangeIndex struct { KeyValueStore } -// UnindexBatch unindexes a batch of fingerprints. -func (i *FingerprintTimeRangeIndex) UnindexBatch(b FingerprintMetricMapping) error { - batch := i.NewBatch() - - for fp, _ := range b { - batch.Delete(codec.CodableFingerprint(fp)) - } - - return i.Commit(batch) +// Lookup returns the time range for the given fingerprint. +func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTime, lastTime clientmodel.Timestamp, ok bool, err error) { + var tr codec.CodableTimeRange + ok, err = i.Get(codec.CodableFingerprint(fp), &tr) + return tr.First, tr.Last, ok, err } // Has returns true if the given fingerprint is present. @@ -224,227 +309,3 @@ func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, KeyValueStore: fingerprintTimeRangeDB, }, nil } - -func findUnindexed(i *FingerprintTimeRangeIndex, b FingerprintMetricMapping) (FingerprintMetricMapping, error) { - // TODO: Move up? Need to include fp->ts map? - out := FingerprintMetricMapping{} - - for fp, m := range b { - has, err := i.Has(fp) - if err != nil { - return nil, err - } - if !has { - out[fp] = m - } - } - - return out, nil -} - -func findIndexed(i *FingerprintTimeRangeIndex, b FingerprintMetricMapping) (FingerprintMetricMapping, error) { - // TODO: Move up? Need to include fp->ts map? - out := FingerprintMetricMapping{} - - for fp, m := range b { - has, err := i.Has(fp) - if err != nil { - return nil, err - } - if has { - out[fp] = m - } - } - - return out, nil -} - -func extendLabelNameToLabelValuesIndex(i *LabelNameLabelValuesIndex, b FingerprintMetricMapping) (LabelNameLabelValuesMapping, error) { - // TODO: Move up? Need to include fp->ts map? - collection := map[clientmodel.LabelName]utility.Set{} - - for _, m := range b { - for l, v := range m { - set, ok := collection[l] - if !ok { - baseValues, _, err := i.Lookup(l) - if err != nil { - return nil, err - } - - set = utility.Set{} - - for _, baseValue := range baseValues { - set.Add(baseValue) - } - - collection[l] = set - } - - set.Add(v) - } - } - - batch := LabelNameLabelValuesMapping{} - for l, set := range collection { - values := make(clientmodel.LabelValues, 0, len(set)) - for e := range set { - val := e.(clientmodel.LabelValue) - values = append(values, val) - } - - batch[l] = values - } - - return batch, nil -} - -func reduceLabelNameToLabelValuesIndex(i *LabelNameLabelValuesIndex, m LabelPairFingerprintsMapping) (LabelNameLabelValuesMapping, error) { - // TODO: Move up? Need to include fp->ts map? - collection := map[clientmodel.LabelName]utility.Set{} - - for lp, fps := range m { - if len(fps) != 0 { - continue - } - - set, ok := collection[lp.Name] - if !ok { - baseValues, _, err := i.Lookup(lp.Name) - if err != nil { - return nil, err - } - - set = utility.Set{} - - for _, baseValue := range baseValues { - set.Add(baseValue) - } - - collection[lp.Name] = set - } - - set.Remove(lp.Value) - } - - batch := LabelNameLabelValuesMapping{} - for l, set := range collection { - values := make(clientmodel.LabelValues, 0, len(set)) - for e := range set { - val := e.(clientmodel.LabelValue) - values = append(values, val) - } - - batch[l] = values - } - return batch, nil -} - -func extendLabelPairIndex(i *LabelPairFingerprintIndex, b FingerprintMetricMapping, remove bool) (LabelPairFingerprintsMapping, error) { - // TODO: Move up? Need to include fp->ts map? - collection := map[metric.LabelPair]utility.Set{} - - for fp, m := range b { - for n, v := range m { - pair := metric.LabelPair{ - Name: n, - Value: v, - } - set, ok := collection[pair] - if !ok { - baseFps, _, err := i.Lookup(pair) - if err != nil { - return nil, err - } - - set = utility.Set{} - for _, baseFp := range baseFps { - set.Add(baseFp) - } - - collection[pair] = set - } - - if remove { - set.Remove(fp) - } else { - set.Add(fp) - } - } - } - - batch := LabelPairFingerprintsMapping{} - - for pair, set := range collection { - fps := batch[pair] - for element := range set { - fp := element.(clientmodel.Fingerprint) - fps = append(fps, fp) - } - batch[pair] = fps - } - - return batch, nil -} - -// TODO: Move IndexMetrics and UndindexMetrics into storage.go. - -/* -// IndexMetrics adds the facets of all unindexed metrics found in the given -// FingerprintMetricMapping to the corresponding indices. -func (i *diskIndexer) IndexMetrics(b FingerprintMetricMapping) error { - unindexed, err := findUnindexed(i.FingerprintTimeRange, b) - if err != nil { - return err - } - - labelNames, err := extendLabelNameToLabelValuesIndex(i.LabelNameToLabelValues, unindexed) - if err != nil { - return err - } - if err := i.LabelNameToLabelValues.IndexBatch(labelNames); err != nil { - return err - } - - labelPairs, err := extendLabelPairIndex(i.LabelPairToFingerprints, unindexed, false) - if err != nil { - return err - } - if err := i.LabelPairToFingerprints.IndexBatch(labelPairs); err != nil { - return err - } - - if err := i.FingerprintToMetric.IndexBatch(unindexed); err != nil { - return err - } - - return i.FingerprintTimeRange.IndexBatch(unindexed) -} - -// UnindexMetrics implements MetricIndexer. -func (i *diskIndexer) UnindexMetrics(b FingerprintMetricMapping) error { - indexed, err := findIndexed(i.FingerprintTimeRange, b) - if err != nil { - return err - } - - labelPairs, err := extendLabelPairIndex(i.LabelPairToFingerprints, indexed, true) - if err != nil { - return err - } - if err := i.LabelPairToFingerprints.IndexBatch(labelPairs); err != nil { - return err - } - - labelNames, err := reduceLabelNameToLabelValuesIndex(i.LabelNameToLabelValues, labelPairs) - if err := i.LabelNameToLabelValues.IndexBatch(labelNames); err != nil { - return err - } - - if err := i.FingerprintToMetric.UnindexBatch(indexed); err != nil { - return err - } - - return i.FingerprintTimeRange.UnindexBatch(indexed) -} -*/ diff --git a/storage/local/index/index_test.go b/storage/local/index/index_test.go deleted file mode 100644 index cd197383a2..0000000000 --- a/storage/local/index/index_test.go +++ /dev/null @@ -1,252 +0,0 @@ -package index - -import ( - "sort" - "testing" - - clientmodel "github.com/prometheus/client_golang/model" - - "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/utility/test" -) - -type incrementalBatch struct { - fpToMetric FingerprintMetricMapping - expectedLnToLvs LabelNameLabelValuesMapping - expectedLpToFps LabelPairFingerprintsMapping -} - -func newTestDB(t *testing.T) (KeyValueStore, test.Closer) { - dir := test.NewTemporaryDirectory("test_db", t) - db, err := NewLevelDB(LevelDBOptions{ - Path: dir.Path(), - }) - if err != nil { - dir.Close() - t.Fatal("failed to create test DB: ", err) - } - return db, test.NewCallbackCloser(func() { - db.Close() - dir.Close() - }) -} - -func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics FingerprintMetricMapping, indexer *diskIndexer) { - for fp, m := range indexedFpsToMetrics { - // Compare indexed metrics with input metrics. - mOut, ok, err := indexer.FingerprintToMetric.Lookup(fp) - if err != nil { - t.Fatal(err) - } - if !ok { - t.Fatalf("%d. fingerprint %v not found", i, fp) - } - if !mOut.Equal(m) { - t.Fatalf("%d. %v: Got: %s; want %s", i, fp, mOut, m) - } - - // Check that indexed metrics are in membership index. - ok, err = indexer.FingerprintMembership.Has(fp) - if err != nil { - t.Fatal(err) - } - if !ok { - t.Fatalf("%d. fingerprint %v not found", i, fp) - } - } - - // Compare label name -> label values mappings. - for ln, lvs := range b.expectedLnToLvs { - outLvs, ok, err := indexer.LabelNameToLabelValues.Lookup(ln) - if err != nil { - t.Fatal(err) - } - if !ok { - t.Fatalf("%d. label name %s not found", i, ln) - } - - sort.Sort(lvs) - sort.Sort(outLvs) - - if len(lvs) != len(outLvs) { - t.Fatalf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs)) - } - for j, _ := range lvs { - if lvs[j] != outLvs[j] { - t.Fatalf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j]) - } - } - } - - // Compare label pair -> fingerprints mappings. - for lp, fps := range b.expectedLpToFps { - outFps, ok, err := indexer.LabelPairToFingerprints.Lookup(&lp) - if err != nil { - t.Fatal(err) - } - if !ok { - t.Fatalf("%d. label pair %v not found", i, lp) - } - - sort.Sort(fps) - sort.Sort(outFps) - - if len(fps) != len(outFps) { - t.Fatalf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps)) - } - for j, _ := range fps { - if fps[j] != outFps[j] { - t.Fatalf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j]) - } - } - } -} - -func TestIndexing(t *testing.T) { - batches := []incrementalBatch{ - { - fpToMetric: FingerprintMetricMapping{ - 0: { - clientmodel.MetricNameLabel: "metric_0", - "label_1": "value_1", - }, - 1: { - clientmodel.MetricNameLabel: "metric_0", - "label_2": "value_2", - "label_3": "value_3", - }, - 2: { - clientmodel.MetricNameLabel: "metric_1", - "label_1": "value_2", - }, - }, - expectedLnToLvs: LabelNameLabelValuesMapping{ - clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1"}, - "label_1": clientmodel.LabelValues{"value_1", "value_2"}, - "label_2": clientmodel.LabelValues{"value_2"}, - "label_3": clientmodel.LabelValues{"value_3"}, - }, - expectedLpToFps: LabelPairFingerprintsMapping{ - metric.LabelPair{ - Name: clientmodel.MetricNameLabel, - Value: "metric_0", - }: {0, 1}, - metric.LabelPair{ - Name: clientmodel.MetricNameLabel, - Value: "metric_1", - }: {2}, - metric.LabelPair{ - Name: "label_1", - Value: "value_1", - }: {0}, - metric.LabelPair{ - Name: "label_1", - Value: "value_2", - }: {2}, - metric.LabelPair{ - Name: "label_2", - Value: "value_2", - }: {1}, - metric.LabelPair{ - Name: "label_3", - Value: "value_3", - }: {1}, - }, - }, { - fpToMetric: FingerprintMetricMapping{ - 3: { - clientmodel.MetricNameLabel: "metric_0", - "label_1": "value_3", - }, - 4: { - clientmodel.MetricNameLabel: "metric_2", - "label_2": "value_2", - "label_3": "value_1", - }, - 5: { - clientmodel.MetricNameLabel: "metric_1", - "label_1": "value_3", - }, - }, - expectedLnToLvs: LabelNameLabelValuesMapping{ - clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1", "metric_2"}, - "label_1": clientmodel.LabelValues{"value_1", "value_2", "value_3"}, - "label_2": clientmodel.LabelValues{"value_2"}, - "label_3": clientmodel.LabelValues{"value_1", "value_3"}, - }, - expectedLpToFps: LabelPairFingerprintsMapping{ - metric.LabelPair{ - Name: clientmodel.MetricNameLabel, - Value: "metric_0", - }: {0, 1, 3}, - metric.LabelPair{ - Name: clientmodel.MetricNameLabel, - Value: "metric_1", - }: {2, 5}, - metric.LabelPair{ - Name: clientmodel.MetricNameLabel, - Value: "metric_2", - }: {4}, - metric.LabelPair{ - Name: "label_1", - Value: "value_1", - }: {0}, - metric.LabelPair{ - Name: "label_1", - Value: "value_2", - }: {2}, - metric.LabelPair{ - Name: "label_1", - Value: "value_3", - }: {3, 5}, - metric.LabelPair{ - Name: "label_2", - Value: "value_2", - }: {1, 4}, - metric.LabelPair{ - Name: "label_3", - Value: "value_1", - }: {4}, - metric.LabelPair{ - Name: "label_3", - Value: "value_3", - }: {1}, - }, - }, - } - - fpToMetricDB, fpToMetricCloser := newTestDB(t) - defer fpToMetricCloser.Close() - lnToLvsDB, lnToLvsCloser := newTestDB(t) - defer lnToLvsCloser.Close() - lpToFpDB, lpToFpCloser := newTestDB(t) - defer lpToFpCloser.Close() - fpMsDB, fpMsCloser := newTestDB(t) - defer fpMsCloser.Close() - - indexer := diskIndexer{ - FingerprintToMetric: NewFingerprintMetricIndex(fpToMetricDB), - LabelNameToLabelValues: NewLabelNameLabelValuesIndex(lnToLvsDB), - LabelPairToFingerprints: NewLabelPairFingerprintIndex(lpToFpDB), - FingerprintMembership: NewFingerprintMembershipIndex(fpMsDB), - } - - indexedFpsToMetrics := FingerprintMetricMapping{} - for i, b := range batches { - indexer.IndexMetrics(b.fpToMetric) - for fp, m := range b.fpToMetric { - indexedFpsToMetrics[fp] = m - } - - verifyIndexedState(i, t, b, indexedFpsToMetrics, &indexer) - } - - for i := len(batches) - 1; i >= 0; i-- { - b := batches[i] - verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, &indexer) - indexer.UnindexMetrics(b.fpToMetric) - for fp, _ := range b.fpToMetric { - delete(indexedFpsToMetrics, fp) - } - } -} diff --git a/storage/local/index/leveldb.go b/storage/local/index/leveldb.go index e8b6429f57..2133e9e433 100644 --- a/storage/local/index/leveldb.go +++ b/storage/local/index/leveldb.go @@ -53,7 +53,7 @@ func (l *LevelDB) Close() error { func (l *LevelDB) Get(key encoding.BinaryMarshaler, value encoding.BinaryUnmarshaler) (bool, error) { k, err := key.MarshalBinary() if err != nil { - return false, nil + return false, err } raw, err := l.storage.Get(k, l.readOpts) if err == leveldb.ErrNotFound { diff --git a/storage/local/interface.go b/storage/local/interface.go index 5b477f8a78..0a4dd04ab7 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -75,14 +75,14 @@ type Persistence interface { // IndexMetric indexes the given metric for the needs of // GetFingerprintsForLabelPair and GetLabelValuesForLabelName. - IndexMetric(clientmodel.Metric) error + IndexMetric(clientmodel.Metric, clientmodel.Fingerprint) error // UnindexMetric removes references to the given metric from the indexes // used for GetFingerprintsForLabelPair and // GetLabelValuesForLabelName. The index of fingerprints to archived // metrics is not affected by this method. (In fact, never call this // method for an archived metric. To drop an archived metric, call // DropArchivedFingerprint.) - UnindexMetric(clientmodel.Metric) error + UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) error // ArchiveMetric persists the mapping of the given fingerprint to the // given metric, together with the first and last timestamp of the diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 0a7f0358b5..c69b78f344 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -401,44 +401,80 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie return false, nil } -func (d *diskPersistence) IndexMetric(m clientmodel.Metric) error { - // TODO: Implement. Possibly in a queue (which needs to be drained before shutdown). - return nil +func (d *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { + // TODO: Don't do it directly, but add it to a queue (which needs to be + // drained before shutdown). Queuing would make this asynchronously, and + // then batches could be created easily. + if err := d.labelNameToLabelValues.Extend(m); err != nil { + return err + } + return d.labelPairToFingerprints.Extend(m, fp) } -func (d *diskPersistence) UnindexMetric(m clientmodel.Metric) error { - // TODO: Implement. Possibly in a queue (which needs to be drained before shutdown). - return nil +func (d *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { + // TODO: Don't do it directly, but add it to a queue (which needs to be + // drained before shutdown). Queuing would make this asynchronously, and + // then batches could be created easily. + labelPairs, err := d.labelPairToFingerprints.Reduce(m, fp) + if err != nil { + return err + } + return d.labelNameToLabelValues.Reduce(labelPairs) } func (d *diskPersistence) ArchiveMetric( - fingerprint clientmodel.Fingerprint, metric clientmodel.Metric, - firstTime, lastTime clientmodel.Timestamp, + // TODO: Two step process, make sure this happens atomically. + fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { - // TODO: Implement. + if err := d.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil { + return err + } + if err := d.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil { + return err + } return nil } -func (d *diskPersistence) HasArchivedMetric(clientmodel.Fingerprint) ( +func (d *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { - // TODO: Implement. + firstTime, lastTime, hasMetric, err = d.archivedFingerprintToTimeRange.Lookup(fp) return } -func (d *diskPersistence) GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error) { - // TODO: Implement. - return nil, nil +func (d *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { + metric, _, err := d.archivedFingerprintToMetrics.Lookup(fp) + return metric, err } -func (d *diskPersistence) DropArchivedMetric(clientmodel.Fingerprint) error { - // TODO: Implement. Unindex after drop! - return nil +func (d *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { + // TODO: Multi-step process, make sure this happens atomically. + metric, err := d.GetArchivedMetric(fp) + if err != nil || metric == nil { + return err + } + if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + return err + } + if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + return err + } + return d.UnindexMetric(metric, fp) } -func (d *diskPersistence) UnarchiveMetric(clientmodel.Fingerprint) (bool, error) { - // TODO: Implement. - return false, nil +func (d *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { + // TODO: Multi-step process, make sure this happens atomically. + has, err := d.archivedFingerprintToTimeRange.Has(fp) + if err != nil || !has { + return false, err + } + if err := d.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + return false, err + } + if err := d.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + return false, err + } + return true, nil } func (d *diskPersistence) Close() error { diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 2fe88e5983..4ff325b4e2 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -1,10 +1,12 @@ package storage_ng import ( + "sort" "testing" clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility/test" ) @@ -89,3 +91,226 @@ func TestPersistChunk(t *testing.T) { } } } + +type incrementalBatch struct { + fpToMetric index.FingerprintMetricMapping + expectedLnToLvs index.LabelNameLabelValuesMapping + expectedLpToFps index.LabelPairFingerprintsMapping +} + +func TestIndexing(t *testing.T) { + batches := []incrementalBatch{ + { + fpToMetric: index.FingerprintMetricMapping{ + 0: { + clientmodel.MetricNameLabel: "metric_0", + "label_1": "value_1", + }, + 1: { + clientmodel.MetricNameLabel: "metric_0", + "label_2": "value_2", + "label_3": "value_3", + }, + 2: { + clientmodel.MetricNameLabel: "metric_1", + "label_1": "value_2", + }, + }, + expectedLnToLvs: index.LabelNameLabelValuesMapping{ + clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1"}, + "label_1": clientmodel.LabelValues{"value_1", "value_2"}, + "label_2": clientmodel.LabelValues{"value_2"}, + "label_3": clientmodel.LabelValues{"value_3"}, + }, + expectedLpToFps: index.LabelPairFingerprintsMapping{ + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_0", + }: {0, 1}, + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_1", + }: {2}, + metric.LabelPair{ + Name: "label_1", + Value: "value_1", + }: {0}, + metric.LabelPair{ + Name: "label_1", + Value: "value_2", + }: {2}, + metric.LabelPair{ + Name: "label_2", + Value: "value_2", + }: {1}, + metric.LabelPair{ + Name: "label_3", + Value: "value_3", + }: {1}, + }, + }, { + fpToMetric: index.FingerprintMetricMapping{ + 3: { + clientmodel.MetricNameLabel: "metric_0", + "label_1": "value_3", + }, + 4: { + clientmodel.MetricNameLabel: "metric_2", + "label_2": "value_2", + "label_3": "value_1", + }, + 5: { + clientmodel.MetricNameLabel: "metric_1", + "label_1": "value_3", + }, + }, + expectedLnToLvs: index.LabelNameLabelValuesMapping{ + clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1", "metric_2"}, + "label_1": clientmodel.LabelValues{"value_1", "value_2", "value_3"}, + "label_2": clientmodel.LabelValues{"value_2"}, + "label_3": clientmodel.LabelValues{"value_1", "value_3"}, + }, + expectedLpToFps: index.LabelPairFingerprintsMapping{ + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_0", + }: {0, 1, 3}, + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_1", + }: {2, 5}, + metric.LabelPair{ + Name: clientmodel.MetricNameLabel, + Value: "metric_2", + }: {4}, + metric.LabelPair{ + Name: "label_1", + Value: "value_1", + }: {0}, + metric.LabelPair{ + Name: "label_1", + Value: "value_2", + }: {2}, + metric.LabelPair{ + Name: "label_1", + Value: "value_3", + }: {3, 5}, + metric.LabelPair{ + Name: "label_2", + Value: "value_2", + }: {1, 4}, + metric.LabelPair{ + Name: "label_3", + Value: "value_1", + }: {4}, + metric.LabelPair{ + Name: "label_3", + Value: "value_3", + }: {1}, + }, + }, + } + + p, closer := newTestPersistence(t) + defer closer.Close() + + indexedFpsToMetrics := index.FingerprintMetricMapping{} + for i, b := range batches { + for fp, m := range b.fpToMetric { + if err := p.IndexMetric(m, fp); err != nil { + t.Fatal(err) + } + if err := p.ArchiveMetric(fp, m, 1, 2); err != nil { + t.Fatal(err) + } + indexedFpsToMetrics[fp] = m + } + verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) + } + + for i := len(batches) - 1; i >= 0; i-- { + b := batches[i] + verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) + for fp, m := range b.fpToMetric { + if err := p.UnindexMetric(m, fp); err != nil { + t.Fatal(err) + } + unarchived, err := p.UnarchiveMetric(fp) + if err != nil { + t.Fatal(err) + } + if !unarchived { + t.Errorf("%d. metric not unarchived", i) + } + delete(indexedFpsToMetrics, fp) + } + } +} + +func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) { + for fp, m := range indexedFpsToMetrics { + // Compare archived metrics with input metrics. + mOut, err := p.GetArchivedMetric(fp) + if err != nil { + t.Fatal(err) + } + if !mOut.Equal(m) { + t.Errorf("%d. %v: Got: %s; want %s", i, fp, mOut, m) + } + + // Check that archived metrics are in membership index. + has, first, last, err := p.HasArchivedMetric(fp) + if err != nil { + t.Fatal(err) + } + if !has { + t.Errorf("%d. fingerprint %v not found", i, fp) + } + if first != 1 || last != 2 { + t.Errorf( + "%d. %v: Got first: %d, last %d; want first: %d, last %d", + i, fp, first, last, 1, 2, + ) + } + } + + // Compare label name -> label values mappings. + for ln, lvs := range b.expectedLnToLvs { + outLvs, err := p.GetLabelValuesForLabelName(ln) + if err != nil { + t.Fatal(err) + } + + sort.Sort(lvs) + sort.Sort(outLvs) + + if len(lvs) != len(outLvs) { + t.Errorf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs)) + } + for j, _ := range lvs { + if lvs[j] != outLvs[j] { + t.Errorf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j]) + } + } + } + + // Compare label pair -> fingerprints mappings. + for lp, fps := range b.expectedLpToFps { + outFps, err := p.GetFingerprintsForLabelPair(lp) + if err != nil { + t.Fatal(err) + } + + sort.Sort(fps) + sort.Sort(outFps) + + if len(fps) != len(outFps) { + t.Errorf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps)) + } + for j, _ := range fps { + if fps[j] != outFps[j] { + t.Errorf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j]) + } + } + } +} diff --git a/storage/local/storage.go b/storage/local/storage.go index 69761cf1b7..093b642756 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -126,7 +126,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySer series.chunkDescsLoaded = false } else { // This was a genuinely new series, so index the metric. - if err := s.persistence.IndexMetric(m); err != nil { + if err := s.persistence.IndexMetric(m, fp); err != nil { glog.Errorf("Error indexing metric %v: %v", m, err) } } @@ -286,7 +286,9 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) { // persistence.PersistChunck needs to be locked on fp level, or // something. And even then, what happens if everything is dropped, but // there are still chunks hung in the persist queue? They would later - // re-create a file for a series that doesn't exist anymore... + // re-create a file for a series that doesn't exist anymore... But + // there is the ref count, which is one higher if you have not yet + // persisted the chunk. defer s.mtx.Unlock() // First purge persisted chunks. We need to do that anyway. @@ -299,15 +301,16 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint) { if series, ok := s.fingerprintToSeries[fp]; ok { if series.purgeOlderThan(ts) { delete(s.fingerprintToSeries, fp) - if err := s.persistence.UnindexMetric(series.metric); err != nil { + if err := s.persistence.UnindexMetric(series.metric, fp); err != nil { glog.Errorf("Error unindexing metric %v: %v", series.metric, err) } } return } - // If nothing was in memory, the metric must have been archived. Drop - // the archived metric if there are no persisted chunks left. + // If we arrive here, nothing was in memory, so the metric must have + // been archived. Drop the archived metric if there are no persisted + // chunks left. if !allDropped { return }