Merge branch 'master' of github.com:prometheus/prometheus

This commit is contained in:
Fabian Reinartz 2015-09-11 21:11:43 +02:00
commit bae9816834
6 changed files with 66 additions and 26 deletions

View file

@ -206,7 +206,6 @@ func reloadConfig(filename string, rls ...Reloadable) (success bool) {
conf, err := config.LoadFile(filename) conf, err := config.LoadFile(filename)
if err != nil { if err != nil {
log.Errorf("Couldn't load configuration (-config.file=%s): %v", filename, err) log.Errorf("Couldn't load configuration (-config.file=%s): %v", filename, err)
log.Errorf("Note: The configuration format has changed with version 0.14. Please see the documentation (http://prometheus.io/docs/operating/configuration/) and the provided configuration migration tool (https://github.com/prometheus/migrate).")
return false return false
} }
success = true success = true

View file

@ -115,6 +115,7 @@ func (kd *Discovery) Sources() []string {
log.Errorf("Unable to list Kubernetes nodes: %s", err) log.Errorf("Unable to list Kubernetes nodes: %s", err)
return []string{} return []string{}
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status) log.Errorf("Unable to list Kubernetes nodes. Unexpected response: %d %s", res.StatusCode, res.Status)
return []string{} return []string{}
@ -143,6 +144,7 @@ func (kd *Discovery) Sources() []string {
log.Errorf("Unable to list Kubernetes services: %s", err) log.Errorf("Unable to list Kubernetes services: %s", err)
return []string{} return []string{}
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status) log.Errorf("Unable to list Kubernetes services. Unexpected response: %d %s", res.StatusCode, res.Status)
return []string{} return []string{}
@ -351,6 +353,7 @@ func (kd *Discovery) watchNodes(events chan interface{}, done <-chan struct{}, r
log.Errorf("Failed to watch nodes: %s", err) log.Errorf("Failed to watch nodes: %s", err)
return return
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch nodes: %d", res.StatusCode) log.Errorf("Failed to watch nodes: %d", res.StatusCode)
return return
@ -392,6 +395,7 @@ func (kd *Discovery) watchServices(events chan interface{}, done <-chan struct{}
log.Errorf("Failed to watch services: %s", err) log.Errorf("Failed to watch services: %s", err)
return return
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch services: %d", res.StatusCode) log.Errorf("Failed to watch services: %d", res.StatusCode)
return return
@ -462,6 +466,7 @@ func (kd *Discovery) addService(service *Service) *config.TargetGroup {
log.Errorf("Error getting service endpoints: %s", err) log.Errorf("Error getting service endpoints: %s", err)
return nil return nil
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Failed to get service endpoints: %d", res.StatusCode) log.Errorf("Failed to get service endpoints: %d", res.StatusCode)
return nil return nil
@ -534,6 +539,7 @@ func (kd *Discovery) watchServiceEndpoints(events chan interface{}, done <-chan
log.Errorf("Failed to watch service endpoints: %s", err) log.Errorf("Failed to watch service endpoints: %s", err)
return return
} }
defer res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
log.Errorf("Failed to watch service endpoints: %d", res.StatusCode) log.Errorf("Failed to watch service endpoints: %d", res.StatusCode)
return return

View file

@ -437,6 +437,8 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned HTTP status %s", resp.Status) return fmt.Errorf("server returned HTTP status %s", resp.Status)
} }
@ -445,7 +447,6 @@ func (t *Target) scrape(appender storage.SampleAppender) (err error) {
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close()
sdec := expfmt.SampleDecoder{ sdec := expfmt.SampleDecoder{
Dec: dec, Dec: dec,

View file

@ -1059,7 +1059,7 @@ func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) {
// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and // indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and
// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is // fingerprintsModifiedBefore. The index of fingerprints to archived metrics is
// not affected by this removal. (In fact, never call this method for an // not affected by this removal. (In fact, never call this method for an
// archived metric. To purge an archived metric, call purgeArchivedFingerprint.) // archived metric. To purge an archived metric, call purgeArchivedMetric.)
// If the queue is full, this method blocks until the metric can be queued. This // If the queue is full, this method blocks until the metric can be queued. This
// method is goroutine-safe. // method is goroutine-safe.
func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) { func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) {

View file

@ -518,12 +518,13 @@ func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprin
s.fpToSeries.del(fp) s.fpToSeries.del(fp)
s.numSeries.Dec() s.numSeries.Dec()
s.persistence.unindexMetric(fp, series.metric) s.persistence.unindexMetric(fp, series.metric)
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
} else if err := s.persistence.purgeArchivedMetric(fp); err != nil { } else if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging metric with fingerprint %v: %v", fp, err) log.Errorf("Error purging metric with fingerprint %v: %v", fp, err)
} }
// Attempt to delete series file in any case.
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
s.fpLocker.Unlock(fp) s.fpLocker.Unlock(fp)
} }

View file

