mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
storage: Split chunks if more than 120 samples (#8582)
* storage: Split chunks if more than 120 samples Signed-off-by: Matthias Loibl <mail@matthiasloibl.com> * storage: Don't set maxt which is overwritten right away Signed-off-by: Matthias Loibl <mail@matthiasloibl.com> * storage: Improve comments on merge_test Signed-off-by: Matthias Loibl <mail@matthiasloibl.com> * storage: Improve comments and move code closer to usage Signed-off-by: Matthias Loibl <mail@matthiasloibl.com> * tsdb/tsdbutil: Add comment for GenerateSamples Signed-off-by: Matthias Loibl <mail@matthiasloibl.com>
This commit is contained in:
parent
4bf7de3b56
commit
7e7efaba32
|
@ -465,6 +465,27 @@ func TestCompactingChunkSeriesMerger(t *testing.T) {
|
||||||
[]tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
|
[]tsdbutil.Sample{sample{31, 31}, sample{35, 35}},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "110 overlapping",
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 110)), // [0 - 110)
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 50)), // [60 - 110)
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
tsdbutil.GenerateSamples(0, 110),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "150 overlapping samples, split chunk",
|
||||||
|
input: []ChunkSeries{
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(0, 90)), // [0 - 90)
|
||||||
|
NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"), tsdbutil.GenerateSamples(60, 90)), // [90 - 150)
|
||||||
|
},
|
||||||
|
expected: NewListChunkSeriesFromSamples(labels.FromStrings("bar", "baz"),
|
||||||
|
tsdbutil.GenerateSamples(0, 120),
|
||||||
|
tsdbutil.GenerateSamples(120, 30),
|
||||||
|
),
|
||||||
|
},
|
||||||
} {
|
} {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
merged := m(tc.input...)
|
merged := m(tc.input...)
|
||||||
|
|
|
@ -217,7 +217,8 @@ type seriesToChunkEncoder struct {
|
||||||
Series
|
Series
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(bwplotka): Currently encoder will just naively build one chunk, without limit. Split it: https://github.com/prometheus/tsdb/issues/670
|
const seriesToChunkEncoderSplit = 120
|
||||||
|
|
||||||
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
chk := chunkenc.NewXORChunk()
|
chk := chunkenc.NewXORChunk()
|
||||||
app, err := chk.Appender()
|
app, err := chk.Appender()
|
||||||
|
@ -227,8 +228,28 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
mint := int64(math.MaxInt64)
|
mint := int64(math.MaxInt64)
|
||||||
maxt := int64(math.MinInt64)
|
maxt := int64(math.MinInt64)
|
||||||
|
|
||||||
|
chks := []chunks.Meta{}
|
||||||
|
|
||||||
|
i := 0
|
||||||
seriesIter := s.Series.Iterator()
|
seriesIter := s.Series.Iterator()
|
||||||
for seriesIter.Next() {
|
for seriesIter.Next() {
|
||||||
|
// Create a new chunk if too many samples in the current one.
|
||||||
|
if i >= seriesToChunkEncoderSplit {
|
||||||
|
chks = append(chks, chunks.Meta{
|
||||||
|
MinTime: mint,
|
||||||
|
MaxTime: maxt,
|
||||||
|
Chunk: chk,
|
||||||
|
})
|
||||||
|
chk = chunkenc.NewXORChunk()
|
||||||
|
app, err = chk.Appender()
|
||||||
|
if err != nil {
|
||||||
|
return errChunksIterator{err: err}
|
||||||
|
}
|
||||||
|
mint = int64(math.MaxInt64)
|
||||||
|
// maxt is immediately overwritten below which is why setting it here won't make a difference.
|
||||||
|
i = 0
|
||||||
|
}
|
||||||
|
|
||||||
t, v := seriesIter.At()
|
t, v := seriesIter.At()
|
||||||
app.Append(t, v)
|
app.Append(t, v)
|
||||||
|
|
||||||
|
@ -236,16 +257,19 @@ func (s *seriesToChunkEncoder) Iterator() chunks.Iterator {
|
||||||
if mint == math.MaxInt64 {
|
if mint == math.MaxInt64 {
|
||||||
mint = t
|
mint = t
|
||||||
}
|
}
|
||||||
|
i++
|
||||||
}
|
}
|
||||||
if err := seriesIter.Err(); err != nil {
|
if err := seriesIter.Err(); err != nil {
|
||||||
return errChunksIterator{err: err}
|
return errChunksIterator{err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewListChunkSeriesIterator(chunks.Meta{
|
chks = append(chks, chunks.Meta{
|
||||||
MinTime: mint,
|
MinTime: mint,
|
||||||
MaxTime: maxt,
|
MaxTime: maxt,
|
||||||
Chunk: chk,
|
Chunk: chk,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
return NewListChunkSeriesIterator(chks...)
|
||||||
}
|
}
|
||||||
|
|
||||||
type errChunksIterator struct {
|
type errChunksIterator struct {
|
||||||
|
|
|
@ -65,3 +65,15 @@ func PopulatedChunk(numSamples int, minTime int64) chunks.Meta {
|
||||||
}
|
}
|
||||||
return ChunkFromSamples(samples)
|
return ChunkFromSamples(samples)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GenerateSamples starting at start and counting up numSamples.
|
||||||
|
func GenerateSamples(start int, numSamples int) []Sample {
|
||||||
|
samples := make([]Sample, 0, numSamples)
|
||||||
|
for i := start; i < start+numSamples; i++ {
|
||||||
|
samples = append(samples, sample{
|
||||||
|
t: int64(i),
|
||||||
|
v: float64(i),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return samples
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue