mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Added sharding support to Querier.Select() (#1)
* Added sharding support to Querier.Select() Signed-off-by: Marco Pracucci <marco@pracucci.com> * Addressed review comments Signed-off-by: Marco Pracucci <marco@pracucci.com>
This commit is contained in:
parent
9691126b3d
commit
7b0d798332
|
@ -145,6 +145,9 @@ type SelectHints struct {
|
|||
Grouping []string // List of label names used in aggregation.
|
||||
By bool // Indicate whether it is without or by.
|
||||
Range int64 // Range vector selector range in milliseconds.
|
||||
|
||||
ShardIndex uint64 // Current shard index (starts from 0).
|
||||
ShardCount uint64 // Total number of shards (0 means sharding is disabled).
|
||||
}
|
||||
|
||||
// TODO(bwplotka): Move to promql/engine_test.go?
|
||||
|
|
|
@ -79,6 +79,10 @@ type IndexReader interface {
|
|||
// by the label set of the underlying series.
|
||||
SortedPostings(index.Postings) index.Postings
|
||||
|
||||
// ShardedPostings returns a postings list filtered by the provided shardIndex
|
||||
// out of shardCount.
|
||||
ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings
|
||||
|
||||
// Series populates the given labels and chunk metas for the series identified
|
||||
// by the reference.
|
||||
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
|
||||
|
@ -470,6 +474,10 @@ func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
|
|||
return r.ir.SortedPostings(p)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
return r.ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
}
|
||||
|
||||
func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
|
||||
if err := r.ir.Series(ref, lset, chks); err != nil {
|
||||
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
|
||||
|
|
|
@ -1130,7 +1130,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
|||
|
||||
func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||
return newMemSeries(lset, id, h.chunkRange.Load(), &h.memChunkPool)
|
||||
return newMemSeries(lset, id, hash, h.chunkRange.Load(), &h.memChunkPool)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
|
@ -1378,6 +1378,7 @@ type memSeries struct {
|
|||
|
||||
ref uint64
|
||||
lset labels.Labels
|
||||
hash uint64
|
||||
mmappedChunks []*mmappedChunk
|
||||
headChunk *memChunk
|
||||
chunkRange int64
|
||||
|
@ -1394,9 +1395,10 @@ type memSeries struct {
|
|||
txs *txRing
|
||||
}
|
||||
|
||||
func newMemSeries(lset labels.Labels, id uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries {
|
||||
func newMemSeries(lset labels.Labels, id, hash uint64, chunkRange int64, memChunkPool *sync.Pool) *memSeries {
|
||||
s := &memSeries{
|
||||
lset: lset,
|
||||
hash: hash,
|
||||
ref: id,
|
||||
chunkRange: chunkRange,
|
||||
nextAt: math.MinInt64,
|
||||
|
|
|
@ -152,6 +152,27 @@ func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.NewListPostings(ep)
|
||||
}
|
||||
|
||||
func (h *headIndexReader) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
out := make([]uint64, 0, 128)
|
||||
|
||||
for p.Next() {
|
||||
s := h.head.series.getByID(p.At())
|
||||
if s == nil {
|
||||
level.Debug(h.head.logger).Log("msg", "Looked up series not found")
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if s.hash%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, s.ref)
|
||||
}
|
||||
|
||||
return index.NewListPostings(out)
|
||||
}
|
||||
|
||||
// Series returns the series for the given reference.
|
||||
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
s := h.head.series.getByID(ref)
|
||||
|
|
|
@ -492,7 +492,8 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool)
|
||||
lbls := labels.FromStrings("a", "b")
|
||||
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, &memChunkPool)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
||||
|
@ -1031,7 +1032,8 @@ func TestMemSeries_append(t *testing.T) {
|
|||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 500, nil)
|
||||
lbls := labels.Labels{}
|
||||
s := newMemSeries(lbls, 1, lbls.Hash(), 500, nil)
|
||||
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
|
@ -2061,6 +2063,55 @@ func TestHeadLabelNamesWithMatchers(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHeadShardedPostings(t *testing.T) {
|
||||
head, _ := newTestHead(t, 1000, false)
|
||||
defer func() {
|
||||
require.NoError(t, head.Close())
|
||||
}()
|
||||
|
||||
// Append some series.
|
||||
app := head.Appender(context.Background())
|
||||
for i := 0; i < 100; i++ {
|
||||
_, err := app.Append(0, labels.Labels{
|
||||
{Name: "unique", Value: fmt.Sprintf("value%d", i)},
|
||||
{Name: "const", Value: "1"},
|
||||
}, 100, 0)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
ir := head.indexRange(0, 200)
|
||||
|
||||
// List all postings for a given label value. This is what we expect to get
|
||||
// in output from all shards.
|
||||
p, err := ir.Postings("const", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
var expected []uint64
|
||||
for p.Next() {
|
||||
expected = append(expected, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
require.Greater(t, len(expected), 0)
|
||||
|
||||
// Shard the same postings and merge of all them together. We expect the postings
|
||||
// merged out of shards is the exact same of the non sharded ones.
|
||||
const shardCount = uint64(4)
|
||||
var actual []uint64
|
||||
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
|
||||
p, err = ir.Postings("const", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
p = ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
for p.Next() {
|
||||
actual = append(actual, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, expected, actual)
|
||||
}
|
||||
|
||||
func TestErrReuseAppender(t *testing.T) {
|
||||
head, _ := newTestHead(t, 1000, false)
|
||||
defer func() {
|
||||
|
@ -2196,7 +2247,8 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
|
|||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, 500, nil)
|
||||
lbls := labels.Labels{}
|
||||
s := newMemSeries(lbls, 1, lbls.Hash(), 500, nil)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
||||
|
|
|
@ -1587,6 +1587,7 @@ func (r *Reader) LabelValueFor(id uint64, label string) (string, error) {
|
|||
}
|
||||
|
||||
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
|
||||
// Chunks will be skipped if chks is nil.
|
||||
func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
offset := id
|
||||
// In version 2 series IDs are no longer exact references but series are 16-byte padded
|
||||
|
@ -1707,6 +1708,33 @@ func (r *Reader) SortedPostings(p Postings) Postings {
|
|||
return p
|
||||
}
|
||||
|
||||
// ShardedPostings returns a postings list filtered by the provided shardIndex out of shardCount.
|
||||
func (r *Reader) ShardedPostings(p Postings, shardIndex, shardCount uint64) Postings {
|
||||
var (
|
||||
out = make([]uint64, 0, 128)
|
||||
bufLbls = make(labels.Labels, 0, 10)
|
||||
)
|
||||
|
||||
for p.Next() {
|
||||
id := p.At()
|
||||
|
||||
// Get the series labels (no chunks).
|
||||
err := r.Series(id, &bufLbls, nil)
|
||||
if err != nil {
|
||||
return ErrPostings(errors.Errorf("series %d not found", id))
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if bufLbls.Hash()%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, id)
|
||||
}
|
||||
|
||||
return NewListPostings(out)
|
||||
}
|
||||
|
||||
// Size returns the size of an index file.
|
||||
func (r *Reader) Size() int64 {
|
||||
return int64(r.b.Len())
|
||||
|
@ -1820,9 +1848,12 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
|
|||
}
|
||||
|
||||
// Series decodes a series entry from the given byte slice into lset and chks.
|
||||
// Skips reading chunks metadata if chks is nil.
|
||||
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) error {
|
||||
*lbls = (*lbls)[:0]
|
||||
*chks = (*chks)[:0]
|
||||
if chks != nil {
|
||||
*chks = (*chks)[:0]
|
||||
}
|
||||
|
||||
d := encoding.Decbuf{B: b}
|
||||
|
||||
|
@ -1848,11 +1879,16 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
|
|||
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
|
||||
}
|
||||
|
||||
// Skip reading chunks metadata if chks is nil.
|
||||
if chks == nil {
|
||||
return d.Err()
|
||||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k = d.Uvarint()
|
||||
|
||||
if k == 0 {
|
||||
return nil
|
||||
return d.Err()
|
||||
}
|
||||
|
||||
t0 := d.Varint64()
|
||||
|
|
|
@ -245,6 +245,37 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||
"b": {"1", "2", "3", "4"},
|
||||
}, labelIndices)
|
||||
|
||||
{
|
||||
// List all postings for a given label value. This is what we expect to get
|
||||
// in output from all shards.
|
||||
p, err = ir.Postings("a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
var expected []uint64
|
||||
for p.Next() {
|
||||
expected = append(expected, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
require.Greater(t, len(expected), 0)
|
||||
|
||||
// Shard the same postings and merge of all them together. We expect the postings
|
||||
// merged out of shards is the exact same of the non sharded ones.
|
||||
const shardCount = uint64(4)
|
||||
var actual []uint64
|
||||
for shardIndex := uint64(0); shardIndex < shardCount; shardIndex++ {
|
||||
p, err = ir.Postings("a", "1")
|
||||
require.NoError(t, err)
|
||||
|
||||
p = ir.ShardedPostings(p, shardIndex, shardCount)
|
||||
for p.Next() {
|
||||
actual = append(actual, p.At())
|
||||
}
|
||||
require.NoError(t, p.Err())
|
||||
}
|
||||
|
||||
require.ElementsMatch(t, expected, actual)
|
||||
}
|
||||
|
||||
require.NoError(t, ir.Close())
|
||||
}
|
||||
|
||||
|
|
|
@ -127,6 +127,9 @@ func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ..
|
|||
if err != nil {
|
||||
return storage.ErrSeriesSet(err)
|
||||
}
|
||||
if hints != nil && hints.ShardCount > 0 {
|
||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||
}
|
||||
if sortSeries {
|
||||
p = q.index.SortedPostings(p)
|
||||
}
|
||||
|
@ -168,6 +171,9 @@ func (q *blockChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints,
|
|||
if err != nil {
|
||||
return storage.ErrChunkSeriesSet(err)
|
||||
}
|
||||
if hints != nil && hints.ShardCount > 0 {
|
||||
p = q.index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
|
||||
}
|
||||
if sortSeries {
|
||||
p = q.index.SortedPostings(p)
|
||||
}
|
||||
|
|
|
@ -1216,6 +1216,27 @@ func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.NewListPostings(ep)
|
||||
}
|
||||
|
||||
func (m mockIndex) ShardedPostings(p index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
out := make([]uint64, 0, 128)
|
||||
|
||||
for p.Next() {
|
||||
ref := p.At()
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the series belong to the shard.
|
||||
if s.l.Hash()%shardCount != shardIndex {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, ref)
|
||||
}
|
||||
|
||||
return index.NewListPostings(out)
|
||||
}
|
||||
|
||||
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
|
||||
s, ok := m.series[ref]
|
||||
if !ok {
|
||||
|
@ -2051,6 +2072,10 @@ func (m mockMatcherIndex) SortedPostings(p index.Postings) index.Postings {
|
|||
return index.EmptyPostings()
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) ShardedPostings(ps index.Postings, shardIndex, shardCount uint64) index.Postings {
|
||||
return ps
|
||||
}
|
||||
|
||||
func (m mockMatcherIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue