Merge branch 'beorn7/storage' into beorn7/storage2

This commit is contained in:
beorn7 2016-02-24 14:00:56 +01:00
commit 53005c3085
2 changed files with 73 additions and 8 deletions

View file

@ -823,6 +823,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
} }
} }
headChunkClosed := true // Initial assumption.
for i := int64(0); i < numChunkDescs; i++ { for i := int64(0); i < numChunkDescs; i++ {
if i < persistWatermark { if i < persistWatermark {
firstTime, err := binary.ReadVarint(r) firstTime, err := binary.ReadVarint(r)
@ -844,6 +845,9 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
chunkDescsTotal++ chunkDescsTotal++
} else { } else {
// Non-persisted chunk. // Non-persisted chunk.
// If there are non-persisted chunks at all, we consider
// the head chunk not to be closed yet.
headChunkClosed = false
encoding, err := r.ReadByte() encoding, err := r.ReadByte()
if err != nil { if err != nil {
log.Warn("Could not decode chunk type:", err) log.Warn("Could not decode chunk type:", err)
@ -856,15 +860,15 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
p.dirty = true p.dirty = true
return sm, chunksToPersist, nil return sm, chunksToPersist, nil
} }
chunkDescs[i] = newChunkDesc(chunk, chunk.firstTime()) cd := newChunkDesc(chunk, chunk.firstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
chunksToPersist++ chunksToPersist++
cd.maybePopulateLastTime()
} }
chunkDescs[i] = cd
} }
headChunkClosed := persistWatermark >= numChunkDescs
if !headChunkClosed {
// Head chunk is not ready for persisting yet.
chunksToPersist--
} }
fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{ fingerprintToSeries[model.Fingerprint(fp)] = &memorySeries{

View file

@ -485,6 +485,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS1.headChunkClosed { if loadedS1.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
if loadedS1.head().chunkFirstTime != 1 {
t.Errorf("want chunkFirstTime in head chunk to be 1, got %d", loadedS1.head().chunkFirstTime)
}
if loadedS1.head().chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m1) t.Errorf("couldn't find %v in loaded map", m1)
} }
@ -501,6 +507,12 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if !loadedS3.headChunkClosed { if !loadedS3.headChunkClosed {
t.Error("headChunkClosed is false") t.Error("headChunkClosed is false")
} }
if loadedS3.head().chunkFirstTime != 2 {
t.Errorf("want chunkFirstTime in head chunk to be 2, got %d", loadedS3.head().chunkFirstTime)
}
if loadedS3.head().chunkLastTime != 2 {
t.Errorf("want chunkLastTime in head chunk to be 2, got %d", loadedS3.head().chunkLastTime)
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m3) t.Errorf("couldn't find %v in loaded map", m3)
} }
@ -526,6 +538,27 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS4.headChunkClosed { if loadedS4.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS4.chunkDescs {
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS4.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
)
}
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m4) t.Errorf("couldn't find %v in loaded map", m4)
} }
@ -551,6 +584,34 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
if loadedS5.headChunkClosed { if loadedS5.headChunkClosed {
t.Error("headChunkClosed is true") t.Error("headChunkClosed is true")
} }
for i, cd := range loadedS5.chunkDescs {
if i < 3 {
// Evicted chunks.
if cd.chunkFirstTime == model.Earliest {
t.Errorf("chunkDesc[%d]: chunkLastTime not set", i)
}
continue
}
if cd.chunkFirstTime != cd.c.firstTime() {
t.Errorf(
"chunkDesc[%d]: chunkFirstTime not consistent with chunk, want %d, got %d",
i, cd.c.firstTime(), cd.chunkFirstTime,
)
}
if i == len(loadedS5.chunkDescs)-1 {
// Head chunk.
if cd.chunkLastTime != model.Earliest {
t.Error("want chunkLastTime in head chunk to be unset")
}
continue
}
if cd.chunkLastTime != cd.c.newIterator().lastTimestamp() {
t.Errorf(
"chunkDesc[%d]: chunkLastTime not consistent with chunk, want %d, got %d",
i, cd.c.newIterator().lastTimestamp(), cd.chunkLastTime,
)
}
}
} else { } else {
t.Errorf("couldn't find %v in loaded map", m5) t.Errorf("couldn't find %v in loaded map", m5)
} }