diff --git a/CHANGELOG.md b/CHANGELOG.md index 2451fb248..94f61d38c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.22.1 / 2020-11-03 + +* [BUGFIX] Fix potential "mmap: invalid argument" errors in loading the head chunks, after an unclean shutdown, by performing read repairs. #8061 +* [BUGFIX] Fix serving metrics and API when reloading scrape config. #8104 +* [BUGFIX] Fix head chunk size calculation for size based retention. #8139 + ## 2.22.0 / 2020-10-07 As announced in the 2.21.0 release notes, the experimental gRPC API v2 has been diff --git a/VERSION b/VERSION index f48f82fa2..d93847fab 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.22.0 +2.22.1 diff --git a/scrape/scrape.go b/scrape/scrape.go index d52b0ac7f..f6aa49810 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -192,7 +192,13 @@ type scrapePool struct { appendable storage.Appendable logger log.Logger - mtx sync.Mutex + // targetMtx protects activeTargets and droppedTargets from concurrent reads + // and writes. Only one of Sync/stop/reload may be called at once due to + // manager.mtxScrape so we only need to protect from concurrent reads from + // the ActiveTargets and DroppedTargets methods. This allows those two + // methods to always complete without having to wait on scrape loops to gracefull stop. + targetMtx sync.Mutex + config *config.ScrapeConfig client *http.Client // Targets and loops must always be synchronized to have the same @@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed } func (sp *scrapePool) ActiveTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() var tActive []*Target for _, t := range sp.activeTargets { @@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target { } func (sp *scrapePool) DroppedTargets() []*Target { - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() + defer sp.targetMtx.Unlock() return sp.droppedTargets } @@ -294,8 +300,7 @@ func (sp *scrapePool) stop() { sp.cancel() var wg sync.WaitGroup - sp.mtx.Lock() - defer sp.mtx.Unlock() + sp.targetMtx.Lock() for fp, l := range sp.loops { wg.Add(1) @@ -308,6 +313,9 @@ func (sp *scrapePool) stop() { delete(sp.loops, fp) delete(sp.activeTargets, fp) } + + sp.targetMtx.Unlock() + wg.Wait() sp.client.CloseIdleConnections() @@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { targetScrapePoolReloads.Inc() start := time.Now() - sp.mtx.Lock() - defer sp.mtx.Unlock() - client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false) if err != nil { targetScrapePoolReloadsFailed.Inc() @@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() + forcedErr := sp.refreshTargetLimitErr() for fp, oldLoop := range sp.loops { var cache *scrapeCache @@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { sp.loops[fp] = newLoop } + sp.targetMtx.Unlock() + wg.Wait() oldClient.CloseIdleConnections() targetReloadIntervalLength.WithLabelValues(interval.String()).Observe( @@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error { // Sync converts target groups into actual scrape targets and synchronizes // the currently running scraper with the resulting set and returns all scraped and dropped targets. func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { - sp.mtx.Lock() - defer sp.mtx.Unlock() - start := time.Now() + sp.targetMtx.Lock() var all []*Target sp.droppedTargets = []*Target{} for _, tg := range tgs { @@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { } } } + sp.targetMtx.Unlock() sp.sync(all) targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe( @@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { // scrape loops for new targets, and stops scrape loops for disappeared targets. // It returns after all stopped scrape loops terminated. func (sp *scrapePool) sync(targets []*Target) { - // This function expects that you have acquired the sp.mtx lock. var ( uniqueLoops = make(map[uint64]loop) interval = time.Duration(sp.config.ScrapeInterval) @@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) { mrc = sp.config.MetricRelabelConfigs ) + sp.targetMtx.Lock() for _, t := range targets { hash := t.hash() @@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) { } } + sp.targetMtx.Unlock() + targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops))) forcedErr := sp.refreshTargetLimitErr() for _, l := range sp.loops { @@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) { // refreshTargetLimitErr returns an error that can be passed to the scrape loops // if the number of targets exceeds the configured limit. func (sp *scrapePool) refreshTargetLimitErr() error { - // This function expects that you have acquired the sp.mtx lock. if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit { return nil } diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index 08c6f1b3c..fbccd28f3 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -163,6 +163,11 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { return err } + files, err = repairLastChunkFile(files) + if err != nil { + return err + } + chkFileIndices := make([]int, 0, len(files)) for seq, fn := range files { f, err := fileutil.OpenMmapFile(fn) @@ -218,9 +223,40 @@ func listChunkFiles(dir string) (map[int]string, error) { } res[int(seq)] = filepath.Join(dir, fi.Name()) } + return res, nil } +// repairLastChunkFile deletes the last file if it's empty. +// Because we don't fsync when creating these file, we could end +// up with an empty file at the end during an abrupt shutdown. +func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) { + lastFile := -1 + for seq := range files { + if seq > lastFile { + lastFile = seq + } + } + + if lastFile <= 0 { + return files, nil + } + + info, err := os.Stat(files[lastFile]) + if err != nil { + return files, errors.Wrap(err, "file stat during last head chunk file repair") + } + if info.Size() == 0 { + // Corrupt file, hence remove it. + if err := os.RemoveAll(files[lastFile]); err != nil { + return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair") + } + delete(files, lastFile) + } + + return files, nil +} + // WriteChunk writes the chunk to the disk. // The returned chunk ref is the reference from where the chunk encoding starts for the chunk. func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) { diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index bfb4262ad..f8ed26cf1 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "math/rand" "os" + "strconv" "testing" "github.com/prometheus/prometheus/tsdb/chunkenc" @@ -363,6 +364,75 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { testutil.Ok(t, hrw.Truncate(2000)) } +func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { + hrw := testHeadReadWriter(t) + defer func() { + testutil.Ok(t, hrw.Close()) + }() + + timeRange := 0 + addChunk := func() { + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + _, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t)) + testutil.Ok(t, err) + timeRange += step + } + nonEmptyFile := func() { + testutil.Ok(t, hrw.CutNewFile()) + addChunk() + } + + addChunk() // 1. Created with the first chunk. + nonEmptyFile() // 2. + nonEmptyFile() // 3. + + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + lastFile := 0 + for idx := range hrw.mmappedChunkFiles { + if idx > lastFile { + lastFile = idx + } + } + testutil.Equals(t, 3, lastFile) + dir := hrw.dir.Name() + testutil.Ok(t, hrw.Close()) + + // Write an empty last file mimicking an abrupt shutdown on file creation. + emptyFileName := segmentFile(dir, lastFile+1) + f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666) + testutil.Ok(t, err) + testutil.Ok(t, f.Sync()) + stat, err := f.Stat() + testutil.Ok(t, err) + testutil.Equals(t, int64(0), stat.Size()) + testutil.Ok(t, f.Close()) + + // Open chunk disk mapper again, corrupt file should be removed. + hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool()) + testutil.Ok(t, err) + testutil.Assert(t, !hrw.fileMaxtSet, "") + testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) + testutil.Assert(t, hrw.fileMaxtSet, "") + + // Removed from memory. + testutil.Equals(t, 3, len(hrw.mmappedChunkFiles)) + for idx := range hrw.mmappedChunkFiles { + testutil.Assert(t, idx <= lastFile, "file index is bigger than previous last file") + } + + // Removed even from disk. + files, err := ioutil.ReadDir(dir) + testutil.Ok(t, err) + testutil.Equals(t, 3, len(files)) + for _, fi := range files { + seq, err := strconv.ParseUint(fi.Name(), 10, 64) + testutil.Ok(t, err) + testutil.Assert(t, seq <= uint64(lastFile), "file index on disk is bigger than previous last file") + } + +} + func testHeadReadWriter(t *testing.T) *ChunkDiskMapper { tmpdir, err := ioutil.TempDir("", "data") testutil.Ok(t, err)