diff --git a/prompb/types.pb.go b/prompb/types.pb.go index b29170e535..e91dd55c4d 100644 --- a/prompb/types.pb.go +++ b/prompb/types.pb.go @@ -127,7 +127,7 @@ func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { type MetricMetadata struct { // Represents the metric type, these match the set from Prometheus. - // Refer to pkg/textparse/interface.go for details. + // Refer to model/textparse/interface.go for details. Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MetricMetadata_MetricType" json:"type,omitempty"` MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"` Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"` @@ -200,7 +200,7 @@ func (m *MetricMetadata) GetUnit() string { type Sample struct { Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -259,7 +259,7 @@ type Exemplar struct { // Optional, can be empty. Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"` - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` diff --git a/prompb/types.proto b/prompb/types.proto index ee11f3a001..6ba7074a5d 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -31,7 +31,7 @@ message MetricMetadata { } // Represents the metric type, these match the set from Prometheus. - // Refer to pkg/textparse/interface.go for details. + // Refer to model/textparse/interface.go for details. MetricType type = 1; string metric_family_name = 2; string help = 4; @@ -40,7 +40,7 @@ message MetricMetadata { message Sample { double value = 1; - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. int64 timestamp = 2; } @@ -49,7 +49,7 @@ message Exemplar { // Optional, can be empty. repeated Label labels = 1 [(gogoproto.nullable) = false]; double value = 2; - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. int64 timestamp = 3; } diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3c55283571..0f94467851 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -440,6 +440,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He decoded <- samples case record.Tombstones, record.Exemplars: // We don't care about tombstones or exemplars during replay. + // TODO: If decide to decode exemplars, we should make sure to prepopulate + // stripeSeries.exemplars in the next block by using setLatestExemplar. continue default: errCh <- &wal.CorruptionErr{ @@ -789,6 +791,16 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem } } + // Check for duplicate vs last stored exemplar for this series, and discard those. + // Otherwise, record the current exemplar as the latest. + // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. + prevExemplar := a.series.GetLatestExemplar(s.ref) + if prevExemplar != nil && prevExemplar.Equals(e) { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + a.series.SetLatestExemplar(s.ref, &e) + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ Ref: s.ref, T: e.Ts, @@ -796,6 +808,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem Labels: e.Labels, }) + a.metrics.totalAppendedExemplars.Inc() return storage.SeriesRef(s.ref), nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index a44b6c7286..a8acb64693 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -129,7 +129,7 @@ func TestCommit(t *testing.T) { e := exemplar.Exemplar{ Labels: lset, - Ts: sample[0].T(), + Ts: sample[0].T() + int64(i), Value: sample[0].V(), HasTs: true, } @@ -482,3 +482,56 @@ func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto return nil } + +func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.Background()) + defer s.Close() + + sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") + + // Write a few exemplars to our appender and call Commit(). + // If the Labels, Value or Timestamp are different than the last exemplar, + // then a new one should be appended; Otherwise, it should be skipped. + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Labels = labels.Labels{{Name: "b", Value: "2"}} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Value = 42 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Ts = 25 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + require.NoError(t, app.Commit()) + + // Read back what was written to the WAL. + var walExemplarsCount int + sr, err := wal.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var dec record.Decoder + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + } + } + + // We had 9 calls to AppendExemplar but only 4 of those should have gotten through. + require.Equal(t, 4, walExemplarsCount) +} diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index ddb64fc55f..d38518f71e 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -16,6 +16,7 @@ package agent import ( "sync" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" ) @@ -89,10 +90,11 @@ func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { // Filling the padded space with the maps was profiled to be slower - // likely due to the additional pointer dereferences. type stripeSeries struct { - size int - series []map[chunks.HeadSeriesRef]*memSeries - hashes []seriesHashmap - locks []stripeLock + size int + series []map[chunks.HeadSeriesRef]*memSeries + hashes []seriesHashmap + exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar + locks []stripeLock gcMut sync.Mutex } @@ -105,10 +107,11 @@ type stripeLock struct { func newStripeSeries(stripeSize int) *stripeSeries { s := &stripeSeries{ - size: stripeSize, - series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize), - hashes: make([]seriesHashmap, stripeSize), - locks: make([]stripeLock, stripeSize), + size: stripeSize, + series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize), + locks: make([]stripeLock, stripeSize), } for i := range s.series { s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} @@ -116,6 +119,9 @@ func newStripeSeries(stripeSize int) *stripeSeries { for i := range s.hashes { s.hashes[i] = seriesHashmap{} } + for i := range s.exemplars { + s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{} + } return s } @@ -154,6 +160,10 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { delete(s.series[refLock], series.ref) s.hashes[hashLock].Delete(hash, series.ref) + // Since the series is gone, we'll also delete + // the latest stored exemplar. + delete(s.exemplars[refLock], series.ref) + if hashLock != refLock { s.locks[refLock].Unlock() } @@ -201,3 +211,24 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) { s.hashes[hashLock].Set(hash, series) s.locks[hashLock].Unlock() } + +func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar { + i := uint64(ref) & uint64(s.size-1) + + s.locks[i].RLock() + exemplar := s.exemplars[i][ref] + s.locks[i].RUnlock() + + return exemplar +} + +func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) { + i := uint64(ref) & uint64(s.size-1) + + // Make sure that's a valid series id and record its latest exemplar + s.locks[i].Lock() + if s.series[i][ref] != nil { + s.exemplars[i][ref] = exemplar + } + s.locks[i].Unlock() +} diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 3232ed61e5..0039896329 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -530,7 +530,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt) s.nextAt = addJitterToChunkEndTime(s.hash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance) } - if t >= s.nextAt { + // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, + // most likely because samples rate has changed and now they are arriving more frequently. + // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk + // as we expect more chunks to come. + // Note that next chunk will have its nextAt recalculated for the new rate. + if t >= s.nextAt || numSamples >= samplesPerChunk*2 { c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 997eec2a67..c140c40e3b 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1313,6 +1313,50 @@ func TestMemSeries_append(t *testing.T) { } } +func TestMemSeries_append_atVariableRate(t *testing.T) { + const samplesPerChunk = 120 + dir := t.TempDir() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, chunkDiskMapper.Close()) + }) + + s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled) + + // At this slow rate, we will fill the chunk in two block durations. + slowRate := (DefaultBlockDuration * 2) / samplesPerChunk + + var nextTs int64 + var totalAppendedSamples int + for i := 0; i < samplesPerChunk/4; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "slow sample %d was not appended", i) + nextTs += slowRate + totalAppendedSamples++ + } + require.Equal(t, DefaultBlockDuration, s.nextAt, "after appending a samplesPerChunk/4 samples at a slow rate, we should aim to cut a new block at the default block duration %d, but it's set to %d", DefaultBlockDuration, s.nextAt) + + // Suddenly, the rate increases and we receive a sample every millisecond. + for i := 0; i < math.MaxUint16; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "quick sample %d was not appended", i) + nextTs++ + totalAppendedSamples++ + } + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper) + require.True(t, ok, "new chunk sample was not appended") + require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") + + var totalSamplesInChunks int + for i, c := range s.mmappedChunks { + totalSamplesInChunks += int(c.numSamples) + require.LessOrEqualf(t, c.numSamples, uint16(2*samplesPerChunk), "mmapped chunk %d has more than %d samples", i, 2*samplesPerChunk) + } + require.Equal(t, totalAppendedSamples, totalSamplesInChunks, "wrong number of samples in %d mmapped chunks", len(s.mmappedChunks)) +} + func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. h, _ := newTestHead(t, 1000, false)