@ -17,6 +17,7 @@ import (
"fmt" "fmt"
"hash/fnv" "hash/fnv"
"math/rand" "math/rand"
"os"
"reflect" "reflect"
"testing" "testing"
"testing/quick" "testing/quick"
@ -438,16 +439,29 @@ func TestDropMetrics(t *testing.T) {
s, closer := NewTestStorage(t, 1) s, closer := NewTestStorage(t, 1)
defer closer.Close() defer closer.Close()
chunkFileExists := func(fp model.Fingerprint) (bool, error) {
f, err := s.persistence.openChunkFileForReading(fp)
if err == nil {
f.Close()
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"} m1 := model.Metric{model.MetricNameLabel: "test", "n1": "v1"}
m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"} m2 := model.Metric{model.MetricNameLabel: "test", "n1": "v2"}
m3 := model.Metric{model.MetricNameLabel: "test", "n1": "v3"}
N := 120000 N := 120000
for j, m := range []model.Metric{m1, m2} { for j, m := range []model.Metric{m1, m2, m3} {
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
smpl := &model.Sample{ smpl := &model.Sample{
Metric: m, Metric: m,
Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals. Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 millisecond intervals.
Value: model.SampleValue(j), Value: model.SampleValue(j),
} }
s.Append(smpl) s.Append(smpl)
@ -455,19 +469,24 @@ func TestDropMetrics(t *testing.T) {
} }
s.WaitForIndexing() s.WaitForIndexing()
// Archive m3, but first maintain it so that at least something is written to disk.
fpToBeArchived := m3.FastFingerprint()
s.maintainMemorySeries(fpToBeArchived, 0)
s.fpLocker.Lock(fpToBeArchived)
s.fpToSeries.del(fpToBeArchived)
if err := s.persistence.archiveMetric(
fpToBeArchived, m3, 0, insertStart.Add(time.Duration(N-1)*time.Millisecond),
); err != nil {
t.Error(err)
}
s.fpLocker.Unlock(fpToBeArchived)
fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"}) fps := s.fingerprintsForLabelPairs(model.LabelPair{Name: model.MetricNameLabel, Value: "test"})
if len(fps) != 2 { if len(fps) != 3 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps)) t.Errorf("unexpected number of fingerprints: %d", len(fps))
} }
var fpList model.Fingerprints fpList := model.Fingerprints{m1.FastFingerprint(), m2.FastFingerprint(), fpToBeArchived}
for fp := range fps {
it := s.NewIterator(fp)
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
fpList = append(fpList, fp)
}
s.DropMetricsForFingerprints(fpList[0]) s.DropMetricsForFingerprints(fpList[0])
s.WaitForIndexing() s.WaitForIndexing()
@ -475,17 +494,24 @@ func TestDropMetrics(t *testing.T) {
fps2 := s.fingerprintsForLabelPairs(model.LabelPair{ fps2 := s.fingerprintsForLabelPairs(model.LabelPair{
Name: model.MetricNameLabel, Value: "test", Name: model.MetricNameLabel, Value: "test",
}) })
if len(fps2) != 1 { if len(fps2) != 2 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps2)) t.Errorf("unexpected number of fingerprints: %d", len(fps2))
} }
it := s.NewIterator(fpList[0]) it := s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
}
exists, err := chunkFileExists(fpList[2])
if err != nil {
t.Fatal(err)
}
if !exists {
t.Errorf("chunk file does not exist for fp=%v", fpList[2])
} }
s.DropMetricsForFingerprints(fpList...) s.DropMetricsForFingerprints(fpList...)
@ -495,16 +521,23 @@ func TestDropMetrics(t *testing.T) {
Name: model.MetricNameLabel, Value: "test", Name: model.MetricNameLabel, Value: "test",
}) })
if len(fps3) != 0 { if len(fps3) != 0 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps3)) t.Errorf("unexpected number of fingerprints: %d", len(fps3))
} }
it = s.NewIterator(fpList[0]) it = s.NewIterator(fpList[0])
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
} }
it = s.NewIterator(fpList[1]) it = s.NewIterator(fpList[1])
if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 { if vals := it.RangeValues(metric.Interval{OldestInclusive: insertStart, NewestInclusive: now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals)) t.Errorf("unexpected number of samples: %d", len(vals))
}
exists, err = chunkFileExists(fpList[2])
if err != nil {
t.Fatal(err)
}
if exists {
t.Errorf("chunk file still exists for fp=%v", fpList[2])
} }
} }
@ -533,7 +566,7 @@ func TestLoop(t *testing.T) {
} }
storage := NewMemorySeriesStorage(o) storage := NewMemorySeriesStorage(o)
if err := storage.Start(); err != nil { if err := storage.Start(); err != nil {
t.Fatalf("Error starting storage: %s", err) t.Errorf("Error starting storage: %s", err)
} }
for _, s := range samples { for _, s := range samples {
storage.Append(s) storage.Append(s)