tsdb: remove chunkRange and oooCapMax from memSeries (#11288)

* tsdb: remove chunkRange from memSeries

chunkRange is the (oddly-named) configured duration for the head block.

We don't need a copy of this value per series. Pass it down where
required, and remove the copy.

The value in `Head` is only updated in `resetInMemoryState()`, which
also discards all `memSeries`.

* tsdb: remove oooCapMax from memSeries

oooCapMax is the configured maximum capacity for an out-of-order chunk.

Storing it per-series uses extra memory, and has surprising behaviour
if users change the value in config - series created before the change
will keep their old value.

Instead, pass it down where required, and remove the per-series value.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>
Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Bryan Boreham 2022-09-27 09:22:22 +01:00 committed by GitHub
parent f2ee959354
commit d0607435a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 59 deletions

View file

@ -1489,7 +1489,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, h.chunkRange.Load(), h.opts.OutOfOrderCapMax.Load(), h.opts.IsolationDisabled)
return newMemSeries(lset, id, h.opts.IsolationDisabled)
})
if err != nil {
return nil, false, err
@ -1779,9 +1779,7 @@ type memSeries struct {
oooHeadChunk *oooHeadChunk // Most recent chunk for ooo samples in memory that's still being built.
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
chunkRange int64
oooCapMax uint8
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
nextAt int64 // Timestamp at which to cut the next chunk.
@ -1800,13 +1798,11 @@ type memSeries struct {
pendingCommit bool // Whether there are samples waiting to be committed to this series.
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, chunkRange, oooCapMax int64, isolationDisabled bool) *memSeries {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, isolationDisabled bool) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
chunkRange: chunkRange,
nextAt: math.MinInt64,
oooCapMax: uint8(oooCapMax),
lset: lset,
ref: id,
nextAt: math.MinInt64,
}
if !isolationDisabled {
s.txs = newTxRing(4)

View file

@ -568,6 +568,8 @@ func (a *headAppender) Commit() (err error) {
wblSamples []record.RefSample
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
oooRecords [][]byte
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
chunkRange = a.head.chunkRange.Load()
series *memSeries
enc record.Encoder
)
@ -635,7 +637,7 @@ func (a *headAppender) Commit() (err error) {
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper)
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
@ -670,7 +672,7 @@ func (a *headAppender) Commit() (err error) {
samplesAppended--
}
} else if err == nil {
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
@ -723,9 +725,9 @@ func (a *headAppender) Commit() (err error) {
}
// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
c := s.oooHeadChunk
if c == nil || c.chunk.NumSamples() == int(s.oooCapMax) {
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
chunkCreated = true
@ -747,7 +749,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -761,7 +763,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
return false, false
}
// There is no head chunk in this series yet, create the first chunk for the sample.
c = s.cutNewHeadChunk(t, chunkDiskMapper)
c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange)
chunkCreated = true
}
@ -775,7 +777,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c.minTime = t
s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange)
s.nextAt = rangeForTimestamp(c.minTime, chunkRange)
}
// If we reach 25% of a chunk's desired sample count, predict an end time
@ -791,7 +793,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, chunkDiskMapper)
c = s.cutNewHeadChunk(t, chunkDiskMapper, chunkRange)
chunkCreated = true
}
s.app.Append(t, v)
@ -823,7 +825,7 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/n
}
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64) *memChunk {
s.mmapCurrentHeadChunk(chunkDiskMapper)
s.headChunk = &memChunk{
@ -834,7 +836,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDis
// Set upper bound on when the next chunk must be started. An earlier timestamp
// may be chosen dynamically at a later point.
s.nextAt = rangeForTimestamp(mint, s.chunkRange)
s.nextAt = rangeForTimestamp(mint, chunkRange)
app, err := s.headChunk.chunk.Appender()
if err != nil {

View file

@ -229,8 +229,8 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, c.mmappedChunkT, 1, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
require.NoError(b, chunkDiskMapper.Close())
@ -731,6 +731,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 2000
memChunkPool := sync.Pool{
New: func() interface{} {
@ -738,10 +739,10 @@ func TestMemSeries_truncateChunks(t *testing.T) {
},
}
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, 1, defaultIsolationDisabled)
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "sample append failed")
}
@ -1276,25 +1277,26 @@ func TestMemSeries_append(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 500
s := newMemSeries(labels.Labels{}, 1, 500, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper)
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper)
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper)
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper)
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1307,7 +1309,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper)
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "append failed")
}
@ -1330,8 +1332,9 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
t.Cleanup(func() {
require.NoError(t, chunkDiskMapper.Close())
})
chunkRange := DefaultBlockDuration
s := newMemSeries(labels.Labels{}, 1, DefaultBlockDuration, 0, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
// At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -1339,7 +1342,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
var nextTs int64
var totalAppendedSamples int
for i := 0; i < samplesPerChunk/4; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange)
require.Truef(t, ok, "slow sample %d was not appended", i)
nextTs += slowRate
totalAppendedSamples++
@ -1348,12 +1351,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
// Suddenly, the rate increases and we receive a sample every millisecond.
for i := 0; i < math.MaxUint16; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange)
require.Truef(t, ok, "quick sample %d was not appended", i)
nextTs++
totalAppendedSamples++
}
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper)
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "new chunk sample was not appended")
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
@ -1367,7 +1370,8 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
func TestGCChunkAccess(t *testing.T) {
// Put a chunk, select it. GC it and then access it.
h, _ := newTestHead(t, 1000, false, false)
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, false, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1377,18 +1381,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1419,7 +1423,8 @@ func TestGCChunkAccess(t *testing.T) {
func TestGCSeriesAccess(t *testing.T) {
// Put a series, select it. GC it and then access it.
h, _ := newTestHead(t, 1000, false, false)
const chunkRange = 1000
h, _ := newTestHead(t, chunkRange, false, false)
defer func() {
require.NoError(t, h.Close())
}()
@ -1429,18 +1434,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper)
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1676,10 +1681,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.True(t, created, "series was not created")
for i := 0; i < 7; i++ {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper)
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
h.chunkDiskMapper.CutNewFile()
@ -2027,7 +2032,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper)
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load())
require.True(t, ok, "Series append failed.")
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
}
@ -2489,11 +2494,12 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
defer func() {
require.NoError(t, chunkDiskMapper.Close())
}()
const chunkRange = 500
s := newMemSeries(labels.Labels{}, 1, 500, 1, defaultIsolationDisabled)
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
require.True(t, ok, "sample append failed")
}

