From 97a5fc8cbb28b8db98cbfa81eaebaff08c44cdc9 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Fri, 9 Mar 2018 12:00:26 +0000 Subject: [PATCH 01/14] Correctly stop the timer used in the remote write path. --- storage/remote/queue_manager.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 54495ca0d..01537aa3d 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -430,6 +430,14 @@ func (s *shards) runShard(i int) { pendingSamples := model.Samples{} timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) + defer func() { + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + }() for { select { @@ -449,11 +457,16 @@ func (s *shards) runShard(i int) { for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + timer.Reset(s.qm.cfg.BatchSendDeadline) } - if !timer.Stop() { - <-timer.C - } - timer.Reset(s.qm.cfg.BatchSendDeadline) + case <-timer.C: if len(pendingSamples) > 0 { s.sendSamples(pendingSamples) From fdb574b608edbf558d73887f09c47ea07346a107 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 14:27:48 +0000 Subject: [PATCH 02/14] Review feedback. --- storage/remote/queue_manager.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 01537aa3d..7d52ed1ef 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -430,14 +430,15 @@ func (s *shards) runShard(i int) { pendingSamples := model.Samples{} timer := time.NewTimer(s.qm.cfg.BatchSendDeadline) - defer func() { + stop := func() { if !timer.Stop() { select { case <-timer.C: default: } } - }() + } + defer stop() for { select { @@ -454,16 +455,11 @@ func (s *shards) runShard(i int) { queueLength.WithLabelValues(s.qm.queueName).Dec() pendingSamples = append(pendingSamples, sample) - for len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { + if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] - if !timer.Stop() { - select { - case <-timer.C: - default: - } - } + stop() timer.Reset(s.qm.cfg.BatchSendDeadline) } @@ -472,6 +468,7 @@ func (s *shards) runShard(i int) { s.sendSamples(pendingSamples) pendingSamples = pendingSamples[:0] } + timer.Reset(s.qm.cfg.BatchSendDeadline) } } } From 731259afd0a00707192fbe3df530a57a7932b990 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 15:35:43 +0000 Subject: [PATCH 03/14] Test sample timeout delivery. --- storage/remote/queue_manager_test.go | 41 ++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index 9d2e7a34b..d81eaf821 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -44,6 +44,9 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { c.mtx.Lock() defer c.mtx.Unlock() + c.expectedSamples = map[string][]*prompb.Sample{} + c.receivedSamples = map[string][]*prompb.Sample{} + for _, s := range ss { ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() c.expectedSamples[ts] = append(c.expectedSamples[ts], &prompb.Sample{ @@ -122,6 +125,44 @@ func TestSampleDelivery(t *testing.T) { c.waitForExpectedSamples(t) } +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send on less sample than batch size, and wait the timeout duration + n := config.DefaultQueueConfig.Capacity - 1 + + samples := make(model.Samples, 0, n) + for i := 0; i < n; i++ { + name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) + samples = append(samples, &model.Sample{ + Metric: model.Metric{ + model.MetricNameLabel: name, + }, + Value: model.SampleValue(i), + }) + } + + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = 100 * time.Millisecond + m := NewQueueManager(nil, cfg, nil, nil, c) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) + + c.expectSamples(samples) + for _, s := range samples { + m.Append(s) + } + c.waitForExpectedSamples(t) +} + func TestSampleDeliveryOrder(t *testing.T) { ts := 10 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts From 597c17d3e9773a1e811bd3b38333fcd8078f1fe8 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Mon, 12 Mar 2018 16:48:51 +0000 Subject: [PATCH 04/14] Fix nit. --- storage/remote/queue_manager_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index d81eaf821..e2a5b19cb 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -126,7 +126,7 @@ func TestSampleDelivery(t *testing.T) { } func TestSampleDeliveryTimeout(t *testing.T) { - // Let's send on less sample than batch size, and wait the timeout duration + // Let's send one less sample than batch size, and wait the timeout duration n := config.DefaultQueueConfig.Capacity - 1 samples := make(model.Samples, 0, n) From 5fb1e27b43c1a236be1390b20a0e1f34b1741a39 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 13 Mar 2018 16:24:37 -0400 Subject: [PATCH 05/14] vendor: update prometheus/tsdb --- vendor/github.com/prometheus/tsdb/compact.go | 9 +++--- .../github.com/prometheus/tsdb/index/index.go | 5 ++++ vendor/vendor.json | 28 +++++++++---------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 3668f0831..0c42bda62 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -151,16 +151,11 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) { if err != nil { return nil, err } - // We do not include the most recently created block. This gives users a window - // of a full block size to piece-wise backup new data without having to care - // about data overlap. if len(dirs) < 1 { return nil, nil } - dirs = dirs[:len(dirs)-1] var dms []dirMeta - for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { @@ -176,6 +171,10 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) + // We do not include a recently created block with max(minTime), so the block which was just created from WAL. + // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. + dms = dms[:len(dms)-1] + var res []string for _, dm := range c.selectDirs(dms) { res = append(res, dm.dir) diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 2689b5691..3cad716c0 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -653,6 +653,11 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) { return r, nil } +// Version returns the file format version of the underlying index. +func (r *Reader) Version() int { + return r.version +} + // Range marks a byte range. type Range struct { Start, End int64 diff --git a/vendor/vendor.json b/vendor/vendor.json index 14fa760dc..d756ff6a9 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -800,40 +800,40 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "zVgXlbZ1J8GhBN7tZji7M/SuiAU=", + "checksumSHA1": "JfxP001vgt0P+hEDk/if0bxa8xU=", "path": "github.com/prometheus/tsdb", - "revision": "16b2bf1b45ce3e3536c78ebec5116ea09a69786e", - "revisionTime": "2018-03-02T11:51:49Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { "checksumSHA1": "S7F4yWxVLhxQNHMdgoOo6plmOOs=", "path": "github.com/prometheus/tsdb/chunkenc", - "revision": "494acd307058387ced7646f9996b0f7372eaa558", - "revisionTime": "2018-02-15T11:29:47Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { "checksumSHA1": "+zsn1i8cqwgZXL8Bg6jDy32xjAo=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "494acd307058387ced7646f9996b0f7372eaa558", - "revisionTime": "2018-02-15T11:29:47Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { "checksumSHA1": "h49AAcJ5+iRBwCgbfQf+2T1E1ZE=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "494acd307058387ced7646f9996b0f7372eaa558", - "revisionTime": "2018-02-15T11:29:47Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { - "checksumSHA1": "UlvN+ZhTu52S8f9niySQpPC+dvQ=", + "checksumSHA1": "4ebzIE2Jvj6+SG6yGFSXN8scgfo=", "path": "github.com/prometheus/tsdb/index", - "revision": "494acd307058387ced7646f9996b0f7372eaa558", - "revisionTime": "2018-02-15T11:29:47Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { "checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=", "path": "github.com/prometheus/tsdb/labels", - "revision": "494acd307058387ced7646f9996b0f7372eaa558", - "revisionTime": "2018-02-15T11:29:47Z" + "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", + "revisionTime": "2018-03-13T20:20:03Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", From a8e3d0fc4b428deada523800bbc4c8905cd88240 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Mon, 12 Mar 2018 13:16:59 +0000 Subject: [PATCH 06/14] Correctly handle pruning wraparound after ring expansion (#3942) Fixes #3939 --- storage/buffer.go | 1 + storage/buffer_test.go | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/storage/buffer.go b/storage/buffer.go index 0159cfa3f..77476c614 100644 --- a/storage/buffer.go +++ b/storage/buffer.go @@ -178,6 +178,7 @@ func (r *sampleRing) add(t int64, v float64) { r.buf = buf r.i = r.f r.f += l + l = 2 * l } else { r.i++ if r.i >= l { diff --git a/storage/buffer_test.go b/storage/buffer_test.go index 2a1b87248..5b752dec2 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -48,6 +48,11 @@ func TestSampleRing(t *testing.T) { delta: 7, size: 1, }, + { + input: []int64{1, 2, 3, 4, 6}, + delta: 4, + size: 4, + }, } for _, c := range cases { r := newSampleRing(c.delta, c.size) From a947750dd61b1f57a09a4fe39a10519f878712cb Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Wed, 14 Mar 2018 10:01:27 -0400 Subject: [PATCH 07/14] vendor: update tsdb --- .../prometheus/tsdb/fileutil/mmap_386.go | 5 ++++ .../prometheus/tsdb/fileutil/mmap_amd64.go | 5 ++++ .../prometheus/tsdb/fileutil/mmap_windows.go | 8 +++--- vendor/github.com/prometheus/tsdb/repair.go | 3 ++ vendor/vendor.json | 28 +++++++++---------- 5 files changed, 31 insertions(+), 18 deletions(-) create mode 100644 vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go create mode 100644 vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go new file mode 100644 index 000000000..156f81b63 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap_386.go @@ -0,0 +1,5 @@ +// +build windows + +package fileutil + +const maxMapSize = 0x7FFFFFFF // 2GB diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go new file mode 100644 index 000000000..4025dbfcb --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap_amd64.go @@ -0,0 +1,5 @@ +// +build windows + +package fileutil + +const maxMapSize = 0xFFFFFFFFFFFF // 256TB diff --git a/vendor/github.com/prometheus/tsdb/fileutil/mmap_windows.go b/vendor/github.com/prometheus/tsdb/fileutil/mmap_windows.go index 3bee807c2..b94226412 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/mmap_windows.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/mmap_windows.go @@ -19,14 +19,14 @@ import ( "unsafe" ) -func mmap(f *os.File, sz int) ([]byte, error) { - low, high := uint32(sz), uint32(sz>>32) +func mmap(f *os.File, size int) ([]byte, error) { + low, high := uint32(size), uint32(size>>32) h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil) if h == 0 { return nil, os.NewSyscallError("CreateFileMapping", errno) } - addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ, 0, 0, uintptr(sz)) + addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ, 0, 0, uintptr(size)) if addr == 0 { return nil, os.NewSyscallError("MapViewOfFile", errno) } @@ -35,7 +35,7 @@ func mmap(f *os.File, sz int) ([]byte, error) { return nil, os.NewSyscallError("CloseHandle", err) } - return (*[1 << 30]byte)(unsafe.Pointer(addr))[:sz], nil + return (*[maxMapSize]byte)(unsafe.Pointer(addr))[:size], nil } func munmap(b []byte) error { diff --git a/vendor/github.com/prometheus/tsdb/repair.go b/vendor/github.com/prometheus/tsdb/repair.go index e9f2a9643..cc0f6e4a9 100644 --- a/vendor/github.com/prometheus/tsdb/repair.go +++ b/vendor/github.com/prometheus/tsdb/repair.go @@ -61,6 +61,9 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err := repl.Close(); err != nil { return err } + if err := broken.Close(); err != nil { + return err + } if err := renameFile(repl.Name(), broken.Name()); err != nil { return err } diff --git a/vendor/vendor.json b/vendor/vendor.json index d756ff6a9..d337ead6a 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -800,40 +800,40 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "JfxP001vgt0P+hEDk/if0bxa8xU=", + "checksumSHA1": "vNslgGjRBqauFmVIBTkvEWwvURg=", "path": "github.com/prometheus/tsdb", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { "checksumSHA1": "S7F4yWxVLhxQNHMdgoOo6plmOOs=", "path": "github.com/prometheus/tsdb/chunkenc", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { "checksumSHA1": "+zsn1i8cqwgZXL8Bg6jDy32xjAo=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { - "checksumSHA1": "h49AAcJ5+iRBwCgbfQf+2T1E1ZE=", + "checksumSHA1": "T7qvg4VhFLklT3g+qPkUWxBo0yw=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { "checksumSHA1": "4ebzIE2Jvj6+SG6yGFSXN8scgfo=", "path": "github.com/prometheus/tsdb/index", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { "checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=", "path": "github.com/prometheus/tsdb/labels", - "revision": "00404ae5ab578bb550377a17aab5511deb0592c5", - "revisionTime": "2018-03-13T20:20:03Z" + "revision": "659ed644294eec6310cef0685b002a3aed8c8f85", + "revisionTime": "2018-03-14T13:49:50Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", From f22e5dce1a63e6e5c4c9b7c854c0bf199a2359ec Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 13 Mar 2018 16:51:57 -0400 Subject: [PATCH 08/14] *: cut 2.2.1 --- CHANGELOG.md | 8 ++++++++ VERSION | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bd9ac485..fbabae49f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 2.2.1 / 2018-03-13 + +* [BUGFIX] Fix data loss in TSDB on compaction +* [BUGFIX] Correctly stop timer in remote-write path +* [BUGFIX] Fix deadlock triggerd by loading targets page +* [BUGFIX] Fix incorrect buffering of samples on range selection queries +* [BUGFIX] Handle large index files on windows properly + ## 2.2.0 / 2018-03-08 * [CHANGE] Rename file SD mtime metric. diff --git a/VERSION b/VERSION index ccbccc3dc..c043eea77 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.2.0 +2.2.1 From f30b37e00b5f5da58487152600a34101c5239be9 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 29 Mar 2018 18:02:25 +0530 Subject: [PATCH 09/14] Fixed pathPrefix for web pages --- web/web.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web/web.go b/web/web.go index 82f343df1..eae7fa94d 100644 --- a/web/web.go +++ b/web/web.go @@ -666,7 +666,7 @@ func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap { return time.Since(t) / time.Millisecond * time.Millisecond }, "consolesPath": func() string { return consolesPath }, - "pathPrefix": func() string { return opts.ExternalURL.Path }, + "pathPrefix": func() string { return opts.RoutePrefix }, "buildVersion": func() string { return opts.Version.Revision }, "stripLabels": func(lset map[string]string, labels ...string) map[string]string { for _, ln := range labels { From cd2820e165483609fbe0af99455bf909e6a8cd30 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 30 Mar 2018 11:00:22 +0530 Subject: [PATCH 10/14] Fix pathPrefix bug from PR-4025 --- web/web.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/web/web.go b/web/web.go index eae7fa94d..731850cd9 100644 --- a/web/web.go +++ b/web/web.go @@ -666,7 +666,13 @@ func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap { return time.Since(t) / time.Millisecond * time.Millisecond }, "consolesPath": func() string { return consolesPath }, - "pathPrefix": func() string { return opts.RoutePrefix }, + "pathPrefix": func() string { + if opts.RoutePrefix == "/" { + return "" + } else { + return opts.RoutePrefix + } + }, "buildVersion": func() string { return opts.Version.Revision }, "stripLabels": func(lset map[string]string, labels ...string) map[string]string { for _, ln := range labels { From b44ce11d1b8b7011c1019dbede6b91905f92e27b Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 30 Mar 2018 11:35:12 +0530 Subject: [PATCH 11/14] Added test to check pathPrefix --- web/web.go | 2 +- web/web_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/web/web.go b/web/web.go index 731850cd9..fe68e62eb 100644 --- a/web/web.go +++ b/web/web.go @@ -666,7 +666,7 @@ func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap { return time.Since(t) / time.Millisecond * time.Millisecond }, "consolesPath": func() string { return consolesPath }, - "pathPrefix": func() string { + "pathPrefix": func() string { if opts.RoutePrefix == "/" { return "" } else { diff --git a/web/web_test.go b/web/web_test.go index a9cd7bb02..e0c209967 100644 --- a/web/web_test.go +++ b/web/web_test.go @@ -274,6 +274,41 @@ func TestRoutePrefix(t *testing.T) { testutil.Equals(t, http.StatusOK, resp.StatusCode) } +func TestPathPrefix(t *testing.T) { + + tests := []struct { + routePrefix string + pathPrefix string + }{ + { + routePrefix: "/", + // If we have pathPrefix as "/", URL in UI gets "//"" as prefix, + // hence doesn't remain relative path anymore. + pathPrefix: "", + }, + { + routePrefix: "/prometheus", + pathPrefix: "/prometheus", + }, + { + routePrefix: "/p1/p2/p3/p4", + pathPrefix: "/p1/p2/p3/p4", + }, + } + + for _, test := range tests { + opts := &Options{ + RoutePrefix: test.routePrefix, + } + + pathPrefix := tmplFuncs("", opts)["pathPrefix"].(func() string) + pp := pathPrefix() + + testutil.Equals(t, test.pathPrefix, pp) + } + +} + func TestDebugHandler(t *testing.T) { for _, tc := range []struct { prefix, url string From ddd46de6f48fef82ade7d2a6f0a3cf35118c5383 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 9 Apr 2018 17:18:25 +0300 Subject: [PATCH 12/14] Races/3994 (#4005) Fix race by properly locking access to scrape pools. Use separate mutex for information needed by UI so that UI isn't blocked when targets are being updated. --- scrape/manager.go | 126 +++++++++++++++++++++-------------------- scrape/scrape.go | 13 ++++- web/api/v1/api.go | 16 +++--- web/api/v1/api_test.go | 4 +- web/web.go | 6 +- 5 files changed, 90 insertions(+), 75 deletions(-) diff --git a/scrape/manager.go b/scrape/manager.go index 64fc100b0..0f2c0d497 100644 --- a/scrape/manager.go +++ b/scrape/manager.go @@ -40,18 +40,25 @@ func NewManager(logger log.Logger, app Appendable) *Manager { scrapeConfigs: make(map[string]*config.ScrapeConfig), scrapePools: make(map[string]*scrapePool), graceShut: make(chan struct{}), + targetsAll: make(map[string][]*Target), } } // Manager maintains a set of scrape pools and manages start/stop cycles // when receiving new target groups form the discovery manager. type Manager struct { - logger log.Logger - append Appendable + logger log.Logger + append Appendable + graceShut chan struct{} + + mtxTargets sync.Mutex // Guards the fields below. + targetsActive []*Target + targetsDropped []*Target + targetsAll map[string][]*Target + + mtxScrape sync.Mutex // Guards the fields below. scrapeConfigs map[string]*config.ScrapeConfig scrapePools map[string]*scrapePool - mtx sync.RWMutex - graceShut chan struct{} } // Run starts background processing to handle target updates and reload the scraping loops. @@ -68,6 +75,9 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error { // Stop cancels all running scrape pools and blocks until all have exited. func (m *Manager) Stop() { + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + for _, sp := range m.scrapePools { sp.stop() } @@ -76,8 +86,9 @@ func (m *Manager) Stop() { // ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg. func (m *Manager) ApplyConfig(cfg *config.Config) error { - m.mtx.Lock() - defer m.mtx.Unlock() + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + c := make(map[string]*config.ScrapeConfig) for _, scfg := range cfg.ScrapeConfigs { c[scfg.JobName] = scfg @@ -97,71 +108,66 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error { return nil } -// TargetMap returns map of active and dropped targets and their corresponding scrape config job name. -func (m *Manager) TargetMap() map[string][]*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - - targets := make(map[string][]*Target) - for jobName, sp := range m.scrapePools { - sp.mtx.RLock() - for _, t := range sp.targets { - targets[jobName] = append(targets[jobName], t) - } - targets[jobName] = append(targets[jobName], sp.droppedTargets...) - sp.mtx.RUnlock() - } - - return targets +// TargetsAll returns active and dropped targets grouped by job_name. +func (m *Manager) TargetsAll() map[string][]*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsAll } -// Targets returns the targets currently being scraped. -func (m *Manager) Targets() []*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - - var targets []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - for _, tt := range p.targets { - targets = append(targets, tt) - } - p.mtx.RUnlock() - } - - return targets +// TargetsActive returns the active targets currently being scraped. +func (m *Manager) TargetsActive() []*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsActive } -// DroppedTargets returns the targets dropped during relabelling. -func (m *Manager) DroppedTargets() []*Target { - m.mtx.Lock() - defer m.mtx.Unlock() - var droppedTargets []*Target - for _, p := range m.scrapePools { - p.mtx.RLock() - droppedTargets = append(droppedTargets, p.droppedTargets...) - p.mtx.RUnlock() +// TargetsDropped returns the dropped targets during relabelling. +func (m *Manager) TargetsDropped() []*Target { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + return m.targetsDropped +} + +func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) { + m.mtxTargets.Lock() + defer m.mtxTargets.Unlock() + + m.targetsAll = make(map[string][]*Target) + m.targetsActive = nil + m.targetsDropped = nil + for jobName, targets := range active { + m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...) + m.targetsActive = append(m.targetsActive, targets...) + + } + for jobName, targets := range dropped { + m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...) + m.targetsDropped = append(m.targetsDropped, targets...) } - return droppedTargets } func (m *Manager) reload(t map[string][]*targetgroup.Group) { + m.mtxScrape.Lock() + defer m.mtxScrape.Unlock() + + tDropped := make(map[string][]*Target) + tActive := make(map[string][]*Target) + for tsetName, tgroup := range t { - scrapeConfig, ok := m.scrapeConfigs[tsetName] - if !ok { - level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) - continue - } - - // Scrape pool doesn't exist so start a new one. - existing, ok := m.scrapePools[tsetName] - if !ok { - sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) + var sp *scrapePool + if existing, ok := m.scrapePools[tsetName]; !ok { + scrapeConfig, ok := m.scrapeConfigs[tsetName] + if !ok { + level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName)) + continue + } + sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName)) m.scrapePools[tsetName] = sp - sp.Sync(tgroup) - } else { - existing.Sync(tgroup) + sp = existing } + tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup) } + m.targetsUpdate(tActive, tDropped) } diff --git a/scrape/scrape.go b/scrape/scrape.go index 1c3adfb72..609ff058e 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -245,8 +245,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) { } // Sync converts target groups into actual scrape targets and synchronizes -// the currently running scraper with the resulting set. -func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { +// the currently running scraper with the resulting set and returns all scraped and dropped targets. +func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) { start := time.Now() var all []*Target @@ -273,6 +273,15 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) { time.Since(start).Seconds(), ) targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc() + + sp.mtx.RLock() + for _, t := range sp.targets { + tActive = append(tActive, t) + } + tDropped = sp.droppedTargets + sp.mtx.RUnlock() + + return tActive, tDropped } // sync takes a list of potentially duplicated targets, deduplicates them, starts diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 50f4ed505..db2da462f 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -82,8 +82,8 @@ func (e *apiError) Error() string { } type targetRetriever interface { - Targets() []*scrape.Target - DroppedTargets() []*scrape.Target + TargetsActive() []*scrape.Target + TargetsDropped() []*scrape.Target } type alertmanagerRetriever interface { @@ -452,11 +452,12 @@ type TargetDiscovery struct { } func (api *API) targets(r *http.Request) (interface{}, *apiError) { - targets := api.targetRetriever.Targets() - droppedTargets := api.targetRetriever.DroppedTargets() - res := &TargetDiscovery{ActiveTargets: make([]*Target, len(targets)), DroppedTargets: make([]*DroppedTarget, len(droppedTargets))} + tActive := api.targetRetriever.TargetsActive() + tDropped := api.targetRetriever.TargetsDropped() + res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))} + + for i, t := range tActive { - for i, t := range targets { lastErrStr := "" lastErr := t.LastError() if lastErr != nil { @@ -473,12 +474,11 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) { } } - for i, t := range droppedTargets { + for i, t := range tDropped { res.DroppedTargets[i] = &DroppedTarget{ DiscoveredLabels: t.DiscoveredLabels().Map(), } } - return res, nil } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index e4128b1b4..921945b34 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -44,7 +44,7 @@ import ( type testTargetRetriever struct{} -func (t testTargetRetriever) Targets() []*scrape.Target { +func (t testTargetRetriever) TargetsActive() []*scrape.Target { return []*scrape.Target{ scrape.NewTarget( labels.FromMap(map[string]string{ @@ -57,7 +57,7 @@ func (t testTargetRetriever) Targets() []*scrape.Target { ), } } -func (t testTargetRetriever) DroppedTargets() []*scrape.Target { +func (t testTargetRetriever) TargetsDropped() []*scrape.Target { return []*scrape.Target{ scrape.NewTarget( nil, diff --git a/web/web.go b/web/web.go index fe68e62eb..d7e00441b 100644 --- a/web/web.go +++ b/web/web.go @@ -404,7 +404,7 @@ func (h *Handler) Run(ctx context.Context) error { h.options.QueryEngine, h.options.Storage.Querier, func() []*scrape.Target { - return h.options.ScrapeManager.Targets() + return h.options.ScrapeManager.TargetsActive() }, func() []*url.URL { return h.options.Notifier.Alertmanagers() @@ -592,7 +592,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) { func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { var index []string - targets := h.scrapeManager.TargetMap() + targets := h.scrapeManager.TargetsAll() for job := range targets { index = append(index, job) } @@ -610,7 +610,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) { func (h *Handler) targets(w http.ResponseWriter, r *http.Request) { // Bucket targets by job label tps := map[string][]*scrape.Target{} - for _, t := range h.scrapeManager.Targets() { + for _, t := range h.scrapeManager.TargetsActive() { job := t.Labels().Get(model.JobLabel) tps[job] = append(tps[job], t) } From bd44e7fe9823f1e5c69e8cda80a1186edbf57cf6 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 9 Apr 2018 17:44:53 +0200 Subject: [PATCH 13/14] Update vendoring of prometheus/common/route to include data race fix See https://github.com/prometheus/common/pull/125 Signed-off-by: beorn7 --- .../prometheus/common/route/route.go | 28 +++++++++++++------ vendor/vendor.json | 6 ++-- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/vendor/github.com/prometheus/common/route/route.go b/vendor/github.com/prometheus/common/route/route.go index bb4688173..742e57547 100644 --- a/vendor/github.com/prometheus/common/route/route.go +++ b/vendor/github.com/prometheus/common/route/route.go @@ -19,11 +19,12 @@ func WithParam(ctx context.Context, p, v string) context.Context { return context.WithValue(ctx, param(p), v) } -// Router wraps httprouter.Router and adds support for prefixed sub-routers -// and per-request context injections. +// Router wraps httprouter.Router and adds support for prefixed sub-routers, +// per-request context injections and instrumentation. type Router struct { rtr *httprouter.Router prefix string + instrh func(handlerName string, handler http.HandlerFunc) http.HandlerFunc } // New returns a new Router. @@ -33,13 +34,22 @@ func New() *Router { } } +// WithInstrumentation returns a router with instrumentation support. +func (r *Router) WithInstrumentation(instrh func(handlerName string, handler http.HandlerFunc) http.HandlerFunc) *Router { + return &Router{rtr: r.rtr, prefix: r.prefix, instrh: instrh} +} + // WithPrefix returns a router that prefixes all registered routes with prefix. func (r *Router) WithPrefix(prefix string) *Router { - return &Router{rtr: r.rtr, prefix: r.prefix + prefix} + return &Router{rtr: r.rtr, prefix: r.prefix + prefix, instrh: r.instrh} } // handle turns a HandlerFunc into an httprouter.Handle. -func (r *Router) handle(h http.HandlerFunc) httprouter.Handle { +func (r *Router) handle(handlerName string, h http.HandlerFunc) httprouter.Handle { + if r.instrh != nil { + // This needs to be outside the closure to avoid data race when reading and writing to 'h'. + h = r.instrh(handlerName, h) + } return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() @@ -53,27 +63,27 @@ func (r *Router) handle(h http.HandlerFunc) httprouter.Handle { // Get registers a new GET route. func (r *Router) Get(path string, h http.HandlerFunc) { - r.rtr.GET(r.prefix+path, r.handle(h)) + r.rtr.GET(r.prefix+path, r.handle(path, h)) } // Options registers a new OPTIONS route. func (r *Router) Options(path string, h http.HandlerFunc) { - r.rtr.OPTIONS(r.prefix+path, r.handle(h)) + r.rtr.OPTIONS(r.prefix+path, r.handle(path, h)) } // Del registers a new DELETE route. func (r *Router) Del(path string, h http.HandlerFunc) { - r.rtr.DELETE(r.prefix+path, r.handle(h)) + r.rtr.DELETE(r.prefix+path, r.handle(path, h)) } // Put registers a new PUT route. func (r *Router) Put(path string, h http.HandlerFunc) { - r.rtr.PUT(r.prefix+path, r.handle(h)) + r.rtr.PUT(r.prefix+path, r.handle(path, h)) } // Post registers a new POST route. func (r *Router) Post(path string, h http.HandlerFunc) { - r.rtr.POST(r.prefix+path, r.handle(h)) + r.rtr.POST(r.prefix+path, r.handle(path, h)) } // Redirect takes an absolute path and sends an internal HTTP redirect for it, diff --git a/vendor/vendor.json b/vendor/vendor.json index d337ead6a..d3e4d1913 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -782,10 +782,10 @@ "revisionTime": "2017-11-04T09:59:07Z" }, { - "checksumSHA1": "9aDxDuzZt1l7FQJ9qpn2kPcF7NU=", + "checksumSHA1": "9doPk0x0LONG/idxK61JnZYcxBs=", "path": "github.com/prometheus/common/route", - "revision": "e3fb1a1acd7605367a2b378bc2e2f893c05174b7", - "revisionTime": "2017-11-04T09:59:07Z" + "revision": "38c53a9f4bfcd932d1b00bfc65e256a7fba6b37a", + "revisionTime": "2018-03-26T16:04:09Z" }, { "checksumSHA1": "91KYK0SpvkaMJJA2+BcxbVnyRO0=", From dc29dd1c6fcb6fe919dffb0ce158b51421ca0c52 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 10 Apr 2018 00:08:26 +0300 Subject: [PATCH 14/14] add mutex for DiscoveredLabels Signed-off-by: Krasi Georgiev --- scrape/target.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scrape/target.go b/scrape/target.go index 6241d5228..d7a971551 100644 --- a/scrape/target.go +++ b/scrape/target.go @@ -110,6 +110,8 @@ func (t *Target) Labels() labels.Labels { // DiscoveredLabels returns a copy of the target's labels before any processing. func (t *Target) DiscoveredLabels() labels.Labels { + t.mtx.Lock() + defer t.mtx.Unlock() lset := make(labels.Labels, len(t.discoveredLabels)) copy(lset, t.discoveredLabels) return lset @@ -117,6 +119,8 @@ func (t *Target) DiscoveredLabels() labels.Labels { // SetDiscoveredLabels sets new DiscoveredLabels func (t *Target) SetDiscoveredLabels(l labels.Labels) { + t.mtx.Lock() + defer t.mtx.Unlock() t.discoveredLabels = l }