Improve sharding tests and comments (#4)

Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
Marco Pracucci 2021-08-13 15:16:03 +02:00 committed by GitHub
parent 7b0d798332
commit a3ad212658
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 49 additions and 13 deletions

View file

@ -146,7 +146,7 @@ type SelectHints struct {
By bool // Indicate whether it is without or by.
Range int64 // Range vector selector range in milliseconds.
ShardIndex uint64 // Current shard index (starts from 0).
ShardIndex uint64 // Current shard index (starts from 0 and up to ShardCount-1).
ShardCount uint64 // Total number of shards (0 means sharding is disabled).
}

View file

@ -80,11 +80,12 @@ type IndexReader interface {
SortedPostings(index.Postings) index.Postings
// ShardedPostings returns a postings list filtered by the provided shardIndex
// out of shardCount.
// out of shardCount. For a given posting, its shard MUST be computed hashing
// the series labels mod shardCount (eg. `labels.Hash() % shardCount == shardIndex`).
ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// by the reference. Chunks are skipped if chks is nil.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error

View file

@ -174,6 +174,7 @@ func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCou
}
// Series returns the series for the given reference.
// Chunks are skipped if chks is nil.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(ref)
@ -183,6 +184,10 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks
}
*lbls = append((*lbls)[:0], s.lset...)
if chks == nil {
return nil
}
s.Lock()
defer s.Unlock()

View file

@ -2094,22 +2094,37 @@ func TestHeadShardedPostings(t *testing.T) {
require.NoError(t, p.Err())
require.Greater(t, len(expected), 0)
// Shard the same postings and merge of all them together. We expect the postings
// merged out of shards is the exact same of the non sharded ones.
// Query the same postings for each shard.
const shardCount = uint64(4)
var actual []uint64
actualShards := make(map[uint64][]uint64)
actualPostings := make([]uint64, 0, len(expected))
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
p, err = ir.Postings("const", "1")
require.NoError(t, err)
p = ir.ShardedPostings(p, shardIndex, shardCount)
for p.Next() {
actual = append(actual, p.At())
ref := p.At()
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
actualPostings = append(actualPostings, ref)
}
require.NoError(t, p.Err())
}
require.ElementsMatch(t, expected, actual)
// We expect the postings merged out of shards is the exact same of the non sharded ones.
require.ElementsMatch(t, expected, actualPostings)
// We expect the series in each shard are the expected ones.
for shardIndex, ids := range actualShards {
for _, id := range ids {
var lbls labels.Labels
require.NoError(t, ir.Series(id, &lbls, nil))
require.Equal(t, shardIndex, lbls.Hash()%shardCount)
}
}
}
func TestErrReuseAppender(t *testing.T) {

View file

@ -258,22 +258,37 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, p.Err())
require.Greater(t, len(expected), 0)
// Shard the same postings and merge of all them together. We expect the postings
// merged out of shards is the exact same of the non sharded ones.
// Query the same postings for each shard.
const shardCount = uint64(4)
var actual []uint64
actualShards := make(map[uint64][]uint64)
actualPostings := make([]uint64, 0, len(expected))
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
p, err = ir.Postings("a", "1")
require.NoError(t, err)
p = ir.ShardedPostings(p, shardIndex, shardCount)
for p.Next() {
actual = append(actual, p.At())
ref := p.At()
actualShards[shardIndex] = append(actualShards[shardIndex], ref)
actualPostings = append(actualPostings, ref)
}
require.NoError(t, p.Err())
}
require.ElementsMatch(t, expected, actual)
// We expect the postings merged out of shards is the exact same of the non sharded ones.
require.ElementsMatch(t, expected, actualPostings)
// We expect the series in each shard are the expected ones.
for shardIndex, ids := range actualShards {
for _, id := range ids {
var lbls labels.Labels
require.NoError(t, ir.Series(id, &lbls, nil))
require.Equal(t, shardIndex, lbls.Hash()%shardCount)
}
}
}
require.NoError(t, ir.Close())