diff --git a/storage/interface.go b/storage/interface.go index 50651022a9..715a2a94f1 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -146,7 +146,7 @@ type SelectHints struct { By bool // Indicate whether it is without or by. Range int64 // Range vector selector range in milliseconds. - ShardIndex uint64 // Current shard index (starts from 0). + ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1). ShardCount uint64 // Total number of shards (0 means sharding is disabled). } diff --git a/tsdb/block.go b/tsdb/block.go index eb7e05d921..05fc7cbf40 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -80,11 +80,12 @@ type IndexReader interface { SortedPostings(index.Postings) index.Postings // ShardedPostings returns a postings list filtered by the provided shardIndex - // out of shardCount. + // out of shardCount. For a given posting, its shard MUST be computed hashing + // the series labels mod shardCount (eg. `labels.Hash() % shardCount == shardIndex`). ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings // Series populates the given labels and chunk metas for the series identified - // by the reference. + // by the reference. Chunks are skipped if chks is nil. // Returns storage.ErrNotFound if the ref does not resolve to a known series. Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error diff --git a/tsdb/head_read.go b/tsdb/head_read.go index b4d10a6353..fecb5b5fe5 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -174,6 +174,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou } // Series returns the series for the given reference. +// Chunks are skipped if chks is nil. func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { s := h.head.series.getByID(ref) @@ -183,6 +184,10 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks } *lbls = append((*lbls)[:0], s.lset...) + if chks == nil { + return nil + } + s.Lock() defer s.Unlock() diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 11872582ad..17b649d84d 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -2094,22 +2094,37 @@ func TestHeadShardedPostings(t *testing.T) { require.NoError(t, p.Err()) require.Greater(t, len(expected), 0) - // Shard the same postings and merge of all them together. We expect the postings - // merged out of shards is the exact same of the non sharded ones. + // Query the same postings for each shard. const shardCount = uint64(4) - var actual []uint64 + actualShards := make(map[uint64][]uint64) + actualPostings := make([]uint64, 0, len(expected)) + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { p, err = ir.Postings("const", "1") require.NoError(t, err) p = ir.ShardedPostings(p, shardIndex, shardCount) for p.Next() { - actual = append(actual, p.At()) + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) } require.NoError(t, p.Err()) } - require.ElementsMatch(t, expected, actual) + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.Labels + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, lbls.Hash()%shardCount) + } + } } func TestErrReuseAppender(t *testing.T) { diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 68e967a73b..ee7a404b77 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -258,22 +258,37 @@ func TestIndexRW_Postings(t *testing.T) { require.NoError(t, p.Err()) require.Greater(t, len(expected), 0) - // Shard the same postings and merge of all them together. We expect the postings - // merged out of shards is the exact same of the non sharded ones. + // Query the same postings for each shard. const shardCount = uint64(4) - var actual []uint64 + actualShards := make(map[uint64][]uint64) + actualPostings := make([]uint64, 0, len(expected)) + for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ { p, err = ir.Postings("a", "1") require.NoError(t, err) p = ir.ShardedPostings(p, shardIndex, shardCount) for p.Next() { - actual = append(actual, p.At()) + ref := p.At() + + actualShards[shardIndex] = append(actualShards[shardIndex], ref) + actualPostings = append(actualPostings, ref) } require.NoError(t, p.Err()) } - require.ElementsMatch(t, expected, actual) + // We expect the postings merged out of shards is the exact same of the non sharded ones. + require.ElementsMatch(t, expected, actualPostings) + + // We expect the series in each shard are the expected ones. + for shardIndex, ids := range actualShards { + for _, id := range ids { + var lbls labels.Labels + + require.NoError(t, ir.Series(id, &lbls, nil)) + require.Equal(t, shardIndex, lbls.Hash()%shardCount) + } + } } require.NoError(t, ir.Close())