mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
A copy is being assigned newChunk not the original
Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
parent
a00d700d48
commit
1627a47640
32
compact.go
32
compact.go
|
@ -525,22 +525,24 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||||
|
|
||||||
if len(dranges) > 0 {
|
if len(dranges) > 0 {
|
||||||
// Re-encode the chunk to not have deleted values.
|
// Re-encode the chunk to not have deleted values.
|
||||||
for _, chk := range chks {
|
for i, chk := range chks {
|
||||||
if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
|
if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
|
||||||
newChunk := chunks.NewXORChunk()
|
continue
|
||||||
app, err := newChunk.Appender()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
|
|
||||||
for it.Next() {
|
|
||||||
ts, v := it.At()
|
|
||||||
app.Append(ts, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
chk.Chunk = newChunk
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
newChunk := chunks.NewXORChunk()
|
||||||
|
app, err := newChunk.Appender()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
|
||||||
|
for it.Next() {
|
||||||
|
ts, v := it.At()
|
||||||
|
app.Append(ts, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
chks[i].Chunk = newChunk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := chunkw.WriteChunks(chks...); err != nil {
|
if err := chunkw.WriteChunks(chks...); err != nil {
|
||||||
|
|
2
db.go
2
db.go
|
@ -612,7 +612,7 @@ func (db *DB) Snapshot(dir string) error {
|
||||||
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
level.Info(db.logger).Log("msg", "snapshotting block", "block", b)
|
||||||
|
|
||||||
if err := b.Snapshot(dir); err != nil {
|
if err := b.Snapshot(dir); err != nil {
|
||||||
return errors.Wrap(err, "error snapshotting headblock")
|
return errors.Wrapf(err, "error snapshotting block: %s", b.Dir())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
||||||
|
|
84
db_test.go
84
db_test.go
|
@ -366,6 +366,90 @@ func TestDB_Snapshot(t *testing.T) {
|
||||||
require.Equal(t, sum, 1000.0)
|
require.Equal(t, sum, 1000.0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDB_SnapshotWithDelete(t *testing.T) {
|
||||||
|
numSamples := int64(10)
|
||||||
|
|
||||||
|
db, close := openTestDB(t, nil)
|
||||||
|
defer close()
|
||||||
|
|
||||||
|
app := db.Appender()
|
||||||
|
|
||||||
|
smpls := make([]float64, numSamples)
|
||||||
|
for i := int64(0); i < numSamples; i++ {
|
||||||
|
smpls[i] = rand.Float64()
|
||||||
|
app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, app.Commit())
|
||||||
|
cases := []struct {
|
||||||
|
intervals Intervals
|
||||||
|
remaint []int64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
intervals: Intervals{{1, 3}, {4, 7}},
|
||||||
|
remaint: []int64{0, 8, 9},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
Outer:
|
||||||
|
for _, c := range cases {
|
||||||
|
// TODO(gouthamve): Reset the tombstones somehow.
|
||||||
|
// Delete the ranges.
|
||||||
|
for _, r := range c.intervals {
|
||||||
|
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
|
||||||
|
}
|
||||||
|
|
||||||
|
// create snapshot
|
||||||
|
snap, err := ioutil.TempDir("", "snap")
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NoError(t, db.Snapshot(snap))
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
|
||||||
|
// reopen DB from snapshot
|
||||||
|
db, err = Open(snap, nil, nil, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Compare the result.
|
||||||
|
q, err := db.Querier(0, numSamples)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
res := q.Select(labels.NewEqualMatcher("a", "b"))
|
||||||
|
|
||||||
|
expSamples := make([]sample, 0, len(c.remaint))
|
||||||
|
for _, ts := range c.remaint {
|
||||||
|
expSamples = append(expSamples, sample{ts, smpls[ts]})
|
||||||
|
}
|
||||||
|
|
||||||
|
expss := newListSeriesSet([]Series{
|
||||||
|
newSeries(map[string]string{"a": "b"}, expSamples),
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(expSamples) == 0 {
|
||||||
|
require.False(t, res.Next())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
eok, rok := expss.Next(), res.Next()
|
||||||
|
require.Equal(t, eok, rok, "next")
|
||||||
|
|
||||||
|
if !eok {
|
||||||
|
continue Outer
|
||||||
|
}
|
||||||
|
sexp := expss.At()
|
||||||
|
sres := res.At()
|
||||||
|
|
||||||
|
require.Equal(t, sexp.Labels(), sres.Labels(), "labels")
|
||||||
|
|
||||||
|
smplExp, errExp := expandSeriesIterator(sexp.Iterator())
|
||||||
|
smplRes, errRes := expandSeriesIterator(sres.Iterator())
|
||||||
|
|
||||||
|
require.Equal(t, errExp, errRes, "samples error")
|
||||||
|
require.Equal(t, smplExp, smplRes, "samples")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDB_e2e(t *testing.T) {
|
func TestDB_e2e(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
numDatapoints = 1000
|
numDatapoints = 1000
|
||||||
|
|
Loading…
Reference in a new issue