diff --git a/storage/local/codec/codec.go b/storage/local/codable/codable.go similarity index 65% rename from storage/local/codec/codec.go rename to storage/local/codable/codable.go index c75b7a1401..27f9a979c8 100644 --- a/storage/local/codec/codec.go +++ b/storage/local/codable/codable.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Package codec provides types that implement encoding.BinaryMarshaler and +// Package codable provides types that implement encoding.BinaryMarshaler and // encoding.BinaryUnmarshaler and functions that help to encode and decode // primitives. The Prometheus storage backend uses them to persist objects to // files and to save objects in LevelDB. @@ -28,11 +28,10 @@ // // Maps are encoded as the number of mappings as a varint, followed by the // mappings, each of which consists of the key followed by the value. -package codec +package codable import ( "bytes" - "encoding" "encoding/binary" "fmt" "io" @@ -43,14 +42,6 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -// codable implements both, encoding.BinaryMarshaler and -// encoding.BinaryUnmarshaler, which is only needed internally and therefore not -// exported for now. -type codable interface { - encoding.BinaryMarshaler - encoding.BinaryUnmarshaler -} - // A byteReader is an io.ByteReader that also implements the vanilla io.Reader // interface. type byteReader interface { @@ -145,12 +136,12 @@ func decodeString(b byteReader) (string, error) { return string(buf), nil } -// A CodableMetric is a clientmodel.Metric that implements +// A Metric is a clientmodel.Metric that implements // encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableMetric clientmodel.Metric +type Metric clientmodel.Metric // MarshalBinary implements encoding.BinaryMarshaler. -func (m CodableMetric) MarshalBinary() ([]byte, error) { +func (m Metric) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(len(m))); err != nil { return nil, err @@ -167,20 +158,20 @@ func (m CodableMetric) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. It can be used with the -// zero value of CodableMetric. -func (m *CodableMetric) UnmarshalBinary(buf []byte) error { +// zero value of Metric. +func (m *Metric) UnmarshalBinary(buf []byte) error { return m.UnmarshalFromReader(bytes.NewReader(buf)) } -// UnmarshalFromReader unmarshals a CodableMetric from a reader that implements +// UnmarshalFromReader unmarshals a Metric from a reader that implements // both, io.Reader and io.ByteReader. It can be used with the zero value of -// CodableMetric. -func (m *CodableMetric) UnmarshalFromReader(r byteReader) error { +// Metric. +func (m *Metric) UnmarshalFromReader(r byteReader) error { numLabelPairs, err := binary.ReadVarint(r) if err != nil { return err } - *m = make(CodableMetric, numLabelPairs) + *m = make(Metric, numLabelPairs) for ; numLabelPairs > 0; numLabelPairs-- { ln, err := decodeString(r) @@ -196,31 +187,64 @@ func (m *CodableMetric) UnmarshalFromReader(r byteReader) error { return nil } -// A CodableFingerprint is a clientmodel.Fingerprint that implements +// A Fingerprint is a clientmodel.Fingerprint that implements // encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. The implementation // depends on clientmodel.Fingerprint to be convertible to uint64. It encodes // the fingerprint as a big-endian uint64. -type CodableFingerprint clientmodel.Fingerprint +type Fingerprint clientmodel.Fingerprint // MarshalBinary implements encoding.BinaryMarshaler. -func (fp CodableFingerprint) MarshalBinary() ([]byte, error) { +func (fp Fingerprint) MarshalBinary() ([]byte, error) { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(fp)) return b, nil } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (fp *CodableFingerprint) UnmarshalBinary(buf []byte) error { - *fp = CodableFingerprint(binary.BigEndian.Uint64(buf)) +func (fp *Fingerprint) UnmarshalBinary(buf []byte) error { + *fp = Fingerprint(binary.BigEndian.Uint64(buf)) return nil } -// CodableFingerprints is a clientmodel.Fingerprints that implements -// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableFingerprints clientmodel.Fingerprints +// FingerprintSet is a map[clientmodel.Fingerprint]struct{} that +// implements encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its +// binary form is identical to that of Fingerprints. +type FingerprintSet map[clientmodel.Fingerprint]struct{} // MarshalBinary implements encoding.BinaryMarshaler. -func (fps CodableFingerprints) MarshalBinary() ([]byte, error) { +func (fps FingerprintSet) MarshalBinary() ([]byte, error) { + b := make([]byte, binary.MaxVarintLen64+len(fps)*8) + lenBytes := binary.PutVarint(b, int64(len(fps))) + offset := lenBytes + + for fp := range fps { + binary.BigEndian.PutUint64(b[offset:], uint64(fp)) + offset += 8 + } + return b[:len(fps)*8+lenBytes], nil +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (fps *FingerprintSet) UnmarshalBinary(buf []byte) error { + numFPs, offset := binary.Varint(buf) + if offset <= 0 { + return fmt.Errorf("could not decode length of Fingerprints, varint decoding returned %d", offset) + } + *fps = make(FingerprintSet, numFPs) + + for i := 0; i < int(numFPs); i++ { + (*fps)[clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:]))] = struct{}{} + } + return nil +} + +// Fingerprints is a clientmodel.Fingerprints that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is +// identical to that of FingerprintSet. +type Fingerprints clientmodel.Fingerprints + +// MarshalBinary implements encoding.BinaryMarshaler. +func (fps Fingerprints) MarshalBinary() ([]byte, error) { b := make([]byte, binary.MaxVarintLen64+len(fps)*8) lenBytes := binary.PutVarint(b, int64(len(fps))) @@ -231,12 +255,12 @@ func (fps CodableFingerprints) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error { +func (fps *Fingerprints) UnmarshalBinary(buf []byte) error { numFPs, offset := binary.Varint(buf) if offset <= 0 { - return fmt.Errorf("could not decode length of CodableFingerprints, varint decoding returned %d", offset) + return fmt.Errorf("could not decode length of Fingerprints, varint decoding returned %d", offset) } - *fps = make(CodableFingerprints, numFPs) + *fps = make(Fingerprints, numFPs) for i := range *fps { (*fps)[i] = clientmodel.Fingerprint(binary.BigEndian.Uint64(buf[offset+i*8:])) @@ -244,12 +268,12 @@ func (fps *CodableFingerprints) UnmarshalBinary(buf []byte) error { return nil } -// CodableLabelPair is a metric.LabelPair that implements +// LabelPair is a metric.LabelPair that implements // encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableLabelPair metric.LabelPair +type LabelPair metric.LabelPair // MarshalBinary implements encoding.BinaryMarshaler. -func (lp CodableLabelPair) MarshalBinary() ([]byte, error) { +func (lp LabelPair) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := encodeString(buf, string(lp.Name)); err != nil { return nil, err @@ -261,7 +285,7 @@ func (lp CodableLabelPair) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error { +func (lp *LabelPair) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) n, err := decodeString(r) if err != nil { @@ -276,12 +300,12 @@ func (lp *CodableLabelPair) UnmarshalBinary(buf []byte) error { return nil } -// CodableLabelName is a clientmodel.LabelName that implements +// LabelName is a clientmodel.LabelName that implements // encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableLabelName clientmodel.LabelName +type LabelName clientmodel.LabelName // MarshalBinary implements encoding.BinaryMarshaler. -func (l CodableLabelName) MarshalBinary() ([]byte, error) { +func (l LabelName) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := encodeString(buf, string(l)); err != nil { return nil, err @@ -290,22 +314,61 @@ func (l CodableLabelName) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (l *CodableLabelName) UnmarshalBinary(buf []byte) error { +func (l *LabelName) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) n, err := decodeString(r) if err != nil { return err } - *l = CodableLabelName(n) + *l = LabelName(n) return nil } -// CodableLabelValues is a clientmodel.LabelValues that implements -// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableLabelValues clientmodel.LabelValues +// LabelValueSet is a map[clientmodel.LabelValue]struct{} that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is +// identical to that of LabelValues. +type LabelValueSet map[clientmodel.LabelValue]struct{} // MarshalBinary implements encoding.BinaryMarshaler. -func (vs CodableLabelValues) MarshalBinary() ([]byte, error) { +func (vs LabelValueSet) MarshalBinary() ([]byte, error) { + buf := &bytes.Buffer{} + if err := EncodeVarint(buf, int64(len(vs))); err != nil { + return nil, err + } + for v := range vs { + if err := encodeString(buf, string(v)); err != nil { + return nil, err + } + } + return buf.Bytes(), nil +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler. +func (vs *LabelValueSet) UnmarshalBinary(buf []byte) error { + r := bytes.NewReader(buf) + numValues, err := binary.ReadVarint(r) + if err != nil { + return err + } + *vs = make(LabelValueSet, numValues) + + for i := int64(0); i < numValues; i++ { + v, err := decodeString(r) + if err != nil { + return err + } + (*vs)[clientmodel.LabelValue(v)] = struct{}{} + } + return nil +} + +// LabelValues is a clientmodel.LabelValues that implements +// encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. Its binary form is +// identical to that of LabelValueSet. +type LabelValues clientmodel.LabelValues + +// MarshalBinary implements encoding.BinaryMarshaler. +func (vs LabelValues) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(len(vs))); err != nil { return nil, err @@ -319,13 +382,13 @@ func (vs CodableLabelValues) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { +func (vs *LabelValues) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) numValues, err := binary.ReadVarint(r) if err != nil { return err } - *vs = make(CodableLabelValues, numValues) + *vs = make(LabelValues, numValues) for i := range *vs { v, err := decodeString(r) @@ -337,14 +400,14 @@ func (vs *CodableLabelValues) UnmarshalBinary(buf []byte) error { return nil } -// CodableTimeRange is used to define a time range and implements +// TimeRange is used to define a time range and implements // encoding.BinaryMarshaler and encoding.BinaryUnmarshaler. -type CodableTimeRange struct { +type TimeRange struct { First, Last clientmodel.Timestamp } // MarshalBinary implements encoding.BinaryMarshaler. -func (tr CodableTimeRange) MarshalBinary() ([]byte, error) { +func (tr TimeRange) MarshalBinary() ([]byte, error) { buf := &bytes.Buffer{} if err := EncodeVarint(buf, int64(tr.First)); err != nil { return nil, err @@ -356,7 +419,7 @@ func (tr CodableTimeRange) MarshalBinary() ([]byte, error) { } // UnmarshalBinary implements encoding.BinaryUnmarshaler. -func (tr *CodableTimeRange) UnmarshalBinary(buf []byte) error { +func (tr *TimeRange) UnmarshalBinary(buf []byte) error { r := bytes.NewReader(buf) first, err := binary.ReadVarint(r) if err != nil { diff --git a/storage/local/codable/codable_test.go b/storage/local/codable/codable_test.go new file mode 100644 index 0000000000..710cbf9be8 --- /dev/null +++ b/storage/local/codable/codable_test.go @@ -0,0 +1,149 @@ +// Copyright 2014 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package codable + +import ( + "encoding" + "reflect" + "testing" +) + +func newFingerprint(fp int64) *Fingerprint { + cfp := Fingerprint(fp) + return &cfp +} + +func newLabelName(ln string) *LabelName { + cln := LabelName(ln) + return &cln +} + +func TestCodec(t *testing.T) { + scenarios := []struct { + in encoding.BinaryMarshaler + out encoding.BinaryUnmarshaler + equal func(in, out interface{}) bool + }{ + { + in: &Metric{ + "label_1": "value_2", + "label_2": "value_2", + "label_3": "value_3", + }, + out: &Metric{}, + }, { + in: newFingerprint(12345), + out: newFingerprint(0), + }, { + in: &Fingerprints{1, 2, 56, 1234}, + out: &Fingerprints{}, + }, { + in: &Fingerprints{1, 2, 56, 1234}, + out: &FingerprintSet{}, + equal: func(in, out interface{}) bool { + inSet := FingerprintSet{} + for _, fp := range *(in.(*Fingerprints)) { + inSet[fp] = struct{}{} + } + return reflect.DeepEqual(inSet, *(out.(*FingerprintSet))) + }, + }, { + in: &FingerprintSet{ + 1: struct{}{}, + 2: struct{}{}, + 56: struct{}{}, + 1234: struct{}{}, + }, + out: &FingerprintSet{}, + }, { + in: &FingerprintSet{ + 1: struct{}{}, + 2: struct{}{}, + 56: struct{}{}, + 1234: struct{}{}, + }, + out: &Fingerprints{}, + equal: func(in, out interface{}) bool { + outSet := FingerprintSet{} + for _, fp := range *(out.(*Fingerprints)) { + outSet[fp] = struct{}{} + } + return reflect.DeepEqual(outSet, *(in.(*FingerprintSet))) + }, + }, { + in: &LabelPair{ + Name: "label_name", + Value: "label_value", + }, + out: &LabelPair{}, + }, { + in: newLabelName("label_name"), + out: newLabelName(""), + }, { + in: &LabelValues{"value_1", "value_2", "value_3"}, + out: &LabelValues{}, + }, { + in: &LabelValues{"value_1", "value_2", "value_3"}, + out: &LabelValueSet{}, + equal: func(in, out interface{}) bool { + inSet := LabelValueSet{} + for _, lv := range *(in.(*LabelValues)) { + inSet[lv] = struct{}{} + } + return reflect.DeepEqual(inSet, *(out.(*LabelValueSet))) + }, + }, { + in: &LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + "value_3": struct{}{}, + }, + out: &LabelValueSet{}, + }, { + in: &LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + "value_3": struct{}{}, + }, + out: &LabelValues{}, + equal: func(in, out interface{}) bool { + outSet := LabelValueSet{} + for _, lv := range *(out.(*LabelValues)) { + outSet[lv] = struct{}{} + } + return reflect.DeepEqual(outSet, *(in.(*LabelValueSet))) + }, + }, { + in: &TimeRange{42, 2001}, + out: &TimeRange{}, + }, + } + + for i, s := range scenarios { + encoded, err := s.in.MarshalBinary() + if err != nil { + t.Fatal(err) + } + if err := s.out.UnmarshalBinary(encoded); err != nil { + t.Fatal(err) + } + equal := s.equal + if equal == nil { + equal = reflect.DeepEqual + } + if !equal(s.in, s.out) { + t.Errorf("%d. Got: %v; want %v; encoded bytes are: %v", i, s.out, s.in, encoded) + } + } +} diff --git a/storage/local/codec/codec_test.go b/storage/local/codec/codec_test.go deleted file mode 100644 index 43c27f8800..0000000000 --- a/storage/local/codec/codec_test.go +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright 2014 Prometheus Team -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package codec - -import ( - "testing" - - clientmodel "github.com/prometheus/client_golang/model" -) - -func newCodableFingerprint(fp int64) *CodableFingerprint { - cfp := CodableFingerprint(fp) - return &cfp -} - -func newCodableLabelName(ln string) *CodableLabelName { - cln := CodableLabelName(ln) - return &cln -} - -func TestCodec(t *testing.T) { - scenarios := []struct { - in codable - out codable - equal func(in, out codable) bool - }{ - { - in: &CodableMetric{ - "label_1": "value_2", - "label_2": "value_2", - "label_3": "value_3", - }, - out: &CodableMetric{}, - equal: func(in, out codable) bool { - m1 := (*clientmodel.Metric)(in.(*CodableMetric)) - m2 := (*clientmodel.Metric)(out.(*CodableMetric)) - return m1.Equal(*m2) - }, - }, { - in: newCodableFingerprint(12345), - out: newCodableFingerprint(0), - equal: func(in, out codable) bool { - return *in.(*CodableFingerprint) == *out.(*CodableFingerprint) - }, - }, { - in: &CodableFingerprints{1, 2, 56, 1234}, - out: &CodableFingerprints{}, - equal: func(in, out codable) bool { - fps1 := *in.(*CodableFingerprints) - fps2 := *out.(*CodableFingerprints) - if len(fps1) != len(fps2) { - return false - } - for i := range fps1 { - if fps1[i] != fps2[i] { - return false - } - } - return true - }, - }, { - in: &CodableLabelPair{ - Name: "label_name", - Value: "label_value", - }, - out: &CodableLabelPair{}, - equal: func(in, out codable) bool { - lp1 := *in.(*CodableLabelPair) - lp2 := *out.(*CodableLabelPair) - return lp1 == lp2 - }, - }, { - in: newCodableLabelName("label_name"), - out: newCodableLabelName(""), - equal: func(in, out codable) bool { - ln1 := *in.(*CodableLabelName) - ln2 := *out.(*CodableLabelName) - return ln1 == ln2 - }, - }, { - in: &CodableLabelValues{"value_1", "value_2", "value_3"}, - out: &CodableLabelValues{}, - equal: func(in, out codable) bool { - lvs1 := *in.(*CodableLabelValues) - lvs2 := *out.(*CodableLabelValues) - if len(lvs1) != len(lvs2) { - return false - } - for i := range lvs1 { - if lvs1[i] != lvs2[i] { - return false - } - } - return true - }, - }, { - in: &CodableTimeRange{42, 2001}, - out: &CodableTimeRange{}, - equal: func(in, out codable) bool { - ln1 := *in.(*CodableTimeRange) - ln2 := *out.(*CodableTimeRange) - return ln1 == ln2 - }, - }, - } - - for i, s := range scenarios { - encoded, err := s.in.MarshalBinary() - if err != nil { - t.Fatal(err) - } - if err := s.out.UnmarshalBinary(encoded); err != nil { - t.Fatal(err) - } - if !s.equal(s.in, s.out) { - t.Errorf("%d. Got: %v; want %v; encoded bytes are: %v", i, s.out, s.in, encoded) - } - } -} diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 7a3de7093f..fe272fbe70 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -22,9 +22,8 @@ import ( clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/storage/local/codec" + "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/metric" - "github.com/prometheus/prometheus/utility" ) const ( @@ -58,7 +57,7 @@ func (i *FingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) er b := i.NewBatch() for fp, m := range mapping { - b.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)) + b.Put(codable.Fingerprint(fp), codable.Metric(m)) } return i.Commit(b) @@ -73,7 +72,7 @@ func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) b := i.NewBatch() for fp := range mapping { - b.Delete(codec.CodableFingerprint(fp)) + b.Delete(codable.Fingerprint(fp)) } return i.Commit(b) @@ -84,7 +83,7 @@ func (i *FingerprintMetricIndex) UnindexBatch(mapping FingerprintMetricMapping) // // This method is goroutine-safe. func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clientmodel.Metric, ok bool, err error) { - ok, err = i.Get(codec.CodableFingerprint(fp), (*codec.CodableMetric)(&metric)) + ok, err = i.Get(codable.Fingerprint(fp), (*codable.Metric)(&metric)) return } @@ -105,7 +104,7 @@ func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) // LabelNameLabelValuesMapping is an in-memory map of label names to // label values. -type LabelNameLabelValuesMapping map[clientmodel.LabelName]clientmodel.LabelValues +type LabelNameLabelValuesMapping map[clientmodel.LabelName]codable.LabelValueSet // LabelNameLabelValuesIndex is a KeyValueStore that maps existing label names // to all label values stored for that label name. @@ -118,19 +117,17 @@ type LabelNameLabelValuesIndex struct { // a deletion of that mapping from the index. // // While this method is fundamentally goroutine-safe, note that the order of -// execution for multiple batches executed concurrently is undefined. Also, it -// is in general not safe to mutate the index while Extend or Reduce are -// running. +// execution for multiple batches executed concurrently is undefined. func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) error { batch := i.NewBatch() for name, values := range b { if len(values) == 0 { - if err := batch.Delete(codec.CodableLabelName(name)); err != nil { + if err := batch.Delete(codable.LabelName(name)); err != nil { return err } } else { - if err := batch.Put(codec.CodableLabelName(name), codec.CodableLabelValues(values)); err != nil { + if err := batch.Put(codable.LabelName(name), values); err != nil { return err } } @@ -145,80 +142,21 @@ func (i *LabelNameLabelValuesIndex) IndexBatch(b LabelNameLabelValuesMapping) er // // This method is goroutine-safe. func (i *LabelNameLabelValuesIndex) Lookup(l clientmodel.LabelName) (values clientmodel.LabelValues, ok bool, err error) { - ok, err = i.Get(codec.CodableLabelName(l), (*codec.CodableLabelValues)(&values)) + ok, err = i.Get(codable.LabelName(l), (*codable.LabelValues)(&values)) return } -// Extend incorporates the given metric into the index, i.e. it creates new -// label name to label values mappings for new label names, and it extends the -// label values list mapped from already existing label names appropriately. +// LookupSet looks up all label values for a given label name. Looking up a +// non-existing label name is not an error. In that case, (nil, false, nil) is +// returned. // -// This method is not goroutine-safe. -func (i *LabelNameLabelValuesIndex) Extend(m clientmodel.Metric) error { - b := make(LabelNameLabelValuesMapping, len(m)) - for ln, lv := range m { - baseLVs, _, err := i.Lookup(ln) - if err != nil { - return err - } - lvSet := utility.Set{} - for _, baseLV := range baseLVs { - lvSet.Add(baseLV) - } - lvSet.Add(lv) - if len(lvSet) == len(baseLVs) { - continue - } - lvs := make(clientmodel.LabelValues, 0, len(lvSet)) - for v := range lvSet { - lvs = append(lvs, v.(clientmodel.LabelValue)) - } - b[ln] = lvs +// This method is goroutine-safe. +func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values map[clientmodel.LabelValue]struct{}, ok bool, err error) { + ok, err = i.Get(codable.LabelName(l), (*codable.LabelValueSet)(&values)) + if values == nil { + values = map[clientmodel.LabelValue]struct{}{} } - return i.IndexBatch(b) -} - -// Reduce removes label values from the index based on the given label pair to -// fingerprints mapping. The mapping to be passed in here is returned by -// LabelPairFingerprintIndex.Reduce. It contains all the label pairs that have -// now fewer fingerprints mapped to it. This method checks if any label pair has -// arrived at zero mapped fingerprints. In that case, the value of that label -// pair is removed from the list of label values mapped to the name of that -// label pair. Label names that are then mapped to zero label values are removed -// entirely from the index. -// -// This method is not goroutine-safe. -func (i *LabelNameLabelValuesIndex) Reduce(m LabelPairFingerprintsMapping) error { - b := make(LabelNameLabelValuesMapping, len(m)) - for lp, fps := range m { - if len(fps) != 0 { - continue - } - ln := lp.Name - lv := lp.Value - baseValues, ok := b[ln] - if !ok { - var err error - baseValues, _, err = i.Lookup(ln) - if err != nil { - return err - } - } - lvSet := utility.Set{} - for _, baseValue := range baseValues { - lvSet.Add(baseValue) - } - lvSet.Remove(lv) - if len(lvSet) == len(baseValues) { - continue - } - lvs := make(clientmodel.LabelValues, 0, len(lvSet)) - for v := range lvSet { - lvs = append(lvs, v.(clientmodel.LabelValue)) - } - b[ln] = lvs - } - return i.IndexBatch(b) + return } // NewLabelNameLabelValuesIndex returns a LevelDB-backed @@ -238,7 +176,7 @@ func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, // LabelPairFingerprintsMapping is an in-memory map of label pairs to // fingerprints. -type LabelPairFingerprintsMapping map[metric.LabelPair]clientmodel.Fingerprints +type LabelPairFingerprintsMapping map[metric.LabelPair]codable.FingerprintSet // LabelPairFingerprintIndex is a KeyValueStore that maps existing label pairs // to the fingerprints of all metrics containing those label pairs. @@ -251,17 +189,15 @@ type LabelPairFingerprintIndex struct { // from the index. // // While this method is fundamentally goroutine-safe, note that the order of -// execution for multiple batches executed concurrently is undefined. Also, it -// is in general not safe to mutate the index while Extend or Reduce are -// running. +// execution for multiple batches executed concurrently is undefined. func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) error { batch := i.NewBatch() for pair, fps := range m { if len(fps) == 0 { - batch.Delete(codec.CodableLabelPair(pair)) + batch.Delete(codable.LabelPair(pair)) } else { - batch.Put(codec.CodableLabelPair(pair), codec.CodableFingerprints(fps)) + batch.Put(codable.LabelPair(pair), fps) } } @@ -274,70 +210,21 @@ func (i *LabelPairFingerprintIndex) IndexBatch(m LabelPairFingerprintsMapping) e // // This method is goroutine-safe. func (i *LabelPairFingerprintIndex) Lookup(p metric.LabelPair) (fps clientmodel.Fingerprints, ok bool, err error) { - ok, err = i.Get((codec.CodableLabelPair)(p), (*codec.CodableFingerprints)(&fps)) + ok, err = i.Get((codable.LabelPair)(p), (*codable.Fingerprints)(&fps)) return } -// Extend incorporates the given metric into the index, i.e. it creates new -// label pair to fingerprint mappings for new label pairs, and it extends the -// fingerprint list mapped from already existing label pairs appropriately. +// LookupSet looks up all fingerprints for a given label pair. Looking up a +// non-existing label pair is not an error. In that case, (nil, false, nil) is +// returned. // -// This method is not goroutine-safe. -func (i *LabelPairFingerprintIndex) Extend(m clientmodel.Metric, fp clientmodel.Fingerprint) error { - b := make(LabelPairFingerprintsMapping, len(m)) - for ln, lv := range m { - lp := metric.LabelPair{Name: ln, Value: lv} - baseFPs, _, err := i.Lookup(lp) - if err != nil { - return err - } - fpSet := utility.Set{} - for _, baseFP := range baseFPs { - fpSet.Add(baseFP) - } - fpSet.Add(fp) - if len(fpSet) == len(baseFPs) { - continue - } - fps := make(clientmodel.Fingerprints, 0, len(fpSet)) - for f := range fpSet { - fps = append(fps, f.(clientmodel.Fingerprint)) - } - b[lp] = fps - +// This method is goroutine-safe. +func (i *LabelPairFingerprintIndex) LookupSet(p metric.LabelPair) (fps map[clientmodel.Fingerprint]struct{}, ok bool, err error) { + ok, err = i.Get((codable.LabelPair)(p), (*codable.FingerprintSet)(&fps)) + if fps == nil { + fps = map[clientmodel.Fingerprint]struct{}{} } - return i.IndexBatch(b) -} - -// Reduce removes the given fingerprint from the fingerprint lists mapped from -// the label pairs contained in the given metric. All the updated mappings are -// returned (for consumption by LabelNameLabelValuesIndex.Reduce). -// -// This method is not goroutine-safe. -func (i *LabelPairFingerprintIndex) Reduce(m clientmodel.Metric, fp clientmodel.Fingerprint) (LabelPairFingerprintsMapping, error) { - b := make(LabelPairFingerprintsMapping, len(m)) - for ln, lv := range m { - lp := metric.LabelPair{Name: ln, Value: lv} - baseFPs, _, err := i.Lookup(lp) - if err != nil { - return nil, err - } - fpSet := utility.Set{} - for _, baseFP := range baseFPs { - fpSet.Add(baseFP) - } - fpSet.Remove(fp) - if len(fpSet) == len(baseFPs) { - continue - } - fps := make(clientmodel.Fingerprints, 0, len(fpSet)) - for f := range fpSet { - fps = append(fps, f.(clientmodel.Fingerprint)) - } - b[lp] = fps - - } - return b, i.IndexBatch(b) + return } // NewLabelPairFingerprintIndex returns a LevelDB-backed @@ -367,8 +254,8 @@ type FingerprintTimeRangeIndex struct { // // This method is goroutine-safe. func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTime, lastTime clientmodel.Timestamp, ok bool, err error) { - var tr codec.CodableTimeRange - ok, err = i.Get(codec.CodableFingerprint(fp), &tr) + var tr codable.TimeRange + ok, err = i.Get(codable.Fingerprint(fp), &tr) return tr.First, tr.Last, ok, err } @@ -376,7 +263,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim // // This method is goroutine-safe. func (i *FingerprintTimeRangeIndex) Has(fp clientmodel.Fingerprint) (ok bool, err error) { - return i.KeyValueStore.Has(codec.CodableFingerprint(fp)) + return i.KeyValueStore.Has(codable.Fingerprint(fp)) } // NewFingerprintTimeRangeIndex returns a LevelDB-backed diff --git a/storage/local/interface.go b/storage/local/interface.go index cb32bd16e9..b4e216b1ce 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -90,20 +90,20 @@ type Persistence interface { // GetLabelValuesForLabelName returns the label values for the given // label name. GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error) - // GetFingerprintsModifiedBefore returns the fingerprints whose timeseries - // have live samples before the provided timestamp. - GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) - // IndexMetric indexes the given metric for the needs of - // GetFingerprintsForLabelPair and GetLabelValuesForLabelName. - IndexMetric(clientmodel.Metric, clientmodel.Fingerprint) error - // UnindexMetric removes references to the given metric from the indexes - // used for GetFingerprintsForLabelPair and + // IndexMetric queues the given metric for addition to the indexes + // needed by GetFingerprintsForLabelPair and GetLabelValuesForLabelName. + // If the queue is full, this method blocks until the metric can be queued. + // This method is goroutine-safe. + IndexMetric(clientmodel.Metric, clientmodel.Fingerprint) + // UnindexMetric queues references to the given metric for removal from + // the indexes used for GetFingerprintsForLabelPair and // GetLabelValuesForLabelName. The index of fingerprints to archived - // metrics is not affected by this method. (In fact, never call this + // metrics is not affected by this removal. (In fact, never call this // method for an archived metric. To drop an archived metric, call - // DropArchivedFingerprint.) - UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) error + // DropArchivedFingerprint.) If the queue is full, this method blocks + // until the metric can be queued. This method is goroutine-safe. + UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) // ArchiveMetric persists the mapping of the given fingerprint to the // given metric, together with the first and last timestamp of the @@ -118,12 +118,15 @@ type Persistence interface { HasArchivedMetric(clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) + // GetFingerprintsModifiedBefore returns the fingerprints of archived + // timeseries that have live samples before the provided timestamp. + GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) // GetArchivedMetric retrieves the archived metric with the given // fingerprint. GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error) // DropArchivedMetric deletes an archived fingerprint and its - // corresponding metric entirely. It also un-indexes the metric (no need - // to call UnindexMetric for the deleted metric.) + // corresponding metric entirely. It also queues the metric for + // un-indexing (no need to call UnindexMetric for the deleted metric.) DropArchivedMetric(clientmodel.Fingerprint) error // UnarchiveMetric deletes an archived fingerprint and its metric, but // (in contrast to DropArchivedMetric) does not un-index the metric. diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 4e6ac2ce96..d105cf081b 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -20,12 +20,13 @@ import ( "io" "os" "path" + "time" "github.com/golang/glog" clientmodel "github.com/prometheus/client_golang/model" - "github.com/prometheus/prometheus/storage/local/codec" + "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/metric" ) @@ -44,6 +45,10 @@ const ( chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 + + indexingMaxBatchSize = 1024 + indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. + indexingQueueCapacity = 10 * indexingMaxBatchSize // TODO: Export as metric. ) const ( @@ -52,6 +57,19 @@ const ( flagHeadChunkPersisted ) +type indexingOpType byte + +const ( + add indexingOpType = iota + remove +) + +type indexingOp struct { + fingerprint clientmodel.Fingerprint + metric clientmodel.Metric + opType indexingOpType +} + type diskPersistence struct { basePath string chunkLen int @@ -60,6 +78,9 @@ type diskPersistence struct { archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex labelPairToFingerprints *index.LabelPairFingerprintIndex labelNameToLabelValues *index.LabelNameLabelValuesIndex + + indexingQueue chan indexingOp + indexingStopped chan struct{} } // NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. @@ -67,29 +88,38 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { if err := os.MkdirAll(basePath, 0700); err != nil { return nil, err } - dp := &diskPersistence{ - basePath: basePath, - chunkLen: chunkLen, - } var err error - dp.archivedFingerprintToMetrics, err = index.NewFingerprintMetricIndex(basePath) + archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath) if err != nil { return nil, err } - dp.archivedFingerprintToTimeRange, err = index.NewFingerprintTimeRangeIndex(basePath) + archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath) if err != nil { return nil, err } - dp.labelPairToFingerprints, err = index.NewLabelPairFingerprintIndex(basePath) + labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath) if err != nil { return nil, err } - dp.labelNameToLabelValues, err = index.NewLabelNameLabelValuesIndex(basePath) + labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath) if err != nil { return nil, err } - return dp, nil + p := &diskPersistence{ + basePath: basePath, + chunkLen: chunkLen, + + archivedFingerprintToMetrics: archivedFingerprintToMetrics, + archivedFingerprintToTimeRange: archivedFingerprintToTimeRange, + labelPairToFingerprints: labelPairToFingerprints, + labelNameToLabelValues: labelNameToLabelValues, + + indexingQueue: make(chan indexingOp, indexingQueueCapacity), + indexingStopped: make(chan struct{}), + } + go p.processIndexingQueue() + return p, nil } func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { @@ -109,8 +139,8 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) ( } func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { - var fp codec.CodableFingerprint - var tr codec.CodableTimeRange + var fp codable.Fingerprint + var tr codable.TimeRange fps := []clientmodel.Fingerprint{} p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error { if err := kv.Value(&tr); err != nil { @@ -257,10 +287,10 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap if _, err := w.WriteString(headsMagicString); err != nil { return err } - if err := codec.EncodeVarint(w, headsFormatVersion); err != nil { + if err := codable.EncodeVarint(w, headsFormatVersion); err != nil { return err } - if err := codec.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil { + if err := codable.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil { return err } @@ -275,23 +305,23 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap if err := w.WriteByte(seriesFlags); err != nil { return err } - if err := codec.EncodeUint64(w, uint64(fp)); err != nil { + if err := codable.EncodeUint64(w, uint64(fp)); err != nil { return err } - buf, err := codec.CodableMetric(series.metric).MarshalBinary() + buf, err := codable.Metric(series.metric).MarshalBinary() if err != nil { return err } w.Write(buf) - if err := codec.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil { + if err := codable.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil { return err } for i, chunkDesc := range series.chunkDescs { if series.headChunkPersisted || i < len(series.chunkDescs)-1 { - if err := codec.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { + if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { return err } - if err := codec.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { + if err := codable.EncodeVarint(w, int64(chunkDesc.lastTime())); err != nil { return err } } else { @@ -345,11 +375,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { return nil, err } headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 - fp, err := codec.DecodeUint64(r) + fp, err := codable.DecodeUint64(r) if err != nil { return nil, err } - var metric codec.CodableMetric + var metric codable.Metric if err := metric.UnmarshalFromReader(r); err != nil { return nil, err } @@ -458,35 +488,24 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie return false, nil } -func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { - // TODO: Don't do it directly, but add it to a queue (which needs to be - // drained before shutdown). Queuing would make this asynchronously, and - // then batches could be created easily. - if err := p.labelNameToLabelValues.Extend(m); err != nil { - return err - } - return p.labelPairToFingerprints.Extend(m, fp) +// IndexMetric implements Persistence. +func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { + p.indexingQueue <- indexingOp{fp, m, add} } -func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) error { - // TODO: Don't do it directly, but add it to a queue (which needs to be - // drained before shutdown). Queuing would make this asynchronously, and - // then batches could be created easily. - labelPairs, err := p.labelPairToFingerprints.Reduce(m, fp) - if err != nil { - return err - } - return p.labelNameToLabelValues.Reduce(labelPairs) +// UnindexMetric implements Persistence. +func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { + p.indexingQueue <- indexingOp{fp, m, remove} } func (p *diskPersistence) ArchiveMetric( // TODO: Two step process, make sure this happens atomically. fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, ) error { - if err := p.archivedFingerprintToMetrics.Put(codec.CodableFingerprint(fp), codec.CodableMetric(m)); err != nil { + if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { return err } - if err := p.archivedFingerprintToTimeRange.Put(codec.CodableFingerprint(fp), codec.CodableTimeRange{First: first, Last: last}); err != nil { + if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil { return err } return nil @@ -510,13 +529,14 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { if err != nil || metric == nil { return err } - if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil { return err } - if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return err } - return p.UnindexMetric(metric, fp) + p.UnindexMetric(metric, fp) + return nil } func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { @@ -525,16 +545,19 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err if err != nil || !has { return false, err } - if err := p.archivedFingerprintToMetrics.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp)); err != nil { return false, err } - if err := p.archivedFingerprintToTimeRange.Delete(codec.CodableFingerprint(fp)); err != nil { + if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { return false, err } return true, nil } func (p *diskPersistence) Close() error { + close(p.indexingQueue) + <-p.indexingStopped + var lastError error if err := p.archivedFingerprintToMetrics.Close(); err != nil { lastError = err @@ -596,6 +619,89 @@ func (p *diskPersistence) headsPath() string { return path.Join(p.basePath, headsFileName) } +func (p *diskPersistence) processIndexingQueue() { + batchSize := 0 + nameToValues := index.LabelNameLabelValuesMapping{} + pairToFPs := index.LabelPairFingerprintsMapping{} + batchTimeout := time.NewTimer(indexingBatchTimeout) + defer batchTimeout.Stop() + + commitBatch := func() { + if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil { + glog.Error("Error indexing label pair to fingerprints batch: ", err) + } + if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil { + glog.Error("Error indexing label name to label values batch: ", err) + } + batchSize = 0 + nameToValues = index.LabelNameLabelValuesMapping{} + pairToFPs = index.LabelPairFingerprintsMapping{} + } + +loop: + for { + select { + case <-batchTimeout.C: + if batchSize > 0 { + commitBatch() + } + batchTimeout.Reset(indexingBatchTimeout) + case op, ok := <-p.indexingQueue: + batchTimeout.Stop() + + if !ok { + if batchSize > 0 { + commitBatch() + } + break loop + } + + batchSize++ + for ln, lv := range op.metric { + lp := metric.LabelPair{Name: ln, Value: lv} + baseFPs, ok := pairToFPs[lp] + if !ok { + var err error + baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp) + if err != nil { + glog.Errorf("Error looking up label pair %v: %s", lp, err) + continue + } + pairToFPs[lp] = baseFPs + } + baseValues, ok := nameToValues[ln] + if !ok { + var err error + baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln) + if err != nil { + glog.Errorf("Error looking up label name %v: %s", ln, err) + continue + } + nameToValues[ln] = baseValues + } + switch op.opType { + case add: + baseFPs[op.fingerprint] = struct{}{} + baseValues[lv] = struct{}{} + case remove: + delete(baseFPs, op.fingerprint) + if len(baseFPs) == 0 { + delete(baseValues, lv) + } + default: + panic("unknown op type") + } + } + + if batchSize >= indexingMaxBatchSize { + commitBatch() + } + batchTimeout.Reset(indexingBatchTimeout) + } + } + close(p.indexingStopped) +} + // exists returns true when the given file or directory exists. func exists(path string) (bool, error) { _, err := os.Stat(path) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 3ef92e949e..525c74bf38 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -14,11 +14,13 @@ package local import ( - "sort" + "reflect" "testing" + "time" clientmodel "github.com/prometheus/client_golang/model" + "github.com/prometheus/prometheus/storage/local/codable" "github.com/prometheus/prometheus/storage/local/index" "github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/utility/test" @@ -130,36 +132,46 @@ func TestIndexing(t *testing.T) { }, }, expectedLnToLvs: index.LabelNameLabelValuesMapping{ - clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1"}, - "label_1": clientmodel.LabelValues{"value_1", "value_2"}, - "label_2": clientmodel.LabelValues{"value_2"}, - "label_3": clientmodel.LabelValues{"value_3"}, + clientmodel.MetricNameLabel: codable.LabelValueSet{ + "metric_0": struct{}{}, + "metric_1": struct{}{}, + }, + "label_1": codable.LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + }, + "label_2": codable.LabelValueSet{ + "value_2": struct{}{}, + }, + "label_3": codable.LabelValueSet{ + "value_3": struct{}{}, + }, }, expectedLpToFps: index.LabelPairFingerprintsMapping{ metric.LabelPair{ Name: clientmodel.MetricNameLabel, Value: "metric_0", - }: {0, 1}, + }: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}}, metric.LabelPair{ Name: clientmodel.MetricNameLabel, Value: "metric_1", - }: {2}, + }: codable.FingerprintSet{2: struct{}{}}, metric.LabelPair{ Name: "label_1", Value: "value_1", - }: {0}, + }: codable.FingerprintSet{0: struct{}{}}, metric.LabelPair{ Name: "label_1", Value: "value_2", - }: {2}, + }: codable.FingerprintSet{2: struct{}{}}, metric.LabelPair{ Name: "label_2", Value: "value_2", - }: {1}, + }: codable.FingerprintSet{1: struct{}{}}, metric.LabelPair{ Name: "label_3", Value: "value_3", - }: {1}, + }: codable.FingerprintSet{1: struct{}{}}, }, }, { fpToMetric: index.FingerprintMetricMapping{ @@ -178,48 +190,61 @@ func TestIndexing(t *testing.T) { }, }, expectedLnToLvs: index.LabelNameLabelValuesMapping{ - clientmodel.MetricNameLabel: clientmodel.LabelValues{"metric_0", "metric_1", "metric_2"}, - "label_1": clientmodel.LabelValues{"value_1", "value_2", "value_3"}, - "label_2": clientmodel.LabelValues{"value_2"}, - "label_3": clientmodel.LabelValues{"value_1", "value_3"}, + clientmodel.MetricNameLabel: codable.LabelValueSet{ + "metric_0": struct{}{}, + "metric_1": struct{}{}, + "metric_2": struct{}{}, + }, + "label_1": codable.LabelValueSet{ + "value_1": struct{}{}, + "value_2": struct{}{}, + "value_3": struct{}{}, + }, + "label_2": codable.LabelValueSet{ + "value_2": struct{}{}, + }, + "label_3": codable.LabelValueSet{ + "value_1": struct{}{}, + "value_3": struct{}{}, + }, }, expectedLpToFps: index.LabelPairFingerprintsMapping{ metric.LabelPair{ Name: clientmodel.MetricNameLabel, Value: "metric_0", - }: {0, 1, 3}, + }: codable.FingerprintSet{0: struct{}{}, 1: struct{}{}, 3: struct{}{}}, metric.LabelPair{ Name: clientmodel.MetricNameLabel, Value: "metric_1", - }: {2, 5}, + }: codable.FingerprintSet{2: struct{}{}, 5: struct{}{}}, metric.LabelPair{ Name: clientmodel.MetricNameLabel, Value: "metric_2", - }: {4}, + }: codable.FingerprintSet{4: struct{}{}}, metric.LabelPair{ Name: "label_1", Value: "value_1", - }: {0}, + }: codable.FingerprintSet{0: struct{}{}}, metric.LabelPair{ Name: "label_1", Value: "value_2", - }: {2}, + }: codable.FingerprintSet{2: struct{}{}}, metric.LabelPair{ Name: "label_1", Value: "value_3", - }: {3, 5}, + }: codable.FingerprintSet{3: struct{}{}, 5: struct{}{}}, metric.LabelPair{ Name: "label_2", Value: "value_2", - }: {1, 4}, + }: codable.FingerprintSet{1: struct{}{}, 4: struct{}{}}, metric.LabelPair{ Name: "label_3", Value: "value_1", - }: {4}, + }: codable.FingerprintSet{4: struct{}{}}, metric.LabelPair{ Name: "label_3", Value: "value_3", - }: {1}, + }: codable.FingerprintSet{1: struct{}{}}, }, }, } @@ -230,24 +255,24 @@ func TestIndexing(t *testing.T) { indexedFpsToMetrics := index.FingerprintMetricMapping{} for i, b := range batches { for fp, m := range b.fpToMetric { - if err := p.IndexMetric(m, fp); err != nil { - t.Fatal(err) - } + p.IndexMetric(m, fp) if err := p.ArchiveMetric(fp, m, 1, 2); err != nil { t.Fatal(err) } indexedFpsToMetrics[fp] = m } + // TODO: Find a better solution than sleeping. + time.Sleep(2 * indexingBatchTimeout) verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) } for i := len(batches) - 1; i >= 0; i-- { b := batches[i] + // TODO: Find a better solution than sleeping. + time.Sleep(2 * indexingBatchTimeout) verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) for fp, m := range b.fpToMetric { - if err := p.UnindexMetric(m, fp); err != nil { - t.Fatal(err) - } + p.UnindexMetric(m, fp) unarchived, err := p.UnarchiveMetric(fp) if err != nil { t.Fatal(err) @@ -294,16 +319,13 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet t.Fatal(err) } - sort.Sort(lvs) - sort.Sort(outLvs) - - if len(lvs) != len(outLvs) { - t.Errorf("%d. different number of label values. Got: %d; want %d", i, len(outLvs), len(lvs)) + outSet := codable.LabelValueSet{} + for _, lv := range outLvs { + outSet[lv] = struct{}{} } - for j := range lvs { - if lvs[j] != outLvs[j] { - t.Errorf("%d.%d. label values don't match. Got: %s; want %s", i, j, outLvs[j], lvs[j]) - } + + if !reflect.DeepEqual(lvs, outSet) { + t.Errorf("%d. label values don't match. Got: %v; want %v", i, outSet, lvs) } } @@ -314,16 +336,13 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet t.Fatal(err) } - sort.Sort(fps) - sort.Sort(outFps) - - if len(fps) != len(outFps) { - t.Errorf("%d. %v: different number of fingerprints. Got: %d; want %d", i, lp, len(outFps), len(fps)) + outSet := codable.FingerprintSet{} + for _, fp := range outFps { + outSet[fp] = struct{}{} } - for j := range fps { - if fps[j] != outFps[j] { - t.Errorf("%d.%d. %v: fingerprints don't match. Got: %d; want %d", i, j, lp, outFps[j], fps[j]) - } + + if !reflect.DeepEqual(fps, outSet) { + t.Errorf("%d. %v: fingerprints don't match. Got: %v; want %v", i, lp, outSet, fps) } } } diff --git a/storage/local/storage.go b/storage/local/storage.go index 115b50a4d2..a803aa1bd7 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -149,9 +149,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(m clientmodel.Metric) *memorySer series.headChunkPersisted = true } else { // This was a genuinely new series, so index the metric. - if err := s.persistence.IndexMetric(m, fp); err != nil { - glog.Errorf("Error indexing metric %v: %v", m, err) - } + s.persistence.IndexMetric(m, fp) } } return series @@ -362,9 +360,7 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime if series, ok := s.fingerprintToSeries[fp]; ok { if series.purgeOlderThan(beforeTime) && allDropped { delete(s.fingerprintToSeries, fp) - if err := s.persistence.UnindexMetric(series.metric, fp); err != nil { - glog.Errorf("Error unindexing metric %v: %v", series.metric, err) - } + s.persistence.UnindexMetric(series.metric, fp) } return }