Add tests for range-limited label matching

While doing so, improve getSeriesForRange.
This commit is contained in:
beorn7 2016-03-09 20:27:50 +01:00
parent 47e3c90f9b
commit 9445c7053d
2 changed files with 89 additions and 7 deletions

View file

@ -520,15 +520,12 @@ func (s *memorySeriesStorage) metricForFingerprint(
fp model.Fingerprint,
from, through model.Time,
) (metric.Metric, bool) {
// Lock FP so that no (un-)archiving will happen during lookup.
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
series, ok := s.fpToSeries.get(fp)
if ok {
if series.lastTime.Before(from) || series.savedFirstTime.After(through) {
if series.lastTime.Before(from) || series.firstTime().After(through) {
return metric.Metric{}, false
}
// Wrap the returned metric in a copy-on-write (COW) metric here because
@ -539,13 +536,15 @@ func (s *memorySeriesStorage) metricForFingerprint(
}
// From here on, we are only concerned with archived metrics.
// If the high watermark of archived series is before 'from', we are done.
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
if watermark < from {
return metric.Metric{}, false
}
if from.After(model.Earliest) || through.Before(model.Latest) {
// The range lookup is relatively cheap, so let's do it first.
ok, first, last := s.persistence.hasArchivedMetric(fp)
if !ok || first.After(through) || last.Before(from) {
// The range lookup is relatively cheap, so let's do it first if
// we have a chance the archived metric is not in the range.
has, first, last := s.persistence.hasArchivedMetric(fp)
if !has || first.After(through) || last.Before(from) {
return metric.Metric{}, false
}
}
@ -725,14 +724,25 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
}
// getSeriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant.
//
// The caller must have locked the fp.
func (s *memorySeriesStorage) getSeriesForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) *memorySeries {
series, ok := s.fpToSeries.get(fp)
if ok {
if series.lastTime.Before(from) || series.firstTime().After(through) {
return nil
}
return series
}
watermark := model.Time(atomic.LoadInt64((*int64)(&s.archiveHighWatermark)))
if watermark < from {
return nil
}
has, first, last := s.persistence.hasArchivedMetric(fp)
if !has {
s.invalidPreloadRequestsCount.Inc()

View file

@ -34,6 +34,7 @@ func TestMatches(t *testing.T) {
storage, closer := NewTestStorage(t, 1)
defer closer.Close()
storage.archiveHighWatermark = 90
samples := make([]*model.Sample, 100)
fingerprints := make(model.Fingerprints, 100)
@ -56,6 +57,20 @@ func TestMatches(t *testing.T) {
}
storage.WaitForIndexing()
// Archive every tenth metric.
for i, fp := range fingerprints {
if i%10 != 0 {
continue
}
s, ok := storage.fpToSeries.get(fp)
if !ok {
t.Fatal("could not retrieve series for fp", fp)
}
storage.fpLocker.Lock(fp)
storage.persistence.archiveMetric(fp, s.metric, s.firstTime(), s.lastTime)
storage.fpLocker.Unlock(fp)
}
newMatcher := func(matchType metric.MatchType, name model.LabelName, value model.LabelValue) *metric.LabelMatcher {
lm, err := metric.NewLabelMatcher(matchType, name, value)
if err != nil {
@ -197,6 +212,56 @@ func TestMatches(t *testing.T) {
t.Errorf("expected fingerprint %s for %q not in result", fp1, mt.matchers)
}
}
// Smoketest for from/through.
if len(storage.MetricsForLabelMatchers(
model.Earliest, -10000,
mt.matchers...,
)) > 0 {
t.Error("expected no matches with 'through' older than any sample")
}
if len(storage.MetricsForLabelMatchers(
10000, model.Latest,
mt.matchers...,
)) > 0 {
t.Error("expected no matches with 'from' newer than any sample")
}
// Now the tricky one, cut out something from the middle.
var (
from model.Time = 25
through model.Time = 75
)
res = storage.MetricsForLabelMatchers(
from, through,
mt.matchers...,
)
expected := model.Fingerprints{}
for _, fp := range mt.expected {
i := 0
for ; fingerprints[i] != fp && i < len(fingerprints); i++ {
}
if i == len(fingerprints) {
t.Fatal("expected fingerprint does not exist")
}
if !model.Time(i).Before(from) && !model.Time(i).After(through) {
expected = append(expected, fp)
}
}
if len(expected) != len(res) {
t.Errorf("expected %d range-limited matches for %q, found %d", len(expected), mt.matchers, len(res))
}
for fp1 := range res {
found := false
for _, fp2 := range expected {
if fp1 == fp2 {
found = true
break
}
}
if !found {
t.Errorf("expected fingerprint %s for %q not in range-limited result", fp1, mt.matchers)
}
}
}
}
@ -1195,6 +1260,9 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
t.Fatal("archived")
}
// Set archiveHighWatermark to a low value so that we can see it increase.
s.archiveHighWatermark = 42
// This will archive again, but must not drop it completely, despite the
// memorySeries being empty.
s.maintainMemorySeries(fp, 10000)
@ -1202,6 +1270,10 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
if !archived {
t.Fatal("series purged completely")
}
// archiveHighWatermark must have been set by maintainMemorySeries.
if want, got := model.Time(19998), s.archiveHighWatermark; want != got {
t.Errorf("want archiveHighWatermark %v, got %v", want, got)
}
}
func TestEvictAndPurgeSeriesChunkType0(t *testing.T) {