track OOO-ness in tsdb into a histogram (#56)

* track OOO-ness in tsdb into a histogram

This should help with building an understanding of customer requirements
as far as how far back samples tend to go.
Note: this has already been discussed, reviewed and approved on:
https://github.com/grafana/mimir/pull/488

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* update the test files which were not in mimir/vendor
This commit is contained in:
Dieter Plaetinck 2021-11-29 06:14:35 -05:00 committed by GitHub
parent 848f3df106
commit 757f57e509
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 32 deletions

View file

@ -282,6 +282,7 @@ type headMetrics struct {
checkpointCreationTotal prometheus.Counter checkpointCreationTotal prometheus.Counter
mmapChunkCorruptionTotal prometheus.Counter mmapChunkCorruptionTotal prometheus.Counter
snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1. snapshotReplayErrorTotal prometheus.Counter // Will be either 0 or 1.
oooHistogram prometheus.Histogram
} }
func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
@ -380,6 +381,20 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
Name: "prometheus_tsdb_snapshot_replay_error_total", Name: "prometheus_tsdb_snapshot_replay_error_total",
Help: "Total number snapshot replays that failed.", Help: "Total number snapshot replays that failed.",
}), }),
oooHistogram: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_sample_ooo_delta",
Help: "Delta in seconds by which a sample is considered out of order.",
Buckets: []float64{
// Note that mimir distributor only gives us a range of wallclock-12h to wallclock+15min
60 * 10, // 10 min
60 * 30, // 30 min
60 * 60, // 60 min
60 * 60 * 2, // 2h
60 * 60 * 3, // 3h
60 * 60 * 6, // 6h
60 * 60 * 12, // 12h
},
}),
} }
if r != nil { if r != nil {

View file

@ -273,10 +273,11 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
s.Lock() s.Lock()
if err := s.appendable(t, v); err != nil { if delta, err := s.appendable(t, v); err != nil {
s.Unlock() s.Unlock()
if err == storage.ErrOutOfOrderSample { if err == storage.ErrOutOfOrderSample {
a.head.metrics.outOfOrderSamples.Inc() a.head.metrics.outOfOrderSamples.Inc()
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
} }
return 0, err return 0, err
} }
@ -300,24 +301,23 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
} }
// appendable checks whether the given sample is valid for appending to the series. // appendable checks whether the given sample is valid for appending to the series.
func (s *memSeries) appendable(t int64, v float64) error { func (s *memSeries) appendable(t int64, v float64) (int64, error) {
c := s.head() c := s.head()
if c == nil { if c == nil {
return nil return 0, nil
} }
if t > c.maxTime { if t > c.maxTime {
return nil return 0, nil
} }
if t < c.maxTime { if t < c.maxTime {
return storage.ErrOutOfOrderSample return c.maxTime - t, storage.ErrOutOfOrderSample
} }
// We are allowing exact duplicates as we can encounter them in valid cases // We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy. // like federation and erroring out at that time would be extremely noisy.
if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) { if math.Float64bits(s.sampleBuf[3].v) != math.Float64bits(v) {
return storage.ErrDuplicateSampleForTimestamp return 0, storage.ErrDuplicateSampleForTimestamp
} }
return nil return 0, nil
} }
// AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't // AppendExemplar for headAppender assumes the series ref already exists, and so it doesn't
@ -455,13 +455,14 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.samples { for i, s := range a.samples {
series = a.sampleSeries[i] series = a.sampleSeries[i]
series.Lock() series.Lock()
ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) delta, ok, chunkCreated := series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false series.pendingCommit = false
series.Unlock() series.Unlock()
if !ok { if !ok {
total-- total--
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
a.head.metrics.outOfOrderSamples.Inc() a.head.metrics.outOfOrderSamples.Inc()
} }
if chunkCreated { if chunkCreated {
@ -480,7 +481,7 @@ func (a *headAppender) Commit() (err error) {
// the appendID for isolation. (The appendID can be zero, which results in no // the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.) // isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio // Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases // so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples. // the time range within which we have to decompress all samples.
@ -491,7 +492,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
if c == nil { if c == nil {
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it. // Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return false, false return s.mmappedChunks[len(s.mmappedChunks)-1].maxTime - t, false, false
} }
// There is no chunk in this series yet, create the first chunk for the sample. // There is no chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, chunkDiskMapper) c = s.cutNewHeadChunk(t, chunkDiskMapper)
@ -500,7 +501,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// Out of order sample. // Out of order sample.
if c.maxTime >= t { if c.maxTime >= t {
return false, chunkCreated return c.maxTime - t, false, chunkCreated
} }
numSamples := c.chunk.NumSamples() numSamples := c.chunk.NumSamples()
@ -538,7 +539,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
s.txs.add(appendID) s.txs.add(appendID)
} }
return true, chunkCreated return 0, true, chunkCreated
} }
// computeChunkEndTime estimates the end timestamp based the beginning of a // computeChunkEndTime estimates the end timestamp based the beginning of a

View file

@ -557,7 +557,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool, defaultIsolationDisabled) s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 { for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) _, ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
require.True(t, ok, "sample append failed") require.True(t, ok, "sample append failed")
} }
@ -1098,19 +1098,19 @@ func TestMemSeries_append(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two // Add first two samples at the very end of a chunk range and the next two
// on and after it. // on and after it.
// New chunk must correctly be cut at 1000. // New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) _, ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper)
require.True(t, ok, "append failed") require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk") require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) _, ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper)
require.True(t, ok, "append failed") require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk") require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) _, ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper)
require.True(t, ok, "append failed") require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary") require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) _, ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper)
require.True(t, ok, "append failed") require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk") require.False(t, chunkCreated, "second sample should use same chunk")
@ -1123,7 +1123,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk. // at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ { for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) _, ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper)
require.True(t, ok, "append failed") require.True(t, ok, "append failed")
} }
@ -1149,18 +1149,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk. // Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) _, ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created") require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created") require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range. // A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created") require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created") require.False(t, chunkCreated, "chunks was created")
@ -1203,18 +1203,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk. // Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) _, ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created") require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created") require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range. // A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created") require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created") require.False(t, chunkCreated, "chunks was created")
@ -1459,10 +1459,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.True(t, created, "series was not created") require.True(t, created, "series was not created")
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) _, ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created") require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed") require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created") require.False(t, chunkCreated, "chunk was created")
require.NoError(t, h.chunkDiskMapper.CutNewFile()) require.NoError(t, h.chunkDiskMapper.CutNewFile())
@ -1808,7 +1808,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) _, ok, _ := s.append(0, 0, 0, h.chunkDiskMapper)
require.True(t, ok, "Series append failed.") require.True(t, ok, "Series append failed.")
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
} }
@ -2347,7 +2347,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil, defaultIsolationDisabled) s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil, defaultIsolationDisabled)
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) _, ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
require.True(t, ok, "sample append failed") require.True(t, ok, "sample append failed")
} }

View file

@ -412,7 +412,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head) (unknownRefs uint64) {
if s.T <= ms.mmMaxTime { if s.T <= ms.mmMaxTime {
continue continue
} }
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { if _, _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
h.metrics.chunksCreated.Inc() h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc() h.metrics.chunks.Inc()
} }