Add Chunks* Iterator/Querier/Selector to storage interface.

* Added Chunk versions of all iterating methods. It all starts in Querier/ChunkQuerier. The plan is that
Storage will implement both chunked and samples.
* Added Seek to chunks.Iterator interface for iterating over chunks.
* Mock, NewTestSeries, SampleSeriesIterator and ChunkSeriesIterator are now available from storage package and reuses instead of
being recreated in many places. NewConcreteSeries was created to replace concreteSeries.
* NewMergeChunkQuerier was added; Both this and NewMergeQuerier are now using generigMergeQuerier to share the code. Generic code was added.
* Both Compactor and block Querier use *exactly* the same iterators. (blockChunkSeriesSet).
* Added some TODO for further simplifications in next PRs.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-02-07 16:10:32 +00:00
parent 350f25eed3
commit af5a7d1078
23 changed files with 2268 additions and 1557 deletions

View file

@ -136,6 +136,14 @@ type sample struct {
v float64
}
func (s sample) T() int64 {
return s.t
}
func (s sample) V() float64 {
return s.v
}
type sampleRing struct {
delta int64

View file

@ -15,11 +15,9 @@ package storage
import (
"math/rand"
"sort"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -107,15 +105,15 @@ func TestBufferedSeriesIterator(t *testing.T) {
testutil.Equals(t, ev, v, "value mismatch")
}
it = NewBufferIterator(newListSeriesIterator([]sample{
{t: 1, v: 2},
{t: 2, v: 3},
{t: 3, v: 4},
{t: 4, v: 5},
{t: 5, v: 6},
{t: 99, v: 8},
{t: 100, v: 9},
{t: 101, v: 10},
it = NewBufferIterator(NewSampleIterator(tsdbutil.SampleSlice{
sample{t: 1, v: 2},
sample{t: 2, v: 3},
sample{t: 3, v: 4},
sample{t: 4, v: 5},
sample{t: 5, v: 6},
sample{t: 99, v: 8},
sample{t: 100, v: 9},
sample{t: 101, v: 10},
}), 2)
testutil.Assert(t, it.Seek(-123), "seek failed")
@ -147,7 +145,7 @@ func TestBufferedSeriesIterator(t *testing.T) {
func TestBufferedSeriesIteratorNoBadAt(t *testing.T) {
done := false
m := &mockSeriesIterator{
m := &mockSampleIterator{
seek: func(int64) bool { return false },
at: func() (int64, float64) {
testutil.Assert(t, !done, "unexpectedly done")
@ -177,97 +175,40 @@ func BenchmarkBufferedSeriesIterator(b *testing.B) {
testutil.Ok(b, it.Err())
}
type mockSeriesIterator struct {
type mockSampleIterator struct {
seek func(int64) bool
at func() (int64, float64)
next func() bool
err func() error
}
func (m *mockSeriesIterator) Seek(t int64) bool { return m.seek(t) }
func (m *mockSeriesIterator) At() (int64, float64) { return m.at() }
func (m *mockSeriesIterator) Next() bool { return m.next() }
func (m *mockSeriesIterator) Err() error { return m.err() }
func (m *mockSampleIterator) Seek(t int64) bool { return m.seek(t) }
func (m *mockSampleIterator) At() (int64, float64) { return m.at() }
func (m *mockSampleIterator) Next() bool { return m.next() }
func (m *mockSampleIterator) Err() error { return m.err() }
type mockSeries struct {
labels func() labels.Labels
iterator func() chunkenc.Iterator
}
func newMockSeries(lset labels.Labels, samples []sample) Series {
return &mockSeries{
labels: func() labels.Labels {
return lset
},
iterator: func() chunkenc.Iterator {
return newListSeriesIterator(samples)
},
}
}
func (m *mockSeries) Labels() labels.Labels { return m.labels() }
func (m *mockSeries) Iterator() chunkenc.Iterator { return m.iterator() }
type listSeriesIterator struct {
list []sample
idx int
}
func newListSeriesIterator(list []sample) *listSeriesIterator {
return &listSeriesIterator{list: list, idx: -1}
}
func (it *listSeriesIterator) At() (int64, float64) {
s := it.list[it.idx]
return s.t, s.v
}
func (it *listSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(len(it.list)-it.idx, func(i int) bool {
s := it.list[i+it.idx]
return s.t >= t
})
return it.idx < len(it.list)
}
func (it *listSeriesIterator) Err() error {
return nil
}
type fakeSeriesIterator struct {
type fakeSampleIterator struct {
nsamples int64
step int64
idx int64
}
func newFakeSeriesIterator(nsamples, step int64) *fakeSeriesIterator {
return &fakeSeriesIterator{nsamples: nsamples, step: step, idx: -1}
func newFakeSeriesIterator(nsamples, step int64) *fakeSampleIterator {
return &fakeSampleIterator{nsamples: nsamples, step: step, idx: -1}
}
func (it *fakeSeriesIterator) At() (int64, float64) {
func (it *fakeSampleIterator) At() (int64, float64) {
return it.idx * it.step, 123 // value doesn't matter
}
func (it *fakeSeriesIterator) Next() bool {
func (it *fakeSampleIterator) Next() bool {
it.idx++
return it.idx < it.nsamples
}
func (it *fakeSeriesIterator) Seek(t int64) bool {
func (it *fakeSampleIterator) Seek(t int64) bool {
it.idx = t / it.step
return it.idx < it.nsamples
}
func (it *fakeSeriesIterator) Err() error {
return nil
}
func (it *fakeSampleIterator) Err() error { return nil }

View file

@ -16,6 +16,7 @@ package storage
import (
"container/heap"
"context"
"math"
"sort"
"strings"
@ -25,6 +26,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
type fanout struct {
@ -47,7 +50,7 @@ func NewFanout(logger log.Logger, primary Storage, secondaries ...Storage) Stora
// StartTime implements the Storage interface.
func (f *fanout) StartTime() (int64, error) {
// StartTime of a fanout should be the earliest StartTime of all its storages,
// both primary and secondaries.
// both primaryQuerier and secondaries.
firstTime, err := f.primary.StartTime()
if err != nil {
return int64(model.Latest), err
@ -68,24 +71,24 @@ func (f *fanout) StartTime() (int64, error) {
func (f *fanout) Querier(ctx context.Context, mint, maxt int64) (Querier, error) {
queriers := make([]Querier, 0, 1+len(f.secondaries))
// Add primary querier
// Add primaryQuerier querier.
primaryQuerier, err := f.primary.Querier(ctx, mint, maxt)
if err != nil {
return nil, err
}
queriers = append(queriers, primaryQuerier)
// Add secondary queriers
// Add secondary queriers.
for _, storage := range f.secondaries {
querier, err := storage.Querier(ctx, mint, maxt)
if err != nil {
NewMergeQuerier(primaryQuerier, queriers).Close()
NewMergeQuerier(primaryQuerier, queriers, VerticalSeriesMergeFunc(ChainedSeriesMerge)).Close()
return nil, err
}
queriers = append(queriers, querier)
}
return NewMergeQuerier(primaryQuerier, queriers), nil
return NewMergeQuerier(primaryQuerier, queriers, VerticalSeriesMergeFunc(ChainedSeriesMerge)), nil
}
func (f *fanout) Appender() Appender {
@ -181,66 +184,101 @@ func (f *fanoutAppender) Rollback() (err error) {
return nil
}
// mergeQuerier implements Querier.
type mergeQuerier struct {
primaryQuerier Querier
queriers []Querier
failedQueriers map[Querier]struct{}
setQuerierMap map[SeriesSet]Querier
type genericQuerier interface {
baseQuerier
Select(bool, *SelectHints, ...*labels.Matcher) (genericSeriesSet, Warnings, error)
}
// NewMergeQuerier returns a new Querier that merges results of input queriers.
type genericSeriesSet interface {
Next() bool
At() Labeled
Err() error
}
type genericSeriesMerger interface {
Merge(...Labeled) Labeled
}
type mergeGenericQuerier struct {
merger genericSeriesMerger
primaryQuerier genericQuerier
queriers []genericQuerier
failedQueriers map[genericQuerier]struct{}
setQuerierMap map[genericSeriesSet]genericQuerier
}
// NewMergeQuerier returns a new Querier that merges results of inputs queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier) Querier {
filtered := make([]Querier, 0, len(queriers))
func NewMergeQuerier(primaryQuerier Querier, queriers []Querier, merger VerticalSeriesMerger) Querier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
if querier != NoopQuerier() {
filtered = append(filtered, querier)
if _, ok := querier.(noopQuerier); !ok {
filtered = append(filtered, newGenericQuerierFrom(querier))
}
}
setQuerierMap := make(map[SeriesSet]Querier)
failedQueriers := make(map[Querier]struct{})
if len(filtered) == 0 {
return primaryQuerier
}
switch len(filtered) {
case 0:
return NoopQuerier()
case 1:
return filtered[0]
default:
return &mergeQuerier{
primaryQuerier: primaryQuerier,
queriers: filtered,
failedQueriers: failedQueriers,
setQuerierMap: setQuerierMap,
return &querierAdapter{&mergeGenericQuerier{
merger: &seriesMergerAdapter{VerticalSeriesMerger: merger},
primaryQuerier: newGenericQuerierFrom(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
}}
}
// NewMergeChunkQuerier returns a new Querier that merges results of inputs queriers.
// NB NewMergeQuerier will return NoopQuerier if no queriers are passed to it,
// and will filter NoopQueriers from its arguments, in order to reduce overhead
// when only one querier is passed.
func NewMergeChunkQuerier(primaryQuerier ChunkQuerier, queriers []ChunkQuerier, merger VerticalChunkSeriesMerger) ChunkQuerier {
filtered := make([]genericQuerier, 0, len(queriers))
for _, querier := range queriers {
if _, ok := querier.(noopChunkedQuerier); !ok {
filtered = append(filtered, newGenericQuerierFromChunk(querier))
}
}
if len(filtered) == 0 {
return primaryQuerier
}
return &chunkQuerierAdapter{&mergeGenericQuerier{
merger: &chunkSeriesMergerAdapter{VerticalChunkSeriesMerger: merger},
primaryQuerier: newGenericQuerierFromChunk(primaryQuerier),
queriers: filtered,
failedQueriers: make(map[genericQuerier]struct{}),
setQuerierMap: make(map[genericSeriesSet]genericQuerier),
}}
}
// Select returns a set of series that matches the given label matchers.
func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
func (q *mergeGenericQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
if len(q.queriers) == 1 {
return q.queriers[0].Select(sortSeries, hints, matchers...)
}
var (
seriesSets = make([]SeriesSet, 0, len(q.queriers))
seriesSets = make([]genericSeriesSet, 0, len(q.queriers))
warnings Warnings
priErr error
)
type queryResult struct {
qr Querier
set SeriesSet
qr genericQuerier
set genericSeriesSet
wrn Warnings
selectError error
}
queryResultChan := make(chan *queryResult)
for _, querier := range q.queriers {
go func(qr Querier) {
go func(qr genericQuerier) {
// We need to sort for NewMergeSeriesSet to work.
set, wrn, err := qr.Select(true, hints, matchers...)
queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err}
@ -266,16 +304,15 @@ func (q *mergeQuerier) Select(sortSeries bool, hints *SelectHints, matchers ...*
if priErr != nil {
return nil, nil, priErr
}
return NewMergeSeriesSet(seriesSets, q), warnings, nil
return newGenericMergeSeriesSet(seriesSets, q, q.merger), warnings, nil
}
// LabelValues returns all potential values for a label name.
func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
func (q *mergeGenericQuerier) LabelValues(name string) ([]string, Warnings, error) {
var results [][]string
var warnings Warnings
for _, querier := range q.queriers {
values, wrn, err := querier.LabelValues(name)
if wrn != nil {
warnings = append(warnings, wrn...)
}
@ -294,11 +331,21 @@ func (q *mergeQuerier) LabelValues(name string) ([]string, Warnings, error) {
return mergeStringSlices(results), warnings, nil
}
func (q *mergeQuerier) IsFailedSet(set SeriesSet) bool {
func (q *mergeGenericQuerier) IsFailedSet(set genericSeriesSet) bool {
_, isFailedQuerier := q.failedQueriers[q.setQuerierMap[set]]
return isFailedQuerier
}
// NewMergeChunkSeriesSet returns a new ChunkSeriesSet that merges results of inputs ChunkSeriesSets.
func NewMergeChunkSeriesSet(sets []ChunkSeriesSet, merger VerticalChunkSeriesMerger) ChunkSeriesSet {
genericSets := make([]genericSeriesSet, 0, len(sets))
for _, s := range sets {
genericSets = append(genericSets, &genericChunkSeriesSetAdapter{s})
}
return &chunkSeriesSetAdapter{newGenericMergeSeriesSet(genericSets, nil, &chunkSeriesMergerAdapter{VerticalChunkSeriesMerger: merger})}
}
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
@ -339,18 +386,19 @@ func mergeTwoStringSlices(a, b []string) []string {
}
// LabelNames returns all the unique label names present in the block in sorted order.
func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
func (q *mergeGenericQuerier) LabelNames() ([]string, Warnings, error) {
labelNamesMap := make(map[string]struct{})
var warnings Warnings
for _, b := range q.queriers {
names, wrn, err := b.LabelNames()
for _, querier := range q.queriers {
names, wrn, err := querier.LabelNames()
if wrn != nil {
warnings = append(warnings, wrn...)
}
if err != nil {
// If the error source isn't the primary querier, return the error as a warning and continue.
if b != q.primaryQuerier {
q.failedQueriers[querier] = struct{}{}
// If the error source isn't the primaryQuerier querier, return the error as a warning and continue.
if querier != q.primaryQuerier {
warnings = append(warnings, err)
continue
} else {
@ -373,39 +421,41 @@ func (q *mergeQuerier) LabelNames() ([]string, Warnings, error) {
}
// Close releases the resources of the Querier.
func (q *mergeQuerier) Close() error {
// TODO return multiple errors?
var lastErr error
func (q *mergeGenericQuerier) Close() error {
var errs tsdb_errors.MultiError
for _, querier := range q.queriers {
if err := querier.Close(); err != nil {
lastErr = err
errs.Add(err)
}
}
return lastErr
return errs.Err()
}
// mergeSeriesSet implements SeriesSet
type mergeSeriesSet struct {
// genericMergeSeriesSet implements genericSeriesSet
type genericMergeSeriesSet struct {
currentLabels labels.Labels
currentSets []SeriesSet
heap seriesSetHeap
sets []SeriesSet
merger genericSeriesMerger
querier *mergeQuerier
heap genericSeriesSetHeap
sets []genericSeriesSet
currentSets []genericSeriesSet
querier *mergeGenericQuerier
}
// NewMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the input series sets when iterating.
// Each input series set must return its series in labels order, otherwise
// NewGenericMergeSeriesSet returns a new series set that merges (deduplicates)
// series returned by the inputs series sets when iterating.
// Each inputs series set must return its series in labels order, otherwise
// merged series set will be incorrect.
func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
// Overlapped samples/chunks will be dropped.
func newGenericMergeSeriesSet(sets []genericSeriesSet, querier *mergeGenericQuerier, merger genericSeriesMerger) genericSeriesSet {
if len(sets) == 1 {
return sets[0]
}
// Sets need to be pre-advanced, so we can introspect the label of the
// series under the cursor.
var h seriesSetHeap
var h genericSeriesSetHeap
for _, set := range sets {
if set == nil {
continue
@ -414,14 +464,37 @@ func NewMergeSeriesSet(sets []SeriesSet, querier *mergeQuerier) SeriesSet {
heap.Push(&h, set)
}
}
return &mergeSeriesSet{
return &genericMergeSeriesSet{
merger: merger,
heap: h,
sets: sets,
querier: querier,
}
}
func (c *mergeSeriesSet) Next() bool {
type VerticalSeriesMerger interface {
// Merge returns merged series implementation that merges series with same labels together.
// It has to handle time-overlapped series as well.
Merge(...Series) Series
}
type VerticalSeriesMergeFunc func(...Series) Series
func (f VerticalSeriesMergeFunc) Merge(s ...Series) Series {
return (f)(s...)
}
type VerticalChunkSeriesMerger interface {
Merge(...ChunkSeries) ChunkSeries
}
type VerticalChunkSeriesMergerFunc func(...ChunkSeries) ChunkSeries
func (f VerticalChunkSeriesMergerFunc) Merge(s ...ChunkSeries) ChunkSeries {
return (f)(s...)
}
func (c *genericMergeSeriesSet) Next() bool {
// Run in a loop because the "next" series sets may not be valid anymore.
// If a remote querier fails, we discard all series sets from that querier.
// If, for the current label set, all the next series sets come from
@ -442,7 +515,7 @@ func (c *mergeSeriesSet) Next() bool {
c.currentSets = nil
c.currentLabels = c.heap[0].At().Labels()
for len(c.heap) > 0 && labels.Equal(c.currentLabels, c.heap[0].At().Labels()) {
set := heap.Pop(&c.heap).(SeriesSet)
set := heap.Pop(&c.heap).(genericSeriesSet)
if c.querier != nil && c.querier.IsFailedSet(set) {
continue
}
@ -458,21 +531,18 @@ func (c *mergeSeriesSet) Next() bool {
return true
}
func (c *mergeSeriesSet) At() Series {
func (c *genericMergeSeriesSet) At() Labeled {
if len(c.currentSets) == 1 {
return c.currentSets[0].At()
}
series := []Series{}
series := make([]Labeled, 0, len(c.currentSets))
for _, seriesSet := range c.currentSets {
series = append(series, seriesSet.At())
}
return &mergeSeries{
labels: c.currentLabels,
series: series,
}
return c.merger.Merge(series...)
}
func (c *mergeSeriesSet) Err() error {
func (c *genericMergeSeriesSet) Err() error {
for _, set := range c.sets {
if err := set.Err(); err != nil {
return err
@ -481,21 +551,34 @@ func (c *mergeSeriesSet) Err() error {
return nil
}
type seriesSetHeap []SeriesSet
// ChainedSeriesMerge returns single series from two by chaining series.
// In case of the overlap, first overlapped sample is kept, rest are dropped.
// We expect the same labels for each given series.
func ChainedSeriesMerge(s ...Series) Series {
if len(s) == 0 {
return nil
}
return &chainSeries{
labels: s[0].Labels(),
series: s,
}
}
func (h seriesSetHeap) Len() int { return len(h) }
func (h seriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
type genericSeriesSetHeap []genericSeriesSet
func (h seriesSetHeap) Less(i, j int) bool {
func (h genericSeriesSetHeap) Len() int { return len(h) }
func (h genericSeriesSetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h genericSeriesSetHeap) Less(i, j int) bool {
a, b := h[i].At().Labels(), h[j].At().Labels()
return labels.Compare(a, b) < 0
}
func (h *seriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(SeriesSet))
func (h *genericSeriesSetHeap) Push(x interface{}) {
*h = append(*h, x.(genericSeriesSet))
}
func (h *seriesSetHeap) Pop() interface{} {
func (h *genericSeriesSetHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
@ -503,37 +586,39 @@ func (h *seriesSetHeap) Pop() interface{} {
return x
}
type mergeSeries struct {
type chainSeries struct {
labels labels.Labels
series []Series
}
func (m *mergeSeries) Labels() labels.Labels {
func (m *chainSeries) Labels() labels.Labels {
return m.labels
}
func (m *mergeSeries) Iterator() chunkenc.Iterator {
func (m *chainSeries) Iterator() chunkenc.Iterator {
iterators := make([]chunkenc.Iterator, 0, len(m.series))
for _, s := range m.series {
iterators = append(iterators, s.Iterator())
}
return newMergeIterator(iterators)
return newChainSampleIterator(iterators)
}
type mergeIterator struct {
// sampleIterator is responsible to iterate over non-overlapping samples from different iterators of same time series.
// If the samples overlap, all but one will be dropped.
type sampleIterator struct {
iterators []chunkenc.Iterator
h seriesIteratorHeap
h samplesIteratorHeap
}
func newMergeIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &mergeIterator{
func newChainSampleIterator(iterators []chunkenc.Iterator) chunkenc.Iterator {
return &sampleIterator{
iterators: iterators,
h: nil,
}
}
func (c *mergeIterator) Seek(t int64) bool {
c.h = seriesIteratorHeap{}
func (c *sampleIterator) Seek(t int64) bool {
c.h = samplesIteratorHeap{}
for _, iter := range c.iterators {
if iter.Seek(t) {
heap.Push(&c.h, iter)
@ -542,15 +627,15 @@ func (c *mergeIterator) Seek(t int64) bool {
return len(c.h) > 0
}
func (c *mergeIterator) At() (t int64, v float64) {
func (c *sampleIterator) At() (t int64, v float64) {
if len(c.h) == 0 {
panic("mergeIterator.At() called after .Next() returned false.")
return math.MaxInt64, 0
}
return c.h[0].At()
}
func (c *mergeIterator) Next() bool {
func (c *sampleIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
@ -568,6 +653,7 @@ func (c *mergeIterator) Next() bool {
currt, _ := c.At()
for len(c.h) > 0 {
nextt, _ := c.h[0].At()
// All but one of the overlapping samples will be dropped.
if nextt != currt {
break
}
@ -581,7 +667,7 @@ func (c *mergeIterator) Next() bool {
return len(c.h) > 0
}
func (c *mergeIterator) Err() error {
func (c *sampleIterator) Err() error {
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
return err
@ -590,22 +676,164 @@ func (c *mergeIterator) Err() error {
return nil
}
type seriesIteratorHeap []chunkenc.Iterator
type samplesIteratorHeap []chunkenc.Iterator
func (h seriesIteratorHeap) Len() int { return len(h) }
func (h seriesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h samplesIteratorHeap) Len() int { return len(h) }
func (h samplesIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h seriesIteratorHeap) Less(i, j int) bool {
func (h samplesIteratorHeap) Less(i, j int) bool {
at, _ := h[i].At()
bt, _ := h[j].At()
return at < bt
}
func (h *seriesIteratorHeap) Push(x interface{}) {
func (h *samplesIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunkenc.Iterator))
}
func (h *seriesIteratorHeap) Pop() interface{} {
func (h *samplesIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// VerticalChunkMergeFunc represents a function that merges multiple time overlapping chunks.
// Passed chunks:
// * have to be sorted by MinTime and should be
// * have to be part of exactly the same timeseries.
// * have to be populated.
//Merge can process can result in more than once chunk.
type VerticalChunksMergeFunc func(chks ...chunks.Meta) chunks.Iterator
type verticalChunkSeriesMerger struct {
verticalChunksMerger VerticalChunksMergeFunc
labels labels.Labels
series []ChunkSeries
}
// NewChunkSeriesMerger returns VerticalChunkSeriesMerger that merges the same chunk series into one chunk series iterator.
// In case of the chunk overlap, VerticalChunkMergeFunc will decide.
// We expect the same labels for each given series.
func NewVerticalChunkSeriesMerger(chunkMerger VerticalChunksMergeFunc) VerticalChunkSeriesMerger {
return VerticalChunkSeriesMergerFunc(func(s ...ChunkSeries) ChunkSeries {
if len(s) == 0 {
return nil
}
return &verticalChunkSeriesMerger{
verticalChunksMerger: chunkMerger,
labels: s[0].Labels(),
series: s,
}
})
}
func (s *verticalChunkSeriesMerger) Labels() labels.Labels {
return s.labels
}
func (s *verticalChunkSeriesMerger) Iterator() chunks.Iterator {
iterators := make([]chunks.Iterator, 0, len(s.series))
for _, series := range s.series {
iterators = append(iterators, series.Iterator())
}
return &chainChunkIterator{
overlappedChunksMerger: s.verticalChunksMerger,
iterators: iterators,
h: nil,
}
}
// chainChunkIterator is responsible to chain chunks from different iterators of same time series.
// If they are time overlapping overlappedChunksMerger will be used.
type chainChunkIterator struct {
overlappedChunksMerger VerticalChunksMergeFunc
iterators []chunks.Iterator
h chunkIteratorHeap
}
func (c *chainChunkIterator) At() chunks.Meta {
if len(c.h) == 0 {
return chunks.Meta{}
}
return c.h[0].At()
}
func (c *chainChunkIterator) Next() bool {
if c.h == nil {
for _, iter := range c.iterators {
if iter.Next() {
heap.Push(&c.h, iter)
}
}
return len(c.h) > 0
}
if len(c.h) == 0 {
return false
}
// Detect the shortest chain of time-overlapped chunks.
last := c.At()
var overlapped []chunks.Meta
for {
iter := heap.Pop(&c.h).(chunks.Iterator)
if iter.Next() {
heap.Push(&c.h, iter)
}
if len(c.h) == 0 {
break
}
next := c.At()
if next.MinTime > last.MaxTime {
// No overlap with last one.
break
}
overlapped = append(overlapped, last)
last = next
}
if len(overlapped) > 0 {
heap.Push(&c.h, c.overlappedChunksMerger(append(overlapped, c.h[0].At())...))
return true
}
return len(c.h) > 0
}
func (c *chainChunkIterator) Err() error {
for _, iter := range c.iterators {
if err := iter.Err(); err != nil {
return err
}
}
return nil
}
type chunkIteratorHeap []chunks.Iterator
func (h chunkIteratorHeap) Len() int { return len(h) }
func (h chunkIteratorHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h chunkIteratorHeap) Less(i, j int) bool {
at := h[i].At()
bt := h[j].At()
if at.MinTime == bt.MinTime {
return at.MaxTime < bt.MaxTime
}
return at.MinTime < bt.MinTime
}
func (h *chunkIteratorHeap) Push(x interface{}) {
*h = append(*h, x.(chunks.Iterator))
}
func (h *chunkIteratorHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]

View file

@ -16,10 +16,12 @@ package storage
import (
"fmt"
"math"
"sort"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/testutil"
)
@ -52,169 +54,281 @@ func TestMergeTwoStringSlices(t *testing.T) {
}
}
func TestMergeSeriesSet(t *testing.T) {
func TestChainedMergeQuerier(t *testing.T) {
for _, tc := range []struct {
input []SeriesSet
inputs [][]Series
extraQueriers []Querier
expected SeriesSet
}{
{
input: []SeriesSet{newMockSeriesSet()},
expected: newMockSeriesSet(),
inputs: [][]Series{{}},
expected: NewMockSeriesSet(),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{2, 2}, {3, 3}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("bar", "baz"), []sample{{1, 1}, {2, 2}, {3, 3}, {4, 4}}),
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}}),
inputs: [][]Series{{
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
input: []SeriesSet{newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
), newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
)},
expected: newMockSeriesSet(
newMockSeries(labels.FromStrings("foo", "bar"), []sample{{0, math.NaN()}}),
inputs: [][]Series{{
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
}, {
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockSeriesSet(
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
inputs: [][]Series{{
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
expected: NewMockSeriesSet(
NewTestSeries(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewTestSeries(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
inputs: [][]Series{{
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewTestSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
extraQueriers: []Querier{NoopQuerier(), NoopQuerier(), NoopQuerier()},
expected: NewMockSeriesSet(
NewTestSeries(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewTestSeries(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
inputs: [][]Series{{
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockSeriesSet(
NewTestSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}, []tsdbutil.Sample{sample{1, 1}}),
),
},
} {
merged := NewMergeSeriesSet(tc.input, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
testutil.Equals(t, drainSamples(expectedSeries.Iterator()), drainSamples(actualSeries.Iterator()))
}
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
t.Run("", func(t *testing.T) {
var qs []Querier
for _, in := range tc.inputs {
qs = append(qs, &mockQuerier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeQuerier(qs[0], qs, VerticalSeriesMergeFunc(ChainedSeriesMerge)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expSmpl, expErr := ExpandSamples(expectedSeries.Iterator())
actSmpl, actErr := ExpandSamples(actualSeries.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expSmpl, actSmpl)
}
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestMergeIterator(t *testing.T) {
func TestChainedMergeChunkQuerier(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []sample
inputs [][]ChunkSeries
extraQueriers []ChunkQuerier
expected ChunkSeriesSet
}{
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
},
expected: []sample{{0, 0}, {1, 1}},
inputs: [][]ChunkSeries{{}},
expected: NewMockChunkSeriesSet(),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
inputs: [][]ChunkSeries{{
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}},
inputs: [][]ChunkSeries{{
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
}, {
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}},
expected: NewMockChunkSeriesSet(
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
),
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{0, 0}, {2, 2}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
expected: []sample{{0, 0}, {1, 1}, {2, 2}, {3, 3}},
inputs: [][]ChunkSeries{{
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
expected: NewMockChunkSeriesSet(
NewTestChunkSeries(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewTestChunkSeries(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
inputs: [][]ChunkSeries{{
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{5, 5}}, []tsdbutil.Sample{sample{6, 6}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, 0}, sample{1, 1}}, []tsdbutil.Sample{sample{2, 2}}),
}, {
NewTestChunkSeries(labels.FromStrings("bar", "baz"), []tsdbutil.Sample{sample{1, 1}, sample{2, 2}}, []tsdbutil.Sample{sample{3, 3}}),
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{3, 3}}, []tsdbutil.Sample{sample{4, 4}}),
}},
extraQueriers: []ChunkQuerier{NoopChunkedQuerier(), NoopChunkedQuerier(), NoopChunkedQuerier()},
expected: NewMockChunkSeriesSet(
NewTestChunkSeries(labels.FromStrings("bar", "baz"),
[]tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{5, 5}},
[]tsdbutil.Sample{sample{6, 6}},
),
NewTestChunkSeries(labels.FromStrings("foo", "bar"),
[]tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
[]tsdbutil.Sample{sample{2, 2}},
[]tsdbutil.Sample{sample{3, 3}},
[]tsdbutil.Sample{sample{4, 4}},
),
),
},
{
inputs: [][]ChunkSeries{{
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}),
}, {
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{1, 1}}),
}},
expected: NewMockChunkSeriesSet(
NewTestChunkSeries(labels.FromStrings("foo", "bar"), []tsdbutil.Sample{sample{0, math.NaN()}}, []tsdbutil.Sample{sample{1, 1}}),
),
},
} {
merged := newMergeIterator(tc.input)
actual := drainSamples(merged)
testutil.Equals(t, tc.expected, actual)
t.Run("", func(t *testing.T) {
var qs []ChunkQuerier
for _, in := range tc.inputs {
qs = append(qs, &mockChunkQurier{toReturn: in})
}
qs = append(qs, tc.extraQueriers...)
merged, _, _ := NewMergeChunkQuerier(qs[0], qs, NewVerticalChunkSeriesMerger(NewListChunkSeriesIterator)).Select(false, nil)
for merged.Next() {
testutil.Assert(t, tc.expected.Next(), "Expected Next() to be true")
actualSeries := merged.At()
expectedSeries := tc.expected.At()
testutil.Equals(t, expectedSeries.Labels(), actualSeries.Labels())
expChks, expErr := ExpandChunks(expectedSeries.Iterator())
actChks, actErr := ExpandChunks(actualSeries.Iterator())
testutil.Equals(t, expErr, actErr)
testutil.Equals(t, expChks, actChks)
}
testutil.Ok(t, merged.Err())
testutil.Assert(t, !tc.expected.Next(), "Expected Next() to be false")
})
}
}
func TestMergeIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []sample
}{
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}, {2, 2}}),
},
seek: 1,
expected: []sample{{1, 1}, {2, 2}},
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {1, 1}}),
newListSeriesIterator([]sample{{2, 2}, {3, 3}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}},
},
{
input: []chunkenc.Iterator{
newListSeriesIterator([]sample{{0, 0}, {3, 3}}),
newListSeriesIterator([]sample{{1, 1}, {4, 4}}),
newListSeriesIterator([]sample{{2, 2}, {5, 5}}),
},
seek: 2,
expected: []sample{{2, 2}, {3, 3}, {4, 4}, {5, 5}},
},
} {
merged := newMergeIterator(tc.input)
actual := []sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
actual = append(actual, drainSamples(merged)...)
testutil.Equals(t, tc.expected, actual)
}
type mockQuerier struct {
baseQuerier
toReturn []Series
}
func drainSamples(iter chunkenc.Iterator) []sample {
result := []sample{}
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, sample{t, v})
type seriesByLabel []Series
func (a seriesByLabel) Len() int { return len(a) }
func (a seriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a seriesByLabel) Less(i, j int) bool { return labels.Compare(a[i].Labels(), a[j].Labels()) < 0 }
func (m *mockQuerier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (SeriesSet, Warnings, error) {
cpy := make([]Series, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(seriesByLabel(cpy))
}
return result
return NewMockSeriesSet(cpy...), nil, nil
}
type mockChunkQurier struct {
baseQuerier
toReturn []ChunkSeries
}
type chunkSeriesByLabel []ChunkSeries
func (a chunkSeriesByLabel) Len() int { return len(a) }
func (a chunkSeriesByLabel) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a chunkSeriesByLabel) Less(i, j int) bool {
return labels.Compare(a[i].Labels(), a[j].Labels()) < 0
}
func (m *mockChunkQurier) Select(sortSeries bool, _ *SelectHints, _ ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
cpy := make([]ChunkSeries, len(m.toReturn))
copy(cpy, m.toReturn)
if sortSeries {
sort.Sort(chunkSeriesByLabel(cpy))
}
return NewMockChunkSeriesSet(cpy...), nil, nil
}
type mockSeriesSet struct {
@ -222,7 +336,7 @@ type mockSeriesSet struct {
series []Series
}
func newMockSeriesSet(series ...Series) SeriesSet {
func NewMockSeriesSet(series ...Series) SeriesSet {
return &mockSeriesSet{
idx: -1,
series: series,
@ -242,33 +356,151 @@ func (m *mockSeriesSet) Err() error {
return nil
}
var result []sample
type mockChunkSeriesSet struct {
idx int
series []ChunkSeries
}
func NewMockChunkSeriesSet(series ...ChunkSeries) ChunkSeriesSet {
return &mockChunkSeriesSet{
idx: -1,
series: series,
}
}
func (m *mockChunkSeriesSet) Next() bool {
m.idx++
return m.idx < len(m.series)
}
func (m *mockChunkSeriesSet) At() ChunkSeries {
return m.series[m.idx]
}
func (m *mockChunkSeriesSet) Err() error {
return nil
}
func TestChainSampleIterator(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{1, 1}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}},
},
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{1, 1}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{2, 2}, sample{3, 3}}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{3, 3}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{1, 1}, sample{4, 4}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{2, 2}, sample{5, 5}}),
},
expected: []tsdbutil.Sample{
sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
// Overlap.
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{1, 1}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{2, 2}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{2, 2}, sample{3, 3}}),
NewSampleIterator(tsdbutil.SampleSlice{}),
NewSampleIterator(tsdbutil.SampleSlice{}),
NewSampleIterator(tsdbutil.SampleSlice{}),
},
expected: []tsdbutil.Sample{sample{0, 0}, sample{1, 1}, sample{2, 2}, sample{3, 3}},
},
} {
merged := newChainSampleIterator(tc.input)
actual, err := ExpandSamples(merged)
testutil.Ok(t, err)
testutil.Equals(t, tc.expected, actual)
}
}
func TestChainSampleIteratorSeek(t *testing.T) {
for _, tc := range []struct {
input []chunkenc.Iterator
seek int64
expected []tsdbutil.Sample
}{
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{1, 1}, sample{2, 2}}),
},
seek: 1,
expected: []tsdbutil.Sample{sample{1, 1}, sample{2, 2}},
},
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{1, 1}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{2, 2}, sample{3, 3}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}},
},
{
input: []chunkenc.Iterator{
NewSampleIterator(tsdbutil.SampleSlice{sample{0, 0}, sample{3, 3}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{1, 1}, sample{4, 4}}),
NewSampleIterator(tsdbutil.SampleSlice{sample{2, 2}, sample{5, 5}}),
},
seek: 2,
expected: []tsdbutil.Sample{sample{2, 2}, sample{3, 3}, sample{4, 4}, sample{5, 5}},
},
} {
merged := newChainSampleIterator(tc.input)
actual := []tsdbutil.Sample{}
if merged.Seek(tc.seek) {
t, v := merged.At()
actual = append(actual, sample{t, v})
}
s, err := ExpandSamples(merged)
testutil.Ok(t, err)
actual = append(actual, s...)
testutil.Equals(t, tc.expected, actual)
}
}
var result []tsdbutil.Sample
func makeSeriesSet(numSeries, numSamples int) SeriesSet {
series := []Series{}
for j := 0; j < numSeries; j++ {
labels := labels.Labels{{Name: "foo", Value: fmt.Sprintf("bar%d", j)}}
samples := []sample{}
samples := []tsdbutil.Sample{}
for k := 0; k < numSamples; k++ {
samples = append(samples, sample{t: int64(k), v: float64(k)})
}
series = append(series, newMockSeries(labels, samples))
series = append(series, NewTestSeries(labels, samples))
}
return newMockSeriesSet(series...)
return NewMockSeriesSet(series...)
}
func makeMergeSeriesSet(numSeriesSets, numSeries, numSamples int) SeriesSet {
seriesSets := []SeriesSet{}
seriesSets := []genericSeriesSet{}
for i := 0; i < numSeriesSets; i++ {
seriesSets = append(seriesSets, makeSeriesSet(numSeries, numSamples))
seriesSets = append(seriesSets, &genericSeriesSetAdapter{makeSeriesSet(numSeries, numSamples)})
}
return NewMergeSeriesSet(seriesSets, nil)
return &seriesSetAdapter{newGenericMergeSeriesSet(seriesSets, nil, &seriesMergerAdapter{VerticalSeriesMerger: VerticalSeriesMergeFunc(ChainedSeriesMerge)})}
}
func benchmarkDrain(seriesSet SeriesSet, b *testing.B) {
var err error
for n := 0; n < b.N; n++ {
for seriesSet.Next() {
result = drainSamples(seriesSet.At().Iterator())
result, err = ExpandSamples(seriesSet.At().Iterator())
testutil.Ok(b, err)
}
}
}

105
storage/generic.go Normal file
View file

@ -0,0 +1,105 @@
package storage
import "github.com/prometheus/prometheus/pkg/labels"
// Boilerplate on purpose. Generics some day...
type genericQuerierAdapter struct {
baseQuerier
// One-of. If both are set, Querier will be used.
q Querier
cq ChunkQuerier
}
type genericSeriesSetAdapter struct {
SeriesSet
}
func (a *genericSeriesSetAdapter) At() Labeled {
return a.SeriesSet.At().(Labeled)
}
type genericChunkSeriesSetAdapter struct {
ChunkSeriesSet
}
func (a *genericChunkSeriesSetAdapter) At() Labeled {
return a.ChunkSeriesSet.At().(Labeled)
}
func (q *genericQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (genericSeriesSet, Warnings, error) {
if q.q != nil {
s, w, err := q.q.Select(sortSeries, hints, matchers...)
return &genericSeriesSetAdapter{s}, w, err
}
s, w, err := q.cq.Select(sortSeries, hints, matchers...)
return &genericChunkSeriesSetAdapter{s}, w, err
}
func newGenericQuerierFrom(q Querier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: q, q: q}
}
func newGenericQuerierFromChunk(cq ChunkQuerier) genericQuerier {
return &genericQuerierAdapter{baseQuerier: cq, cq: cq}
}
type querierAdapter struct {
genericQuerier
}
type seriesSetAdapter struct {
genericSeriesSet
}
func (a *seriesSetAdapter) At() Series {
return a.genericSeriesSet.At().(Series)
}
func (q *querierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &seriesSetAdapter{s}, w, err
}
type chunkQuerierAdapter struct {
genericQuerier
}
type chunkSeriesSetAdapter struct {
genericSeriesSet
}
func (a *chunkSeriesSetAdapter) At() ChunkSeries {
return a.genericSeriesSet.At().(ChunkSeries)
}
func (q *chunkQuerierAdapter) Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
s, w, err := q.genericQuerier.Select(sortSeries, hints, matchers...)
return &chunkSeriesSetAdapter{s}, w, err
}
type seriesMergerAdapter struct {
VerticalSeriesMerger
buf []Series
}
func (a *seriesMergerAdapter) Merge(s ...Labeled) Labeled {
a.buf = a.buf[:0]
for _, ser := range s {
a.buf = append(a.buf, ser.(Series))
}
return a.VerticalSeriesMerger.Merge(a.buf...)
}
type chunkSeriesMergerAdapter struct {
VerticalChunkSeriesMerger
buf []ChunkSeries
}
func (a *chunkSeriesMergerAdapter) Merge(s ...Labeled) Labeled {
a.buf = a.buf[:0]
for _, ser := range s {
a.buf = append(a.buf, ser.(ChunkSeries))
}
return a.VerticalChunkSeriesMerger.Merge(a.buf...)
}

View file

@ -20,7 +20,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tombstones"
)
// The errors exposed.
@ -39,6 +38,7 @@ type Appendable interface {
// Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. Storage implements storage.SampleAppender.
// TODO(bwplotka): Add ChunkQueryable to Storage in next PR.
type Storage interface {
Queryable
Appendable
@ -51,19 +51,41 @@ type Storage interface {
}
// A Queryable handles queries against a storage.
// Use it when you need to have access to all samples without chunk encoding abstraction e.g promQL
type Queryable interface {
// Querier returns a new Querier on the storage.
Querier(ctx context.Context, mint, maxt int64) (Querier, error)
}
// Querier provides querying access over time series data of a fixed
// time range.
// A ChunkQueryable handles queries against a storage.
// Use it when you need to have access to samples in encoded format. This is usually more efficient when asking
// for data older than 2h. e.g remote read.
type ChunkQueryable interface {
// ChunkQuerier returns a new ChunkQuerier on the storage.
ChunkQuerier(ctx context.Context, mint, maxt int64) (ChunkQuerier, Warnings, error)
}
// Querier provides querying access over time series data of a fixed time range.
type Querier interface {
baseQuerier
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (SeriesSet, Warnings, error)
}
// ChunkQuerier provides querying access over time series data of a fixed time range.
type ChunkQuerier interface {
baseQuerier
// Select returns a set of series that matches the given label matchers.
// Caller can specify if it requires returned series to be sorted. Prefer not requiring sorting for better performance.
// It allows passing hints that can help in optimising select, but it's up to implementation how this is used if used at all.
Select(sortSeries bool, hints *SelectHints, matchers ...*labels.Matcher) (ChunkSeriesSet, Warnings, error)
}
type baseQuerier interface {
// LabelValues returns all potential values for a label name.
// It is not safe to use the strings beyond the lifefime of the querier.
LabelValues(name string) ([]string, Warnings, error)
@ -127,13 +149,6 @@ type Appender interface {
Rollback() error
}
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
At() Series
Err() error
}
var emptySeriesSet = errSeriesSet{}
// EmptySeriesSet returns a series set that's always empty.
@ -149,22 +164,46 @@ func (s errSeriesSet) Next() bool { return false }
func (s errSeriesSet) At() Series { return nil }
func (s errSeriesSet) Err() error { return s.err }
// Series represents a single time series.
type Series interface {
// Labels returns the complete set of labels identifying the series.
Labels() labels.Labels
// SeriesSet contains a set of series.
type SeriesSet interface {
Next() bool
At() Series
Err() error
}
// ChunkSeriesSet contains a set of chunked series.
type ChunkSeriesSet interface {
Next() bool
At() ChunkSeries
Err() error
}
// Labeled represents the item that have labels e.g. time series.
type Labeled interface {
// Labels returns the complete set of labels. For series it means all labels identifying the series.
Labels() labels.Labels
}
// Series exposes a single time series and allows iterating over samples.
type Series interface {
Labeled
SampleIteratable
}
// ChunkSeries exposes a single time series and allows iterating over chunks.
type ChunkSeries interface {
Labeled
ChunkIteratable
}
type SampleIteratable interface {
// Iterator returns a new iterator of the data of the series.
Iterator() chunkenc.Iterator
}
// ChunkSeriesSet exposes the chunks and intervals of a series instead of the
// actual series itself.
// TODO(bwplotka): Move it to Series like Iterator that iterates over chunks and avoiding loading all of them at once.
type ChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []chunks.Meta, tombstones.Intervals)
Err() error
type ChunkIteratable interface {
// ChunkIterator returns a new iterator that iterates over non-overlapping chunks of the series.
Iterator() chunks.Iterator
}
type Warnings []error

View file

@ -28,7 +28,7 @@ func (noopQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (SeriesSet, Wa
return NoopSeriesSet(), nil, nil
}
func (noopQuerier) LabelValues(name string) ([]string, Warnings, error) {
func (noopQuerier) LabelValues(string) ([]string, Warnings, error) {
return nil, nil, nil
}
@ -40,6 +40,29 @@ func (noopQuerier) Close() error {
return nil
}
type noopChunkedQuerier struct{}
// NoopChunkedQuerier is a ChunkQuerier that does nothing.
func NoopChunkedQuerier() ChunkQuerier {
return noopChunkedQuerier{}
}
func (noopChunkedQuerier) Select(bool, *SelectHints, ...*labels.Matcher) (ChunkSeriesSet, Warnings, error) {
return NoopChunkedSeriesSet(), nil, nil
}
func (noopChunkedQuerier) LabelValues(string) ([]string, Warnings, error) {
return nil, nil, nil
}
func (noopChunkedQuerier) LabelNames() ([]string, Warnings, error) {
return nil, nil, nil
}
func (noopChunkedQuerier) Close() error {
return nil
}
type noopSeriesSet struct{}
// NoopSeriesSet is a SeriesSet that does nothing.
@ -52,3 +75,16 @@ func (noopSeriesSet) Next() bool { return false }
func (noopSeriesSet) At() Series { return nil }
func (noopSeriesSet) Err() error { return nil }
type noopChunkedSeriesSet struct{}
// NoopChunkedSeriesSet is a ChunkSeriesSet that does nothing.
func NoopChunkedSeriesSet() ChunkSeriesSet {
return noopChunkedSeriesSet{}
}
func (noopChunkedSeriesSet) Next() bool { return false }
func (noopChunkedSeriesSet) At() ChunkSeries { return nil }
func (noopChunkedSeriesSet) Err() error { return nil }

View file

@ -28,6 +28,7 @@ import (
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
// decodeReadLimit is the maximum size of a read request body in bytes.
@ -144,6 +145,12 @@ func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult,
return resp, nil
}
type Samples []prompb.Sample
func (s Samples) Get(i int) tsdbutil.Sample { return s[i] }
func (s Samples) Len() int { return len(s) }
// FromQueryResult unpacks and sorts a QueryResult proto.
func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet {
series := make([]storage.Series, 0, len(res.Timeseries))
@ -153,10 +160,7 @@ func FromQueryResult(sortSeries bool, res *prompb.QueryResult) storage.SeriesSet
return errSeriesSet{err: err}
}
series = append(series, &concreteSeries{
labels: labels,
samples: ts.Samples,
})
series = append(series, storage.NewSeriesFromSamples(labels, Samples(ts.Samples)))
}
if sortSeries {
@ -188,8 +192,83 @@ func NegotiateResponseType(accepted []prompb.ReadRequest_ResponseType) (prompb.R
}
// StreamChunkedReadResponses iterates over series, builds chunks and streams those to the caller.
// TODO(bwplotka): Encode only what's needed. Fetch the encoded series from blocks instead of re-encoding everything.
// It expects Series set with populated chunks.
func StreamChunkedReadResponses(
stream io.Writer,
queryIndex int64,
ss storage.ChunkSeriesSet,
sortedExternalLabels []prompb.Label,
maxBytesInFrame int,
) error {
var (
chks []prompb.Chunk
lbls []prompb.Label
)
for ss.Next() {
series := ss.At()
iter := series.Iterator()
lbls = MergeLabels(labelsToLabelsProto(series.Labels(), lbls), sortedExternalLabels)
frameBytesLeft := maxBytesInFrame
for _, lbl := range lbls {
frameBytesLeft -= lbl.Size()
}
isNext := iter.Next()
// Send at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for isNext {
chk := iter.At()
if chk.Chunk == nil {
return errors.Errorf("StreamChunkedReadResponses: found not populated chunk returned by SeriesSet at ref: %v", chk.Ref)
}
// Cut the chunk.
chks = append(chks, prompb.Chunk{
MinTimeMs: chk.MinTime,
MaxTimeMs: chk.MaxTime,
Type: prompb.Chunk_Encoding(chk.Chunk.Encoding()),
Data: chk.Chunk.Bytes(),
})
frameBytesLeft -= chks[len(chks)-1].Size()
// We are fine with minor inaccuracy of max bytes per frame. The inaccuracy will be max of full chunk size.
isNext = iter.Next()
if frameBytesLeft > 0 && isNext {
continue
}
b, err := proto.Marshal(&prompb.ChunkedReadResponse{
ChunkedSeries: []*prompb.ChunkedSeries{
{Labels: lbls, Chunks: chks},
},
QueryIndex: queryIndex,
})
if err != nil {
return errors.Wrap(err, "marshal ChunkedReadResponse")
}
if _, err := stream.Write(b); err != nil {
return errors.Wrap(err, "write to stream")
}
chks = chks[:0]
}
if err := iter.Err(); err != nil {
return err
}
}
if err := ss.Err(); err != nil {
return err
}
return nil
}
// TODO(bwplotka): Delete. For benchmarking purposes only.
func StreamChunkedReadResponsesOLD(
stream io.Writer,
queryIndex int64,
ss storage.SeriesSet,
@ -384,58 +463,6 @@ func (c *concreteSeriesSet) Err() error {
return nil
}
// concreteSeries implements storage.Series.
type concreteSeries struct {
labels labels.Labels
samples []prompb.Sample
}
func (c *concreteSeries) Labels() labels.Labels {
return labels.New(c.labels...)
}
func (c *concreteSeries) Iterator() chunkenc.Iterator {
return newConcreteSeriersIterator(c)
}
// concreteSeriesIterator implements storage.SeriesIterator.
type concreteSeriesIterator struct {
cur int
series *concreteSeries
}
func newConcreteSeriersIterator(series *concreteSeries) chunkenc.Iterator {
return &concreteSeriesIterator{
cur: -1,
series: series,
}
}
// Seek implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Seek(t int64) bool {
c.cur = sort.Search(len(c.series.samples), func(n int) bool {
return c.series.samples[n].Timestamp >= t
})
return c.cur < len(c.series.samples)
}
// At implements storage.SeriesIterator.
func (c *concreteSeriesIterator) At() (t int64, v float64) {
s := c.series.samples[c.cur]
return s.Timestamp, s.Value
}
// Next implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Next() bool {
c.cur++
return c.cur < len(c.series.samples)
}
// Err implements storage.SeriesIterator.
func (c *concreteSeriesIterator) Err() error {
return nil
}
// validateLabelsAndMetricName validates the label names/values and metric names returned from remote read,
// also making sure that there are no labels with duplicate names
func validateLabelsAndMetricName(ls labels.Labels) error {

View file

@ -19,7 +19,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil"
)
@ -123,44 +122,6 @@ func TestValidateLabelsAndMetricName(t *testing.T) {
}
}
func TestConcreteSeriesSet(t *testing.T) {
series1 := &concreteSeries{
labels: labels.FromStrings("foo", "bar"),
samples: []prompb.Sample{{Value: 1, Timestamp: 2}},
}
series2 := &concreteSeries{
labels: labels.FromStrings("foo", "baz"),
samples: []prompb.Sample{{Value: 3, Timestamp: 4}},
}
c := &concreteSeriesSet{
series: []storage.Series{series1, series2},
}
testutil.Assert(t, c.Next(), "Expected Next() to be true.")
testutil.Equals(t, series1, c.At(), "Unexpected series returned.")
testutil.Assert(t, c.Next(), "Expected Next() to be true.")
testutil.Equals(t, series2, c.At(), "Unexpected series returned.")
testutil.Assert(t, !c.Next(), "Expected Next() to be false.")
}
func TestConcreteSeriesClonesLabels(t *testing.T) {
lbls := labels.Labels{
labels.Label{Name: "a", Value: "b"},
labels.Label{Name: "c", Value: "d"},
}
cs := concreteSeries{
labels: labels.New(lbls...),
}
gotLabels := cs.Labels()
testutil.Equals(t, lbls, gotLabels)
gotLabels[0].Value = "foo"
gotLabels[1].Value = "bar"
gotLabels = cs.Labels()
testutil.Equals(t, lbls, gotLabels)
}
func TestFromQueryResultWithDuplicates(t *testing.T) {
ts1 := prompb.TimeSeries{
Labels: []prompb.Label{

167
storage/series.go Normal file
View file

@ -0,0 +1,167 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"math"
"sort"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
)
type ConcreteSeries struct {
labels labels.Labels
SampleIteratorFn func() chunkenc.Iterator
}
func NewTestSeries(lset labels.Labels, samples ...[]tsdbutil.Sample) *ConcreteSeries {
return &ConcreteSeries{
labels: lset,
SampleIteratorFn: func() chunkenc.Iterator {
var list tsdbutil.SampleSlice
for _, l := range samples {
list = append(list, l...)
}
return NewSampleIterator(list)
},
}
}
func NewSeriesFromSamples(lset labels.Labels, samples tsdbutil.Samples) Series {
return &ConcreteSeries{
labels: lset,
SampleIteratorFn: func() chunkenc.Iterator {
return NewSampleIterator(samples)
},
}
}
func (s *ConcreteSeries) Labels() labels.Labels { return s.labels }
func (s *ConcreteSeries) Iterator() chunkenc.Iterator { return s.SampleIteratorFn() }
type SampleSeriesIterator struct {
samples tsdbutil.Samples
idx int
}
func NewSampleIterator(samples tsdbutil.Samples) chunkenc.Iterator {
return &SampleSeriesIterator{samples: samples, idx: -1}
}
func (it *SampleSeriesIterator) At() (int64, float64) {
s := it.samples.Get(it.idx)
return s.T(), s.V()
}
func (it *SampleSeriesIterator) Next() bool {
it.idx++
return it.idx < it.samples.Len()
}
func (it *SampleSeriesIterator) Seek(t int64) bool {
if it.idx == -1 {
it.idx = 0
}
// Do binary search between current position and end.
it.idx = sort.Search(it.samples.Len()-it.idx, func(i int) bool {
s := it.samples.Get(i + it.idx)
return s.T() >= t
})
return it.idx < it.samples.Len()
}
func (it *SampleSeriesIterator) Err() error { return nil }
type ChunkConcreteSeries struct {
labels labels.Labels
ChunkIteratorFn func() chunks.Iterator
}
func NewTestChunkSeries(lset labels.Labels, samples ...[]tsdbutil.Sample) *ChunkConcreteSeries {
var chks []chunks.Meta
return &ChunkConcreteSeries{
labels: lset,
ChunkIteratorFn: func() chunks.Iterator {
// Inefficient chunks encoding implementation, not caring about chunk size.
for _, s := range samples {
chks = append(chks, tsdbutil.ChunkFromSamples(s))
}
return NewListChunkSeriesIterator(chks...)
},
}
}
func (s *ChunkConcreteSeries) Labels() labels.Labels { return s.labels }
func (s *ChunkConcreteSeries) Iterator() chunks.Iterator { return s.ChunkIteratorFn() }
type ChunkReader interface {
Chunk(ref uint64) (chunkenc.Chunk, error)
}
type ChunkSeriesIterator struct {
chks []chunks.Meta
idx int
}
func NewListChunkSeriesIterator(chks ...chunks.Meta) chunks.Iterator {
return &ChunkSeriesIterator{chks: chks, idx: -1}
}
func (it *ChunkSeriesIterator) At() chunks.Meta {
return it.chks[it.idx]
}
func (it *ChunkSeriesIterator) Next() bool {
it.idx++
return it.idx < len(it.chks)
}
func (it *ChunkSeriesIterator) Err() error { return nil }
func NewListChunkSeries(lset labels.Labels, chks ...chunks.Meta) ChunkSeries {
return &ChunkConcreteSeries{
labels: lset,
ChunkIteratorFn: func() chunks.Iterator {
// Inefficient chunks encoding implementation, not caring about chunk size.
return NewListChunkSeriesIterator(chks...)
},
}
}
func ExpandSamples(iter chunkenc.Iterator) ([]tsdbutil.Sample, error) {
var result []tsdbutil.Sample
for iter.Next() {
t, v := iter.At()
// NaNs can't be compared normally, so substitute for another value.
if math.IsNaN(v) {
v = -42
}
result = append(result, sample{t, v})
}
return result, iter.Err()
}
func ExpandChunks(iter chunks.Iterator) ([]chunks.Meta, error) {
var result []chunks.Meta
for iter.Next() {
result = append(result, iter.At())
}
return result, iter.Err()
}

64
storage/series_set.go Normal file
View file

@ -0,0 +1,64 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import "github.com/prometheus/prometheus/tsdb/chunkenc"
type chunkSetToSeriesSet struct {
ChunkSeriesSet
chkIterErr error
sameSeriesChunks []Series
bufIterator chunkenc.Iterator
}
func NewSeriesSetFromChunkSeriesSet(chk ChunkSeriesSet) SeriesSet {
return &chunkSetToSeriesSet{ChunkSeriesSet: chk}
}
func (c *chunkSetToSeriesSet) Next() bool {
if c.Err() != nil || !c.ChunkSeriesSet.Next() {
return false
}
iter := c.ChunkSeriesSet.At().Iterator()
c.sameSeriesChunks = c.sameSeriesChunks[:0]
for iter.Next() {
c.sameSeriesChunks = append(c.sameSeriesChunks, &ConcreteSeries{
labels: c.ChunkSeriesSet.At().Labels(),
SampleIteratorFn: func() chunkenc.Iterator {
return iter.At().Chunk.Iterator(c.bufIterator)
},
})
}
if iter.Err() != nil {
c.chkIterErr = iter.Err()
return false
}
return true
}
func (c *chunkSetToSeriesSet) At() Series {
return ChainedSeriesMerge(c.sameSeriesChunks...)
}
func (c *chunkSetToSeriesSet) Err() error {
if c.chkIterErr != nil {
return c.chkIterErr
}
return c.ChunkSeriesSet.Err()
}

View file

@ -111,10 +111,10 @@ func testChunk(t *testing.T, c Chunk) {
func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
var (
t = int64(1234123324)
v = 1243535.123
t = int64(1234123324)
v = 1243535.123
exp []pair
)
var exp []pair
for i := 0; i < b.N; i++ {
// t += int64(rand.Intn(10000) + 1)
t += int64(1000)
@ -146,7 +146,7 @@ func benchmarkIterator(b *testing.B, newChunk func() Chunk) {
b.ReportAllocs()
b.ResetTimer()
fmt.Println("num", b.N, "created chunks", len(chunks))
b.Log("num", b.N, "created chunks", len(chunks))
res := make([]float64, 0, 1024)

View file

@ -127,6 +127,7 @@ func (c *XORChunk) iterator(it Iterator) *xorIterator {
// We skip that for actual samples.
br: newBReader(c.b.bytes()[2:]),
numTotal: binary.BigEndian.Uint16(c.b.bytes()),
t: math.MinInt64,
}
}

View file

@ -67,6 +67,20 @@ type Meta struct {
MinTime, MaxTime int64
}
// Iterator iterates over the chunk of a time series.
type Iterator interface {
//// Seek advances the iterator forward to the given timestamp.
//// It advances to the chunk with min time at t or first chunk with min time after t.
//Seek(t int64) bool
// At returns the current meta.
// It depends on implementation if the chunk is populated or not.
At() Meta
// Next advances the iterator by one.
Next() bool
// Err returns optional error if Next is false.
Err() error
}
// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
buf = append(buf[:0], byte(cm.Chunk.Encoding()))
@ -242,84 +256,85 @@ func (w *Writer) write(b []byte) error {
return err
}
// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
last++
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}
return newChks, nil
}
// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator(nil)
bit := b.Iterator(nil)
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}
//
//// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
//// The last appearing sample is retained in case there is overlapping.
//// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
//func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
// if len(chks) < 2 {
// return chks, nil
// }
// newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
// newChks = append(newChks, chks[0])
// last := 0
// for _, c := range chks[1:] {
// // We need to check only the last chunk in newChks.
// // Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// // (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// // So never overlaps with newChks[last-1] or anything before that.
// if c.MinTime > newChks[last].MaxTime {
// newChks = append(newChks, c)
// last++
// continue
// }
// nc := &newChks[last]
// if c.MaxTime > nc.MaxTime {
// nc.MaxTime = c.MaxTime
// }
// chk, err := MergeChunks(nc.Chunk, c.Chunk)
// if err != nil {
// return nil, err
// }
// nc.Chunk = chk
// }
//
// return newChks, nil
//}
//
//// MergeChunks vertically merges a and b, i.e., if there is any sample
//// with same timestamp in both a and b, the sample in a is discarded.
//func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
// newChunk := chunkenc.NewXORChunk()
// app, err := newChunk.Appender()
// if err != nil {
// return nil, err
// }
// ait := a.Iterator(nil)
// bit := b.Iterator(nil)
// aok, bok := ait.Next(), bit.Next()
// for aok && bok {
// at, av := ait.At()
// bt, bv := bit.At()
// if at < bt {
// app.Append(at, av)
// aok = ait.Next()
// } else if bt < at {
// app.Append(bt, bv)
// bok = bit.Next()
// } else {
// app.Append(bt, bv)
// aok = ait.Next()
// bok = bit.Next()
// }
// }
// for aok {
// at, av := ait.At()
// app.Append(at, av)
// aok = ait.Next()
// }
// for bok {
// bt, bv := bit.At()
// app.Append(bt, bv)
// bok = bit.Next()
// }
// if ait.Err() != nil {
// return nil, ait.Err()
// }
// if bit.Err() != nil {
// return nil, bit.Err()
// }
// return newChunk, nil
//}
// WriteChunks writes as many chunks as possible to the current segment,
// cuts a new segment when the current segment is full and

View file

@ -648,7 +648,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
var (
set storage.ChunkSeriesSet
sets []storage.ChunkSeriesSet
symbols index.StringIter
closers = []io.Closer{}
overlapping bool
@ -706,18 +706,14 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
all = indexr.SortedPostings(all)
s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
s := newBlockChunkSeriesSet(indexr, chunkr, tombsr, all, meta.MinTime, meta.MaxTime)
syms := indexr.Symbols()
if i == 0 {
set = s
symbols = syms
continue
}
set, err = newCompactionMerger(set, s)
if err != nil {
return err
}
sets = append(sets, s)
symbols = newMergedStringIter(symbols, syms)
}
@ -730,8 +726,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(symbols.Err(), "next symbol")
}
delIter := &deletedIterator{}
ref := uint64(0)
var (
ref = uint64(0)
chks []chunks.Meta
)
set := storage.NewMergeChunkSeriesSet(sets, storage.NewVerticalChunkSeriesMerger(MergeOverlappingChunksVertically))
for set.Next() {
select {
case <-c.ctx.Done():
@ -739,93 +739,30 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})
chksIter := set.At().Iterator()
chks = chks[:0]
for chksIter.Next() {
chks = append(chks, chksIter.At())
}
if chksIter.Err() != nil {
return errors.Wrap(chksIter.Err(), "chunk iter")
}
// Skip the series with all deleted chunks.
if len(chks) == 0 {
continue
}
for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding.
// This happens when snapshotting the head block.
//
// Block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed hence the chk.MaxTime >= meta.MaxTime check.
//
// TODO think how to avoid the typecasting to verify when it is head block.
if _, isHeadChunk := chk.Chunk.(*safeChunk); isHeadChunk && chk.MaxTime >= meta.MaxTime {
dranges = append(dranges, tombstones.Interval{Mint: meta.MaxTime, Maxt: math.MaxInt64})
} else
// Sanity check for disk blocks.
// chk.MaxTime == meta.MaxTime shouldn't happen as well, but will brake many users so not checking for that.
if chk.MinTime < meta.MinTime || chk.MaxTime > meta.MaxTime {
return errors.Errorf("found chunk with minTime: %d maxTime: %d outside of compacted minTime: %d maxTime: %d",
chk.MinTime, chk.MaxTime, meta.MinTime, meta.MaxTime)
}
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) {
continue
}
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return err
}
delIter.it = chk.Chunk.Iterator(delIter.it)
delIter.intervals = dranges
var (
t int64
v float64
)
for delIter.Next() {
t, v = delIter.At()
app.Append(t, v)
}
if err := delIter.Err(); err != nil {
return errors.Wrap(err, "iterate chunk while re-encoding")
}
chks[i].Chunk = newChunk
chks[i].MaxTime = t
}
}
mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
}
if err := chunkw.WriteChunks(mergedChks...); err != nil {
if err := chunkw.WriteChunks(chks...); err != nil {
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
if err := indexw.AddSeries(ref, set.At().Labels(), chks...); err != nil {
return errors.Wrap(err, "add series")
}
meta.Stats.NumChunks += uint64(len(mergedChks))
meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumSeries++
for _, chk := range mergedChks {
for _, chk := range chks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}
for _, chk := range mergedChks {
for _, chk := range chks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
@ -840,165 +777,169 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return nil
}
type compactionSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
var _ storage.ChunkSeriesSet = &blockChunkSeriesSet{}
l labels.Labels
c []chunks.Meta
// blockChunkSeriesSet allows to iterate over sorted series with populated with applied tombstones.
// Series with all deleted chunks are still present as Labelled iterator with no chunks.
// Chunks are also trimmed to requested min and max time.
type blockChunkSeriesSet struct {
p index.Postings
index IndexReader
chunks ChunkReader
tombstones tombstones.Reader
minTime, maxTime int64
labels labels.Labels
chks []chunks.Meta
intervals tombstones.Intervals
err error
err error
}
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings) *compactionSeriesSet {
return &compactionSeriesSet{
func newBlockChunkSeriesSet(i IndexReader, c ChunkReader, t tombstones.Reader, p index.Postings, minTime, maxTime int64) *blockChunkSeriesSet {
return &blockChunkSeriesSet{
index: i,
chunks: c,
tombstones: t,
p: p,
minTime: minTime,
maxTime: maxTime,
}
}
func (c *compactionSeriesSet) Next() bool {
if !c.p.Next() {
return false
}
var err error
c.intervals, err = c.tombstones.Get(c.p.At())
if err != nil {
c.err = errors.Wrap(err, "get tombstones")
return false
}
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At())
return false
}
// Remove completely deleted chunks.
if len(c.intervals) > 0 {
chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c {
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(c.intervals)) {
chks = append(chks, chk)
}
func (b *blockChunkSeriesSet) Next() bool {
for b.p.Next() {
b.intervals, b.err = b.tombstones.Get(b.p.At())
if b.err != nil {
b.err = errors.Wrap(b.err, "get tombstones")
return false
}
c.c = chks
}
for i := range c.c {
chk := &c.c[i]
chk.Chunk, err = c.chunks.Chunk(chk.Ref)
if err != nil {
c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref)
if err := b.index.Series(b.p.At(), &b.labels, &b.chks); err != nil {
b.err = errors.Wrapf(err, "get series %d", b.p.At())
return false
}
}
return true
return false
}
func (c *compactionSeriesSet) Err() error {
if c.err != nil {
return c.err
func (b *blockChunkSeriesSet) Err() error {
if b.err != nil {
return b.err
}
return c.p.Err()
return b.p.Err()
}
func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
func (b *blockChunkSeriesSet) At() storage.ChunkSeries {
return b
}
type compactionMerger struct {
a, b storage.ChunkSeriesSet
func (b *blockChunkSeriesSet) Labels() labels.Labels {
return b.labels
}
aok, bok bool
l labels.Labels
c []chunks.Meta
func (b *blockChunkSeriesSet) Iterator() chunks.Iterator {
return &populateWithTombstonesChunkIterator{chunks: b.chunks, minTime: b.minTime, maxTime: b.maxTime, chks: b.chks, intervals: b.intervals, i: -1, bufDelIter: &deletedIterator{}}
}
type populateWithTombstonesChunkIterator struct {
chunks ChunkReader
// Requested min and max time range iterator should scope down.
minTime, maxTime int64
// chks are expected to be sorted by minTime and should be related to the same, single series.
chks []chunks.Meta
intervals tombstones.Intervals
i int
curr chunks.Meta
err error
bufDelIter *deletedIterator
}
func newCompactionMerger(a, b storage.ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,
b: b,
}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
c.aok = c.a.Next()
c.bok = c.b.Next()
func (p *populateWithTombstonesChunkIterator) Next() bool {
for p.err == nil && p.i < len(p.chks)-1 {
p.i++
return c, c.Err()
}
p.curr = p.chks[p.i]
func (c *compactionMerger) compare() int {
if !c.aok {
return 1
}
if !c.bok {
return -1
}
a, _, _ := c.a.At()
b, _, _ := c.b.At()
return labels.Compare(a, b)
}
// NOTE: block time range is half-open: [meta.MinTime, meta.MaxTime) and
// chunks are closed: [chk.MinTime, chk.MaxTime].
func (c *compactionMerger) Next() bool {
if !c.aok && !c.bok || c.Err() != nil {
return false
}
// While advancing child iterators the memory used for labels and chunks
// may be reused. When picking a series we have to store the result.
var lset labels.Labels
var chks []chunks.Meta
d := c.compare()
if d > 0 {
lset, chks, c.intervals = c.b.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
c.bok = c.b.Next()
} else if d < 0 {
lset, chks, c.intervals = c.a.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
c.aok = c.a.Next()
} else {
// Both sets contain the current series. Chain them into a single one.
l, ca, ra := c.a.At()
_, cb, rb := c.b.At()
for _, r := range rb {
ra = ra.Add(r)
if p.curr.MaxTime < p.minTime {
// Fully before the requested range.
continue
}
c.l = append(c.l[:0], l...)
c.c = append(append(c.c[:0], ca...), cb...)
c.intervals = ra
if p.curr.MinTime >= p.maxTime {
// Fully outside of the requested range.
p.i = len(p.chks)
return false
}
c.aok = c.a.Next()
c.bok = c.b.Next()
// We cannot use p.intervals directly, as we don't want to modify it for next iterations.
p.bufDelIter.intervals = append(p.bufDelIter.intervals[:0], p.intervals...)
// Re-encode head chunks that are partially outside of requested min and max time.
//
// It is mostly for blockQuerier, but also for compactor. During (cut) compaction,
// the head block's chunks can be still open (being appended to) and thus
// outside the compacted MaxTime range.
// The chunk.Bytes() method is not safe for open chunks hence the re-encoding. This happens often
// when snapshotting the head block.
if p.curr.MaxTime >= p.maxTime {
p.bufDelIter.intervals = append(p.bufDelIter.intervals, tombstones.Interval{Mint: p.maxTime - 1, Maxt: math.MaxInt64})
}
if p.curr.MinTime < p.minTime {
p.bufDelIter.intervals = append(p.bufDelIter.intervals, tombstones.Interval{Mint: math.MinInt64, Maxt: p.minTime})
}
p.curr.Chunk, p.err = p.chunks.Chunk(p.curr.Ref)
if p.err != nil {
p.err = errors.Wrapf(p.err, "cannot populate chunk %d", p.curr.Ref)
return false
}
// Re-encode the chunk to not have deleted values or if they are still open from case above.
if len(p.bufDelIter.intervals) > 0 && p.curr.OverlapsClosedInterval(p.bufDelIter.intervals[0].Mint, p.bufDelIter.intervals[len(p.bufDelIter.intervals)-1].Maxt) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
p.err = err
return false
}
p.bufDelIter.it = p.curr.Chunk.Iterator(p.bufDelIter.it)
var (
t int64
v float64
)
for p.bufDelIter.Next() {
t, v = p.bufDelIter.At()
app.Append(t, v)
}
if err := p.bufDelIter.Err(); err != nil {
p.err = errors.Wrap(err, "iterate chunk while re-encoding")
return false
}
p.curr.Chunk = newChunk
p.curr.MaxTime = t
}
return true
}
return true
return false
}
func (c *compactionMerger) Err() error {
if c.a.Err() != nil {
return c.a.Err()
}
return c.b.Err()
func (p *populateWithTombstonesChunkIterator) At() chunks.Meta {
return p.curr
}
func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return c.l, c.c, c.intervals
func (p *populateWithTombstonesChunkIterator) Err() error {
return p.err
}
func newMergedStringIter(a index.StringIter, b index.StringIter) index.StringIter {

View file

@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/testutil"
)
@ -1087,3 +1088,100 @@ func TestDeleteCompactionBlockAfterFailedReload(t *testing.T) {
})
}
}
func TestBlockChunkSeriesSet(t *testing.T) {
type refdSeries struct {
lset labels.Labels
chunks []chunks.Meta
ref uint64
}
cases := []struct {
series []refdSeries
// Postings should be in the sorted order of the series
postings []uint64
expIdxs []int
}{
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}}...),
chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 121},
},
ref: 12,
},
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 1,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
},
ref: 108,
},
},
postings: []uint64{12, 13, 10, 108}, // 13 doesn't exist and should just be skipped over.
expIdxs: []int{0, 1, 3},
},
{
series: []refdSeries{
{
lset: labels.New([]labels.Label{{Name: "a", Value: "a"}, {Name: "b", Value: "b"}}...),
chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
},
ref: 10,
},
{
lset: labels.New([]labels.Label{{Name: "b", Value: "c"}}...),
chunks: []chunks.Meta{{Ref: 8282}},
ref: 3,
},
},
postings: []uint64{},
expIdxs: []int{},
},
}
for _, tc := range cases {
mi := newMockIndex()
for _, s := range tc.series {
testutil.Ok(t, mi.AddSeries(s.ref, s.lset, s.chunks...))
}
bcs := &blockChunkSeriesSet{
p: index.NewListPostings(tc.postings),
index: mi,
tombstones: tombstones.NewMemTombstones(),
}
i := 0
for bcs.Next() {
// TODO
//// lset, chks, _ := bcs.At()
//
//idx := tc.expIdxs[i]
//
//testutil.Equals(t, tc.series[idx].lset, lset)
//testutil.Equals(t, tc.series[idx].chunks, chks)
//
//i++
}
testutil.Equals(t, len(tc.expIdxs), i)
testutil.Ok(t, bcs.Err())
}
}

View file

@ -813,7 +813,7 @@ func TestDelete_e2e(t *testing.T) {
smpls = deletedSamples(smpls, del.drange)
// Only append those series for which samples exist as mockSeriesSet
// doesn't skip series with no samples.
// TODO: But sometimes SeriesSet returns an empty SeriesIterator
// TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator
if len(smpls) > 0 {
matchedSeries = append(matchedSeries, newSeries(
m.Map(),

View file

@ -14,6 +14,7 @@
package tsdb
import (
"math"
"sort"
"strings"
"unicode/utf8"
@ -121,6 +122,7 @@ func (q *querier) Close() error {
// verticalQuerier aggregates querying results from time blocks within
// a single partition. The block time ranges can be overlapping.
// TODO(bwplotka): Remove this once we move to storage.MergeSeriesSet function as they handle overlaps.
type verticalQuerier struct {
querier
}
@ -190,35 +192,19 @@ type blockQuerier struct {
}
func (q *blockQuerier) Select(sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
var base storage.ChunkSeriesSet
var err error
if sortSeries {
base, err = LookupChunkSeriesSorted(q.index, q.tombstones, ms...)
} else {
base, err = LookupChunkSeries(q.index, q.tombstones, ms...)
}
if err != nil {
return nil, nil, err
}
mint := q.mint
maxt := q.maxt
if hints != nil {
mint = hints.Start
maxt = hints.End
}
return &blockSeriesSet{
set: &populatedChunkSeries{
set: base,
chunks: q.chunks,
mint: mint,
maxt: maxt,
},
mint: mint,
maxt: maxt,
}, nil, nil
base, err := LookupChunkSeries(sortSeries, q.index, q.tombstones, q.chunks, mint, maxt, ms...)
if err != nil {
return nil, nil, err
}
return storage.NewSeriesSetFromChunkSeriesSet(base), nil, nil
}
func (q *blockQuerier) LabelValues(name string) ([]string, storage.Warnings, error) {
@ -467,7 +453,7 @@ func mergeStrings(a, b []string) []string {
// mergedSeriesSet returns a series sets slice as a single series set. The input series sets
// must be sorted and sequential in time.
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package (to limit size of PR).
type mergedSeriesSet struct {
all []storage.SeriesSet
buf []storage.SeriesSet // A buffer for keeping the order of SeriesSet slice during forwarding the SeriesSet.
@ -477,7 +463,7 @@ type mergedSeriesSet struct {
cur storage.Series
}
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package.
// TODO(bwplotka): Merge this with merge SeriesSet available in storage package (to limit size of PR).
func NewMergedSeriesSet(all []storage.SeriesSet) storage.SeriesSet {
if len(all) == 1 {
return all[0]
@ -582,7 +568,7 @@ func (s *mergedSeriesSet) Next() bool {
for i, idx := range s.ids {
series[i] = s.all[idx].At()
}
s.cur = &chainedSeries{series: series}
s.cur = storage.ChainedSeriesMerge(series...)
} else {
s.cur = s.all[s.ids[0]].At()
}
@ -648,39 +634,16 @@ func (s *mergedVerticalSeriesSet) Next() bool {
s.cur = s.a.At()
s.adone = !s.a.Next()
} else {
s.cur = &verticalChainedSeries{series: []storage.Series{s.a.At(), s.b.At()}}
s.cur = storage.ChainedSeriesMerge([]storage.Series{s.a.At(), s.b.At()}...)
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
}
return true
}
// baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct {
p index.Postings
index IndexReader
tombstones tombstones.Reader
lset labels.Labels
chks []chunks.Meta
intervals tombstones.Intervals
err error
}
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader.
func LookupChunkSeries(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
return lookupChunkSeries(false, ir, tr, ms...)
}
// LookupChunkSeries retrieves all series for the given matchers and returns a ChunkSeriesSet
// over them. It drops chunks based on tombstones in the given reader. Series will be in order.
func LookupChunkSeriesSorted(ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
return lookupChunkSeries(true, ir, tr, ms...)
}
func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
func LookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, c ChunkReader, minTime, maxTime int64, ms ...*labels.Matcher) (storage.ChunkSeriesSet, error) {
if tr == nil {
tr = tombstones.NewMemTombstones()
}
@ -691,269 +654,49 @@ func lookupChunkSeries(sorted bool, ir IndexReader, tr tombstones.Reader, ms ...
if sorted {
p = ir.SortedPostings(p)
}
return &baseChunkSeries{
p: p,
index: ir,
tombstones: tr,
}, nil
return newBlockChunkSeriesSet(ir, c, tr, p, minTime, maxTime), nil
}
func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return s.lset, s.chks, s.intervals
}
func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool {
var (
lset = make(labels.Labels, len(s.lset))
chkMetas = make([]chunks.Meta, len(s.chks))
err error
)
for s.p.Next() {
ref := s.p.At()
if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
// Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == ErrNotFound {
continue
}
s.err = err
return false
}
s.lset = lset
s.chks = chkMetas
s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil {
s.err = errors.Wrap(err, "get tombstones")
return false
}
if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted.
chks := make([]chunks.Meta, 0, len(s.chks))
for _, chk := range s.chks {
if !(tombstones.Interval{Mint: chk.MinTime, Maxt: chk.MaxTime}.IsSubrange(s.intervals)) {
chks = append(chks, chk)
}
}
s.chks = chks
}
return true
// MergeOverlappingChunksVertically merges multiple time overlapping chunks together into one.
// TODO(bwplotka): https://github.com/prometheus/tsdb/issues/670
func MergeOverlappingChunksVertically(chks ...chunks.Meta) chunks.Iterator {
chk := chunkenc.NewXORChunk()
app, err := chk.Appender()
if err != nil {
return errChunksIterator{err: err}
}
if err := s.p.Err(); err != nil {
s.err = err
}
return false
}
seriesIter := newVerticalMergeSeriesIterator(chks...)
// populatedChunkSeries loads chunk data from a store for a set of series
// with known chunk references. It filters out chunks that do not fit the
// given time range.
type populatedChunkSeries struct {
set storage.ChunkSeriesSet
chunks ChunkReader
mint, maxt int64
mint := int64(math.MaxInt64)
maxt := int64(math.MinInt64)
err error
chks []chunks.Meta
lset labels.Labels
intervals tombstones.Intervals
}
for seriesIter.Next() {
t, v := seriesIter.At()
app.Append(t, v)
func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, tombstones.Intervals) {
return s.lset, s.chks, s.intervals
}
func (s *populatedChunkSeries) Err() error { return s.err }
func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks, dranges := s.set.At()
for len(chks) > 0 {
if chks[0].MaxTime >= s.mint {
break
}
chks = chks[1:]
maxt = t
if mint == math.MaxInt64 {
mint = t
}
// This is to delete in place while iterating.
for i, rlen := 0, len(chks); i < rlen; i++ {
j := i - (rlen - len(chks))
c := &chks[j]
// Break out at the first chunk that has no overlap with mint, maxt.
if c.MinTime > s.maxt {
chks = chks[:j]
break
}
c.Chunk, s.err = s.chunks.Chunk(c.Ref)
if s.err != nil {
// This means that the chunk has be garbage collected. Remove it from the list.
if s.err == ErrNotFound {
s.err = nil
// Delete in-place.
s.chks = append(chks[:j], chks[j+1:]...)
}
return false
}
}
if len(chks) == 0 {
continue
}
s.lset = lset
s.chks = chks
s.intervals = dranges
return true
}
if err := s.set.Err(); err != nil {
s.err = err
if err := seriesIter.Err(); err != nil {
return errChunksIterator{err: err}
}
return false
return storage.NewListChunkSeriesIterator(chunks.Meta{
MinTime: mint,
MaxTime: maxt,
Chunk: chk,
})
}
// blockSeriesSet is a set of series from an inverted index query.
type blockSeriesSet struct {
set storage.ChunkSeriesSet
type errChunksIterator struct {
err error
cur storage.Series
mint, maxt int64
}
func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks, dranges := s.set.At()
s.cur = &chunkSeries{
labels: lset,
chunks: chunks,
mint: s.mint,
maxt: s.maxt,
intervals: dranges,
}
return true
}
if s.set.Err() != nil {
s.err = s.set.Err()
}
return false
}
func (s *blockSeriesSet) At() storage.Series { return s.cur }
func (s *blockSeriesSet) Err() error { return s.err }
// chunkSeries is a series that is backed by a sequence of chunks holding
// time series data.
type chunkSeries struct {
labels labels.Labels
chunks []chunks.Meta // in-order chunk refs
mint, maxt int64
intervals tombstones.Intervals
}
func (s *chunkSeries) Labels() labels.Labels {
return s.labels
}
func (s *chunkSeries) Iterator() chunkenc.Iterator {
return newChunkSeriesIterator(s.chunks, s.intervals, s.mint, s.maxt)
}
// chainedSeries implements a series for a list of time-sorted series.
// They all must have the same labels.
type chainedSeries struct {
series []storage.Series
}
func (s *chainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}
func (s *chainedSeries) Iterator() chunkenc.Iterator {
return newChainedSeriesIterator(s.series...)
}
// chainedSeriesIterator implements a series iterator over a list
// of time-sorted, non-overlapping iterators.
type chainedSeriesIterator struct {
series []storage.Series // series in time order
i int
cur chunkenc.Iterator
}
func newChainedSeriesIterator(s ...storage.Series) *chainedSeriesIterator {
return &chainedSeriesIterator{
series: s,
i: 0,
cur: s[0].Iterator(),
}
}
func (it *chainedSeriesIterator) Seek(t int64) bool {
// We just scan the chained series sequentially as they are already
// pre-selected by relevant time and should be accessed sequentially anyway.
for i, s := range it.series[it.i:] {
cur := s.Iterator()
if !cur.Seek(t) {
continue
}
it.cur = cur
it.i += i
return true
}
return false
}
func (it *chainedSeriesIterator) Next() bool {
if it.cur.Next() {
return true
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.series)-1 {
return false
}
it.i++
it.cur = it.series[it.i].Iterator()
return it.Next()
}
func (it *chainedSeriesIterator) At() (t int64, v float64) {
return it.cur.At()
}
func (it *chainedSeriesIterator) Err() error {
return it.cur.Err()
}
// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series.
// They all must have the same labels.
type verticalChainedSeries struct {
series []storage.Series
}
func (s *verticalChainedSeries) Labels() labels.Labels {
return s.series[0].Labels()
}
func (s *verticalChainedSeries) Iterator() chunkenc.Iterator {
return newVerticalMergeSeriesIterator(s.series...)
}
func (e errChunksIterator) At() chunks.Meta { return chunks.Meta{} }
func (e errChunksIterator) Next() bool { return false }
func (e errChunksIterator) Err() error { return e.err }
// verticalMergeSeriesIterator implements a series iterator over a list
// of time-sorted, time-overlapping iterators.
@ -965,18 +708,23 @@ type verticalMergeSeriesIterator struct {
curV float64
}
func newVerticalMergeSeriesIterator(s ...storage.Series) chunkenc.Iterator {
func newVerticalMergeSeriesIterator(s ...chunks.Meta) chunkenc.Iterator {
if len(s) == 1 {
return s[0].Iterator()
} else if len(s) == 2 {
return s[0].Chunk.Iterator(nil)
}
if len(s) == 2 {
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: s[1].Iterator(),
a: s[0].Chunk.Iterator(nil),
b: s[1].Chunk.Iterator(nil),
curT: math.MinInt64,
}
}
return &verticalMergeSeriesIterator{
a: s[0].Iterator(),
b: newVerticalMergeSeriesIterator(s[1:]...),
a: s[0].Chunk.Iterator(nil),
b: newVerticalMergeSeriesIterator(s[1:]...),
curT: math.MinInt64,
}
}

File diff suppressed because it is too large Load diff

View file

@ -15,25 +15,13 @@ package tsdbutil
import (
"math"
)
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
// BufferedSeriesIterator wraps an iterator with a look-back buffer.
type BufferedSeriesIterator struct {
it SeriesIterator
it chunkenc.Iterator
buf *sampleRing
lastTime int64
@ -41,7 +29,7 @@ type BufferedSeriesIterator struct {
// NewBuffer returns a new iterator that buffers the values within the time range
// of the current element and the duration of delta before.
func NewBuffer(it SeriesIterator, delta int64) *BufferedSeriesIterator {
func NewBuffer(it chunkenc.Iterator, delta int64) *BufferedSeriesIterator {
return &BufferedSeriesIterator{
it: it,
buf: newSampleRing(delta, 16),
@ -56,7 +44,7 @@ func (b *BufferedSeriesIterator) PeekBack() (t int64, v float64, ok bool) {
}
// Buffer returns an iterator over the buffered data.
func (b *BufferedSeriesIterator) Buffer() SeriesIterator {
func (b *BufferedSeriesIterator) Buffer() chunkenc.Iterator {
return b.buf.iterator()
}
@ -145,7 +133,7 @@ func (r *sampleRing) reset() {
r.f = 0
}
func (r *sampleRing) iterator() SeriesIterator {
func (r *sampleRing) iterator() chunkenc.Iterator {
return &sampleRingIterator{r: r, i: -1}
}

View file

@ -18,23 +18,37 @@ import (
"github.com/prometheus/prometheus/tsdb/chunks"
)
type Samples interface {
Get(i int) Sample
Len() int
}
type Sample interface {
T() int64
V() float64
}
type SampleSlice []Sample
func (s SampleSlice) Get(i int) Sample { return s[i] }
func (s SampleSlice) Len() int { return len(s) }
func ChunkFromSamples(s []Sample) chunks.Meta {
return ChunkFromSamplesGeneric(SampleSlice(s))
}
func ChunkFromSamplesGeneric(s Samples) chunks.Meta {
mint, maxt := int64(0), int64(0)
if len(s) > 0 {
mint, maxt = s[0].T(), s[len(s)-1].T()
if s.Len() > 0 {
mint, maxt = s.Get(0).T(), s.Get(s.Len()-1).T()
}
c := chunkenc.NewXORChunk()
ca, _ := c.Appender()
for _, s := range s {
ca.Append(s.T(), s.V())
for i := 0; i < s.Len(); i++ {
ca.Append(s.Get(i).T(), s.Get(i).V())
}
return chunks.Meta{
MinTime: mint,

View file

@ -539,7 +539,7 @@ func (api *API) series(r *http.Request) apiFuncResult {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, nil)
set := storage.NewChainedMergeSeriesSet(sets, nil)
metrics := []labels.Labels{}
for set.Next() {
metrics = append(metrics, set.At().Labels())

View file

@ -98,14 +98,14 @@ func (h *Handler) federation(w http.ResponseWriter, req *http.Request) {
sets = append(sets, s)
}
set := storage.NewMergeSeriesSet(sets, nil)
set := storage.NewChainedMergeSeriesSet(sets, nil)
it := storage.NewBuffer(int64(h.lookbackDelta / 1e6))
for set.Next() {
s := set.At()
// TODO(fabxc): allow fast path for most recent sample either
// in the storage itself or caching layer in Prometheus.
it.Reset(s.Iterator())
it.Reset(s.SampleIterator())
var t int64
var v float64