Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
Ganesh Vernekar 2019-01-11 22:04:09 +05:30
parent d7e505db34
commit 4592b77035
No known key found for this signature in database
GPG key ID: 0241A11211763456

View file

@ -296,37 +296,40 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
}
func TestHeadDeleteSimple(t *testing.T) {
numSamples := int64(10)
buildSmpls := func(s []int64) []sample {
ss := make([]sample, 0, len(s))
for _, t := range s {
ss = append(ss, sample{t: t, v: float64(t)})
}
return ss
}
smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
lblDefault := labels.Label{"a", "b"}
cases := []struct {
dranges Intervals
remaint []int64
remainSampbuf []int64 // Sample buffer that should remain after deletion.
dranges Intervals
smplsExp []sample
}{
{
dranges: Intervals{{0, 3}},
remaint: []int64{4, 5, 6, 7, 8, 9},
remainSampbuf: []int64{6, 7, 8, 9},
dranges: Intervals{{0, 3}},
smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}),
},
{
dranges: Intervals{{1, 3}},
remaint: []int64{0, 4, 5, 6, 7, 8, 9},
remainSampbuf: []int64{6, 7, 8, 9},
dranges: Intervals{{1, 3}},
smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}),
},
{
dranges: Intervals{{1, 3}, {4, 7}},
remaint: []int64{0, 8, 9},
remainSampbuf: []int64{0, 8, 9},
dranges: Intervals{{1, 3}, {4, 7}},
smplsExp: buildSmpls([]int64{0, 8, 9}),
},
{
dranges: Intervals{{1, 3}, {4, 700}},
remaint: []int64{0},
remainSampbuf: []int64{0},
dranges: Intervals{{1, 3}, {4, 700}},
smplsExp: buildSmpls([]int64{0}),
},
{ // This case is to ensure that labels and symbols are deleted.
dranges: Intervals{{0, 9}},
remaint: []int64{},
remainSampbuf: []int64{},
dranges: Intervals{{0, 9}},
smplsExp: buildSmpls([]int64{}),
},
}
@ -339,53 +342,40 @@ Outer:
w, err := wal.New(nil, nil, path.Join(dir, "wal"))
testutil.Ok(t, err)
// Samples are deleted from head after calling head.Delete()
// and not just creating tombstones.
// Hence creating new Head for every case.
head, err := NewHead(nil, nil, w, 1000)
testutil.Ok(t, err)
app := head.Appender()
smpls := make([]float64, numSamples)
for i := int64(0); i < numSamples; i++ {
smpls[i] = rand.Float64()
_, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
for _, smpl := range smplsAll {
_, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v)
testutil.Ok(t, err)
}
testutil.Ok(t, app.Commit())
// Delete the ranges.
for _, r := range c.dranges {
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
}
reloadedW, err := wal.New(nil, nil, w.Dir())
testutil.Ok(t, err)
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
// Test the head reloaded from the WAL to ensure deleted samples
// are gone even after reloading the wal file.
testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0))
expSamples := make([]sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamples = append(expSamples, sample{ts, smpls[ts]})
testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)))
}
// Compare the samples for both heads - before and after the reload.
reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload.
testutil.Ok(t, err)
reloadedHead, err := NewHead(nil, nil, reloadedW, 1000)
testutil.Ok(t, err)
testutil.Ok(t, reloadedHead.Init(0))
for _, h := range []*Head{head, reloadedHead} {
indexr, err := h.Index()
testutil.Ok(t, err)
// We use emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b"))
// Use an emptyTombstoneReader explicitly to get all the samples.
css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
// Getting the actual samples.
actSamples := make([]sample, 0, len(c.remaint))
if len(expSamples) > 0 {
testutil.Assert(t, css.Next() == true, "")
lbls, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls)
actSamples := make([]sample, 0)
for css.Next() {
lblsAct, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
testutil.Equals(t, 0, len(intv))
chunkr, err := h.Chunks()
@ -401,31 +391,33 @@ Outer:
}
}
testutil.Assert(t, css.Next() == false, "")
testutil.Ok(t, css.Err())
testutil.Equals(t, expSamples, actSamples)
testutil.Equals(t, c.smplsExp, actSamples)
}
expSamplesTemp := make([]Sample, 0, len(c.remaint))
for _, ts := range c.remaint {
expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]})
}
expSeriesSet := newMockSeriesSet([]Series{
newSeries(map[string]string{"a": "b"}, expSamplesTemp),
})
// Compare the query results for both heads - before and after the reload.
expSeriesSet := newMockSeriesSet([]Series{
newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []Sample {
ss := make([]Sample, 0, len(c.smplsExp))
for _, s := range c.smplsExp {
ss = append(ss, s)
}
return ss
}(),
),
})
for _, h := range []*Head{head, reloadedHead} {
q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime())
testutil.Ok(t, err)
actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b"))
actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))
testutil.Ok(t, err)
lns, err := q.LabelNames()
testutil.Ok(t, err)
lvs, err := q.LabelValues("a")
lvs, err := q.LabelValues(lblDefault.Name)
testutil.Ok(t, err)
if len(expSamples) == 0 {
// When all samples are deleted we expect that no labels should exist either.
if len(c.smplsExp) == 0 {
testutil.Equals(t, 0, len(lns))
testutil.Equals(t, 0, len(lvs))
testutil.Assert(t, actSeriesSet.Next() == false, "")
@ -434,8 +426,8 @@ Outer:
} else {
testutil.Equals(t, 1, len(lns))
testutil.Equals(t, 1, len(lvs))
testutil.Equals(t, "a", lns[0])
testutil.Equals(t, "b", lvs[0])
testutil.Equals(t, lblDefault.Name, lns[0])
testutil.Equals(t, lblDefault.Value, lvs[0])
}
for {