mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-26 06:04:05 -08:00
storage: Fix offset returned by dropAndPersistChunks
This is another corner-case that was previously never exercised because the rewriting of a series file was never prevented by the shrink ratio. Scenario: There is an existing series on disk, which is archived. If a new sample comes in for that file, a new chunk in memory is created, and the chunkDescsOffset is set to -1. If series maintenance happens before the series has at least one chunk to persist _and_ an insufficient chunks on disk is old enough for purging (so that the shrink ratio kicks in), dropAndPersistChunks would return 0, but it should return the chunk length of the series file.
This commit is contained in:
parent
0c688ab339
commit
46a0837816
|
@ -845,12 +845,14 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in
|
|||
// dropAndPersistChunks deletes all chunks from a series file whose last sample
|
||||
// time is before beforeTime, and then appends the provided chunks, leaving out
|
||||
// those whose last sample time is before beforeTime. It returns the timestamp
|
||||
// of the first sample in the oldest chunk _not_ dropped, the offset within the
|
||||
// series file of the first chunk persisted (out of the provided chunks), the
|
||||
// number of deleted chunks, and true if all chunks of the series have been
|
||||
// deleted (in which case the returned timestamp will be 0 and must be ignored).
|
||||
// It is the caller's responsibility to make sure nothing is persisted or loaded
|
||||
// for the same fingerprint concurrently.
|
||||
// of the first sample in the oldest chunk _not_ dropped, the chunk offset
|
||||
// within the series file of the first chunk persisted (out of the provided
|
||||
// chunks, or - if no chunks were provided - the chunk offset where chunks would
|
||||
// have been persisted, i.e. the end of the file), the number of deleted chunks,
|
||||
// and true if all chunks of the series have been deleted (in which case the
|
||||
// returned timestamp will be 0 and must be ignored). It is the caller's
|
||||
// responsibility to make sure nothing is persisted or loaded for the same
|
||||
// fingerprint concurrently.
|
||||
//
|
||||
// Returning an error signals problems with the series file. In this case, the
|
||||
// caller should quarantine the series.
|
||||
|
@ -966,13 +968,16 @@ func (p *persistence) dropAndPersistChunks(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
|
||||
chunksInFile := int(fi.Size()) / chunkLenWithHeader
|
||||
totalChunks := chunksInFile + len(chunks)
|
||||
if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
|
||||
// Nothing to drop. Just adjust the return values and append the chunks (if any).
|
||||
numDropped = 0
|
||||
firstTimeNotDropped = firstTimeInFile
|
||||
if len(chunks) > 0 {
|
||||
offset, err = p.persistChunks(fp, chunks)
|
||||
} else {
|
||||
offset = chunksInFile
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -173,14 +173,16 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
|
|||
|
||||
}
|
||||
}
|
||||
// Try to drop one chunk, which must be prevented by the shrink ratio.
|
||||
// Try to drop one chunk, which must be prevented by the shrink
|
||||
// ratio. Since we do not pass in any chunks to persist, the offset
|
||||
// should be the number of chunks in the file.
|
||||
for fp, _ := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 0 {
|
||||
t.Errorf("want offset 0, got %d", offset)
|
||||
if offset != 10 {
|
||||
t.Errorf("want offset 10, got %d", offset)
|
||||
}
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
|
@ -422,14 +424,14 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunk.Encoding) {
|
|||
t.Error("all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Drop only the first two chunks should not happen, either.
|
||||
// Drop only the first two chunks should not happen, either. Chunks in file is now 9.
|
||||
for fp := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 0 {
|
||||
t.Errorf("want offset 0, got %d", offset)
|
||||
if offset != 9 {
|
||||
t.Errorf("want offset 9, got %d", offset)
|
||||
}
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
|
|
Loading…
Reference in a new issue