Remove samplesPerChunk from memSeries (#12390)

Signed-off-by: Justin Lei <justin.lei@grafana.com>
This commit is contained in:
Justin Lei 2023-05-25 02:18:41 -07:00 committed by Bryan Boreham
parent 31164a7373
commit d0fea47a9c
4 changed files with 51 additions and 53 deletions

View file

@ -1641,7 +1641,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled, h.opts.SamplesPerChunk)
return newMemSeries(lset, id, labels.StableHash(lset), h.opts.ChunkEndTimeVariance, h.opts.IsolationDisabled)
})
if err != nil {
return nil, false, err
@ -1956,8 +1956,7 @@ type memSeries struct {
// to spread chunks writing across time. Doesn't apply to the last chunk of the chunk range. 0 to disable variance.
chunkEndTimeVariance float64
samplesPerChunk int // Target number of samples per chunk.
nextAt int64 // Timestamp at which to cut the next chunk.
nextAt int64 // Timestamp at which to cut the next chunk.
// We keep the last value here (in addition to appending it to the chunk) so we can check for duplicates.
lastValue float64
@ -1985,14 +1984,13 @@ type memSeriesOOOFields struct {
firstOOOChunkID chunks.HeadChunkID // HeadOOOChunkID for oooMmappedChunks[0].
}
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool, samplesPerChunk int) *memSeries {
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, shardHash uint64, chunkEndTimeVariance float64, isolationDisabled bool) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
nextAt: math.MinInt64,
chunkEndTimeVariance: chunkEndTimeVariance,
shardHash: shardHash,
samplesPerChunk: samplesPerChunk,
}
if !isolationDisabled {
s.txs = newTxRing(4)

View file

@ -994,7 +994,7 @@ func (a *headAppender) Commit() (err error) {
samplesAppended--
}
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange)
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
if ok {
if s.T < inOrderMint {
inOrderMint = s.T
@ -1023,7 +1023,7 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.histograms {
series = a.histogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange)
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1049,7 +1049,7 @@ func (a *headAppender) Commit() (err error) {
for i, s := range a.floatHistograms {
series = a.floatHistogramSeries[i]
series.Lock()
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange)
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
series.pendingCommit = false
series.Unlock()
@ -1129,8 +1129,8 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper chunkDiskMapper,
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange)
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange, samplesPerChunk)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1151,7 +1151,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
// appendHistogram adds the histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
@ -1164,7 +1164,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1245,7 +1245,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
// appendFloatHistogram adds the float histogram.
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64) (sampleInOrder, chunkCreated bool) {
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
// Head controls the execution of recoding, so that we own the proper
// chunk reference afterwards. We check for Appendable from appender before
// appendPreprocessor because in case it ends up creating a new chunk,
@ -1258,7 +1258,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
pMergedSpans, nMergedSpans []histogram.Span
okToAppend, counterReset, gauge bool
)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
if !sampleInOrder {
return sampleInOrder, chunkCreated
}
@ -1341,7 +1341,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
// This should be called only when appending data.
func (s *memSeries) appendPreprocessor(
t int64, e chunkenc.Encoding, chunkDiskMapper chunkDiskMapper, chunkRange int64,
t int64, e chunkenc.Encoding, chunkDiskMapper chunkDiskMapper, chunkRange int64, samplesPerChunk int,
) (c *memChunk, sampleInOrder, chunkCreated bool) {
c = s.head()
@ -1379,7 +1379,7 @@ func (s *memSeries) appendPreprocessor(
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == s.samplesPerChunk/4 {
if numSamples == samplesPerChunk/4 {
maxNextAt := s.nextAt
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt)
@ -1390,7 +1390,7 @@ func (s *memSeries) appendPreprocessor(
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
// as we expect more chunks to come.
// Note that next chunk will have its nextAt recalculated for the new rate.
if t >= s.nextAt || numSamples >= s.samplesPerChunk*2 {
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
c = s.cutNewHeadChunk(t, e, chunkDiskMapper, chunkRange)
chunkCreated = true
}

View file

@ -286,8 +286,8 @@ func BenchmarkLoadWAL(b *testing.B) {
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
lbls := labels.Labels{}
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT)
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, labels.StableHash(lbls), 0, defaultIsolationDisabled)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT, DefaultSamplesPerChunk)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
require.NoError(b, chunkDiskMapper.Close())
@ -809,10 +809,10 @@ func TestMemSeries_truncateChunks(t *testing.T) {
}
lbls := labels.FromStrings("a", "b")
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
for i := 0; i < 4000; i += 5 {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "sample append failed")
}
@ -1341,24 +1341,24 @@ func TestMemSeries_append(t *testing.T) {
const chunkRange = 500
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange)
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1371,7 +1371,7 @@ func TestMemSeries_append(t *testing.T) {
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
// at approximately 120 samples per chunk.
for i := 1; i < 1000; i++ {
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
}
@ -1396,7 +1396,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
chunkRange := int64(1000)
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
histograms := tsdbutil.GenerateTestHistograms(4)
histogramWithOneMoreBucket := histograms[3].Copy()
@ -1408,19 +1408,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
// Add first two samples at the very end of a chunk range and the next two
// on and after it.
// New chunk must correctly be cut at 1000.
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange)
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "first sample created chunk")
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.True(t, chunkCreated, "expected new chunk on boundary")
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "second sample should use same chunk")
@ -1430,7 +1430,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange)
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "append failed")
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
@ -1453,7 +1453,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
chunkRange := DefaultBlockDuration
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
// At this slow rate, we will fill the chunk in two block durations.
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
@ -1461,7 +1461,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
var nextTs int64
var totalAppendedSamples int
for i := 0; i < samplesPerChunk/4; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange)
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.Truef(t, ok, "slow sample %d was not appended", i)
nextTs += slowRate
totalAppendedSamples++
@ -1470,12 +1470,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
// Suddenly, the rate increases and we receive a sample every millisecond.
for i := 0; i < math.MaxUint16; i++ {
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange)
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.Truef(t, ok, "quick sample %d was not appended", i)
nextTs++
totalAppendedSamples++
}
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange)
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "new chunk sample was not appended")
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
@ -1500,18 +1500,18 @@ func TestGCChunkAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1553,18 +1553,18 @@ func TestGCSeriesAccess(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
// Appending 2 samples for the first chunk.
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
// A new chunks should be created here as it's beyond the chunk range.
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunks was not created")
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunks was created")
@ -1800,10 +1800,10 @@ func TestHeadReadWriterRepair(t *testing.T) {
require.True(t, created, "series was not created")
for i := 0; i < 7; i++ {
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.True(t, chunkCreated, "chunk was not created")
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange)
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
require.NoError(t, h.chunkDiskMapper.CutNewFile())
@ -2151,7 +2151,7 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load())
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load(), DefaultSamplesPerChunk)
require.True(t, ok, "Series append failed.")
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
}
@ -2677,10 +2677,10 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
const chunkRange = 500
lbls := labels.Labels{}
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled, DefaultSamplesPerChunk)
s := newMemSeries(lbls, 1, labels.StableHash(lbls), 0, defaultIsolationDisabled)
for i := 0; i < 7; i++ {
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange)
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
require.True(t, ok, "sample append failed")
}

View file

@ -590,7 +590,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
if s.T <= ms.mmMaxTime {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange); chunkCreated {
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk); chunkCreated {
h.metrics.chunksCreated.Inc()
h.metrics.chunks.Inc()
}
@ -620,9 +620,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
}
var chunkCreated bool
if s.h != nil {
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange)
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
} else {
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange)
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
}
if chunkCreated {
h.metrics.chunksCreated.Inc()