diff --git a/tsdb/head.go b/tsdb/head.go index 14a7915700..9697c70923 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1641,7 +1641,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) { s, created, err := h.series.getOrSet(hash, lset, func() *memSeries { - return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled, h.opts.SamplesPerChunk) + return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled) }) if err != nil { return nil, false, err @@ -1956,8 +1956,7 @@ type memSeries struct { // to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 to disable variance. chunkEndTimeVariance float64 - samplesPerChunk int // Target number of samples per chunk. - nextAt int64 // Timestamp at which to cut the next chunk. + nextAt int64 // Timestamp at which to cut the next chunk. // We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates. lastValue float64 @@ -1985,14 +1984,13 @@ type memSeriesOOOFields struct { firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0]. } -func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool, samplesPerChunk int) *memSeries { +func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries { s := &memSeries{ lset: lset, ref: id, nextAt: math.MinInt64, chunkEndTimeVariance: chunkEndTimeVariance, shardHash: shardHash, - samplesPerChunk: samplesPerChunk, } if !isolationDisabled { s.txs = newTxRing(4) diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 0065d7bfa9..249739721d 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -994,7 +994,7 @@ func (a *headAppender) Commit() (err error) { samplesAppended-- } default: - ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange) + ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) if ok { if s.T < inOrderMint { inOrderMint = s.T @@ -1023,7 +1023,7 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.histograms { series = a.histogramSeries[i] series.Lock() - ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange) + ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() @@ -1049,7 +1049,7 @@ func (a *headAppender) Commit() (err error) { for i, s := range a.floatHistograms { series = a.floatHistogramSeries[i] series.Lock() - ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange) + ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk) series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow) series.pendingCommit = false series.Unlock() @@ -1129,8 +1129,8 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper chunkDiskMapper, // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange) +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange, samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1151,7 +1151,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper // appendHistogram adds the histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, @@ -1164,7 +1164,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange, samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1245,7 +1245,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui // appendFloatHistogram adds the float histogram. // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) { +func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) { // Head controls the execution of recoding, so that we own the proper // chunk reference afterwards. We check for Appendable from appender before // appendPreprocessor because in case it ends up creating a new chunk, @@ -1258,7 +1258,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, pMergedSpans, nMergedSpans []histogram.Span okToAppend, counterReset, gauge bool ) - c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange) + c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange, samplesPerChunk) if !sampleInOrder { return sampleInOrder, chunkCreated } @@ -1341,7 +1341,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. // This should be called only when appending data. func (s *memSeries) appendPreprocessor( - t int64, e chunkenc.Encoding, chunkDiskMapper chunkDiskMapper, chunkRange int64, + t int64, e chunkenc.Encoding, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int, ) (c *memChunk, sampleInOrder, chunkCreated bool) { c = s.head() @@ -1379,7 +1379,7 @@ func (s *memSeries) appendPreprocessor( // for this chunk that will try to make samples equally distributed within // the remaining chunks in the current chunk range. // At latest it must happen at the timestamp set when the chunk was cut. - if numSamples == s.samplesPerChunk/4 { + if numSamples == samplesPerChunk/4 { maxNextAt := s.nextAt s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt) @@ -1390,7 +1390,7 @@ func (s *memSeries) appendPreprocessor( // Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk // as we expect more chunks to come. // Note that next chunk will have its nextAt recalculated for the new rate. - if t >= s.nextAt || numSamples >= s.samplesPerChunk*2 { + if t >= s.nextAt || numSamples >= samplesPerChunk*2 { c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange) chunkCreated = true } diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 6122d28466..fd2ceb6c34 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -286,8 +286,8 @@ func BenchmarkLoadWAL(b *testing.B) { for k := 0; k < c.batches*c.seriesPerBatch; k++ { // Create one mmapped chunk per series, with one sample at the given time. lbls := labels.Labels{} - s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) - s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT) + s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled) + s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT, DefaultSamplesPerChunk) s.mmapCurrentHeadChunk(chunkDiskMapper) } require.NoError(b, chunkDiskMapper.Close()) @@ -809,10 +809,10 @@ func TestMemSeries_truncateChunks(t *testing.T) { } lbls := labels.FromStrings("a", "b") - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) for i := 0; i < 4000; i += 5 { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "sample append failed") } @@ -1341,24 +1341,24 @@ func TestMemSeries_append(t *testing.T) { const chunkRange = 500 lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange) + ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1371,7 +1371,7 @@ func TestMemSeries_append(t *testing.T) { // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut // at approximately 120 samples per chunk. for i := 1; i < 1000; i++ { - ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange) + ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") } @@ -1396,7 +1396,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { chunkRange := int64(1000) lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) histograms := tsdbutil.GenerateTestHistograms(4) histogramWithOneMoreBucket := histograms[3].Copy() @@ -1408,19 +1408,19 @@ func TestMemSeries_appendHistogram(t *testing.T) { // Add first two samples at the very end of a chunk range and the next two // on and after it. // New chunk must correctly be cut at 1000. - ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange) + ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.True(t, chunkCreated, "first sample created chunk") - ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") - ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.True(t, chunkCreated, "expected new chunk on boundary") - ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.False(t, chunkCreated, "second sample should use same chunk") @@ -1430,7 +1430,7 @@ func TestMemSeries_appendHistogram(t *testing.T) { require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") - ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange) + ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "append failed") require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") @@ -1453,7 +1453,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { chunkRange := DefaultBlockDuration lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) // At this slow rate, we will fill the chunk in two block durations. slowRate := (DefaultBlockDuration * 2) / samplesPerChunk @@ -1461,7 +1461,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { var nextTs int64 var totalAppendedSamples int for i := 0; i < samplesPerChunk/4; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange) + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.Truef(t, ok, "slow sample %d was not appended", i) nextTs += slowRate totalAppendedSamples++ @@ -1470,12 +1470,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) { // Suddenly, the rate increases and we receive a sample every millisecond. for i := 0; i < math.MaxUint16; i++ { - ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange) + ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.Truef(t, ok, "quick sample %d was not appended", i) nextTs++ totalAppendedSamples++ } - ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange) + ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "new chunk sample was not appended") require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk") @@ -1500,18 +1500,18 @@ func TestGCChunkAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1553,18 +1553,18 @@ func TestGCSeriesAccess(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) // Appending 2 samples for the first chunk. - ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") // A new chunks should be created here as it's beyond the chunk range. - ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunks was not created") - ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunks was created") @@ -1800,10 +1800,10 @@ func TestHeadReadWriterRepair(t *testing.T) { require.True(t, created, "series was not created") for i := 0; i < 7; i++ { - ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.True(t, chunkCreated, "chunk was not created") - ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange) + ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") require.NoError(t, h.chunkDiskMapper.CutNewFile()) @@ -2151,7 +2151,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) { s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) - ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load()) + ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load(), DefaultSamplesPerChunk) require.True(t, ok, "Series append failed.") require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") } @@ -2677,10 +2677,10 @@ func TestIteratorSeekIntoBuffer(t *testing.T) { const chunkRange = 500 lbls := labels.Labels{} - s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk) + s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled) for i := 0; i < 7; i++ { - ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange) + ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk) require.True(t, ok, "sample append failed") } diff --git a/tsdb/head_wal.go b/tsdb/head_wal.go index 713fd5573a..560fd10b70 100644 --- a/tsdb/head_wal.go +++ b/tsdb/head_wal.go @@ -590,7 +590,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp if s.T <= ms.mmMaxTime { continue } - if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated { + if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk); chunkCreated { h.metrics.chunksCreated.Inc() h.metrics.chunks.Inc() } @@ -620,9 +620,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp } var chunkCreated bool if s.h != nil { - _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange) + _, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk) } else { - _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange) + _, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk) } if chunkCreated { h.metrics.chunksCreated.Inc()