Add context argument to DB.Delete (#12834)

Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
Arve Knudsen 2023-09-13 15:43:06 +02:00 committed by GitHub
parent 4419399e4e
commit 6ef9ed0bc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 45 additions and 34 deletions

View file

@ -1461,11 +1461,11 @@ func (s *readyStorage) CleanTombstones() error {
} }
// Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces. // Delete implements the api_v1.TSDBAdminStats and api_v2.TSDBAdmin interfaces.
func (s *readyStorage) Delete(mint, maxt int64, ms ...*labels.Matcher) error { func (s *readyStorage) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
if x := s.get(); x != nil { if x := s.get(); x != nil {
switch db := x.(type) { switch db := x.(type) {
case *tsdb.DB: case *tsdb.DB:
return db.Delete(mint, maxt, ms...) return db.Delete(ctx, mint, maxt, ms...)
case *agent.DB: case *agent.DB:
return agent.ErrUnsupported return agent.ErrUnsupported
default: default:

View file

@ -15,6 +15,7 @@
package tsdb package tsdb
import ( import (
"context"
"encoding/json" "encoding/json"
"io" "io"
"os" "os"
@ -543,7 +544,7 @@ func (r blockChunkReader) Close() error {
} }
// Delete matching series between mint and maxt in the block. // Delete matching series between mint and maxt in the block.
func (pb *Block) Delete(mint, maxt int64, ms ...*labels.Matcher) error { func (pb *Block) Delete(_ context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
pb.mtx.Lock() pb.mtx.Lock()
defer pb.mtx.Unlock() defer pb.mtx.Unlock()

View file

@ -304,7 +304,7 @@ func TestBlockSize(t *testing.T) {
// Delete some series and check the sizes again. // Delete some series and check the sizes again.
{ {
require.NoError(t, blockInit.Delete(1, 10, labels.MustNewMatcher(labels.MatchRegexp, "", ".*"))) require.NoError(t, blockInit.Delete(context.Background(), 1, 10, labels.MustNewMatcher(labels.MatchRegexp, "", ".*")))
expAfterDelete := blockInit.Size() expAfterDelete := blockInit.Size()
require.Greater(t, expAfterDelete, expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit) require.Greater(t, expAfterDelete, expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit)
actAfterDelete, err := fileutil.DirSize(blockDirInit) actAfterDelete, err := fileutil.DirSize(blockDirInit)

View file

@ -2006,7 +2006,7 @@ func rangeForTimestamp(t, width int64) (maxt int64) {
} }
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis. // Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error { func (db *DB) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
db.cmtx.Lock() db.cmtx.Lock()
defer db.cmtx.Unlock() defer db.cmtx.Unlock()
@ -2018,13 +2018,13 @@ func (db *DB) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
for _, b := range db.blocks { for _, b := range db.blocks {
if b.OverlapsClosedInterval(mint, maxt) { if b.OverlapsClosedInterval(mint, maxt) {
g.Go(func(b *Block) func() error { g.Go(func(b *Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) } return func() error { return b.Delete(ctx, mint, maxt, ms...) }
}(b)) }(b))
} }
} }
if db.head.OverlapsClosedInterval(mint, maxt) { if db.head.OverlapsClosedInterval(mint, maxt) {
g.Go(func() error { g.Go(func() error {
return db.head.Delete(mint, maxt, ms...) return db.head.Delete(ctx, mint, maxt, ms...)
}) })
} }

View file

@ -397,7 +397,7 @@ func TestAppendEmptyLabelsIgnored(t *testing.T) {
} }
func TestDeleteSimple(t *testing.T) { func TestDeleteSimple(t *testing.T) {
numSamples := int64(10) const numSamples int64 = 10
cases := []struct { cases := []struct {
Intervals tombstones.Intervals Intervals tombstones.Intervals
@ -446,7 +446,7 @@ Outer:
// TODO(gouthamve): Reset the tombstones somehow. // TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges. // Delete the ranges.
for _, r := range c.Intervals { for _, r := range c.Intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
} }
// Compare the result. // Compare the result.
@ -733,7 +733,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
} }
func TestDB_SnapshotWithDelete(t *testing.T) { func TestDB_SnapshotWithDelete(t *testing.T) {
numSamples := int64(10) const numSamples int64 = 10
db := openTestDB(t, nil, nil) db := openTestDB(t, nil, nil)
defer func() { require.NoError(t, db.Close()) }() defer func() { require.NoError(t, db.Close()) }()
@ -763,7 +763,7 @@ Outer:
// TODO(gouthamve): Reset the tombstones somehow. // TODO(gouthamve): Reset the tombstones somehow.
// Delete the ranges. // Delete the ranges.
for _, r := range c.intervals { for _, r := range c.intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
} }
// create snapshot // create snapshot
@ -1169,7 +1169,7 @@ func testWALReplayRaceOnSamplesLoggedBeforeSeries(t *testing.T, numSamplesBefore
} }
func TestTombstoneClean(t *testing.T) { func TestTombstoneClean(t *testing.T) {
numSamples := int64(10) const numSamples int64 = 10
db := openTestDB(t, nil, nil) db := openTestDB(t, nil, nil)
@ -1207,7 +1207,7 @@ func TestTombstoneClean(t *testing.T) {
defer db.Close() defer db.Close()
for _, r := range c.intervals { for _, r := range c.intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, db.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
} }
// All of the setup for THIS line. // All of the setup for THIS line.
@ -1292,7 +1292,7 @@ func TestTombstoneCleanResultEmptyBlock(t *testing.T) {
// Create tombstones by deleting all samples. // Create tombstones by deleting all samples.
for _, r := range intervals { for _, r := range intervals {
require.NoError(t, db.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, db.Delete(ctx, r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
} }
require.NoError(t, db.CleanTombstones()) require.NoError(t, db.CleanTombstones())
@ -2068,7 +2068,7 @@ func TestNoEmptyBlocks(t *testing.T) {
_, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0) _, err = app.Append(0, defaultLabel, 3+rangeToTriggerCompaction, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact())
require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, 1, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
@ -2111,7 +2111,7 @@ func TestNoEmptyBlocks(t *testing.T) {
_, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0) _, err = app.Append(0, defaultLabel, currentTime+rangeToTriggerCompaction, 0)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, db.head.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.head.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact())
require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here") require.Equal(t, 3, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here")
require.Equal(t, oldBlocks, db.Blocks()) require.Equal(t, oldBlocks, db.Blocks())
@ -2130,7 +2130,7 @@ func TestNoEmptyBlocks(t *testing.T) {
oldBlocks := db.Blocks() oldBlocks := db.Blocks()
require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks. require.NoError(t, db.reloadBlocks()) // Reload the db to register the new blocks.
require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered. require.Equal(t, len(blocks)+len(oldBlocks), len(db.Blocks())) // Ensure all blocks are registered.
require.NoError(t, db.Delete(math.MinInt64, math.MaxInt64, defaultMatcher)) require.NoError(t, db.Delete(ctx, math.MinInt64, math.MaxInt64, defaultMatcher))
require.NoError(t, db.Compact()) require.NoError(t, db.Compact())
require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones") require.Equal(t, 5, int(prom_testutil.ToFloat64(db.compactor.(*LeveledCompactor).metrics.Ran)), "compaction should have been triggered here once for each block that have tombstones")
@ -2268,17 +2268,17 @@ func TestCorrectNumTombstones(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, len(db.blocks)) require.Equal(t, 1, len(db.blocks))
require.NoError(t, db.Delete(0, 1, defaultMatcher)) require.NoError(t, db.Delete(ctx, 0, 1, defaultMatcher))
require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
// {0, 1} and {2, 3} are merged to form 1 tombstone. // {0, 1} and {2, 3} are merged to form 1 tombstone.
require.NoError(t, db.Delete(2, 3, defaultMatcher)) require.NoError(t, db.Delete(ctx, 2, 3, defaultMatcher))
require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones) require.Equal(t, uint64(1), db.blocks[0].meta.Stats.NumTombstones)
require.NoError(t, db.Delete(5, 6, defaultMatcher)) require.NoError(t, db.Delete(ctx, 5, 6, defaultMatcher))
require.Equal(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones) require.Equal(t, uint64(2), db.blocks[0].meta.Stats.NumTombstones)
require.NoError(t, db.Delete(9, 11, defaultMatcher)) require.NoError(t, db.Delete(ctx, 9, 11, defaultMatcher))
require.Equal(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones) require.Equal(t, uint64(3), db.blocks[0].meta.Stats.NumTombstones)
} }
@ -3038,7 +3038,7 @@ func TestCompactHeadWithDeletion(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
err = db.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) err = db.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))
require.NoError(t, err) require.NoError(t, err)
// This recreates the bug. // This recreates the bug.

View file

@ -14,6 +14,7 @@
package tsdb package tsdb
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"math" "math"
@ -1426,7 +1427,7 @@ func (h *RangeHead) String() string {
// Delete all samples in the range of [mint, maxt] for series that satisfy the given // Delete all samples in the range of [mint, maxt] for series that satisfy the given
// label matchers. // label matchers.
func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error { func (h *Head) Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error {
// Do not delete anything beyond the currently valid range. // Do not delete anything beyond the currently valid range.
mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime()) mint, maxt = clampInterval(mint, maxt, h.MinTime(), h.MaxTime())
@ -1439,6 +1440,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
var stones []tombstones.Stone var stones []tombstones.Stone
for p.Next() { for p.Next() {
if err := ctx.Err(); err != nil {
return errors.Wrap(err, "select series")
}
series := h.series.getByID(chunks.HeadSeriesRef(p.At())) series := h.series.getByID(chunks.HeadSeriesRef(p.At()))
if series == nil { if series == nil {
level.Debug(h.logger).Log("msg", "Series not found in Head.Delete") level.Debug(h.logger).Log("msg", "Series not found in Head.Delete")
@ -1458,6 +1463,10 @@ func (h *Head) Delete(mint, maxt int64, ms ...*labels.Matcher) error {
if p.Err() != nil { if p.Err() != nil {
return p.Err() return p.Err()
} }
if ctx.Err() != nil {
return errors.Wrap(err, "select series")
}
if h.wal != nil { if h.wal != nil {
var enc record.Encoder var enc record.Encoder
if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil {

View file

@ -1131,7 +1131,7 @@ func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
require.NoError(t, head.Init(math.MinInt64)) require.NoError(t, head.Init(math.MinInt64))
require.NoError(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) require.NoError(t, head.Delete(context.Background(), 0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1")))
}) })
} }
} }
@ -1203,7 +1203,7 @@ func TestHeadDeleteSimple(t *testing.T) {
// Delete the ranges. // Delete the ranges.
for _, r := range c.dranges { for _, r := range c.dranges {
require.NoError(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) require.NoError(t, head.Delete(context.Background(), r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)))
} }
// Add more samples. // Add more samples.
@ -1285,7 +1285,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
require.NoError(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, hb.Delete(context.Background(), 0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
// Test the series returns no samples. The series is cleared only after compaction. // Test the series returns no samples. The series is cleared only after compaction.
q, err := NewBlockQuerier(hb, 0, 100000) q, err := NewBlockQuerier(hb, 0, 100000)
@ -1332,7 +1332,7 @@ func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, app.Commit()) require.NoError(t, app.Commit())
} }
require.NoError(t, hb.Delete(0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) require.NoError(t, hb.Delete(context.Background(), 0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b")))
require.NoError(t, hb.Truncate(1)) require.NoError(t, hb.Truncate(1))
require.NoError(t, hb.Close()) require.NoError(t, hb.Close())
@ -1464,7 +1464,7 @@ func TestDelete_e2e(t *testing.T) {
} }
for _, del := range dels { for _, del := range dels {
for _, r := range del.drange { for _, r := range del.drange {
require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) require.NoError(t, hb.Delete(context.Background(), r.Mint, r.Maxt, del.ms...))
} }
matched := labels.Slice{} matched := labels.Slice{}
for _, l := range lbls { for _, l := range lbls {

View file

@ -2699,6 +2699,7 @@ func BenchmarkHeadQuerier(b *testing.B) {
// This is a regression test for the case where gauge histograms were not handled by // This is a regression test for the case where gauge histograms were not handled by
// populateWithDelChunkSeriesIterator correctly. // populateWithDelChunkSeriesIterator correctly.
func TestQueryWithDeletedHistograms(t *testing.T) { func TestQueryWithDeletedHistograms(t *testing.T) {
ctx := context.Background()
testcases := map[string]func(int) (*histogram.Histogram, *histogram.FloatHistogram){ testcases := map[string]func(int) (*histogram.Histogram, *histogram.FloatHistogram){
"intCounter": func(i int) (*histogram.Histogram, *histogram.FloatHistogram) { "intCounter": func(i int) (*histogram.Histogram, *histogram.FloatHistogram) {
return tsdbutil.GenerateTestHistogram(i), nil return tsdbutil.GenerateTestHistogram(i), nil
@ -2743,7 +2744,7 @@ func TestQueryWithDeletedHistograms(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// Delete the last 20. // Delete the last 20.
err = db.Delete(80, 100, matcher) err = db.Delete(ctx, 80, 100, matcher)
require.NoError(t, err) require.NoError(t, err)
chunkQuerier, err := db.ChunkQuerier(0, 100) chunkQuerier, err := db.ChunkQuerier(0, 100)

View file

@ -170,7 +170,7 @@ type apiFunc func(r *http.Request) apiFuncResult
// TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics. // TSDBAdminStats defines the tsdb interfaces used by the v1 API for admin operations as well as statistics.
type TSDBAdminStats interface { type TSDBAdminStats interface {
CleanTombstones() error CleanTombstones() error
Delete(mint, maxt int64, ms ...*labels.Matcher) error Delete(ctx context.Context, mint, maxt int64, ms ...*labels.Matcher) error
Snapshot(dir string, withHead bool) error Snapshot(dir string, withHead bool) error
Stats(statsByLabelName string, limit int) (*tsdb.Stats, error) Stats(statsByLabelName string, limit int) (*tsdb.Stats, error)
WALReplayStatus() (tsdb.WALReplayStatus, error) WALReplayStatus() (tsdb.WALReplayStatus, error)
@ -1632,7 +1632,7 @@ func (api *API) deleteSeries(r *http.Request) apiFuncResult {
if err != nil { if err != nil {
return invalidParamError(err, "match[]") return invalidParamError(err, "match[]")
} }
if err := api.db.Delete(timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil { if err := api.db.Delete(r.Context(), timestamp.FromTime(start), timestamp.FromTime(end), matchers...); err != nil {
return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil} return apiFuncResult{nil, &apiError{errorInternal, err}, nil, nil}
} }
} }

View file

@ -2767,9 +2767,9 @@ type fakeDB struct {
err error err error
} }
func (f *fakeDB) CleanTombstones() error { return f.err } func (f *fakeDB) CleanTombstones() error { return f.err }
func (f *fakeDB) Delete(int64, int64, ...*labels.Matcher) error { return f.err } func (f *fakeDB) Delete(context.Context, int64, int64, ...*labels.Matcher) error { return f.err }
func (f *fakeDB) Snapshot(string, bool) error { return f.err } func (f *fakeDB) Snapshot(string, bool) error { return f.err }
func (f *fakeDB) Stats(statsByLabelName string, limit int) (_ *tsdb.Stats, retErr error) { func (f *fakeDB) Stats(statsByLabelName string, limit int) (_ *tsdb.Stats, retErr error) {
dbDir, err := os.MkdirTemp("", "tsdb-api-ready") dbDir, err := os.MkdirTemp("", "tsdb-api-ready")
if err != nil { if err != nil {