discovery/file: fix flaky tests (#5948)

* discovery/file: fix flaky tests

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* Fix typos

Signed-off-by: Simon Pasquier <spasquie@redhat.com>

* Add copyFileTo() method

Signed-off-by: Simon Pasquier <spasquie@redhat.com>
This commit is contained in:
Simon Pasquier 2019-09-24 14:54:50 +02:00 committed by GitHub
parent 52e0504f83
commit 80bc8553be
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 433 additions and 114 deletions

View file

@ -15,158 +15,462 @@ package file
import (
"context"
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"sync"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/testutil"
)
const testDir = "fixtures"
const defaultWait = time.Second
func TestFileSD(t *testing.T) {
defer os.Remove(filepath.Join(testDir, "_test_valid.yml"))
defer os.Remove(filepath.Join(testDir, "_test_valid.json"))
defer os.Remove(filepath.Join(testDir, "_test_invalid_nil.json"))
defer os.Remove(filepath.Join(testDir, "_test_invalid_nil.yml"))
testFileSD(t, "valid", ".yml", true)
testFileSD(t, "valid", ".json", true)
testFileSD(t, "invalid_nil", ".json", false)
testFileSD(t, "invalid_nil", ".yml", false)
type testRunner struct {
*testing.T
dir string
ch chan []*targetgroup.Group
done, stopped chan struct{}
cancelSD context.CancelFunc
mtx sync.Mutex
tgs map[string]*targetgroup.Group
receivedAt time.Time
}
func testFileSD(t *testing.T, prefix, ext string, expect bool) {
// As interval refreshing is more of a fallback, we only want to test
// whether file watches work as expected.
var conf SDConfig
conf.Files = []string{filepath.Join(testDir, "_*"+ext)}
conf.RefreshInterval = model.Duration(1 * time.Hour)
func newTestRunner(t *testing.T) *testRunner {
t.Helper()
var (
fsd = NewDiscovery(&conf, nil)
ch = make(chan []*targetgroup.Group)
ctx, cancel = context.WithCancel(context.Background())
)
go fsd.Run(ctx, ch)
tmpDir, err := ioutil.TempDir("", "prometheus-file-sd")
testutil.Ok(t, err)
select {
case <-time.After(25 * time.Millisecond):
// Expected.
case tgs := <-ch:
t.Fatalf("Unexpected target groups in file discovery: %s", tgs)
return &testRunner{
T: t,
dir: tmpDir,
ch: make(chan []*targetgroup.Group),
done: make(chan struct{}),
stopped: make(chan struct{}),
tgs: make(map[string]*targetgroup.Group),
}
}
// To avoid empty group struct sent from the discovery caused by invalid fsnotify updates,
// drain the channel until we are ready with the test files.
fileReady := make(chan struct{})
drainReady := make(chan struct{})
// copyFile atomically copies a file to the runner's directory.
func (t *testRunner) copyFile(src string) string {
t.Helper()
return t.copyFileTo(src, filepath.Base(src))
}
// copyFileTo atomically copies a file with a different name to the runner's directory.
func (t *testRunner) copyFileTo(src string, name string) string {
t.Helper()
newf, err := ioutil.TempFile(t.dir, "")
testutil.Ok(t, err)
f, err := os.Open(src)
testutil.Ok(t, err)
_, err = io.Copy(newf, f)
testutil.Ok(t, err)
testutil.Ok(t, f.Close())
dst := filepath.Join(t.dir, name)
err = os.Rename(newf.Name(), dst)
testutil.Ok(t, err)
return dst
}
// writeString writes atomically a string to a file.
func (t *testRunner) writeString(file string, data string) {
t.Helper()
newf, err := ioutil.TempFile(t.dir, "")
testutil.Ok(t, err)
_, err = newf.WriteString(data)
testutil.Ok(t, err)
testutil.Ok(t, newf.Close())
err = os.Rename(newf.Name(), file)
testutil.Ok(t, err)
}
// appendString appends a string to a file.
func (t *testRunner) appendString(file, data string) {
t.Helper()
f, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND, 0)
testutil.Ok(t, err)
defer f.Close()
_, err = f.WriteString(data)
testutil.Ok(t, err)
}
// run starts the file SD and the loop receiving target groups updates.
func (t *testRunner) run(files ...string) {
go func() {
defer close(t.stopped)
for {
select {
case <-ch:
case <-fileReady:
close(drainReady)
case <-t.done:
os.RemoveAll(t.dir)
return
case tgs := <-t.ch:
t.mtx.Lock()
t.receivedAt = time.Now()
for _, tg := range tgs {
t.tgs[tg.Source] = tg
}
t.mtx.Unlock()
}
}
}()
newf, err := os.Create(filepath.Join(testDir, "_test_"+prefix+ext))
if err != nil {
t.Fatal(err)
for i := range files {
files[i] = filepath.Join(t.dir, files[i])
}
defer newf.Close()
ctx, cancel := context.WithCancel(context.Background())
t.cancelSD = cancel
go func() {
NewDiscovery(
&SDConfig{
Files: files,
// Setting a high refresh interval to make sure that the tests only
// rely on file watches.
RefreshInterval: model.Duration(1 * time.Hour),
},
nil,
).Run(ctx, t.ch)
}()
}
f, err := os.Open(filepath.Join(testDir, prefix+ext))
if err != nil {
t.Fatal(err)
func (t *testRunner) stop() {
t.cancelSD()
close(t.done)
<-t.stopped
}
func (t *testRunner) lastReceive() time.Time {
t.mtx.Lock()
defer t.mtx.Unlock()
return t.receivedAt
}
func (t *testRunner) targets() []*targetgroup.Group {
t.mtx.Lock()
defer t.mtx.Unlock()
var (
keys []string
tgs []*targetgroup.Group
)
for k := range t.tgs {
keys = append(keys, k)
}
defer f.Close()
_, err = io.Copy(newf, f)
if err != nil {
t.Fatal(err)
sort.Strings(keys)
for _, k := range keys {
tgs = append(tgs, t.tgs[k])
}
return tgs
}
// Test file is ready so stop draining the discovery channel.
// It contains two target groups.
close(fileReady)
<-drainReady
newf.WriteString(" ") // One last meaningless write to trigger fsnotify and a new loop of the discovery service.
func (t *testRunner) requireUpdate(ref time.Time, expected []*targetgroup.Group) {
t.Helper()
timeout := time.After(15 * time.Second)
retry:
for {
select {
case <-timeout:
if expect {
t.Fatalf("Expected new target group but got none")
} else {
// Invalid type fsd should always break down.
break retry
}
case tgs := <-ch:
if !expect {
t.Fatalf("Unexpected target groups %s, we expected a failure here.", tgs)
case <-time.After(defaultWait):
t.Fatalf("Expected update but got none")
return
case <-time.After(defaultWait / 10):
if ref.Equal(t.lastReceive()) {
// No update received.
break
}
if len(tgs) != 2 {
continue retry // Potentially a partial write, just retry.
// We can receive partial updates so only check the result when the
// expected number of groups is reached.
tgs := t.targets()
if len(tgs) != len(expected) {
t.Logf("skipping update: expected %d targets, got %d", len(expected), len(tgs))
break
}
tg := tgs[0]
if _, ok := tg.Labels["foo"]; !ok {
t.Fatalf("Label not parsed")
t.requireTargetGroups(expected, tgs)
if ref.After(time.Time{}) {
t.Logf("update received after %v", t.lastReceive().Sub(ref))
}
if tg.String() != filepath.Join(testDir, "_test_"+prefix+ext+":0") {
t.Fatalf("Unexpected target group %s", tg)
}
tg = tgs[1]
if tg.String() != filepath.Join(testDir, "_test_"+prefix+ext+":1") {
t.Fatalf("Unexpected target groups %s", tg)
}
break retry
return
}
}
// Based on unknown circumstances, sometimes fsnotify will trigger more events in
// some runs (which might be empty, chains of different operations etc.).
// We have to drain those (as the target manager would) to avoid deadlocking and must
// not try to make sense of it all...
drained := make(chan struct{})
go func() {
for {
select {
case tgs := <-ch:
// Below we will change the file to a bad syntax. Previously extracted target
// groups must not be deleted via sending an empty target group.
if len(tgs[0].Targets) == 0 {
t.Errorf("Unexpected empty target groups received: %s", tgs)
}
case <-time.After(500 * time.Millisecond):
close(drained)
return
}
}
}()
newf, err = os.Create(filepath.Join(testDir, "_test.new"))
if err != nil {
t.Fatal(err)
}
defer os.Remove(newf.Name())
if _, err := newf.Write([]byte("]gibberish\n][")); err != nil {
t.Fatal(err)
}
newf.Close()
os.Rename(newf.Name(), filepath.Join(testDir, "_test_"+prefix+ext))
cancel()
<-drained
}
func (t *testRunner) requireTargetGroups(expected, got []*targetgroup.Group) {
t.Helper()
b1, err := json.Marshal(expected)
if err != nil {
panic(err)
}
b2, err := json.Marshal(got)
if err != nil {
panic(err)
}
testutil.Equals(t, string(b1), string(b2))
}
// validTg() maps to fixtures/valid.{json,yml}.
func validTg(file string) []*targetgroup.Group {
return []*targetgroup.Group{
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9090"),
},
{
model.AddressLabel: model.LabelValue("example.org:443"),
},
},
Labels: model.LabelSet{
model.LabelName("foo"): model.LabelValue("bar"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 0),
},
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("my.domain"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 1),
},
}
}
// valid2Tg() maps to fixtures/valid2.{json,yml}.
func valid2Tg(file string) []*targetgroup.Group {
return []*targetgroup.Group{
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("my.domain"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 0),
},
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9090"),
},
},
Labels: model.LabelSet{
model.LabelName("foo"): model.LabelValue("bar"),
model.LabelName("fred"): model.LabelValue("baz"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 1),
},
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("example.org:443"),
},
},
Labels: model.LabelSet{
model.LabelName("scheme"): model.LabelValue("https"),
fileSDFilepathLabel: model.LabelValue(file),
},
Source: fileSource(file, 2),
},
}
}
func TestInitialUpdate(t *testing.T) {
for _, tc := range []string{
"fixtures/valid.yml",
"fixtures/valid.json",
} {
t.Run(tc, func(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile(tc)
runner.run("*" + filepath.Ext(tc))
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
})
}
}
func TestInvalidFile(t *testing.T) {
for _, tc := range []string{
"fixtures/invalid_nil.yml",
"fixtures/invalid_nil.json",
} {
tc := tc
t.Run(tc, func(t *testing.T) {
t.Parallel()
now := time.Now()
runner := newTestRunner(t)
runner.copyFile(tc)
runner.run("*" + filepath.Ext(tc))
defer runner.stop()
// Verify that we've received nothing.
time.Sleep(defaultWait)
if runner.lastReceive().After(now) {
t.Fatalf("unexpected targets received: %v", runner.targets())
}
})
}
}
func TestNoopFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive an update with the same target groups.
ref := runner.lastReceive()
runner.copyFileTo("fixtures/valid3.yml", "valid.yml")
runner.requireUpdate(ref, validTg(sdFile))
}
func TestFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive an update with the new target groups.
ref := runner.lastReceive()
runner.copyFileTo("fixtures/valid2.yml", "valid.yml")
runner.requireUpdate(ref, valid2Tg(sdFile))
}
func TestInvalidFileUpdate(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
ref := runner.lastReceive()
runner.writeString(sdFile, "]gibberish\n][")
// Verify that we receive nothing or the same targets as before.
time.Sleep(defaultWait)
if runner.lastReceive().After(ref) {
runner.requireTargetGroups(validTg(sdFile), runner.targets())
}
}
func TestUpdateFileWithPartialWrites(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Do a partial write operation.
ref := runner.lastReceive()
runner.writeString(sdFile, "- targets")
time.Sleep(defaultWait)
// Verify that we receive nothing or the same target groups as before.
if runner.lastReceive().After(ref) {
runner.requireTargetGroups(validTg(sdFile), runner.targets())
}
// Verify that we receive the update target groups once the file is a valid YAML payload.
ref = runner.lastReceive()
runner.appendString(sdFile, `: ["localhost:9091"]`)
runner.requireUpdate(ref,
[]*targetgroup.Group{
&targetgroup.Group{
Targets: []model.LabelSet{
{
model.AddressLabel: model.LabelValue("localhost:9091"),
},
},
Labels: model.LabelSet{
fileSDFilepathLabel: model.LabelValue(sdFile),
},
Source: fileSource(sdFile, 0),
},
&targetgroup.Group{
Source: fileSource(sdFile, 1),
},
},
)
}
func TestRemoveFile(t *testing.T) {
t.Parallel()
runner := newTestRunner(t)
sdFile := runner.copyFile("fixtures/valid.yml")
runner.run("*.yml")
defer runner.stop()
// Verify that we receive the initial target groups.
runner.requireUpdate(time.Time{}, validTg(sdFile))
// Verify that we receive the update about the target groups being removed.
ref := runner.lastReceive()
testutil.Ok(t, os.Remove(sdFile))
runner.requireUpdate(
ref,
[]*targetgroup.Group{
&targetgroup.Group{
Source: fileSource(sdFile, 0),
},
&targetgroup.Group{
Source: fileSource(sdFile, 1),
}},
)
}

View file

@ -0,0 +1,8 @@
- targets: ['my.domain']
- targets: ['localhost:9090']
labels:
foo: bar
fred: baz
- targets: ['example.org:443']
labels:
scheme: https

View file

@ -0,0 +1,7 @@
# the YAML structure is identical to valid.yml but the raw data is different.
- targets: ['localhost:9090', 'example.org:443']
labels:
foo: bar
- targets: ['my.domain']