diff --git a/tsdb/head.go b/tsdb/head.go index b17c8f7e75..1268470345 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -282,6 +282,7 @@ type headMetrics struct { checkpointCreationTotal prometheus.Counter mmapChunkCorruptionTotal prometheus.Counter snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. + oooHistogram prometheus.Histogram } func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { @@ -380,6 +381,20 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_snapshot_replay_error_total", Help: "Total number snapshot replays that failed.", }), + oooHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "prometheus_tsdb_sample_ooo_delta", + Help: "Delta in seconds by which a sample is considered out of order.", + Buckets: []float64{ + // Note that mimir distributor only gives us a range of wallclock-12h to wallclock+15min + 60 * 10, // 10 min + 60 * 30, // 30 min + 60 * 60, // 60 min + 60 * 60 * 2, // 2h + 60 * 60 * 3, // 3h + 60 * 60 * 6, // 6h + 60 * 60 * 12, // 12h + }, + }), } if r != nil { diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 4616e55b1c..9cf7908876 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -273,10 +273,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } s.Lock() - if err := s.appendable(t, v); err != nil { + if delta, err := s.appendable(t, v); err != nil { s.Unlock() if err == storage.ErrOutOfOrderSample { a.head.metrics.outOfOrderSamples.Inc() + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) } return 0, err } @@ -300,24 +301,23 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64 } // appendable checks whether the given sample is valid for appending to the series. -func (s *memSeries) appendable(t int64, v float64) error { +func (s *memSeries) appendable(t int64, v float64) (int64, error) { c := s.head() if c == nil { - return nil + return 0, nil } - if t > c.maxTime { - return nil + return 0, nil } if t < c.maxTime { - return storage.ErrOutOfOrderSample + return c.maxTime - t, storage.ErrOutOfOrderSample } // We are allowing exact duplicates as we can encounter them in valid cases // like federation and erroring out at that time would be extremely noisy. if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { - return storage.ErrDuplicateSampleForTimestamp + return 0, storage.ErrDuplicateSampleForTimestamp } - return nil + return 0, nil } // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't @@ -455,13 +455,14 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.samples { series = a.sampleSeries[i] series.Lock() - ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) + delta, ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() if !ok { total-- + a.head.metrics.oooHistogram.Observe(float64(delta) / 1000) a.head.metrics.outOfOrderSamples.Inc() } if chunkCreated { @@ -480,7 +481,7 @@ func (a *headAppender) Commit() (err error) { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -491,7 +492,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper if c == nil { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { // Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it. - return false, false + return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime - t, false, false } // There is no chunk in this series yet, create the first chunk for the sample. c = s.cutNewHeadChunk(t, chunkDiskMapper) @@ -500,7 +501,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // Out of order sample. if c.maxTime >= t { - return false, chunkCreated + return c.maxTime - t, false, chunkCreated } numSamples := c.chunk.NumSamples() @@ -538,7 +539,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper s.txs.add(appendID) } - return true, chunkCreated + return 0, true, chunkCreated } // computeChunkEndTime estimates the end timestamp based the beginning of a diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 043b5ce6e2..505babfe07 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -557,7 +557,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + _, ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) require.True(t, ok, "sample append failed") } @@ -1098,19 +1098,19 @@ func TestMemSeries_append(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) + _, ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) + _, ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) + _, ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) + _, ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1123,7 +1123,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) + _, ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) require.True(t, ok, "append failed") } @@ -1149,18 +1149,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) + _, ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1203,18 +1203,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) + _, ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1459,10 +1459,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := 0; i < 7; i++ { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) + _, ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) + _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") require.NoError(t, h.chunkDiskMapper.CutNewFile()) @@ -1808,7 +1808,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) + _, ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) require.True(t, ok, "Series append failed.") require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -2347,7 +2347,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil, defaultIsolationDisabled) for i := 0; i < 7; i++ { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + _, ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) require.True(t, ok, "sample append failed") } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 8191284932..da9c0e5652 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -412,7 +412,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) { if s.T <= ms.mmMaxTime { continue } - if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { + if _, _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() }