From 306d127d7b1120486c77bd3da0c7917e1677b3d7 Mon Sep 17 00:00:00 2001 From: Carrie Edwards Date: Wed, 31 Jul 2024 11:39:11 -0700 Subject: [PATCH] Add additional tests for OOO native histograms Signed-off-by: Carrie Edwards Co-authored by: Fiona Liao : --- tsdb/db_test.go | 1033 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 938 insertions(+), 95 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 45c49825d3..75e2eef75c 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3995,6 +3995,299 @@ func newTestDB(t *testing.T) *DB { } func TestOOOWALWrite(t *testing.T) { + minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } + s1, s2 := labels.FromStrings("l", "v1"), labels.FromStrings("l", "v2") + scenarios := map[string]struct { + appendSample func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) + expectedOOORecords []interface{} + expectedInORecords []interface{} + }{ + "float": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.Append(0, l, minutes(mins), float64(mins)) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(40), V: 40}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(42), V: 42}, + }, + + []record.RefSample{ + {Ref: 2, T: minutes(45), V: 45}, + {Ref: 1, T: minutes(35), V: 35}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(36), V: 36}, + {Ref: 1, T: minutes(37), V: 37}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 58}, + }, + []record.RefSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), V: 50}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 107}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(51), V: 51}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 156}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(52), V: 52}, + {Ref: 2, T: minutes(53), V: 53}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(60), V: 60}, + {Ref: 2, T: minutes(60), V: 60}, + }, + []record.RefSample{ + {Ref: 1, T: minutes(40), V: 40}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(42), V: 42}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(45), V: 45}, + {Ref: 1, T: minutes(35), V: 35}, + {Ref: 1, T: minutes(36), V: 36}, + {Ref: 1, T: minutes(37), V: 37}, + }, + []record.RefSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(65), V: 65}, + }, + []record.RefSample{ + {Ref: 2, T: minutes(50), V: 50}, + {Ref: 2, T: minutes(51), V: 51}, + {Ref: 2, T: minutes(52), V: 52}, + {Ref: 2, T: minutes(53), V: 53}, + }, + }, + }, + "integer histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), tsdbutil.GenerateTestHistogram(int(mins)), nil) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestHistogram(42)}, + }, + + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 89}, + }, + []record.RefHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 172}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 257}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(60), H: tsdbutil.GenerateTestHistogram(60)}, + {Ref: 2, T: minutes(60), H: tsdbutil.GenerateTestHistogram(60)}, + }, + []record.RefHistogramSample{ + {Ref: 1, T: minutes(40), H: tsdbutil.GenerateTestHistogram(40)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(42), H: tsdbutil.GenerateTestHistogram(42)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(45), H: tsdbutil.GenerateTestHistogram(45)}, + {Ref: 1, T: minutes(35), H: tsdbutil.GenerateTestHistogram(35)}, + {Ref: 1, T: minutes(36), H: tsdbutil.GenerateTestHistogram(36)}, + {Ref: 1, T: minutes(37), H: tsdbutil.GenerateTestHistogram(37)}, + }, + []record.RefHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(65), H: tsdbutil.GenerateTestHistogram(65)}, + }, + []record.RefHistogramSample{ + {Ref: 2, T: minutes(50), H: tsdbutil.GenerateTestHistogram(50)}, + {Ref: 2, T: minutes(51), H: tsdbutil.GenerateTestHistogram(51)}, + {Ref: 2, T: minutes(52), H: tsdbutil.GenerateTestHistogram(52)}, + {Ref: 2, T: minutes(53), H: tsdbutil.GenerateTestHistogram(53)}, + }, + }, + }, + "float histogram": { + appendSample: func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error) { + seriesRef, err := app.AppendHistogram(0, l, minutes(mins), nil, tsdbutil.GenerateTestFloatHistogram(int(mins))) + require.NoError(t, err) + return seriesRef, nil + }, + expectedOOORecords: []interface{}{ + // The MmapRef in this are not hand calculated, and instead taken from the test run. + // What is important here is the order of records, and that MmapRef increases for each record. + []record.RefMmapMarker{ + {Ref: 1}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestFloatHistogram(40)}, + }, + + []record.RefMmapMarker{ + {Ref: 2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestFloatHistogram(42)}, + }, + + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestFloatHistogram(35)}, + }, + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 8}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestFloatHistogram(37)}, + }, + + []record.RefMmapMarker{ // 3rd sample, hence m-mapped. + {Ref: 1, MmapRef: 0x100000000 + 177}, + }, + []record.RefFloatHistogramSample{ // Does not contain the in-order sample here. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + }, + + // Single commit but multiple OOO records. + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 348}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestFloatHistogram(51)}, + }, + []record.RefMmapMarker{ + {Ref: 2, MmapRef: 0x100000000 + 521}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestFloatHistogram(53)}, + }, + }, + expectedInORecords: []interface{}{ + []record.RefSeries{ + {Ref: 1, Labels: s1}, + {Ref: 2, Labels: s2}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(60), FH: tsdbutil.GenerateTestFloatHistogram(60)}, + {Ref: 2, T: minutes(60), FH: tsdbutil.GenerateTestFloatHistogram(60)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 1, T: minutes(40), FH: tsdbutil.GenerateTestFloatHistogram(40)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(42), FH: tsdbutil.GenerateTestFloatHistogram(42)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(45), FH: tsdbutil.GenerateTestFloatHistogram(45)}, + {Ref: 1, T: minutes(35), FH: tsdbutil.GenerateTestFloatHistogram(35)}, + {Ref: 1, T: minutes(36), FH: tsdbutil.GenerateTestFloatHistogram(36)}, + {Ref: 1, T: minutes(37), FH: tsdbutil.GenerateTestFloatHistogram(37)}, + }, + []record.RefFloatHistogramSample{ // Contains both in-order and ooo sample. + {Ref: 1, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(65), FH: tsdbutil.GenerateTestFloatHistogram(65)}, + }, + []record.RefFloatHistogramSample{ + {Ref: 2, T: minutes(50), FH: tsdbutil.GenerateTestFloatHistogram(50)}, + {Ref: 2, T: minutes(51), FH: tsdbutil.GenerateTestFloatHistogram(51)}, + {Ref: 2, T: minutes(52), FH: tsdbutil.GenerateTestFloatHistogram(52)}, + {Ref: 2, T: minutes(53), FH: tsdbutil.GenerateTestFloatHistogram(53)}, + }, + }, + }, + } + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + testOOOWALWrite(t, scenario.appendSample, scenario.expectedOOORecords, scenario.expectedInORecords) + }) + } +} + +func testOOOWALWrite(t *testing.T, + appendSample func(app storage.Appender, l labels.Labels, mins int64) (storage.SeriesRef, error), + expectedOOORecords []interface{}, + expectedInORecords []interface{}, +) { dir := t.TempDir() opts := DefaultOptions() @@ -4003,18 +4296,14 @@ func TestOOOWALWrite(t *testing.T) { db, err := Open(dir, nil, nil, opts, nil) require.NoError(t, err) + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() t.Cleanup(func() { require.NoError(t, db.Close()) }) s1, s2 := labels.FromStrings("l", "v1"), labels.FromStrings("l", "v2") - minutes := func(m int64) int64 { return m * time.Minute.Milliseconds() } - - appendSample := func(app storage.Appender, l labels.Labels, mins int64) { - _, err = app.Append(0, l, minutes(mins), float64(mins)) - require.NoError(t, err) - } // Ingest sample at 1h. app := db.Appender(context.Background()) @@ -4054,92 +4343,6 @@ func TestOOOWALWrite(t *testing.T) { appendSample(app, s2, 53) require.NoError(t, app.Commit()) - // The MmapRef in this are not hand calculated, and instead taken from the test run. - // What is important here is the order of records, and that MmapRef increases for each record. - oooRecords := []interface{}{ - []record.RefMmapMarker{ - {Ref: 1}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(40), V: 40}, - }, - - []record.RefMmapMarker{ - {Ref: 2}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(42), V: 42}, - }, - - []record.RefSample{ - {Ref: 2, T: minutes(45), V: 45}, - {Ref: 1, T: minutes(35), V: 35}, - }, - []record.RefMmapMarker{ // 3rd sample, hence m-mapped. - {Ref: 1, MmapRef: 4294967304}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(36), V: 36}, - {Ref: 1, T: minutes(37), V: 37}, - }, - - []record.RefMmapMarker{ // 3rd sample, hence m-mapped. - {Ref: 1, MmapRef: 4294967354}, - }, - []record.RefSample{ // Does not contain the in-order sample here. - {Ref: 1, T: minutes(50), V: 50}, - }, - - // Single commit but multiple OOO records. - []record.RefMmapMarker{ - {Ref: 2, MmapRef: 4294967403}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(51), V: 51}, - }, - []record.RefMmapMarker{ - {Ref: 2, MmapRef: 4294967452}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(52), V: 52}, - {Ref: 2, T: minutes(53), V: 53}, - }, - } - - inOrderRecords := []interface{}{ - []record.RefSeries{ - {Ref: 1, Labels: s1}, - {Ref: 2, Labels: s2}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(60), V: 60}, - {Ref: 2, T: minutes(60), V: 60}, - }, - []record.RefSample{ - {Ref: 1, T: minutes(40), V: 40}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(42), V: 42}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(45), V: 45}, - {Ref: 1, T: minutes(35), V: 35}, - {Ref: 1, T: minutes(36), V: 36}, - {Ref: 1, T: minutes(37), V: 37}, - }, - []record.RefSample{ // Contains both in-order and ooo sample. - {Ref: 1, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(65), V: 65}, - }, - []record.RefSample{ - {Ref: 2, T: minutes(50), V: 50}, - {Ref: 2, T: minutes(51), V: 51}, - {Ref: 2, T: minutes(52), V: 52}, - {Ref: 2, T: minutes(53), V: 53}, - }, - } - getRecords := func(walDir string) []interface{} { sr, err := wlog.NewSegmentsReader(walDir) require.NoError(t, err) @@ -4150,7 +4353,7 @@ func TestOOOWALWrite(t *testing.T) { var ( records []interface{} - dec record.Decoder = record.NewDecoder(labels.NewSymbolTable()) + dec record.Decoder ) for r.Next() { rec := r.Record() @@ -4167,6 +4370,14 @@ func TestOOOWALWrite(t *testing.T) { markers, err := dec.MmapMarkers(rec, nil) require.NoError(t, err) records = append(records, markers) + case record.HistogramSamples: + histogramSamples, err := dec.HistogramSamples(rec, nil) + require.NoError(t, err) + records = append(records, histogramSamples) + case record.FloatHistogramSamples: + floatHistogramSamples, err := dec.FloatHistogramSamples(rec, nil) + require.NoError(t, err) + records = append(records, floatHistogramSamples) default: t.Fatalf("got a WAL record that is not series or samples: %v", typ) } @@ -4177,11 +4388,11 @@ func TestOOOWALWrite(t *testing.T) { // The normal WAL. actRecs := getRecords(path.Join(dir, "wal")) - testutil.RequireEqual(t, inOrderRecords, actRecs) + require.Equal(t, expectedInORecords, actRecs) // The WBL. actRecs = getRecords(path.Join(dir, wlog.WblDirName)) - testutil.RequireEqual(t, oooRecords, actRecs) + require.Equal(t, expectedOOORecords, actRecs) } // Tests https://github.com/prometheus/prometheus/issues/10291#issuecomment-1044373110. @@ -4494,6 +4705,160 @@ func TestMetadataAssertInMemoryData(t *testing.T) { require.Equal(t, *reopenDB.head.series.getByHash(s4.Hash(), s4).meta, m4) } +// TestMultipleEncodingsCommitOrder mainly serves to demonstrate when happens when committing a batch of samples for the +// same series when there are multiple encodings. Commit() will process all float samples before histogram samples. This +// means that if histograms are appended before floats, the histograms could be marked as OOO when they are committed. +// While possible, this shouldn't happen very often - you need the same series to be ingested as both a float and a +// histogram in a single write request. +func TestMultipleEncodingsCommitOrder(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 24 * time.Hour.Milliseconds() + + series1 := labels.FromStrings("foo", "bar1") + + db := openTestDB(t, opts, nil) + db.DisableCompactions() + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + defer func() { + require.NoError(t, db.Close()) + }() + + addSample := func(app storage.Appender, ts int64, valType chunkenc.ValueType) chunks.Sample { + if valType == chunkenc.ValFloat { + _, err := app.Append(0, labels.FromStrings("foo", "bar1"), ts, float64(ts)) + require.NoError(t, err) + return sample{t: ts, f: float64(ts)} + } + if valType == chunkenc.ValHistogram { + h := tsdbutil.GenerateTestHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, h, nil) + require.NoError(t, err) + return sample{t: ts, h: h} + } + fh := tsdbutil.GenerateTestFloatHistogram(int(ts)) + _, err := app.AppendHistogram(0, labels.FromStrings("foo", "bar1"), ts, nil, fh) + require.NoError(t, err) + return sample{t: ts, fh: fh} + } + + verifySamples := func(minT, maxT int64, expSamples []chunks.Sample, oooCount int) { + requireEqualOOOSamples(t, oooCount, db) + + // Verify samples querier. + querier, err := db.Querier(minT, maxT) + require.NoError(t, err) + defer querier.Close() + + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.Len(t, seriesSet, 1) + gotSamples := seriesSet[series1.String()] + requireEqualSamples(t, series1.String(), expSamples, gotSamples, true) + + // Verify chunks querier. + chunkQuerier, err := db.ChunkQuerier(minT, maxT) + require.NoError(t, err) + defer chunkQuerier.Close() + + chks := queryChunks(t, chunkQuerier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar1")) + require.NotNil(t, chks[series1.String()]) + require.Len(t, chks, 1) + var gotChunkSamples []chunks.Sample + for _, chunk := range chks[series1.String()] { + it := chunk.Chunk.Iterator(nil) + smpls, err := storage.ExpandSamples(it, newSample) + require.NoError(t, err) + gotChunkSamples = append(gotChunkSamples, smpls...) + require.NoError(t, it.Err()) + } + requireEqualSamples(t, series1.String(), expSamples, gotChunkSamples, true) + } + + var expSamples []chunks.Sample + + // Append samples with different encoding types and then commit them at once. + app := db.Appender(context.Background()) + + for i := 100; i < 105; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 110; i < 120; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as OOO as their timestamps are less than the max timestamp for float samples in the + // same batch. + for i := 120; i < 130; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order as their timestamps are greater than the max timestamp for float + // samples in the same batch. + for i := 140; i < 150; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + // These samples will be marked as in-order, even though they're appended after the float histograms from ts 140-150 + // because float samples are processed first and these samples are in-order wrt to the float samples in the batch. + for i := 130; i < 135; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + + require.NoError(t, app.Commit()) + + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 20 because the histograms from 120 - 130 and float histograms from 120 - 130 are detected as OOO. + verifySamples(100, 150, expSamples, 20) + + // Append and commit some in-order histograms by themselves. + app = db.Appender(context.Background()) + for i := 150; i < 160; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // oooCount remains at 20 as no new OOO samples have been added. + verifySamples(100, 160, expSamples, 20) + + // Append and commit samples for all encoding types. This time all samples will be treated as OOO because samples + // with newer timestamps have already been committed. + app = db.Appender(context.Background()) + for i := 50; i < 55; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 60; i < 70; i++ { + s := addSample(app, int64(i), chunkenc.ValHistogram) + expSamples = append(expSamples, s) + } + for i := 70; i < 75; i++ { + s := addSample(app, int64(i), chunkenc.ValFloat) + expSamples = append(expSamples, s) + } + for i := 80; i < 90; i++ { + s := addSample(app, int64(i), chunkenc.ValFloatHistogram) + expSamples = append(expSamples, s) + } + require.NoError(t, app.Commit()) + + // Sort samples again because OOO samples have been added. + sort.Slice(expSamples, func(i, j int) bool { + return expSamples[i].T() < expSamples[j].T() + }) + + // oooCount = 50 as we've added 30 more OOO samples. + verifySamples(50, 160, expSamples, 50) +} + // TODO(codesome): test more samples incoming once compaction has started. To verify new samples after the start // // are not included in this compaction. @@ -6001,6 +6366,378 @@ func testWBLAndMmapReplay(t *testing.T, scenario sampleTypeScenario) { }) } +func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { + for _, floatHistogram := range []bool{false, true} { + dir := t.TempDir() + ctx := context.Background() + + opts := DefaultOptions() + opts.OutOfOrderCapMax = 30 + opts.OutOfOrderTimeWindow = 500 * time.Minute.Milliseconds() + + db, err := Open(dir, nil, nil, opts, nil) + require.NoError(t, err) + db.DisableCompactions() // We want to manually call it. + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + series1 := labels.FromStrings("foo", "bar1") + series2 := labels.FromStrings("foo", "bar2") + + var series1ExpSamplesPreCompact, series2ExpSamplesPreCompact, series1ExpSamplesPostCompact, series2ExpSamplesPostCompact []chunks.Sample + + addSample := func(ts int64, l labels.Labels, val int, hint histogram.CounterResetHint) sample { + app := db.Appender(context.Background()) + tsMs := ts * time.Minute.Milliseconds() + if floatHistogram { + h := tsdbutil.GenerateTestFloatHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, nil, h) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, fh: h.Copy()} + } + + h := tsdbutil.GenerateTestHistogram(val) + h.CounterResetHint = hint + _, err = app.AppendHistogram(0, l, tsMs, h, nil) + require.NoError(t, err) + require.NoError(t, app.Commit()) + return sample{t: tsMs, h: h.Copy()} + } + + // Add an in-order sample to each series. + s := addSample(520, series1, 1000000, histogram.UnknownCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + + s = addSample(520, series2, 1000000, histogram.UnknownCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + + // Verify that the in-memory ooo chunk is empty. + checkEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Nil(t, ms.ooo) + } + + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + // Add samples for series1. There are three head chunks that will be created: + // Chunk 1 - Samples between 100 - 440. One explicit counter reset at ts 250. + // Chunk 2 - Samples between 105 - 395. Overlaps with Chunk 1. One detected counter reset at ts 165. + // Chunk 3 - Samples between 480 - 509. All within one block boundary. One detected counter reset at 490. + + // Chunk 1. + // First add 10 samples. + for i := 100; i < 200; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Before compaction, all the samples have UnknownCounterReset even though they've been added to the same + // chunk. This is because they overlap with the samples from chunk two and when merging two chunks on read, + // the header is set as unknown when the next sample is not in the same chunk as the previous one. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, samples from multiple mmapped chunks will be merged, so there won't be any overlapping + // chunks. Therefore, most samples will have the NotCounterReset header. + // 100 is the first sample in the first chunk in the blocks, so is still set to UnknownCounterReset. + // 120 is a block boundary - after compaction, 120 will be the first sample in a chunk, so is still set to + // UnknownCounterReset. + if i > 100 && i != 120 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Explicit counter reset - the counter reset header is set to CounterReset but the value is higher + // than for the previous timestamp. Explicit counter reset headers are actually ignored though, so when reading + // the sample back you actually get unknown/not counter reset. This is as the chainSampleIterator ignores + // existing headers and sets the header as UnknownCounterReset if the next sample is not in the same chunk as + // the previous one, and counter resets always create a new chunk. + // This case has been added to document what's happening, though it might not be the ideal behavior. + s = addSample(250, series1, 100000+250, histogram.CounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, copyWithCounterReset(s, histogram.UnknownCounterReset)) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + + // Add 19 more samples to complete a chunk. + for i := 260; i < 450; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // The samples with timestamp less than 410 overlap with the samples from chunk 2, so before compaction, + // they're all UnknownCounterReset. Samples greater than or equal to 410 don't overlap with other chunks + // so they're always detected as NotCounterReset pre and post compaction/ + if i >= 410 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // + // 360 is a block boundary, so after compaction its header is still UnknownCounterReset. + if i != 360 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 2. + // Add six OOO samples. + for i := 105; i < 165; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples overlap with chunk 1 so before compaction all headers are UnknownCounterReset. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.NotCounterReset)) + } + + // Add sample that will be detected as a counter reset. + s = addSample(165, series1, 100000, histogram.UnknownCounterReset) + // Before compaction, sample has an UnknownCounterReset header due to the chainSampleIterator. + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // After compaction, the sample's counter reset is properly detected. + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, copyWithCounterReset(s, histogram.CounterReset)) + + // Add 23 more samples to complete a chunk. + for i := 175; i < 405; i += 10 { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // Samples between 205-255 overlap with chunk 1 so before compaction those samples will have the + // UnknownCounterReset header. + if i >= 205 && i < 255 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 245 is the first sample >= the block boundary at 240, so it's still UnknownCounterReset after compaction. + if i != 245 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } else { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Chunk 3. + for i := 480; i < 490; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + // No overlapping samples in other chunks, so all other samples will already be detected as NotCounterReset + // before compaction. + if i > 480 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + // 480 is block boundary. + if i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(490), series1, 100000, histogram.UnknownCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 491; i < 510; i++ { + s = addSample(int64(i), series1, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series1ExpSamplesPreCompact = append(series1ExpSamplesPreCompact, s) + series1ExpSamplesPostCompact = append(series1ExpSamplesPostCompact, s) + } + + // Add samples for series2 - one chunk with one detected counter reset at 300. + for i := 200; i < 300; i += 10 { + s = addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + if i > 200 { + s = copyWithCounterReset(s, histogram.NotCounterReset) + } + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + if i == 240 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + // Counter reset. + s = addSample(int64(300), series2, 100000, histogram.UnknownCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + // Add some more samples after the counter reset. + for i := 310; i < 500; i += 10 { + s := addSample(int64(i), series2, 100000+i, histogram.UnknownCounterReset) + s = copyWithCounterReset(s, histogram.NotCounterReset) + series2ExpSamplesPreCompact = append(series2ExpSamplesPreCompact, s) + // 360 and 480 are block boundaries. + if i == 360 || i == 480 { + s = copyWithCounterReset(s, histogram.UnknownCounterReset) + } + series2ExpSamplesPostCompact = append(series2ExpSamplesPostCompact, s) + } + + // Sort samples (as OOO samples not added in time-order). + sort.Slice(series1ExpSamplesPreCompact, func(i, j int) bool { + return series1ExpSamplesPreCompact[i].T() < series1ExpSamplesPreCompact[j].T() + }) + sort.Slice(series1ExpSamplesPostCompact, func(i, j int) bool { + return series1ExpSamplesPostCompact[i].T() < series1ExpSamplesPostCompact[j].T() + }) + sort.Slice(series2ExpSamplesPreCompact, func(i, j int) bool { + return series2ExpSamplesPreCompact[i].T() < series2ExpSamplesPreCompact[j].T() + }) + sort.Slice(series2ExpSamplesPostCompact, func(i, j int) bool { + return series2ExpSamplesPostCompact[i].T() < series2ExpSamplesPostCompact[j].T() + }) + + verifyDBSamples := func(s1Samples, s2Samples []chunks.Sample) { + expRes := map[string][]chunks.Sample{ + series1.String(): s1Samples, + series2.String(): s2Samples, + } + + q, err := db.Querier(math.MinInt64, math.MaxInt64) + require.NoError(t, err) + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSeries(t, expRes, actRes, false) + } + + // Verify DB samples before compaction. + verifyDBSamples(series1ExpSamplesPreCompact, series2ExpSamplesPreCompact) + + // Verify that the in-memory ooo chunk is not empty. + checkNonEmptyOOOChunk := func(lbls labels.Labels) { + ms, created, err := db.head.getOrCreate(lbls.Hash(), lbls) + require.NoError(t, err) + require.False(t, created) + require.Greater(t, ms.ooo.oooHeadChunk.chunk.NumSamples(), 0) + } + + checkNonEmptyOOOChunk(series1) + checkNonEmptyOOOChunk(series2) + + // No blocks before compaction. + require.Empty(t, db.Blocks()) + + // There is a 0th WBL file. + require.NoError(t, db.head.wbl.Sync()) // syncing to make sure wbl is flushed in windows + files, err := os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000000", files[0].Name()) + f, err := files[0].Info() + require.NoError(t, err) + require.Greater(t, f.Size(), int64(100)) + + // OOO compaction happens here. + require.NoError(t, db.CompactOOOHead(ctx)) + + // Check that blocks are created after compaction. + require.Len(t, db.Blocks(), 5) + + // Check samples after compaction + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // 0th WBL file will be deleted and 1st will be the only present. + files, err = os.ReadDir(db.head.wbl.Dir()) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "00000001", files[0].Name()) + f, err = files[0].Info() + require.NoError(t, err) + require.Equal(t, int64(0), f.Size()) + + // OOO stuff should not be present in the Head now. + checkEmptyOOOChunk(series1) + checkEmptyOOOChunk(series2) + + verifyBlockSamples := func(block *Block, fromMins, toMins int64) { + var series1Samples, series2Samples []chunks.Sample + + for _, s := range series1ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // samples should be sorted, so break out of loop when we reach a timestamp that's too big + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series1Samples = append(series1Samples, s) + } + } + for _, s := range series2ExpSamplesPostCompact { + if s.T() >= fromMins*time.Minute.Milliseconds() { + // Samples should be sorted, so break out of loop when we reach a timestamp that's too big. + if s.T() > toMins*time.Minute.Milliseconds() { + break + } + series2Samples = append(series2Samples, s) + } + } + + expRes := map[string][]chunks.Sample{} + if len(series1Samples) != 0 { + expRes[series1.String()] = series1Samples + } + if len(series2Samples) != 0 { + expRes[series2.String()] = series2Samples + } + + q, err := NewBlockQuerier(block, math.MinInt64, math.MaxInt64) + require.NoError(t, err) + + actRes := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", "bar.*")) + requireEqualSeries(t, expRes, actRes, false) + } + + // Checking for expected data in the blocks. + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 509) + + // There should be a single m-map file. + mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot) + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + + // Compact the in-order head and expect another block. + // Since this is a forced compaction, this block is not aligned with 2h. + err = db.CompactHead(NewRangeHead(db.head, 500*time.Minute.Milliseconds(), 550*time.Minute.Milliseconds())) + require.NoError(t, err) + require.Len(t, db.Blocks(), 6) + verifyBlockSamples(db.Blocks()[5], 520, 520) + + // Blocks created out of normal and OOO head now. But not merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + + // The compaction also clears out the old m-map files. Including + // the file that has ooo chunks. + files, err = os.ReadDir(mmapDir) + require.NoError(t, err) + require.Len(t, files, 1) + require.Equal(t, "000001", files[0].Name()) + + // This will merge overlapping block. + require.NoError(t, db.Compact(ctx)) + + require.Len(t, db.Blocks(), 5) + verifyBlockSamples(db.Blocks()[0], 100, 119) + verifyBlockSamples(db.Blocks()[1], 120, 239) + verifyBlockSamples(db.Blocks()[2], 240, 359) + verifyBlockSamples(db.Blocks()[3], 360, 479) + verifyBlockSamples(db.Blocks()[4], 480, 520) // Merged block. + + // Final state. Blocks from normal and OOO head are merged. + verifyDBSamples(series1ExpSamplesPostCompact, series2ExpSamplesPostCompact) + } +} + +func copyWithCounterReset(s sample, hint histogram.CounterResetHint) sample { + if s.h != nil { + h := s.h.Copy() + h.CounterResetHint = hint + return sample{t: s.t, h: h} + } + + h := s.fh.Copy() + h.CounterResetHint = hint + return sample{t: s.t, fh: h} +} + func TestOOOCompactionFailure(t *testing.T) { for name, scenario := range sampleTypeScenarios { t.Run(name, func(t *testing.T) { @@ -7456,6 +8193,112 @@ func TestNativeHistogramFlag(t *testing.T) { }, act) } +func TestOOONativeHistogramFlag(t *testing.T) { + h := &histogram.Histogram{ + Count: 9, + ZeroCount: 4, + ZeroThreshold: 0.001, + Sum: 35.5, + Schema: 1, + PositiveSpans: []histogram.Span{ + {Offset: 0, Length: 2}, + {Offset: 2, Length: 2}, + }, + PositiveBuckets: []int64{1, 1, -1, 0}, + } + + l := labels.FromStrings("foo", "bar") + + t.Run("Test OOO native histograms if OOO is disabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 0 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Enable Native Histograms and OOO Native Histogram ingestion + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 100, h, nil) + require.NoError(t, err) + + _, err = app.AppendHistogram(0, l, 50, h, nil) + require.NoError(t, err) // The OOO sample is not detected until it is committed, so no error is returned + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{ + l.String(): {sample{t: 100, h: h}}, + }, act) + }) + t.Run("Test OOO Native Histograms if Native Histograms are disabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 100 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Disable Native Histograms and enable OOO Native Histogram ingestion + db.DisableNativeHistograms() + db.EnableOOONativeHistograms() + + // Attempt to add an in-order sample + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 200, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + // Attempt to add an OOO sample + _, err = app.AppendHistogram(0, l, 100, h, nil) + require.Equal(t, storage.ErrNativeHistogramsDisabled, err) + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{}, act) + }) + t.Run("Test OOO native histograms when flag is enabled", func(t *testing.T) { + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 100 + db := openTestDB(t, opts, []int64{100}) + defer func() { + require.NoError(t, db.Close()) + }() + + // Enable Native Histograms and OOO Native Histogram ingestion + db.EnableNativeHistograms() + db.EnableOOONativeHistograms() + + // Add in-order samples + app := db.Appender(context.Background()) + _, err := app.AppendHistogram(0, l, 200, h, nil) + require.NoError(t, err) + + // Add OOO samples + _, err = app.AppendHistogram(0, l, 100, h, nil) + require.NoError(t, err) + _, err = app.AppendHistogram(0, l, 150, h, nil) + require.NoError(t, err) + + require.NoError(t, app.Commit()) + + q, err := db.Querier(math.MinInt, math.MaxInt64) + require.NoError(t, err) + act := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + requireEqualSeries(t, map[string][]chunks.Sample{ + l.String(): {sample{t: 100, h: h}, sample{t: 150, h: h}, sample{t: 200, h: h}}, + }, act, true) + }) +} + // compareSeries essentially replaces `require.Equal(t, expected, actual) in // situations where the actual series might contain more counter reset hints // "unknown" than the expected series. This can easily happen for long series