From d0607435a29b542a03d5c83f89cf17f0447ecacf Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 27 Sep 2022 09:22:22 +0100 Subject: [PATCH] tsdb: remove chunkRange and oooCapMax from memSeries (#11288) * tsdb: remove chunkRange from memSeries chunkRange is the (oddly-named) configured duration for the head block. We don't need a copy of this value per series. Pass it down where required, and remove the copy. The value in `Head` is only updated in `resetInMemoryState()`, which also discards all `memSeries`. * tsdb: remove oooCapMax from memSeries oooCapMax is the configured maximum capacity for an out-of-order chunk. Storing it per-series uses extra memory, and has surprising behaviour if users change the value in config - series created before the change will keep their old value. Instead, pass it down where required, and remove the per-series value. Signed-off-by: Bryan Boreham Signed-off-by: Ganesh Vernekar Co-authored-by: Ganesh Vernekar --- tsdb/head.go | 16 +++++------- tsdb/head_append.go | 22 +++++++++------- tsdb/head_test.go | 64 +++++++++++++++++++++++++-------------------- tsdb/head_wal.go | 20 +++++++------- 4 files changed, 63 insertions(+), 59 deletions(-) diff --git a/tsdb/head.go b/tsdb/head.go index 8aa5aa2c8..79232a0a8 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1489,7 +1489,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, h.chunkRange.Load(), h.opts.OutOfOrderCapMax.Load(), h.opts.IsolationDisabled) + return newMemSeries(lset, id, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1779,9 +1779,7 @@ type memSeries struct { oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built. firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0] - mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. - chunkRange int64 - oooCapMax uint8 + mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. nextAt int64 // Timestamp at which to cut the next chunk. @@ -1800,13 +1798,11 @@ type memSeries struct { pendingCommit bool // Whether there are samples waiting to be committed to this series. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange, oooCapMax int64, isolationDisabled bool) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries { s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, - oooCapMax: uint8(oooCapMax), + lset: lset, + ref: id, + nextAt: math.MinInt64, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index cbd6ad8e2..48682ea28 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -568,6 +568,8 @@ func (a *headAppender) Commit() (err error) { wblSamples []record.RefSample oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef oooRecords [][]byte + oooCapMax = a.head.opts.OutOfOrderCapMax.Load() + chunkRange = a.head.chunkRange.Load() series *memSeries enc record.Encoder ) @@ -635,7 +637,7 @@ func (a *headAppender) Commit() (err error) { // Sample is OOO and OOO handling is enabled // and the delta is within the OOO tolerance. var mmapRef chunks.ChunkDiskMapperRef - ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper) + ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax) if chunkCreated { r, ok := oooMmapMarkers[series.ref] if !ok || r != 0 { @@ -670,7 +672,7 @@ func (a *headAppender) Commit() (err error) { samplesAppended-- } } else if err == nil { - ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper) + ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange) if ok { if s.T < inOrderMint { inOrderMint = s.T @@ -723,9 +725,9 @@ func (a *headAppender) Commit() (err error) { } // insert is like append, except it inserts. Used for OOO samples. -func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { +func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) { c := s.oooHeadChunk - if c == nil || c.chunk.NumSamples() == int(s.oooCapMax) { + if c == nil || c.chunk.NumSamples() == int(oooCapMax) { // Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks. c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper) chunkCreated = true @@ -747,7 +749,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -761,7 +763,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper return false, false } // There is no head chunk in this series yet, create the first chunk for the sample. - c = s.cutNewHeadChunk(t, chunkDiskMapper) + c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange) chunkCreated = true } @@ -775,7 +777,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // It could be the new chunk created after reading the chunk snapshot, // hence we fix the minTime of the chunk here. c.minTime = t - s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange) + s.nextAt = rangeForTimestamp(c.minTime, chunkRange) } // If we reach 25% of a chunk's desired sample count, predict an end time @@ -791,7 +793,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. if t >= s.nextAt || numSamples >= samplesPerChunk*2 { - c = s.cutNewHeadChunk(t, chunkDiskMapper) + c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange) chunkCreated = true } s.app.Append(t, v) @@ -823,7 +825,7 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/n } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ @@ -834,7 +836,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDis // Set upper bound on when the next chunk must be started. An earlier timestamp // may be chosen dynamically at a later point. - s.nextAt = rangeForTimestamp(mint, s.chunkRange) + s.nextAt = rangeForTimestamp(mint, chunkRange) app, err := s.headChunk.chunk.Appender() if err != nil { diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 489dad65c..3a6266e22 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -229,8 +229,8 @@ func BenchmarkLoadWAL(b *testing.B) { require.NoError(b, err) for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. - s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, 1, defaultIsolationDisabled) - s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper) + s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled) + s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT) s.mmapCurrentHeadChunk(chunkDiskMapper) } require.NoError(b, chunkDiskMapper.Close()) @@ -731,6 +731,7 @@ func TestMemSeries_truncateChunks(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() + const chunkRange = 2000 memChunkPool := sync.Pool{ New: func() interface{} { @@ -738,10 +739,10 @@ func TestMemSeries_truncateChunks(t *testing.T) { }, } - s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, 1, defaultIsolationDisabled) + s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) require.True(t, ok, "sample append failed") } @@ -1276,25 +1277,26 @@ func TestMemSeries_append(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() + const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, 500, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) + ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) + ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) + ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) + ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1307,7 +1309,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) + ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange) require.True(t, ok, "append failed") } @@ -1330,8 +1332,9 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { t.Cleanup(func() { require.NoError(t, chunkDiskMapper.Close()) }) + chunkRange := DefaultBlockDuration - s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, 0, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -1339,7 +1342,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := 0; i < samplesPerChunk/4; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -1348,12 +1351,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := 0; i < math.MaxUint16; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper) + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper) + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -1367,7 +1370,8 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { func TestGCChunkAccess(t *testing.T) { // Put a chunk, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false, false) + const chunkRange = 1000 + h, _ := newTestHead(t, chunkRange, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1377,18 +1381,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1419,7 +1423,8 @@ func TestGCChunkAccess(t *testing.T) { func TestGCSeriesAccess(t *testing.T) { // Put a series, select it. GC it and then access it. - h, _ := newTestHead(t, 1000, false, false) + const chunkRange = 1000 + h, _ := newTestHead(t, chunkRange, false, false) defer func() { require.NoError(t, h.Close()) }() @@ -1429,18 +1434,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1676,10 +1681,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := 0; i < 7; i++ { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) + ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) + ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") h.chunkDiskMapper.CutNewFile() @@ -2027,7 +2032,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) + ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load()) require.True(t, ok, "Series append failed.") require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -2489,11 +2494,12 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { defer func() { require.NoError(t, chunkDiskMapper.Close()) }() + const chunkRange = 500 - s := newMemSeries(labels.Labels{}, 1, 500, 1, defaultIsolationDisabled) + s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled) for i := 0; i < 7; i++ { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) require.True(t, ok, "sample append failed") } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 8bbe33cc4..a69cd8019 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -439,6 +439,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp defer close(wp.output) mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) + chunkRange := h.chunkRange.Load() for in := range wp.input { if in.existingSeries != nil { @@ -459,7 +460,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } - if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -718,6 +719,7 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample { func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { defer close(wp.output) + oooCapMax := h.opts.OutOfOrderCapMax.Load() // We don't check for minValidTime for ooo samples. mint, maxt := int64(math.MaxInt64), int64(math.MinInt64) for samples := range wp.input { @@ -728,7 +730,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) { unknownRefs++ continue } - ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper) + ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax) if chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() @@ -773,11 +775,10 @@ const ( ) type chunkSnapshotRecord struct { - ref chunks.HeadSeriesRef - lset labels.Labels - chunkRange int64 - mc *memChunk - sampleBuf [4]sample + ref chunks.HeadSeriesRef + lset labels.Labels + mc *memChunk + sampleBuf [4]sample } func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { @@ -786,7 +787,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte { buf.PutByte(chunkSnapshotRecordTypeSeries) buf.PutBE64(uint64(s.ref)) record.EncodeLabels(&buf, s.lset) - buf.PutBE64int64(s.chunkRange) + buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused. s.Lock() if s.headChunk == nil { @@ -820,7 +821,7 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh // TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it. csr.lset = d.DecodeLabels(&dec) - csr.chunkRange = dec.Be64int64() + _ = dec.Be64int64() // Was chunkRange but now unused. if dec.Uvarint() == 0 { return } @@ -1216,7 +1217,6 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie } } - series.chunkRange = csr.chunkRange if csr.mc == nil { continue }