mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 22:19:40 -08:00
Only checkpoint chunkdescs and series that need persisting. (#2340)
This decreases checkpoint size by not checkpointing things that don't actually need checkpointing. This is fully compatible with the v2 checkpoint format, as it makes series appear as though the only chunksdescs in memory are those that need persisting.
This commit is contained in:
parent
5418a42965
commit
c1b547a90e
|
@ -668,8 +668,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
fpLocker.Lock(m.fp)
|
fpLocker.Lock(m.fp)
|
||||||
defer fpLocker.Unlock(m.fp)
|
defer fpLocker.Unlock(m.fp)
|
||||||
|
|
||||||
if len(m.series.chunkDescs) == 0 {
|
chunksToPersist := len(m.series.chunkDescs) - m.series.persistWatermark
|
||||||
// This series was completely purged or archived in the meantime. Ignore.
|
if len(m.series.chunkDescs) == 0 || chunksToPersist == 0 {
|
||||||
|
// This series was completely purged or archived in the meantime or has
|
||||||
|
// no chunks that need persisting. Ignore.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
realNumberOfSeries++
|
realNumberOfSeries++
|
||||||
|
@ -688,7 +690,9 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
if _, err = w.Write(buf); err != nil {
|
if _, err = w.Write(buf); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
|
// persistWatermark. We only checkpoint chunks that need persisting, so
|
||||||
|
// this is always 0.
|
||||||
|
if _, err = codable.EncodeVarint(w, int64(0)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if m.series.modTime.IsZero() {
|
if m.series.modTime.IsZero() {
|
||||||
|
@ -700,37 +704,25 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
|
// chunkDescsOffset.
|
||||||
|
if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset+m.series.persistWatermark)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
|
if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
|
// Number of chunkDescs.
|
||||||
|
if _, err = codable.EncodeVarint(w, int64(chunksToPersist)); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i, chunkDesc := range m.series.chunkDescs {
|
for _, chunkDesc := range m.series.chunkDescs[m.series.persistWatermark:] {
|
||||||
if i < m.series.persistWatermark {
|
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
|
||||||
if _, err = codable.EncodeVarint(w, int64(chunkDesc.FirstTime())); err != nil {
|
return
|
||||||
return
|
|
||||||
}
|
|
||||||
lt, err := chunkDesc.LastTime()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// This is a non-persisted chunk. Fully marshal it.
|
|
||||||
if err = w.WriteByte(byte(chunkDesc.C.Encoding())); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err = chunkDesc.C.Marshal(w); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
p.checkpointChunksWritten.Observe(float64(len(m.series.chunkDescs) - m.series.persistWatermark))
|
if err = chunkDesc.C.Marshal(w); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.checkpointChunksWritten.Observe(float64(chunksToPersist))
|
||||||
}
|
}
|
||||||
// Series is checkpointed now, so declare it clean. In case the entire
|
// Series is checkpointed now, so declare it clean. In case the entire
|
||||||
// checkpoint fails later on, this is fine, as the storage's series
|
// checkpoint fails later on, this is fine, as the storage's series
|
||||||
|
|
|
@ -493,8 +493,8 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if loadedSM.length() != 4 {
|
if loadedSM.length() != 3 {
|
||||||
t.Errorf("want 4 series in map, got %d", loadedSM.length())
|
t.Errorf("want 3 series in map, got %d", loadedSM.length())
|
||||||
}
|
}
|
||||||
if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok {
|
if loadedS1, ok := loadedSM.get(m1.FastFingerprint()); ok {
|
||||||
if !reflect.DeepEqual(loadedS1.metric, m1) {
|
if !reflect.DeepEqual(loadedS1.metric, m1) {
|
||||||
|
@ -518,28 +518,6 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
|
||||||
} else {
|
} else {
|
||||||
t.Errorf("couldn't find %v in loaded map", m1)
|
t.Errorf("couldn't find %v in loaded map", m1)
|
||||||
}
|
}
|
||||||
if loadedS3, ok := loadedSM.get(m3.FastFingerprint()); ok {
|
|
||||||
if !reflect.DeepEqual(loadedS3.metric, m3) {
|
|
||||||
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
|
|
||||||
}
|
|
||||||
if loadedS3.head().C != nil {
|
|
||||||
t.Error("head chunk not evicted")
|
|
||||||
}
|
|
||||||
if loadedS3.chunkDescsOffset != 0 {
|
|
||||||
t.Errorf("want chunkDescsOffset 0, got %d", loadedS3.chunkDescsOffset)
|
|
||||||
}
|
|
||||||
if !loadedS3.headChunkClosed {
|
|
||||||
t.Error("headChunkClosed is false")
|
|
||||||
}
|
|
||||||
if loadedS3.head().ChunkFirstTime != 2 {
|
|
||||||
t.Errorf("want ChunkFirstTime in head chunk to be 2, got %d", loadedS3.head().ChunkFirstTime)
|
|
||||||
}
|
|
||||||
if loadedS3.head().ChunkLastTime != 2 {
|
|
||||||
t.Errorf("want ChunkLastTime in head chunk to be 2, got %d", loadedS3.head().ChunkLastTime)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
t.Errorf("couldn't find %v in loaded map", m3)
|
|
||||||
}
|
|
||||||
if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok {
|
if loadedS4, ok := loadedSM.get(m4.FastFingerprint()); ok {
|
||||||
if !reflect.DeepEqual(loadedS4.metric, m4) {
|
if !reflect.DeepEqual(loadedS4.metric, m4) {
|
||||||
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)
|
t.Errorf("want metric %v, got %v", m4, loadedS4.metric)
|
||||||
|
@ -594,20 +572,17 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunk.Encodin
|
||||||
if !reflect.DeepEqual(loadedS5.metric, m5) {
|
if !reflect.DeepEqual(loadedS5.metric, m5) {
|
||||||
t.Errorf("want metric %v, got %v", m5, loadedS5.metric)
|
t.Errorf("want metric %v, got %v", m5, loadedS5.metric)
|
||||||
}
|
}
|
||||||
if got, want := len(loadedS5.chunkDescs), chunkCountS5; got != want {
|
if got, want := len(loadedS5.chunkDescs), chunkCountS5-3; got != want {
|
||||||
t.Errorf("got %d chunkDescs, want %d", got, want)
|
t.Errorf("got %d chunkDescs, want %d", got, want)
|
||||||
}
|
}
|
||||||
if got, want := loadedS5.persistWatermark, 3; got != want {
|
if got, want := loadedS5.persistWatermark, 0; got != want {
|
||||||
t.Errorf("got persistWatermark %d, want %d", got, want)
|
t.Errorf("got persistWatermark %d, want %d", got, want)
|
||||||
}
|
}
|
||||||
if !loadedS5.chunkDescs[2].IsEvicted() {
|
if loadedS5.chunkDescs[0].IsEvicted() {
|
||||||
t.Error("3rd chunk not evicted")
|
t.Error("1st chunk evicted")
|
||||||
}
|
}
|
||||||
if loadedS5.chunkDescs[3].IsEvicted() {
|
if loadedS5.chunkDescsOffset != 3 {
|
||||||
t.Error("4th chunk evicted")
|
t.Errorf("want chunkDescsOffset 3, got %d", loadedS5.chunkDescsOffset)
|
||||||
}
|
|
||||||
if loadedS5.chunkDescsOffset != 0 {
|
|
||||||
t.Errorf("want chunkDescsOffset 0, got %d", loadedS5.chunkDescsOffset)
|
|
||||||
}
|
}
|
||||||
if loadedS5.headChunkClosed {
|
if loadedS5.headChunkClosed {
|
||||||
t.Error("headChunkClosed is true")
|
t.Error("headChunkClosed is true")
|
||||||
|
|
Loading…
Reference in a new issue