mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
commit
00f16d1ac3
|
@ -1,3 +1,9 @@
|
||||||
|
## 2.22.1 / 2020-11-03
|
||||||
|
|
||||||
|
* [BUGFIX] Fix potential "mmap: invalid argument" errors in loading the head chunks, after an unclean shutdown, by performing read repairs. #8061
|
||||||
|
* [BUGFIX] Fix serving metrics and API when reloading scrape config. #8104
|
||||||
|
* [BUGFIX] Fix head chunk size calculation for size based retention. #8139
|
||||||
|
|
||||||
## 2.22.0 / 2020-10-07
|
## 2.22.0 / 2020-10-07
|
||||||
|
|
||||||
As announced in the 2.21.0 release notes, the experimental gRPC API v2 has been
|
As announced in the 2.21.0 release notes, the experimental gRPC API v2 has been
|
||||||
|
|
|
@ -192,7 +192,13 @@ type scrapePool struct {
|
||||||
appendable storage.Appendable
|
appendable storage.Appendable
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
|
|
||||||
mtx sync.Mutex
|
// targetMtx protects activeTargets and droppedTargets from concurrent reads
|
||||||
|
// and writes. Only one of Sync/stop/reload may be called at once due to
|
||||||
|
// manager.mtxScrape so we only need to protect from concurrent reads from
|
||||||
|
// the ActiveTargets and DroppedTargets methods. This allows those two
|
||||||
|
// methods to always complete without having to wait on scrape loops to gracefull stop.
|
||||||
|
targetMtx sync.Mutex
|
||||||
|
|
||||||
config *config.ScrapeConfig
|
config *config.ScrapeConfig
|
||||||
client *http.Client
|
client *http.Client
|
||||||
// Targets and loops must always be synchronized to have the same
|
// Targets and loops must always be synchronized to have the same
|
||||||
|
@ -273,8 +279,8 @@ func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, jitterSeed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sp *scrapePool) ActiveTargets() []*Target {
|
func (sp *scrapePool) ActiveTargets() []*Target {
|
||||||
sp.mtx.Lock()
|
sp.targetMtx.Lock()
|
||||||
defer sp.mtx.Unlock()
|
defer sp.targetMtx.Unlock()
|
||||||
|
|
||||||
var tActive []*Target
|
var tActive []*Target
|
||||||
for _, t := range sp.activeTargets {
|
for _, t := range sp.activeTargets {
|
||||||
|
@ -284,8 +290,8 @@ func (sp *scrapePool) ActiveTargets() []*Target {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sp *scrapePool) DroppedTargets() []*Target {
|
func (sp *scrapePool) DroppedTargets() []*Target {
|
||||||
sp.mtx.Lock()
|
sp.targetMtx.Lock()
|
||||||
defer sp.mtx.Unlock()
|
defer sp.targetMtx.Unlock()
|
||||||
return sp.droppedTargets
|
return sp.droppedTargets
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,8 +300,7 @@ func (sp *scrapePool) stop() {
|
||||||
sp.cancel()
|
sp.cancel()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
sp.mtx.Lock()
|
sp.targetMtx.Lock()
|
||||||
defer sp.mtx.Unlock()
|
|
||||||
|
|
||||||
for fp, l := range sp.loops {
|
for fp, l := range sp.loops {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
@ -308,6 +313,9 @@ func (sp *scrapePool) stop() {
|
||||||
delete(sp.loops, fp)
|
delete(sp.loops, fp)
|
||||||
delete(sp.activeTargets, fp)
|
delete(sp.activeTargets, fp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sp.targetMtx.Unlock()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
sp.client.CloseIdleConnections()
|
sp.client.CloseIdleConnections()
|
||||||
|
|
||||||
|
@ -326,9 +334,6 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
targetScrapePoolReloads.Inc()
|
targetScrapePoolReloads.Inc()
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
sp.mtx.Lock()
|
|
||||||
defer sp.mtx.Unlock()
|
|
||||||
|
|
||||||
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false)
|
client, err := config_util.NewClientFromConfig(cfg.HTTPClientConfig, cfg.JobName, false, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
targetScrapePoolReloadsFailed.Inc()
|
targetScrapePoolReloadsFailed.Inc()
|
||||||
|
@ -352,6 +357,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
mrc = sp.config.MetricRelabelConfigs
|
mrc = sp.config.MetricRelabelConfigs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
sp.targetMtx.Lock()
|
||||||
|
|
||||||
forcedErr := sp.refreshTargetLimitErr()
|
forcedErr := sp.refreshTargetLimitErr()
|
||||||
for fp, oldLoop := range sp.loops {
|
for fp, oldLoop := range sp.loops {
|
||||||
var cache *scrapeCache
|
var cache *scrapeCache
|
||||||
|
@ -387,6 +394,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
sp.loops[fp] = newLoop
|
sp.loops[fp] = newLoop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sp.targetMtx.Unlock()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
oldClient.CloseIdleConnections()
|
oldClient.CloseIdleConnections()
|
||||||
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
targetReloadIntervalLength.WithLabelValues(interval.String()).Observe(
|
||||||
|
@ -398,11 +407,9 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
|
||||||
// Sync converts target groups into actual scrape targets and synchronizes
|
// Sync converts target groups into actual scrape targets and synchronizes
|
||||||
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
|
||||||
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||||
sp.mtx.Lock()
|
|
||||||
defer sp.mtx.Unlock()
|
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
|
sp.targetMtx.Lock()
|
||||||
var all []*Target
|
var all []*Target
|
||||||
sp.droppedTargets = []*Target{}
|
sp.droppedTargets = []*Target{}
|
||||||
for _, tg := range tgs {
|
for _, tg := range tgs {
|
||||||
|
@ -419,6 +426,7 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sp.targetMtx.Unlock()
|
||||||
sp.sync(all)
|
sp.sync(all)
|
||||||
|
|
||||||
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
|
||||||
|
@ -431,7 +439,6 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
|
||||||
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
// scrape loops for new targets, and stops scrape loops for disappeared targets.
|
||||||
// It returns after all stopped scrape loops terminated.
|
// It returns after all stopped scrape loops terminated.
|
||||||
func (sp *scrapePool) sync(targets []*Target) {
|
func (sp *scrapePool) sync(targets []*Target) {
|
||||||
// This function expects that you have acquired the sp.mtx lock.
|
|
||||||
var (
|
var (
|
||||||
uniqueLoops = make(map[uint64]loop)
|
uniqueLoops = make(map[uint64]loop)
|
||||||
interval = time.Duration(sp.config.ScrapeInterval)
|
interval = time.Duration(sp.config.ScrapeInterval)
|
||||||
|
@ -442,6 +449,7 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
mrc = sp.config.MetricRelabelConfigs
|
mrc = sp.config.MetricRelabelConfigs
|
||||||
)
|
)
|
||||||
|
|
||||||
|
sp.targetMtx.Lock()
|
||||||
for _, t := range targets {
|
for _, t := range targets {
|
||||||
hash := t.hash()
|
hash := t.hash()
|
||||||
|
|
||||||
|
@ -487,6 +495,8 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sp.targetMtx.Unlock()
|
||||||
|
|
||||||
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
|
targetScrapePoolTargetsAdded.WithLabelValues(sp.config.JobName).Set(float64(len(uniqueLoops)))
|
||||||
forcedErr := sp.refreshTargetLimitErr()
|
forcedErr := sp.refreshTargetLimitErr()
|
||||||
for _, l := range sp.loops {
|
for _, l := range sp.loops {
|
||||||
|
@ -507,7 +517,6 @@ func (sp *scrapePool) sync(targets []*Target) {
|
||||||
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
|
// refreshTargetLimitErr returns an error that can be passed to the scrape loops
|
||||||
// if the number of targets exceeds the configured limit.
|
// if the number of targets exceeds the configured limit.
|
||||||
func (sp *scrapePool) refreshTargetLimitErr() error {
|
func (sp *scrapePool) refreshTargetLimitErr() error {
|
||||||
// This function expects that you have acquired the sp.mtx lock.
|
|
||||||
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
|
if sp.config == nil || sp.config.TargetLimit == 0 && !sp.targetLimitHit {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -163,6 +163,11 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
files, err = repairLastChunkFile(files)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
chkFileIndices := make([]int, 0, len(files))
|
chkFileIndices := make([]int, 0, len(files))
|
||||||
for seq, fn := range files {
|
for seq, fn := range files {
|
||||||
f, err := fileutil.OpenMmapFile(fn)
|
f, err := fileutil.OpenMmapFile(fn)
|
||||||
|
@ -218,9 +223,40 @@ func listChunkFiles(dir string) (map[int]string, error) {
|
||||||
}
|
}
|
||||||
res[int(seq)] = filepath.Join(dir, fi.Name())
|
res[int(seq)] = filepath.Join(dir, fi.Name())
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// repairLastChunkFile deletes the last file if it's empty.
|
||||||
|
// Because we don't fsync when creating these file, we could end
|
||||||
|
// up with an empty file at the end during an abrupt shutdown.
|
||||||
|
func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) {
|
||||||
|
lastFile := -1
|
||||||
|
for seq := range files {
|
||||||
|
if seq > lastFile {
|
||||||
|
lastFile = seq
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastFile <= 0 {
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := os.Stat(files[lastFile])
|
||||||
|
if err != nil {
|
||||||
|
return files, errors.Wrap(err, "file stat during last head chunk file repair")
|
||||||
|
}
|
||||||
|
if info.Size() == 0 {
|
||||||
|
// Corrupt file, hence remove it.
|
||||||
|
if err := os.RemoveAll(files[lastFile]); err != nil {
|
||||||
|
return files, errors.Wrap(err, "delete corrupted, empty head chunk file during last file repair")
|
||||||
|
}
|
||||||
|
delete(files, lastFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
return files, nil
|
||||||
|
}
|
||||||
|
|
||||||
// WriteChunk writes the chunk to the disk.
|
// WriteChunk writes the chunk to the disk.
|
||||||
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
|
||||||
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) {
|
func (cdm *ChunkDiskMapper) WriteChunk(seriesRef uint64, mint, maxt int64, chk chunkenc.Chunk) (chkRef uint64, err error) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
@ -363,6 +364,75 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
||||||
testutil.Ok(t, hrw.Truncate(2000))
|
testutil.Ok(t, hrw.Truncate(2000))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
||||||
|
hrw := testHeadReadWriter(t)
|
||||||
|
defer func() {
|
||||||
|
testutil.Ok(t, hrw.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
timeRange := 0
|
||||||
|
addChunk := func() {
|
||||||
|
step := 100
|
||||||
|
mint, maxt := timeRange+1, timeRange+step-1
|
||||||
|
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
timeRange += step
|
||||||
|
}
|
||||||
|
nonEmptyFile := func() {
|
||||||
|
testutil.Ok(t, hrw.CutNewFile())
|
||||||
|
addChunk()
|
||||||
|
}
|
||||||
|
|
||||||
|
addChunk() // 1. Created with the first chunk.
|
||||||
|
nonEmptyFile() // 2.
|
||||||
|
nonEmptyFile() // 3.
|
||||||
|
|
||||||
|
testutil.Equals(t, 3, len(hrw.mmappedChunkFiles))
|
||||||
|
lastFile := 0
|
||||||
|
for idx := range hrw.mmappedChunkFiles {
|
||||||
|
if idx > lastFile {
|
||||||
|
lastFile = idx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testutil.Equals(t, 3, lastFile)
|
||||||
|
dir := hrw.dir.Name()
|
||||||
|
testutil.Ok(t, hrw.Close())
|
||||||
|
|
||||||
|
// Write an empty last file mimicking an abrupt shutdown on file creation.
|
||||||
|
emptyFileName := segmentFile(dir, lastFile+1)
|
||||||
|
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Ok(t, f.Sync())
|
||||||
|
stat, err := f.Stat()
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, int64(0), stat.Size())
|
||||||
|
testutil.Ok(t, f.Close())
|
||||||
|
|
||||||
|
// Open chunk disk mapper again, corrupt file should be removed.
|
||||||
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool())
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Assert(t, !hrw.fileMaxtSet, "")
|
||||||
|
testutil.Ok(t, hrw.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil }))
|
||||||
|
testutil.Assert(t, hrw.fileMaxtSet, "")
|
||||||
|
|
||||||
|
// Removed from memory.
|
||||||
|
testutil.Equals(t, 3, len(hrw.mmappedChunkFiles))
|
||||||
|
for idx := range hrw.mmappedChunkFiles {
|
||||||
|
testutil.Assert(t, idx <= lastFile, "file index is bigger than previous last file")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Removed even from disk.
|
||||||
|
files, err := ioutil.ReadDir(dir)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Equals(t, 3, len(files))
|
||||||
|
for _, fi := range files {
|
||||||
|
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
testutil.Assert(t, seq <= uint64(lastFile), "file index on disk is bigger than previous last file")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func testHeadReadWriter(t *testing.T) *ChunkDiskMapper {
|
func testHeadReadWriter(t *testing.T) *ChunkDiskMapper {
|
||||||
tmpdir, err := ioutil.TempDir("", "data")
|
tmpdir, err := ioutil.TempDir("", "data")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
Loading…
Reference in a new issue