From 4592b77035ce33bd08a290586139f2ba102850f6 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 11 Jan 2019 22:04:09 +0530 Subject: [PATCH] Patch by Krasi (https://github.com/codesome/tsdb/pull/3) Signed-off-by: Ganesh Vernekar --- head_test.go | 116 ++++++++++++++++++++++++--------------------------- 1 file changed, 54 insertions(+), 62 deletions(-) diff --git a/head_test.go b/head_test.go index cfc307f25..2f57cebbf 100644 --- a/head_test.go +++ b/head_test.go @@ -296,37 +296,40 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { } func TestHeadDeleteSimple(t *testing.T) { - numSamples := int64(10) + + buildSmpls := func(s []int64) []sample { + ss := make([]sample, 0, len(s)) + for _, t := range s { + ss = append(ss, sample{t: t, v: float64(t)}) + } + return ss + } + smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) + lblDefault := labels.Label{"a", "b"} cases := []struct { - dranges Intervals - remaint []int64 - remainSampbuf []int64 // Sample buffer that should remain after deletion. + dranges Intervals + smplsExp []sample }{ { - dranges: Intervals{{0, 3}}, - remaint: []int64{4, 5, 6, 7, 8, 9}, - remainSampbuf: []int64{6, 7, 8, 9}, + dranges: Intervals{{0, 3}}, + smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}}, - remaint: []int64{0, 4, 5, 6, 7, 8, 9}, - remainSampbuf: []int64{6, 7, 8, 9}, + dranges: Intervals{{1, 3}}, + smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 7}}, - remaint: []int64{0, 8, 9}, - remainSampbuf: []int64{0, 8, 9}, + dranges: Intervals{{1, 3}, {4, 7}}, + smplsExp: buildSmpls([]int64{0, 8, 9}), }, { - dranges: Intervals{{1, 3}, {4, 700}}, - remaint: []int64{0}, - remainSampbuf: []int64{0}, + dranges: Intervals{{1, 3}, {4, 700}}, + smplsExp: buildSmpls([]int64{0}), }, { // This case is to ensure that labels and symbols are deleted. - dranges: Intervals{{0, 9}}, - remaint: []int64{}, - remainSampbuf: []int64{}, + dranges: Intervals{{0, 9}}, + smplsExp: buildSmpls([]int64{}), }, } @@ -339,53 +342,40 @@ Outer: w, err := wal.New(nil, nil, path.Join(dir, "wal")) testutil.Ok(t, err) - // Samples are deleted from head after calling head.Delete() - // and not just creating tombstones. - // Hence creating new Head for every case. head, err := NewHead(nil, nil, w, 1000) testutil.Ok(t, err) app := head.Appender() - smpls := make([]float64, numSamples) - for i := int64(0); i < numSamples; i++ { - smpls[i] = rand.Float64() - _, err = app.Add(labels.Labels{{"a", "b"}}, i, smpls[i]) + for _, smpl := range smplsAll { + _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) testutil.Ok(t, err) + } testutil.Ok(t, app.Commit()) // Delete the ranges. for _, r := range c.dranges { - testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b"))) - } - - reloadedW, err := wal.New(nil, nil, w.Dir()) - testutil.Ok(t, err) - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) - // Test the head reloaded from the WAL to ensure deleted samples - // are gone even after reloading the wal file. - testutil.Ok(t, err) - testutil.Ok(t, reloadedHead.Init(0)) - - expSamples := make([]sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamples = append(expSamples, sample{ts, smpls[ts]}) + testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) } // Compare the samples for both heads - before and after the reload. + reloadedW, err := wal.New(nil, nil, w.Dir()) // Use a new wal to ensure deleted samples are gone even after a reload. + testutil.Ok(t, err) + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) + testutil.Ok(t, err) + testutil.Ok(t, reloadedHead.Init(0)) for _, h := range []*Head{head, reloadedHead} { indexr, err := h.Index() testutil.Ok(t, err) - // We use emptyTombstoneReader explicitly to get all the samples. - css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher("a", "b")) + // Use an emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) // Getting the actual samples. - actSamples := make([]sample, 0, len(c.remaint)) - if len(expSamples) > 0 { - testutil.Assert(t, css.Next() == true, "") - lbls, chkMetas, intv := css.At() - testutil.Equals(t, labels.Labels{{"a", "b"}}, lbls) + actSamples := make([]sample, 0) + for css.Next() { + lblsAct, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) testutil.Equals(t, 0, len(intv)) chunkr, err := h.Chunks() @@ -401,31 +391,33 @@ Outer: } } - testutil.Assert(t, css.Next() == false, "") testutil.Ok(t, css.Err()) - testutil.Equals(t, expSamples, actSamples) + testutil.Equals(t, c.smplsExp, actSamples) } - expSamplesTemp := make([]Sample, 0, len(c.remaint)) - for _, ts := range c.remaint { - expSamplesTemp = append(expSamplesTemp, sample{ts, smpls[ts]}) - } - expSeriesSet := newMockSeriesSet([]Series{ - newSeries(map[string]string{"a": "b"}, expSamplesTemp), - }) - // Compare the query results for both heads - before and after the reload. + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []Sample { + ss := make([]Sample, 0, len(c.smplsExp)) + for _, s := range c.smplsExp { + ss = append(ss, s) + } + return ss + }(), + ), + }) for _, h := range []*Head{head, reloadedHead} { q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) testutil.Ok(t, err) - actSeriesSet, err := q.Select(labels.NewEqualMatcher("a", "b")) + actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) lns, err := q.LabelNames() testutil.Ok(t, err) - lvs, err := q.LabelValues("a") + lvs, err := q.LabelValues(lblDefault.Name) testutil.Ok(t, err) - if len(expSamples) == 0 { + // When all samples are deleted we expect that no labels should exist either. + if len(c.smplsExp) == 0 { testutil.Equals(t, 0, len(lns)) testutil.Equals(t, 0, len(lvs)) testutil.Assert(t, actSeriesSet.Next() == false, "") @@ -434,8 +426,8 @@ Outer: } else { testutil.Equals(t, 1, len(lns)) testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, "a", lns[0]) - testutil.Equals(t, "b", lvs[0]) + testutil.Equals(t, lblDefault.Name, lns[0]) + testutil.Equals(t, lblDefault.Value, lvs[0]) } for {