From 4deb1a90d2f1300bb92938aa3c5949fffc7b7ce4 Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Fri, 15 Apr 2022 12:12:43 +0300 Subject: [PATCH 1/3] Amend Protobuf docstrings referring to model/ packages Signed-off-by: Paschalis Tsilias --- prompb/types.pb.go | 6 +++--- prompb/types.proto | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/prompb/types.pb.go b/prompb/types.pb.go index b29170e535..e91dd55c4d 100644 --- a/prompb/types.pb.go +++ b/prompb/types.pb.go @@ -127,7 +127,7 @@ func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) { type MetricMetadata struct { // Represents the metric type, these match the set from Prometheus. - // Refer to pkg/textparse/interface.go for details. + // Refer to model/textparse/interface.go for details. Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MetricMetadata_MetricType" json:"type,omitempty"` MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"` Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"` @@ -200,7 +200,7 @@ func (m *MetricMetadata) GetUnit() string { type Sample struct { Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"` - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -259,7 +259,7 @@ type Exemplar struct { // Optional, can be empty. Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"` - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` diff --git a/prompb/types.proto b/prompb/types.proto index ee11f3a001..6ba7074a5d 100644 --- a/prompb/types.proto +++ b/prompb/types.proto @@ -31,7 +31,7 @@ message MetricMetadata { } // Represents the metric type, these match the set from Prometheus. - // Refer to pkg/textparse/interface.go for details. + // Refer to model/textparse/interface.go for details. MetricType type = 1; string metric_family_name = 2; string help = 4; @@ -40,7 +40,7 @@ message MetricMetadata { message Sample { double value = 1; - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. int64 timestamp = 2; } @@ -49,7 +49,7 @@ message Exemplar { // Optional, can be empty. repeated Label labels = 1 [(gogoproto.nullable) = false]; double value = 2; - // timestamp is in ms format, see pkg/timestamp/timestamp.go for + // timestamp is in ms format, see model/timestamp/timestamp.go for // conversion from time.Time to Prometheus timestamp. int64 timestamp = 3; } From 40c1efe8bc68f74bd857edd857e1fc442275b56e Mon Sep 17 00:00:00 2001 From: Paschalis Tsilias Date: Mon, 18 Apr 2022 18:41:04 +0300 Subject: [PATCH 2/3] tsdb/agent: Ignore duplicate exemplars (#10595) * tsdb/agent: Ignore duplicate exemplars Signed-off-by: Paschalis Tsilias * Make each exemplar unique in TestCommit Signed-off-by: Paschalis Tsilias * Re-Trigger CI for Windows and UI-related steps Signed-off-by: Paschalis Tsilias * Change test comment to properly re-trigger pipeline Signed-off-by: Paschalis Tsilias * Defer Close() calls for test agent and segment reader Signed-off-by: Paschalis Tsilias --- tsdb/agent/db.go | 13 ++++++++++ tsdb/agent/db_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++- tsdb/agent/series.go | 47 +++++++++++++++++++++++++++++------- 3 files changed, 106 insertions(+), 9 deletions(-) diff --git a/tsdb/agent/db.go b/tsdb/agent/db.go index 3c55283571..0f94467851 100644 --- a/tsdb/agent/db.go +++ b/tsdb/agent/db.go @@ -440,6 +440,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He decoded <- samples case record.Tombstones, record.Exemplars: // We don't care about tombstones or exemplars during replay. + // TODO: If decide to decode exemplars, we should make sure to prepopulate + // stripeSeries.exemplars in the next block by using setLatestExemplar. continue default: errCh <- &wal.CorruptionErr{ @@ -789,6 +791,16 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem } } + // Check for duplicate vs last stored exemplar for this series, and discard those. + // Otherwise, record the current exemplar as the latest. + // Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here. + prevExemplar := a.series.GetLatestExemplar(s.ref) + if prevExemplar != nil && prevExemplar.Equals(e) { + // Duplicate, don't return an error but don't accept the exemplar. + return 0, nil + } + a.series.SetLatestExemplar(s.ref, &e) + a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{ Ref: s.ref, T: e.Ts, @@ -796,6 +808,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem Labels: e.Labels, }) + a.metrics.totalAppendedExemplars.Inc() return storage.SeriesRef(s.ref), nil } diff --git a/tsdb/agent/db_test.go b/tsdb/agent/db_test.go index a44b6c7286..a8acb64693 100644 --- a/tsdb/agent/db_test.go +++ b/tsdb/agent/db_test.go @@ -129,7 +129,7 @@ func TestCommit(t *testing.T) { e := exemplar.Exemplar{ Labels: lset, - Ts: sample[0].T(), + Ts: sample[0].T() + int64(i), Value: sample[0].V(), HasTs: true, } @@ -482,3 +482,56 @@ func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto return nil } + +func TestStorage_DuplicateExemplarsIgnored(t *testing.T) { + s := createTestAgentDB(t, nil, DefaultOptions()) + app := s.Appender(context.Background()) + defer s.Close() + + sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0) + require.NoError(t, err, "should not reject valid series") + + // Write a few exemplars to our appender and call Commit(). + // If the Labels, Value or Timestamp are different than the last exemplar, + // then a new one should be appended; Otherwise, it should be skipped. + e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Labels = labels.Labels{{Name: "b", Value: "2"}} + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Value = 42 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + e.Ts = 25 + _, _ = app.AppendExemplar(sRef, nil, e) + _, _ = app.AppendExemplar(sRef, nil, e) + + require.NoError(t, app.Commit()) + + // Read back what was written to the WAL. + var walExemplarsCount int + sr, err := wal.NewSegmentsReader(s.wal.Dir()) + require.NoError(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var dec record.Decoder + for r.Next() { + rec := r.Record() + switch dec.Type(rec) { + case record.Exemplars: + var exemplars []record.RefExemplar + exemplars, err = dec.Exemplars(rec, exemplars) + require.NoError(t, err) + walExemplarsCount += len(exemplars) + } + } + + // We had 9 calls to AppendExemplar but only 4 of those should have gotten through. + require.Equal(t, 4, walExemplarsCount) +} diff --git a/tsdb/agent/series.go b/tsdb/agent/series.go index ddb64fc55f..d38518f71e 100644 --- a/tsdb/agent/series.go +++ b/tsdb/agent/series.go @@ -16,6 +16,7 @@ package agent import ( "sync" + "github.com/prometheus/prometheus/model/exemplar" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/tsdb/chunks" ) @@ -89,10 +90,11 @@ func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) { // Filling the padded space with the maps was profiled to be slower - // likely due to the additional pointer dereferences. type stripeSeries struct { - size int - series []map[chunks.HeadSeriesRef]*memSeries - hashes []seriesHashmap - locks []stripeLock + size int + series []map[chunks.HeadSeriesRef]*memSeries + hashes []seriesHashmap + exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar + locks []stripeLock gcMut sync.Mutex } @@ -105,10 +107,11 @@ type stripeLock struct { func newStripeSeries(stripeSize int) *stripeSeries { s := &stripeSeries{ - size: stripeSize, - series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize), - hashes: make([]seriesHashmap, stripeSize), - locks: make([]stripeLock, stripeSize), + size: stripeSize, + series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize), + hashes: make([]seriesHashmap, stripeSize), + exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize), + locks: make([]stripeLock, stripeSize), } for i := range s.series { s.series[i] = map[chunks.HeadSeriesRef]*memSeries{} @@ -116,6 +119,9 @@ func newStripeSeries(stripeSize int) *stripeSeries { for i := range s.hashes { s.hashes[i] = seriesHashmap{} } + for i := range s.exemplars { + s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{} + } return s } @@ -154,6 +160,10 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} { delete(s.series[refLock], series.ref) s.hashes[hashLock].Delete(hash, series.ref) + // Since the series is gone, we'll also delete + // the latest stored exemplar. + delete(s.exemplars[refLock], series.ref) + if hashLock != refLock { s.locks[refLock].Unlock() } @@ -201,3 +211,24 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) { s.hashes[hashLock].Set(hash, series) s.locks[hashLock].Unlock() } + +func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar { + i := uint64(ref) & uint64(s.size-1) + + s.locks[i].RLock() + exemplar := s.exemplars[i][ref] + s.locks[i].RUnlock() + + return exemplar +} + +func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) { + i := uint64(ref) & uint64(s.size-1) + + // Make sure that's a valid series id and record its latest exemplar + s.locks[i].Lock() + if s.series[i][ref] != nil { + s.exemplars[i][ref] = exemplar + } + s.locks[i].Unlock() +} From af0f6da5cb5bb9736abcfc9e8c7633ee01000ce2 Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Wed, 20 Apr 2022 14:54:20 +0200 Subject: [PATCH 3/3] Fix chunk overflow appending samples at a variable rate (#10607) * Add a test with variable samples rate append This test overflows the chunk created in memseries, and the total amount of samples in the (only) mmapped chunk is 29, instead of the 65565 appended ones. Signed-off-by: Oleg Zaytsev * Cut new chunk when rate prediction was wrong When appending samples at a slow rate, and then appending at a higher rate, the prediction we made to cut a new chunk is no longer valid. Sometimes this can even cause an overflow in the chunk, if more samples than uint16 can hold are appended. Signed-off-by: Oleg Zaytsev * Improve comment on 2*samplesPerChunk Signed-off-by: Oleg Zaytsev * Assert that all chunks have less than 240 samples Also, trigger new chunk at 240, not at more than 240 Signed-off-by: Oleg Zaytsev --- tsdb/head_append.go | 7 ++++++- tsdb/head_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 8f1e9bc54a..13d56de7bf 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -519,7 +519,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper if numSamples == samplesPerChunk/4 { s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) } - if t >= s.nextAt { + // If numSamples > samplesPerChunk*2 then our previous prediction was invalid, + // most likely because samples rate has changed and now they are arriving more frequently. + // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk + // as we expect more chunks to come. + // Note that next chunk will have its nextAt recalculated for the new rate. + if t >= s.nextAt || numSamples >= samplesPerChunk*2 { c = s.cutNewHeadChunk(t, chunkDiskMapper) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 54dc82cf94..7cd072a192 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1308,6 +1308,50 @@ func TestMemSeries_append(t *testing.T) { } } +func TestMemSeries_append_atVariableRate(t *testing.T) { + const samplesPerChunk = 120 + dir := t.TempDir() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, chunkDiskMapper.Close()) + }) + + s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, nil, defaultIsolationDisabled) + + // At this slow rate, we will fill the chunk in two block durations. + slowRate := (DefaultBlockDuration * 2) / samplesPerChunk + + var nextTs int64 + var totalAppendedSamples int + for i := 0; i < samplesPerChunk/4; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "slow sample %d was not appended", i) + nextTs += slowRate + totalAppendedSamples++ + } + require.Equal(t, DefaultBlockDuration, s.nextAt, "after appending a samplesPerChunk/4 samples at a slow rate, we should aim to cut a new block at the default block duration %d, but it's set to %d", DefaultBlockDuration, s.nextAt) + + // Suddenly, the rate increases and we receive a sample every millisecond. + for i := 0; i < math.MaxUint16; i++ { + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + require.Truef(t, ok, "quick sample %d was not appended", i) + nextTs++ + totalAppendedSamples++ + } + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper) + require.True(t, ok, "new chunk sample was not appended") + require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") + + var totalSamplesInChunks int + for i, c := range s.mmappedChunks { + totalSamplesInChunks += int(c.numSamples) + require.LessOrEqualf(t, c.numSamples, uint16(2*samplesPerChunk), "mmapped chunk %d has more than %d samples", i, 2*samplesPerChunk) + } + require.Equal(t, totalAppendedSamples, totalSamplesInChunks, "wrong number of samples in %d mmapped chunks", len(s.mmappedChunks)) +} + func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. h, _ := newTestHead(t, 1000, false)