mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 14:27:27 -08:00
Improve handling of series file truncation
If only very few chunks are to be truncated from a very large series file, the rewrite of the file is a lorge overhead. With this change, a certain ratio of the file has to be dropped to make it happen. While only causing disk overhead at about the same ratio (by default 10%), it will cut down I/O by a lot in above scenario.
This commit is contained in:
parent
37d28bf91a
commit
4221c7de5c
|
@ -128,6 +128,10 @@ func init() {
|
|||
&cfg.storage.SyncStrategy, "storage.local.series-sync-strategy",
|
||||
"When to sync series files after modification. Possible values: 'never', 'always', 'adaptive'. Sync'ing slows down storage performance but reduces the risk of data loss in case of an OS crash. With the 'adaptive' strategy, series files are sync'd for as long as the storage is not too much behind on chunk persistence.",
|
||||
)
|
||||
cfg.fs.Float64Var(
|
||||
&cfg.storage.MinShrinkRatio, "storage.local.series-file-shrink-ratio", 0.1,
|
||||
"A series file is only truncated (to delete samples that have exceeded the retention period) if it shrinks by at least the provided ratio. This saves I/O operations while causing only a limited storage space overhead. If 0 or smaller, truncation will be performed even for a single dropped chunk, while 1 or larger will effectively prevent any truncation.",
|
||||
)
|
||||
cfg.fs.BoolVar(
|
||||
&cfg.storage.Dirty, "storage.local.dirty", false,
|
||||
"If set, the local storage layer will perform crash recovery even if the last shutdown appears to be clean.",
|
||||
|
|
|
@ -129,11 +129,18 @@ type persistence struct {
|
|||
|
||||
shouldSync syncStrategy
|
||||
|
||||
minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.
|
||||
|
||||
bufPool sync.Pool
|
||||
}
|
||||
|
||||
// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
|
||||
func newPersistence(basePath string, dirty, pedanticChecks bool, shouldSync syncStrategy) (*persistence, error) {
|
||||
func newPersistence(
|
||||
basePath string,
|
||||
dirty, pedanticChecks bool,
|
||||
shouldSync syncStrategy,
|
||||
minShrinkRatio float64,
|
||||
) (*persistence, error) {
|
||||
dirtyPath := filepath.Join(basePath, dirtyFileName)
|
||||
versionPath := filepath.Join(basePath, versionFileName)
|
||||
|
||||
|
@ -938,13 +945,14 @@ func (p *persistence) dropAndPersistChunks(
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
headerBuf := make([]byte, chunkHeaderLen)
|
||||
var firstTimeInFile model.Time
|
||||
// Find the first chunk in the file that should be kept.
|
||||
for ; ; numDropped++ {
|
||||
_, err = f.Seek(offsetForChunkIndex(numDropped), os.SEEK_SET)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
headerBuf := make([]byte, chunkHeaderLen)
|
||||
_, err = io.ReadFull(f, headerBuf)
|
||||
if err == io.EOF {
|
||||
// We ran into the end of the file without finding any chunks that should
|
||||
|
@ -962,29 +970,44 @@ func (p *persistence) dropAndPersistChunks(
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
if numDropped == 0 {
|
||||
firstTimeInFile = model.Time(
|
||||
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
|
||||
)
|
||||
}
|
||||
lastTime := model.Time(
|
||||
binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
|
||||
)
|
||||
if !lastTime.Before(beforeTime) {
|
||||
firstTimeNotDropped = model.Time(
|
||||
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
|
||||
)
|
||||
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// We've found the first chunk that should be kept. If it is the first
|
||||
// one, just append the chunks.
|
||||
if numDropped == 0 {
|
||||
// We've found the first chunk that should be kept.
|
||||
// First check if the shrink ratio is good enough to perform the the
|
||||
// actual drop or leave it for next time if it is not worth the effort.
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
|
||||
if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
|
||||
// Nothing to drop. Just adjust the return values and append the chunks (if any).
|
||||
numDropped = 0
|
||||
firstTimeNotDropped = firstTimeInFile
|
||||
if len(chunks) > 0 {
|
||||
offset, err = p.persistChunks(fp, chunks)
|
||||
}
|
||||
return
|
||||
}
|
||||
// Otherwise, seek backwards to the beginning of its header and start
|
||||
// copying everything from there into a new file. Then append the chunks
|
||||
// to the new file.
|
||||
// If we are here, we have to drop some chunks for real. So we need to
|
||||
// record firstTimeNotDropped from the last read header, seek backwards
|
||||
// to the beginning of its header, and start copying everything from
|
||||
// there into a new file. Then append the chunks to the new file.
|
||||
firstTimeNotDropped = model.Time(
|
||||
binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
|
||||
)
|
||||
chunkOps.WithLabelValues(drop).Add(float64(numDropped))
|
||||
_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
@ -37,7 +37,7 @@ var (
|
|||
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) {
|
||||
DefaultChunkEncoding = encoding
|
||||
dir := testutil.NewTemporaryDirectory("test_persistence", t)
|
||||
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
|
||||
p, err := newPersistence(dir.Path(), false, false, func() bool { return false }, 0.1)
|
||||
if err != nil {
|
||||
dir.Close()
|
||||
t.Fatal(err)
|
||||
|
@ -338,6 +338,85 @@ func testPersistLoadDropChunks(t *testing.T, encoding chunkEncoding) {
|
|||
t.Error("not all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Now set minShrinkRatio to 0.25 and play with it.
|
||||
p.minShrinkRatio = 0.25
|
||||
// Re-add 8 chunks.
|
||||
for fp, chunks := range fpToChunks {
|
||||
firstTimeNotDropped, offset, numDropped, allDropped, err :=
|
||||
p.dropAndPersistChunks(fp, model.Earliest, chunks[:8])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got, want := firstTimeNotDropped, model.Time(0); got != want {
|
||||
t.Errorf("Want firstTimeNotDropped %v, got %v.", got, want)
|
||||
}
|
||||
if got, want := offset, 0; got != want {
|
||||
t.Errorf("Want offset %v, got %v.", got, want)
|
||||
}
|
||||
if got, want := numDropped, 0; got != want {
|
||||
t.Errorf("Want numDropped %v, got %v.", got, want)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("All dropped.")
|
||||
}
|
||||
}
|
||||
// Drop only the first chunk should not happen, but persistence should still work.
|
||||
for fp, chunks := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 1, chunks[8:9])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 8 {
|
||||
t.Errorf("want offset 8, got %d", offset)
|
||||
}
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
}
|
||||
if numDropped != 0 {
|
||||
t.Errorf("want 0 dropped chunk, got %v", numDropped)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Drop only the first two chunks should not happen, either.
|
||||
for fp := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 2, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 0 {
|
||||
t.Errorf("want offset 0, got %d", offset)
|
||||
}
|
||||
if firstTime != 0 {
|
||||
t.Errorf("want first time 0, got %d", firstTime)
|
||||
}
|
||||
if numDropped != 0 {
|
||||
t.Errorf("want 0 dropped chunk, got %v", numDropped)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("all chunks dropped")
|
||||
}
|
||||
}
|
||||
// Drop the first three chunks should finally work.
|
||||
for fp, chunks := range fpToChunks {
|
||||
firstTime, offset, numDropped, allDropped, err := p.dropAndPersistChunks(fp, 3, chunks[9:])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if offset != 6 {
|
||||
t.Errorf("want offset 6, got %d", offset)
|
||||
}
|
||||
if firstTime != 3 {
|
||||
t.Errorf("want first time 3, got %d", firstTime)
|
||||
}
|
||||
if numDropped != 3 {
|
||||
t.Errorf("want 3 dropped chunk, got %v", numDropped)
|
||||
}
|
||||
if allDropped {
|
||||
t.Error("all chunks dropped")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistLoadDropChunksType0(t *testing.T) {
|
||||
|
|
|
@ -152,6 +152,7 @@ type MemorySeriesStorageOptions struct {
|
|||
Dirty bool // Force the storage to consider itself dirty on startup.
|
||||
PedanticChecks bool // If dirty, perform crash-recovery checks on each series file.
|
||||
SyncStrategy SyncStrategy // Which sync strategy to apply to series files.
|
||||
MinShrinkRatio float64 // Minimum ratio a series file has to shrink during truncation.
|
||||
}
|
||||
|
||||
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
|
||||
|
@ -243,7 +244,12 @@ func (s *memorySeriesStorage) Start() (err error) {
|
|||
}
|
||||
|
||||
var p *persistence
|
||||
p, err = newPersistence(s.options.PersistenceStoragePath, s.options.Dirty, s.options.PedanticChecks, syncStrategy)
|
||||
p, err = newPersistence(
|
||||
s.options.PersistenceStoragePath,
|
||||
s.options.Dirty, s.options.PedanticChecks,
|
||||
syncStrategy,
|
||||
s.options.MinShrinkRatio,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -563,6 +563,7 @@ func TestLoop(t *testing.T) {
|
|||
PersistenceStoragePath: directory.Path(),
|
||||
CheckpointInterval: 250 * time.Millisecond,
|
||||
SyncStrategy: Adaptive,
|
||||
MinShrinkRatio: 0.1,
|
||||
}
|
||||
storage := NewMemorySeriesStorage(o)
|
||||
if err := storage.Start(); err != nil {
|
||||
|
@ -1320,6 +1321,7 @@ func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
|||
PersistenceStoragePath: directory.Path(),
|
||||
CheckpointInterval: time.Second,
|
||||
SyncStrategy: Adaptive,
|
||||
MinShrinkRatio: 0.1,
|
||||
}
|
||||
s := NewMemorySeriesStorage(o)
|
||||
if err := s.Start(); err != nil {
|
||||
|
|
Loading…
Reference in a new issue