View file

@ -439,6 +439,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
defer close(wp.output)
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
chunkRange := h.chunkRange.Load()
for in := range wp.input {
if in.existingSeries != nil {
@ -459,7 +460,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
@ -718,6 +719,7 @@ func (wp *wblSubsetProcessor) reuseBuf() []record.RefSample {
func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
defer close(wp.output)
oooCapMax := h.opts.OutOfOrderCapMax.Load()
// We don't check for minValidTime for ooo samples.
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range wp.input {
@ -728,7 +730,7 @@ func (wp *wblSubsetProcessor) processWBLSamples(h *Head) (unknownRefs uint64) {
unknownRefs++
continue
}
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper)
ok, chunkCreated, _ := ms.insert(s.T, s.V, h.chunkDiskMapper, oooCapMax)
if chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
@ -773,11 +775,10 @@ const (
)
type chunkSnapshotRecord struct {
ref chunks.HeadSeriesRef
lset labels.Labels
chunkRange int64
mc *memChunk
sampleBuf [4]sample
ref chunks.HeadSeriesRef
lset labels.Labels
mc *memChunk
sampleBuf [4]sample
}
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
@ -786,7 +787,7 @@ func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
buf.PutByte(chunkSnapshotRecordTypeSeries)
buf.PutBE64(uint64(s.ref))
record.EncodeLabels(&buf, s.lset)
buf.PutBE64int64(s.chunkRange)
buf.PutBE64int64(0) // Backwards-compatibility; was chunkRange but now unused.
s.Lock()
if s.headChunk == nil {
@ -820,7 +821,7 @@ func decodeSeriesFromChunkSnapshot(d *record.Decoder, b []byte) (csr chunkSnapsh
// TODO: figure out why DecodeLabels calls Sort(), and perhaps remove it.
csr.lset = d.DecodeLabels(&dec)
csr.chunkRange = dec.Be64int64()
_ = dec.Be64int64() // Was chunkRange but now unused.
if dec.Uvarint() == 0 {
return
}
@ -1216,7 +1217,6 @@ func (h *Head) loadChunkSnapshot() (int, int, map[chunks.HeadSeriesRef]*memSerie
}
}
series.chunkRange = csr.chunkRange
if csr.mc == nil {
continue
}