Enforce chunks ordering when writing index. (#8085)

Document conditions on chunks. Add check on chunk time ordering.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
This commit is contained in:
Peter Štibraný 2024-02-04 16:31:49 +01:00 committed by GitHub
parent 98c4889029
commit e2b9cfeeeb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 82 additions and 7 deletions

View file

@ -82,6 +82,10 @@ Each series section is aligned to 16 bytes. The ID for a series is the `offset/1
Every series entry first holds its number of labels, followed by tuples of symbol table references that contain the label name and value. The label pairs are lexicographically sorted. Every series entry first holds its number of labels, followed by tuples of symbol table references that contain the label name and value. The label pairs are lexicographically sorted.
After the labels, the number of indexed chunks is encoded, followed by a sequence of metadata entries containing the chunks minimum (`mint`) and maximum (`maxt`) timestamp and a reference to its position in the chunk file. The `mint` is the time of the first sample and `maxt` is the time of the last sample in the chunk. Holding the time range data in the index allows dropping chunks irrelevant to queried time ranges without accessing them directly. After the labels, the number of indexed chunks is encoded, followed by a sequence of metadata entries containing the chunks minimum (`mint`) and maximum (`maxt`) timestamp and a reference to its position in the chunk file. The `mint` is the time of the first sample and `maxt` is the time of the last sample in the chunk. Holding the time range data in the index allows dropping chunks irrelevant to queried time ranges without accessing them directly.
Chunk references within single series must be increasing, and chunk references for `series_(N+1)` must be higher than chunk references for `series_N`.
This property guarantees that chunks that belong to the same series are grouped together in the segment files.
Furthermore chunk `mint` must be less or equal than `maxt`, and subsequent chunks within single series must have increasing `mint` and `maxt` and not overlap.
`mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one. `mint` of the first chunk is stored, it's `maxt` is stored as a delta and the `mint` and `maxt` are encoded as deltas to the previous time for subsequent chunks. Similarly, the reference of the first chunk is stored and the next ref is stored as a delta to the previous one.
``` ```

View file

@ -146,8 +146,11 @@ type Writer struct {
labelNames map[string]uint64 // Label names, and their usage. labelNames map[string]uint64 // Label names, and their usage.
// Hold last series to validate that clients insert new series in order. // Hold last series to validate that clients insert new series in order.
lastSeries labels.Labels lastSeries labels.Labels
lastRef storage.SeriesRef lastSeriesRef storage.SeriesRef
// Hold last added chunk reference to make sure that chunks are ordered properly.
lastChunkRef chunks.ChunkRef
crc32 hash.Hash crc32 hash.Hash
@ -433,9 +436,27 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
return fmt.Errorf("out-of-order series added with label set %q", lset) return fmt.Errorf("out-of-order series added with label set %q", lset)
} }
if ref < w.lastRef && !w.lastSeries.IsEmpty() { if ref < w.lastSeriesRef && !w.lastSeries.IsEmpty() {
return fmt.Errorf("series with reference greater than %d already added", ref) return fmt.Errorf("series with reference greater than %d already added", ref)
} }
lastChunkRef := w.lastChunkRef
lastMaxT := int64(0)
for ix, c := range chunks {
if c.Ref < lastChunkRef {
return fmt.Errorf("unsorted chunk reference: %d, previous: %d", c.Ref, lastChunkRef)
}
lastChunkRef = c.Ref
if ix > 0 && c.MinTime <= lastMaxT {
return fmt.Errorf("chunk minT %d is not higher than previous chunk maxT %d", c.MinTime, lastMaxT)
}
if c.MaxTime < c.MinTime {
return fmt.Errorf("chunk maxT %d is less than minT %d", c.MaxTime, c.MinTime)
}
lastMaxT = c.MaxTime
}
// We add padding to 16 bytes to increase the addressable space we get through 4 byte // We add padding to 16 bytes to increase the addressable space we get through 4 byte
// series references. // series references.
if err := w.addPadding(seriesByteAlign); err != nil { if err := w.addPadding(seriesByteAlign); err != nil {
@ -510,7 +531,8 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
} }
w.lastSeries.CopyFrom(lset) w.lastSeries.CopyFrom(lset)
w.lastRef = ref w.lastSeriesRef = ref
w.lastChunkRef = lastChunkRef
return nil return nil
} }

View file

@ -18,7 +18,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"math/rand"
"os" "os"
"path/filepath" "path/filepath"
"sort" "sort"
@ -407,15 +406,17 @@ func TestPersistence_index_e2e(t *testing.T) {
var input indexWriterSeriesSlice var input indexWriterSeriesSlice
ref := uint64(0)
// Generate ChunkMetas for every label set. // Generate ChunkMetas for every label set.
for i, lset := range lbls { for i, lset := range lbls {
var metas []chunks.Meta var metas []chunks.Meta
for j := 0; j <= (i % 20); j++ { for j := 0; j <= (i % 20); j++ {
ref++
metas = append(metas, chunks.Meta{ metas = append(metas, chunks.Meta{
MinTime: int64(j * 10000), MinTime: int64(j * 10000),
MaxTime: int64((j + 1) * 10000), MaxTime: int64((j+1)*10000) - 1,
Ref: chunks.ChunkRef(rand.Uint64()), Ref: chunks.ChunkRef(ref),
Chunk: chunkenc.NewXORChunk(), Chunk: chunkenc.NewXORChunk(),
}) })
} }
@ -670,3 +671,51 @@ func TestDecoder_Postings_WrongInput(t *testing.T) {
_, _, err := (&Decoder{}).Postings([]byte("the cake is a lie")) _, _, err := (&Decoder{}).Postings([]byte("the cake is a lie"))
require.Error(t, err) require.Error(t, err)
} }
func TestChunksRefOrdering(t *testing.T) {
dir := t.TempDir()
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
require.NoError(t, err)
require.NoError(t, idx.AddSymbol("1"))
require.NoError(t, idx.AddSymbol("2"))
require.NoError(t, idx.AddSymbol("__name__"))
c50 := chunks.Meta{Ref: 50}
c100 := chunks.Meta{Ref: 100}
c200 := chunks.Meta{Ref: 200}
require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"), c100))
require.EqualError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c50), "unsorted chunk reference: 50, previous: 100")
require.NoError(t, idx.AddSeries(2, labels.FromStrings("__name__", "2"), c200))
require.NoError(t, idx.Close())
}
func TestChunksTimeOrdering(t *testing.T) {
dir := t.TempDir()
idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
require.NoError(t, err)
require.NoError(t, idx.AddSymbol("1"))
require.NoError(t, idx.AddSymbol("2"))
require.NoError(t, idx.AddSymbol("__name__"))
require.NoError(t, idx.AddSeries(1, labels.FromStrings("__name__", "1"),
chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10}, // Also checks that first chunk can have MinTime: 0.
chunks.Meta{Ref: 2, MinTime: 11, MaxTime: 20},
chunks.Meta{Ref: 3, MinTime: 21, MaxTime: 30},
))
require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"),
chunks.Meta{Ref: 10, MinTime: 0, MaxTime: 10},
chunks.Meta{Ref: 20, MinTime: 10, MaxTime: 20},
), "chunk minT 10 is not higher than previous chunk maxT 10")
require.EqualError(t, idx.AddSeries(1, labels.FromStrings("__name__", "2"),
chunks.Meta{Ref: 10, MinTime: 100, MaxTime: 30},
), "chunk maxT 30 is less than minT 100")
require.NoError(t, idx.Close())
}