Move index and chunk encoders to own packages

This commit is contained in:
Fabian Reinartz 2017-11-30 15:34:49 +01:00
parent fe67d2f16a
commit 67f0ca8f0e
23 changed files with 1109 additions and 625 deletions

139
block.go
View file

@ -23,10 +23,101 @@ import (
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
// IndexWriter serializes the index for a block of series data.
// The methods must be called in the order they are specified in.
type IndexWriter interface {
// AddSymbols registers all string symbols that are encountered in series
// and other indices.
AddSymbols(sym map[string]struct{}) error
// AddSeries populates the index writer with a series and its offsets
// of chunks that the index can reference.
// Implementations may require series to be insert in increasing order by
// their labels.
// The reference numbers are used to resolve entries in postings lists that
// are added later.
AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
WriteLabelIndex(names []string, values []string) error
// WritePostings writes a postings list for a single label pair.
// The Postings here contain refs to the series that were added.
WritePostings(name, value string, it index.Postings) error
// Close writes any finalization and closes the resources associated with
// the underlying writer.
Close() error
}
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// Symbols returns a set of string symbols that may occur in series' labels
// and indices.
Symbols() (map[string]struct{}, error)
// LabelValues returns the possible label values
LabelValues(names ...string) (index.StringTuples, error)
// Postings returns the postings list iterator for the label pair.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
Postings(name, value string) (index.Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(index.Postings) index.Postings
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader.
Close() error
}
// StringTuples provides access to a sorted list of string tuples.
type StringTuples interface {
// Total number of tuples in the list.
Len() int
// At returns the tuple at position i.
At(i int) ([]string, error)
}
// ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
// must be populated.
// After returning successfully, the Ref fields in the ChunkMetas
// are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...chunks.Meta) error
// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Close() error
}
// ChunkReader provides reading access of serialized time series data.
type ChunkReader interface {
// Chunk returns the series data chunk with the given reference.
Chunk(ref uint64) (chunkenc.Chunk, error)
// Close releases all underlying resources of the reader.
Close() error
}
// BlockReader provides reading access to a data block. // BlockReader provides reading access to a data block.
type BlockReader interface { type BlockReader interface {
// Index returns an IndexReader over the block's data. // Index returns an IndexReader over the block's data.
@ -91,8 +182,12 @@ type blockMeta struct {
*BlockMeta *BlockMeta
} }
const indexFilename = "index"
const metaFilename = "meta.json" const metaFilename = "meta.json"
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func walDir(dir string) string { return filepath.Join(dir, "wal") }
func readMetaFile(dir string) (*BlockMeta, error) { func readMetaFile(dir string) (*BlockMeta, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename))
if err != nil { if err != nil {
@ -150,17 +245,17 @@ type Block struct {
// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs. // to instantiate chunk structs.
func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) {
meta, err := readMetaFile(dir) meta, err := readMetaFile(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
cr, err := NewDirChunkReader(chunkDir(dir), pool) cr, err := chunks.NewDirReader(chunkDir(dir), pool)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ir, err := NewFileIndexReader(filepath.Join(dir, "index")) ir, err := index.NewFileReader(filepath.Join(dir, "index"))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -300,7 +395,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error {
stones := memTombstones{} stones := memTombstones{}
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []chunks.Meta
Outer: Outer:
for p.Next() { for p.Next() {
@ -405,9 +500,6 @@ func (pb *Block) Snapshot(dir string) error {
return nil return nil
} }
func chunkDir(dir string) string { return filepath.Join(dir, "chunks") }
func walDir(dir string) string { return filepath.Join(dir, "wal") }
func clampInterval(a, b, mint, maxt int64) (int64, int64) { func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint { if a < mint {
a = mint a = mint
@ -417,36 +509,3 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) {
} }
return a, b return a, b
} }
type mmapFile struct {
f *os.File
b []byte
}
func openMmapFile(path string) (*mmapFile, error) {
f, err := os.Open(path)
if err != nil {
return nil, errors.Wrap(err, "try lock file")
}
info, err := f.Stat()
if err != nil {
return nil, errors.Wrap(err, "stat")
}
b, err := mmap(f, int(info.Size()))
if err != nil {
return nil, errors.Wrap(err, "mmap")
}
return &mmapFile{f: f, b: b}, nil
}
func (f *mmapFile) Close() error {
err0 := munmap(f.b)
err1 := f.f.Close()
if err0 != nil {
return err0
}
return err1
}

View file

@ -16,8 +16,10 @@ package tsdb
import ( import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"testing" "testing"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -42,9 +44,9 @@ func createEmptyBlock(t *testing.T, dir string) *Block {
testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) testutil.Ok(t, writeMetaFile(dir, &BlockMeta{}))
ir, err := newIndexWriter(dir) ir, err := index.NewWriter(filepath.Join(dir, indexFilename))
testutil.Ok(t, err) Ok(t, err)
testutil.Ok(t, ir.Close()) Ok(t, ir.Close())
testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777))

View file

@ -39,7 +39,7 @@
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunks package chunkenc
import "io" import "io"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package chunks package chunkenc
import ( import (
"fmt" "fmt"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package chunks package chunkenc
import ( import (
"fmt" "fmt"

View file

@ -41,7 +41,7 @@
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunks package chunkenc
import ( import (
"encoding/binary" "encoding/binary"

View file

@ -11,18 +11,22 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package chunks
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash" "hash"
"hash/crc32"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath"
"strconv"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
) )
@ -31,19 +35,19 @@ const (
MagicChunks = 0x85BD40DD MagicChunks = 0x85BD40DD
) )
// ChunkMeta holds information about a chunk of data. // Meta holds information about a chunk of data.
type ChunkMeta struct { type Meta struct {
// Ref and Chunk hold either a reference that can be used to retrieve // Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself. // chunk data or the data itself.
// Generally, only one of them is set. // Generally, only one of them is set.
Ref uint64 Ref uint64
Chunk chunks.Chunk Chunk chunkenc.Chunk
MinTime, MaxTime int64 // time range the data covers MinTime, MaxTime int64 // time range the data covers
} }
// writeHash writes the chunk encoding and raw data into the provided hash. // writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *ChunkMeta) writeHash(h hash.Hash) error { func (cm *Meta) writeHash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
return err return err
} }
@ -53,62 +57,30 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
return nil return nil
} }
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not var (
// returned. errInvalidSize = fmt.Errorf("invalid size")
type deletedIterator struct { errInvalidFlag = fmt.Errorf("invalid flag")
it chunks.Iterator errInvalidChecksum = fmt.Errorf("invalid checksum")
)
intervals Intervals // The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table
func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
} }
func (it *deletedIterator) At() (int64, float64) { // newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
return it.it.At() // polynomial may be easily changed in one location at a later time, if necessary.
func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
} }
func (it *deletedIterator) Next() bool { // Writer implements the ChunkWriter interface for the standard
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for _, tr := range it.intervals {
if tr.inBounds(ts) {
continue Outer
}
if ts > tr.Maxt {
it.intervals = it.intervals[1:]
continue
}
return true
}
return true
}
return false
}
func (it *deletedIterator) Err() error {
return it.it.Err()
}
// ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
// must be populated.
// After returning successfully, the Ref fields in the ChunkMetas
// are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...ChunkMeta) error
// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Close() error
}
// chunkWriter implements the ChunkWriter interface for the standard
// serialization format. // serialization format.
type chunkWriter struct { type Writer struct {
dirFile *os.File dirFile *os.File
files []*os.File files []*os.File
wbuf *bufio.Writer wbuf *bufio.Writer
@ -124,7 +96,8 @@ const (
chunksFormatV1 = 1 chunksFormatV1 = 1
) )
func newChunkWriter(dir string) (*chunkWriter, error) { // NewWriter returns a new writer against the given directory.
func NewWriter(dir string) (*Writer, error) {
if err := os.MkdirAll(dir, 0777); err != nil { if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err return nil, err
} }
@ -132,7 +105,7 @@ func newChunkWriter(dir string) (*chunkWriter, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
cw := &chunkWriter{ cw := &Writer{
dirFile: dirFile, dirFile: dirFile,
n: 0, n: 0,
crc32: newCRC32(), crc32: newCRC32(),
@ -141,7 +114,7 @@ func newChunkWriter(dir string) (*chunkWriter, error) {
return cw, nil return cw, nil
} }
func (w *chunkWriter) tail() *os.File { func (w *Writer) tail() *os.File {
if len(w.files) == 0 { if len(w.files) == 0 {
return nil return nil
} }
@ -150,7 +123,7 @@ func (w *chunkWriter) tail() *os.File {
// finalizeTail writes all pending data to the current tail file, // finalizeTail writes all pending data to the current tail file,
// truncates its size, and closes it. // truncates its size, and closes it.
func (w *chunkWriter) finalizeTail() error { func (w *Writer) finalizeTail() error {
tf := w.tail() tf := w.tail()
if tf == nil { if tf == nil {
return nil return nil
@ -174,7 +147,7 @@ func (w *chunkWriter) finalizeTail() error {
return tf.Close() return tf.Close()
} }
func (w *chunkWriter) cut() error { func (w *Writer) cut() error {
// Sync current tail to disk and close. // Sync current tail to disk and close.
if err := w.finalizeTail(); err != nil { if err := w.finalizeTail(); err != nil {
return err return err
@ -216,13 +189,13 @@ func (w *chunkWriter) cut() error {
return nil return nil
} }
func (w *chunkWriter) write(b []byte) error { func (w *Writer) write(b []byte) error {
n, err := w.wbuf.Write(b) n, err := w.wbuf.Write(b)
w.n += int64(n) w.n += int64(n)
return err return err
} }
func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case // Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one. // we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) // The number of chunks. maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
@ -272,11 +245,11 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error {
return nil return nil
} }
func (w *chunkWriter) seq() int { func (w *Writer) seq() int {
return len(w.files) - 1 return len(w.files) - 1
} }
func (w *chunkWriter) Close() error { func (w *Writer) Close() error {
if err := w.finalizeTail(); err != nil { if err := w.finalizeTail(); err != nil {
return err return err
} }
@ -285,29 +258,40 @@ func (w *chunkWriter) Close() error {
return w.dirFile.Close() return w.dirFile.Close()
} }
// ChunkReader provides reading access of serialized time series data. // ByteSlice abstracts a byte slice.
type ChunkReader interface { type ByteSlice interface {
// Chunk returns the series data chunk with the given reference. Len() int
Chunk(ref uint64) (chunks.Chunk, error) Range(start, end int) []byte
// Close releases all underlying resources of the reader.
Close() error
} }
// chunkReader implements a SeriesReader for a serialized byte stream type realByteSlice []byte
func (b realByteSlice) Len() int {
return len(b)
}
func (b realByteSlice) Range(start, end int) []byte {
return b[start:end]
}
func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end]
}
// Reader implements a SeriesReader for a serialized byte stream
// of series data. // of series data.
type chunkReader struct { type Reader struct {
// The underlying bytes holding the encoded series data. // The underlying bytes holding the encoded series data.
bs []ByteSlice bs []ByteSlice
// Closers for resources behind the byte slices. // Closers for resources behind the byte slices.
cs []io.Closer cs []io.Closer
pool chunks.Pool pool chunkenc.Pool
} }
func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) {
cr := chunkReader{pool: pool, bs: bs, cs: cs} cr := Reader{pool: pool, bs: bs, cs: cs}
for i, b := range cr.bs { for i, b := range cr.bs {
if b.Len() < 4 { if b.Len() < 4 {
@ -321,44 +305,44 @@ func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkRea
return &cr, nil return &cr, nil
} }
// NewChunkReader returns a new chunk reader against the given byte slices. // NewReader returns a new chunk reader against the given byte slices.
func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) { func NewReader(bs []ByteSlice, pool chunkenc.Pool) (*Reader, error) {
if pool == nil { if pool == nil {
pool = chunks.NewPool() pool = chunkenc.NewPool()
} }
return newChunkReader(bs, nil, pool) return newReader(bs, nil, pool)
} }
// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the // NewDirReader returns a new Reader against sequentially numbered files in the
// given directory. // given directory.
func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
files, err := sequenceFiles(dir) files, err := sequenceFiles(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if pool == nil { if pool == nil {
pool = chunks.NewPool() pool = chunkenc.NewPool()
} }
var bs []ByteSlice var bs []ByteSlice
var cs []io.Closer var cs []io.Closer
for _, fn := range files { for _, fn := range files {
f, err := openMmapFile(fn) f, err := fileutil.OpenMmapFile(fn)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "mmap files") return nil, errors.Wrapf(err, "mmap files")
} }
cs = append(cs, f) cs = append(cs, f)
bs = append(bs, realByteSlice(f.b)) bs = append(bs, realByteSlice(f.Bytes()))
} }
return newChunkReader(bs, cs, pool) return newReader(bs, cs, pool)
} }
func (s *chunkReader) Close() error { func (s *Reader) Close() error {
return closeAll(s.cs...) return closeAll(s.cs...)
} }
func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var ( var (
seq = int(ref >> 32) seq = int(ref >> 32)
off = int((ref << 32) >> 32) off = int((ref << 32) >> 32)
@ -381,5 +365,47 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
} }
r = b.Range(off+n, off+n+int(l)) r = b.Range(off+n, off+n+int(l))
return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l])
}
func nextSequenceFile(dir string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
}
i := uint64(0)
for _, n := range names {
j, err := strconv.ParseUint(n, 10, 64)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
func sequenceFiles(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
}
var res []string
for _, fi := range files {
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
}
res = append(res, filepath.Join(dir, fi.Name()))
}
return res, nil
}
func closeAll(cs ...io.Closer) (err error) {
for _, c := range cs {
if e := c.Close(); e != nil {
err = e
}
}
return err
} }

View file

@ -1,98 +0,0 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"math/rand"
"testing"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/testutil"
)
type mockChunkReader map[uint64]chunks.Chunk
func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
chk, ok := cr[ref]
if ok {
return chk, nil
}
return nil, errors.New("Chunk with ref not found")
}
func (cr mockChunkReader) Close() error {
return nil
}
func TestDeletedIterator(t *testing.T) {
chk := chunks.NewXORChunk()
app, err := chk.Appender()
testutil.Ok(t, err)
// Insert random stuff from (0, 1000).
act := make([]sample, 1000)
for i := 0; i < 1000; i++ {
act[i].t = int64(i)
act[i].v = rand.Float64()
app.Append(act[i].t, act[i].v)
}
cases := []struct {
r Intervals
}{
{r: Intervals{{1, 20}}},
{r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
{r: Intervals{{1, 10}, {12, 20}, {20, 30}}},
{r: Intervals{{1, 10}, {12, 23}, {25, 30}}},
{r: Intervals{{1, 23}, {12, 20}, {25, 30}}},
{r: Intervals{{1, 23}, {12, 20}, {25, 3000}}},
{r: Intervals{{0, 2000}}},
{r: Intervals{{500, 2000}}},
{r: Intervals{{0, 200}}},
{r: Intervals{{1000, 20000}}},
}
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000 == true, "")
ts, v := it.At()
testutil.Equals(t, act[i].t, ts)
testutil.Equals(t, act[i].v, v)
}
// There has been an extra call to Next().
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000 == false, "")
testutil.Ok(t, it.Err())
}
}

View file

@ -26,8 +26,10 @@ import (
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -65,7 +67,7 @@ type LeveledCompactor struct {
metrics *compactorMetrics metrics *compactorMetrics
logger log.Logger logger log.Logger
ranges []int64 ranges []int64
chunkPool chunks.Pool chunkPool chunkenc.Pool
} }
type compactorMetrics struct { type compactorMetrics struct {
@ -123,12 +125,12 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
} }
// NewLeveledCompactor returns a LeveledCompactor. // NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunks.Pool) (*LeveledCompactor, error) { func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) {
if len(ranges) == 0 { if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided") return nil, errors.Errorf("at least one range must be provided")
} }
if pool == nil { if pool == nil {
pool = chunks.NewPool() pool = chunkenc.NewPool()
} }
return &LeveledCompactor{ return &LeveledCompactor{
ranges: ranges, ranges: ranges,
@ -370,7 +372,7 @@ type instrumentedChunkWriter struct {
trange prometheus.Histogram trange prometheus.Histogram
} }
func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
for _, c := range chunks { for _, c := range chunks {
w.size.Observe(float64(len(c.Chunk.Bytes()))) w.size.Observe(float64(len(c.Chunk.Bytes())))
w.samples.Observe(float64(c.Chunk.NumSamples())) w.samples.Observe(float64(c.Chunk.NumSamples()))
@ -411,7 +413,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
// data of all blocks. // data of all blocks.
var chunkw ChunkWriter var chunkw ChunkWriter
chunkw, err = newChunkWriter(chunkDir(tmp)) chunkw, err = chunks.NewWriter(chunkDir(tmp))
if err != nil { if err != nil {
return errors.Wrap(err, "open chunk writer") return errors.Wrap(err, "open chunk writer")
} }
@ -425,7 +427,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
} }
} }
indexw, err := newIndexWriter(tmp) indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename))
if err != nil { if err != nil {
return errors.Wrap(err, "open index writer") return errors.Wrap(err, "open index writer")
} }
@ -514,7 +516,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
allSymbols[s] = struct{}{} allSymbols[s] = struct{}{}
} }
all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) all, err := indexr.Postings("", "")
if err != nil { if err != nil {
return err return err
} }
@ -534,7 +536,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
// We fully rebuild the postings list index from merged series. // We fully rebuild the postings list index from merged series.
var ( var (
postings = newMemPostings() postings = index.NewMemPostings()
values = map[string]stringset{} values = map[string]stringset{}
i = uint64(0) i = uint64(0)
) )
@ -558,7 +560,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
continue continue
} }
newChunk := chunks.NewXORChunk() newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender() app, err := newChunk.Appender()
if err != nil { if err != nil {
return err return err
@ -599,7 +601,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
valset.set(l.Value) valset.set(l.Value)
} }
postings.add(i, lset) postings.Add(i, lset)
i++ i++
} }
@ -619,8 +621,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
} }
for _, l := range postings.sortedKeys() { for _, l := range postings.SortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings") return errors.Wrap(err, "write postings")
} }
} }
@ -628,18 +630,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
type compactionSeriesSet struct { type compactionSeriesSet struct {
p Postings p index.Postings
index IndexReader index IndexReader
chunks ChunkReader chunks ChunkReader
tombstones TombstoneReader tombstones TombstoneReader
l labels.Labels l labels.Labels
c []ChunkMeta c []chunks.Meta
intervals Intervals intervals Intervals
err error err error
} }
func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet { func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet {
return &compactionSeriesSet{ return &compactionSeriesSet{
index: i, index: i,
chunks: c, chunks: c,
@ -667,7 +669,7 @@ func (c *compactionSeriesSet) Next() bool {
// Remove completely deleted chunks. // Remove completely deleted chunks.
if len(c.intervals) > 0 { if len(c.intervals) > 0 {
chks := make([]ChunkMeta, 0, len(c.c)) chks := make([]chunks.Meta, 0, len(c.c))
for _, chk := range c.c { for _, chk := range c.c {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
chks = append(chks, chk) chks = append(chks, chk)
@ -697,7 +699,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err() return c.p.Err()
} }
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals return c.l, c.c, c.intervals
} }
@ -706,13 +708,13 @@ type compactionMerger struct {
aok, bok bool aok, bok bool
l labels.Labels l labels.Labels
c []ChunkMeta c []chunks.Meta
intervals Intervals intervals Intervals
} }
type compactionSeries struct { type compactionSeries struct {
labels labels.Labels labels labels.Labels
chunks []*ChunkMeta chunks []*chunks.Meta
} }
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
@ -747,7 +749,7 @@ func (c *compactionMerger) Next() bool {
// While advancing child iterators the memory used for labels and chunks // While advancing child iterators the memory used for labels and chunks
// may be reused. When picking a series we have to store the result. // may be reused. When picking a series we have to store the result.
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []chunks.Meta
d := c.compare() d := c.compare()
// Both sets contain the current series. Chain them into a single one. // Both sets contain the current series. Chain them into a single one.
@ -788,7 +790,7 @@ func (c *compactionMerger) Err() error {
return c.b.Err() return c.b.Err()
} }
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, Intervals) { func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals return c.l, c.c, c.intervals
} }

6
db.go
View file

@ -36,7 +36,7 @@ import (
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -99,7 +99,7 @@ type DB struct {
logger log.Logger logger log.Logger
metrics *dbMetrics metrics *dbMetrics
opts *Options opts *Options
chunkPool chunks.Pool chunkPool chunkenc.Pool
compactor Compactor compactor Compactor
// Mutex for that must be held when modifying the general block layout. // Mutex for that must be held when modifying the general block layout.
@ -185,7 +185,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
compactionsEnabled: true, compactionsEnabled: true,
chunkPool: chunks.NewPool(), chunkPool: chunkenc.NewPool(),
} }
db.metrics = newDBMetrics(db, r) db.metrics = newDBMetrics(db, r)

View file

@ -5,8 +5,12 @@ import (
"hash" "hash"
"hash/crc32" "hash/crc32"
"unsafe" "unsafe"
"github.com/pkg/errors"
) )
var errInvalidSize = errors.New("invalid size")
// enbuf is a helper type to populate a byte slice with various types. // enbuf is a helper type to populate a byte slice with various types.
type encbuf struct { type encbuf struct {
b []byte b []byte

48
fileutil/mmap.go Normal file
View file

@ -0,0 +1,48 @@
package fileutil
import (
"os"
"github.com/pkg/errors"
)
type MmapFile struct {
f *os.File
b []byte
}
func OpenMmapFile(path string) (*MmapFile, error) {
f, err := os.Open(path)
if err != nil {
return nil, errors.Wrap(err, "try lock file")
}
info, err := f.Stat()
if err != nil {
return nil, errors.Wrap(err, "stat")
}
b, err := mmap(f, int(info.Size()))
if err != nil {
return nil, errors.Wrap(err, "mmap")
}
return &MmapFile{f: f, b: b}, nil
}
func (f *MmapFile) Close() error {
err0 := munmap(f.b)
err1 := f.f.Close()
if err0 != nil {
return err0
}
return err1
}
func (f *MmapFile) File() *os.File {
return f.f
}
func (f *MmapFile) Bytes() []byte {
return f.b
}

View file

@ -13,7 +13,7 @@
// +build !windows,!plan9 // +build !windows,!plan9
package tsdb package fileutil
import ( import (
"os" "os"

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package fileutil
import ( import (
"os" "os"

132
head.go
View file

@ -17,6 +17,7 @@ import (
"math" "math"
"runtime" "runtime"
"sort" "sort"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -25,7 +26,9 @@ import (
"github.com/go-kit/kit/log/level" "github.com/go-kit/kit/log/level"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -64,7 +67,7 @@ type Head struct {
symbols map[string]struct{} symbols map[string]struct{}
values map[string]stringset // label names to possible values values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms postings *index.MemPostings // postings lists for terms
tombstones memTombstones tombstones memTombstones
} }
@ -185,7 +188,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
series: newStripeSeries(), series: newStripeSeries(),
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: newUnorderedMemPostings(), postings: index.NewUnorderedMemPostings(),
tombstones: memTombstones{}, tombstones: memTombstones{},
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -226,7 +229,7 @@ func (h *Head) processWALSamples(
// ReadWAL initializes the head by consuming the write ahead log. // ReadWAL initializes the head by consuming the write ahead log.
func (h *Head) ReadWAL() error { func (h *Head) ReadWAL() error {
defer h.postings.ensureOrder() defer h.postings.EnsureOrder()
r := h.wal.Reader() r := h.wal.Reader()
mint := h.MinTime() mint := h.MinTime()
@ -616,64 +619,14 @@ func (h *Head) gc() {
h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved))
h.metrics.chunks.Sub(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved))
// Remove deleted series IDs from the postings lists. First do a collection // Remove deleted series IDs from the postings lists.
// run where we rebuild all postings that have something to delete h.postings.Delete(deleted)
h.postings.mtx.RLock()
type replEntry struct {
idx int
l []uint64
}
collected := map[labels.Label]replEntry{}
for t, p := range h.postings.m {
repl := replEntry{idx: len(p)}
for i, id := range p {
if _, ok := deleted[id]; ok {
// First ID that got deleted, initialize replacement with
// all remaining IDs so far.
if repl.l == nil {
repl.l = make([]uint64, 0, len(p))
repl.l = append(repl.l, p[:i]...)
}
continue
}
// Only add to the replacement once we know we have to do it.
if repl.l != nil {
repl.l = append(repl.l, id)
}
}
if repl.l != nil {
collected[t] = repl
}
}
h.postings.mtx.RUnlock()
// Replace all postings that have changed. Append all IDs that may have
// been added while we switched locks.
h.postings.mtx.Lock()
for t, repl := range collected {
l := append(repl.l, h.postings.m[t][repl.idx:]...)
if len(l) > 0 {
h.postings.m[t] = l
} else {
delete(h.postings.m, t)
}
}
h.postings.mtx.Unlock()
// Rebuild symbols and label value indices from what is left in the postings terms. // Rebuild symbols and label value indices from what is left in the postings terms.
h.postings.mtx.RLock()
symbols := make(map[string]struct{}) symbols := make(map[string]struct{})
values := make(map[string]stringset, len(h.values)) values := make(map[string]stringset, len(h.values))
for t := range h.postings.m { h.postings.Iter(func(t labels.Label, _ index.Postings) error {
symbols[t.Name] = struct{}{} symbols[t.Name] = struct{}{}
symbols[t.Value] = struct{}{} symbols[t.Value] = struct{}{}
@ -683,9 +636,8 @@ func (h *Head) gc() {
values[t.Name] = ss values[t.Name] = ss
} }
ss.set(t.Value) ss.set(t.Value)
} return nil
})
h.postings.mtx.RUnlock()
h.symMtx.Lock() h.symMtx.Lock()
@ -765,7 +717,7 @@ func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
} }
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) {
sid, cid := unpackChunkID(ref) sid, cid := unpackChunkID(ref)
s := h.head.series.getByID(sid) s := h.head.series.getByID(sid)
@ -798,12 +750,12 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
} }
type safeChunk struct { type safeChunk struct {
chunks.Chunk chunkenc.Chunk
s *memSeries s *memSeries
cid int cid int
} }
func (c *safeChunk) Iterator() chunks.Iterator { func (c *safeChunk) Iterator() chunkenc.Iterator {
c.s.Lock() c.s.Lock()
it := c.s.iterator(c.cid) it := c.s.iterator(c.cid)
c.s.Unlock() c.s.Unlock()
@ -836,7 +788,7 @@ func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
} }
// LabelValues returns the possible label values // LabelValues returns the possible label values
func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
if len(names) != 1 { if len(names) != 1 {
return nil, errInvalidSize return nil, errInvalidSize
} }
@ -850,22 +802,22 @@ func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) {
} }
sort.Strings(sl) sort.Strings(sl)
return &stringTuples{l: len(names), s: sl}, nil return index.NewStringTuples(sl, len(names))
} }
// Postings returns the postings list iterator for the label pair. // Postings returns the postings list iterator for the label pair.
func (h *headIndexReader) Postings(name, value string) (Postings, error) { func (h *headIndexReader) Postings(name, value string) (index.Postings, error) {
return h.head.postings.get(name, value), nil return h.head.postings.Get(name, value), nil
} }
func (h *headIndexReader) SortedPostings(p Postings) Postings { func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
ep := make([]uint64, 0, 128) ep := make([]uint64, 0, 128)
for p.Next() { for p.Next() {
ep = append(ep, p.At()) ep = append(ep, p.At())
} }
if err := p.Err(); err != nil { if err := p.Err(); err != nil {
return errPostings{err: errors.Wrap(err, "expand postings")} return index.ErrPostings(errors.Wrap(err, "expand postings"))
} }
sort.Slice(ep, func(i, j int) bool { sort.Slice(ep, func(i, j int) bool {
@ -878,11 +830,11 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
} }
return labels.Compare(a.lset, b.lset) < 0 return labels.Compare(a.lset, b.lset) < 0
}) })
return newListPostings(ep) return index.NewListPostings(ep)
} }
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(ref) s := h.head.series.getByID(ref)
if s == nil { if s == nil {
@ -901,7 +853,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) {
continue continue
} }
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, chunks.Meta{
MinTime: c.minTime, MinTime: c.minTime,
MaxTime: c.maxTime, MaxTime: c.maxTime,
Ref: packChunkID(s.ref, uint64(s.chunkID(i))), Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
@ -949,7 +901,7 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie
h.metrics.series.Inc() h.metrics.series.Inc()
h.metrics.seriesCreated.Inc() h.metrics.seriesCreated.Inc()
h.postings.add(id, lset) h.postings.Add(id, lset)
h.symMtx.Lock() h.symMtx.Lock()
defer h.symMtx.Unlock() defer h.symMtx.Unlock()
@ -1154,7 +1106,7 @@ type memSeries struct {
lastValue float64 lastValue float64
sampleBuf [4]sample sampleBuf [4]sample
app chunks.Appender // Current appender for the chunk. app chunkenc.Appender // Current appender for the chunk.
} }
func (s *memSeries) minTime() int64 { func (s *memSeries) minTime() int64 {
@ -1167,7 +1119,7 @@ func (s *memSeries) maxTime() int64 {
func (s *memSeries) cut(mint int64) *memChunk { func (s *memSeries) cut(mint int64) *memChunk {
c := &memChunk{ c := &memChunk{
chunk: chunks.NewXORChunk(), chunk: chunkenc.NewXORChunk(),
minTime: mint, minTime: mint,
maxTime: math.MinInt64, maxTime: math.MinInt64,
} }
@ -1295,13 +1247,13 @@ func computeChunkEndTime(start, cur, max int64) int64 {
return start + (max-start)/a return start + (max-start)/a
} }
func (s *memSeries) iterator(id int) chunks.Iterator { func (s *memSeries) iterator(id int) chunkenc.Iterator {
c := s.chunk(id) c := s.chunk(id)
// TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk,
// which got then garbage collected before it got accessed. // which got then garbage collected before it got accessed.
// We must ensure to not garbage collect as long as any readers still hold a reference. // We must ensure to not garbage collect as long as any readers still hold a reference.
if c == nil { if c == nil {
return chunks.NewNopIterator() return chunkenc.NewNopIterator()
} }
if id-s.firstChunkID < len(s.chunks)-1 { if id-s.firstChunkID < len(s.chunks)-1 {
@ -1326,12 +1278,12 @@ func (s *memSeries) head() *memChunk {
} }
type memChunk struct { type memChunk struct {
chunk chunks.Chunk chunk chunkenc.Chunk
minTime, maxTime int64 minTime, maxTime int64
} }
type memSafeIterator struct { type memSafeIterator struct {
chunks.Iterator chunkenc.Iterator
i int i int
total int total int
@ -1356,3 +1308,27 @@ func (it *memSafeIterator) At() (int64, float64) {
s := it.buf[4-(it.total-it.i)] s := it.buf[4-(it.total-it.i)]
return s.t, s.v return s.t, s.v
} }
type stringset map[string]struct{}
func (ss stringset) set(s string) {
ss[s] = struct{}{}
}
func (ss stringset) has(s string) bool {
_, ok := ss[s]
return ok
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}
func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}

View file

@ -22,7 +22,8 @@ import (
"testing" "testing"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -150,7 +151,7 @@ func TestHead_ReadWAL(t *testing.T) {
testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)
expandChunk := func(c chunks.Iterator) (x []sample) { expandChunk := func(c chunkenc.Iterator) (x []sample) {
for c.Next() { for c.Next() {
t, v := c.At() t, v := c.At()
x = append(x, sample{t: t, v: v}) x = append(x, sample{t: t, v: v})
@ -210,12 +211,12 @@ func TestHead_Truncate(t *testing.T) {
testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") testutil.Assert(t, h.series.getByID(s3.ref) == nil, "")
testutil.Assert(t, h.series.getByID(s4.ref) == nil, "") testutil.Assert(t, h.series.getByID(s4.ref) == nil, "")
postingsA1, _ := expandPostings(h.postings.get("a", "1")) postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
postingsA2, _ := expandPostings(h.postings.get("a", "2")) postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
postingsB1, _ := expandPostings(h.postings.get("b", "1")) postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
postingsB2, _ := expandPostings(h.postings.get("b", "2")) postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
postingsC1, _ := expandPostings(h.postings.get("c", "1")) postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
postingsAll, _ := expandPostings(h.postings.get("", "")) postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))
testutil.Equals(t, []uint64{s1.ref}, postingsA1) testutil.Equals(t, []uint64{s1.ref}, postingsA1)
testutil.Equals(t, []uint64{s2.ref}, postingsA2) testutil.Equals(t, []uint64{s2.ref}, postingsA2)

179
index/encoding_helpers.go Normal file
View file

@ -0,0 +1,179 @@
package index
import (
"encoding/binary"
"hash"
"hash/crc32"
"unsafe"
)
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...)
}
func (e *encbuf) putBE64(x uint64) {
binary.BigEndian.PutUint64(e.c[:], x)
e.b = append(e.b, e.c[:8]...)
}
func (e *encbuf) putUvarint64(x uint64) {
n := binary.PutUvarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
func (e *encbuf) putVarint64(x int64) {
n := binary.PutVarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
func (e *encbuf) putUvarintStr(s string) {
b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using
// any datum, the err() method must be checked.
type decbuf struct {
b []byte
e error
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) varint64() int64 {
if d.e != nil {
return 0
}
x, n := binary.Varint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) uvarint64() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) be64() uint64 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint64(d.b)
d.b = d.b[8:]
return x
}
func (d *decbuf) be32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) byte() byte {
if d.e != nil {
return 0
}
if len(d.b) < 1 {
d.e = errInvalidSize
return 0
}
x := d.b[0]
d.b = d.b[1:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

View file

@ -11,13 +11,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package index
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"hash" "hash"
"hash/crc32"
"io" "io"
"math" "math"
"os" "os"
@ -26,6 +27,7 @@ import (
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -35,17 +37,11 @@ const (
MagicIndex = 0xBAAAD700 MagicIndex = 0xBAAAD700
indexFormatV1 = 1 indexFormatV1 = 1
size_unit = 4
) )
const indexFilename = "index"
const compactionPageBytes = minSectorSize * 64
type indexWriterSeries struct { type indexWriterSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta // series file offset of chunks chunks []chunks.Meta // series file offset of chunks
offset uint32 // index file offset of series reference offset uint32 // index file offset of series reference
} }
@ -87,37 +83,24 @@ func (s indexWriterStage) String() string {
return "<unknown>" return "<unknown>"
} }
// IndexWriter serializes the index for a block of series data. // The table gets initialized with sync.Once but may still cause a race
// The methods must be called in the order they are specified in. // with any other use of the crc32 package anywhere. Thus we initialize it
type IndexWriter interface { // before.
// AddSymbols registers all string symbols that are encountered in series var castagnoliTable *crc32.Table
// and other indices.
AddSymbols(sym map[string]struct{}) error
// AddSeries populates the index writer with a series and its offsets func init() {
// of chunks that the index can reference. castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
// Implementations may require series to be insert in increasing order by }
// their labels.
// The reference numbers are used to resolve entries in postings lists that
// are added later.
AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values. // newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// The passed in values chained tuples of strings of the length of names. // polynomial may be easily changed in one location at a later time, if necessary.
WriteLabelIndex(names []string, values []string) error func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
// WritePostings writes a postings list for a single label pair.
// The Postings here contain refs to the series that were added.
WritePostings(name, value string, it Postings) error
// Close writes any finalization and closes the resources associated with
// the underlying writer.
Close() error
} }
// indexWriter implements the IndexWriter interface for the standard // indexWriter implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type indexWriter struct { type Writer struct {
f *os.File f *os.File
fbuf *bufio.Writer fbuf *bufio.Writer
pos uint64 pos uint64
@ -150,14 +133,17 @@ type indexTOC struct {
postingsTable uint64 postingsTable uint64
} }
func newIndexWriter(dir string) (*indexWriter, error) { // NewWriter returns a new Writer to the given filename.
func NewWriter(fn string) (*Writer, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer df.Close() // close for flatform windows defer df.Close() // close for flatform windows
f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -165,7 +151,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
return nil, errors.Wrap(err, "sync dir") return nil, errors.Wrap(err, "sync dir")
} }
iw := &indexWriter{ iw := &Writer{
f: f, f: f,
fbuf: bufio.NewWriterSize(f, 1<<22), fbuf: bufio.NewWriterSize(f, 1<<22),
pos: 0, pos: 0,
@ -187,7 +173,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
return iw, nil return iw, nil
} }
func (w *indexWriter) write(bufs ...[]byte) error { func (w *Writer) write(bufs ...[]byte) error {
for _, b := range bufs { for _, b := range bufs {
n, err := w.fbuf.Write(b) n, err := w.fbuf.Write(b)
w.pos += uint64(n) w.pos += uint64(n)
@ -206,18 +192,18 @@ func (w *indexWriter) write(bufs ...[]byte) error {
} }
// addPadding adds zero byte padding until the file size is a multiple size_unit. // addPadding adds zero byte padding until the file size is a multiple size_unit.
func (w *indexWriter) addPadding() error { func (w *Writer) addPadding(size int) error {
p := w.pos % size_unit p := w.pos % uint64(size)
if p == 0 { if p == 0 {
return nil return nil
} }
p = size_unit - p p = uint64(size) - p
return errors.Wrap(w.write(make([]byte, p)), "add padding") return errors.Wrap(w.write(make([]byte, p)), "add padding")
} }
// ensureStage handles transitions between write stages and ensures that IndexWriter // ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation. // methods are called in an order valid for the implementation.
func (w *indexWriter) ensureStage(s indexWriterStage) error { func (w *Writer) ensureStage(s indexWriterStage) error {
if w.stage == s { if w.stage == s {
return nil return nil
} }
@ -256,7 +242,7 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error {
return nil return nil
} }
func (w *indexWriter) writeMeta() error { func (w *Writer) writeMeta() error {
w.buf1.reset() w.buf1.reset()
w.buf1.putBE32(MagicIndex) w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV1) w.buf1.putByte(indexFormatV1)
@ -264,7 +250,7 @@ func (w *indexWriter) writeMeta() error {
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error { func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error {
if err := w.ensureStage(idxStageSeries); err != nil { if err := w.ensureStage(idxStageSeries); err != nil {
return err return err
} }
@ -328,7 +314,7 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM
return nil return nil
} }
func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { func (w *Writer) AddSymbols(sym map[string]struct{}) error {
if err := w.ensureStage(idxStageSymbols); err != nil { if err := w.ensureStage(idxStageSymbols); err != nil {
return err return err
} }
@ -361,7 +347,7 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error {
return errors.Wrap(err, "write symbols") return errors.Wrap(err, "write symbols")
} }
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { func (w *Writer) WriteLabelIndex(names []string, values []string) error {
if len(values)%len(names) != 0 { if len(values)%len(names) != 0 {
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
} }
@ -369,14 +355,14 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
return errors.Wrap(err, "ensure stage") return errors.Wrap(err, "ensure stage")
} }
valt, err := newStringTuples(values, len(names)) valt, err := NewStringTuples(values, len(names))
if err != nil { if err != nil {
return err return err
} }
sort.Sort(valt) sort.Sort(valt)
// Align beginning to 4 bytes for more efficient index list scans. // Align beginning to 4 bytes for more efficient index list scans.
if err := w.addPadding(); err != nil { if err := w.addPadding(4); err != nil {
return err return err
} }
@ -407,7 +393,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
} }
// writeOffsetTable writes a sequence of readable hash entries. // writeOffsetTable writes a sequence of readable hash entries.
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { func (w *Writer) writeOffsetTable(entries []hashEntry) error {
w.buf2.reset() w.buf2.reset()
w.buf2.putBE32int(len(entries)) w.buf2.putBE32int(len(entries))
@ -428,7 +414,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
const indexTOCLen = 6*8 + 4 const indexTOCLen = 6*8 + 4
func (w *indexWriter) writeTOC() error { func (w *Writer) writeTOC() error {
w.buf1.reset() w.buf1.reset()
w.buf1.putBE64(w.toc.symbols) w.buf1.putBE64(w.toc.symbols)
@ -443,13 +429,13 @@ func (w *indexWriter) writeTOC() error {
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
func (w *indexWriter) WritePostings(name, value string, it Postings) error { func (w *Writer) WritePostings(name, value string, it Postings) error {
if err := w.ensureStage(idxStagePostings); err != nil { if err := w.ensureStage(idxStagePostings); err != nil {
return errors.Wrap(err, "ensure stage") return errors.Wrap(err, "ensure stage")
} }
// Align beginning to 4 bytes for more efficient postings list scans. // Align beginning to 4 bytes for more efficient postings list scans.
if err := w.addPadding(); err != nil { if err := w.addPadding(4); err != nil {
return err return err
} }
@ -506,7 +492,7 @@ type hashEntry struct {
offset uint64 offset uint64
} }
func (w *indexWriter) Close() error { func (w *Writer) Close() error {
if err := w.ensureStage(idxStageDone); err != nil { if err := w.ensureStage(idxStageDone); err != nil {
return err return err
} }
@ -519,37 +505,6 @@ func (w *indexWriter) Close() error {
return w.f.Close() return w.f.Close()
} }
// IndexReader provides reading access of serialized index data.
type IndexReader interface {
// Symbols returns a set of string symbols that may occur in series' labels
// and indices.
Symbols() (map[string]struct{}, error)
// LabelValues returns the possible label values
LabelValues(names ...string) (StringTuples, error)
// Postings returns the postings list iterator for the label pair.
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
Postings(name, value string) (Postings, error)
// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
SortedPostings(Postings) Postings
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns ErrNotFound if the ref does not resolve to a known series.
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
// Close released the underlying resources of the reader.
Close() error
}
// StringTuples provides access to a sorted list of string tuples. // StringTuples provides access to a sorted list of string tuples.
type StringTuples interface { type StringTuples interface {
// Total number of tuples in the list. // Total number of tuples in the list.
@ -558,7 +513,7 @@ type StringTuples interface {
At(i int) ([]string, error) At(i int) ([]string, error)
} }
type indexReader struct { type Reader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b ByteSlice b ByteSlice
toc indexTOC toc indexTOC
@ -605,22 +560,22 @@ func (b realByteSlice) Sub(start, end int) ByteSlice {
return b[start:end] return b[start:end]
} }
// NewIndexReader returns a new IndexReader on the given byte slice. // NewReader returns a new IndexReader on the given byte slice.
func NewIndexReader(b ByteSlice) (IndexReader, error) { func NewReader(b ByteSlice) (*Reader, error) {
return newIndexReader(b, nil) return newReader(b, nil)
} }
// NewFileIndexReader returns a new index reader against the given index file. // NewFileReader returns a new index reader against the given index file.
func NewFileIndexReader(path string) (IndexReader, error) { func NewFileReader(path string) (*Reader, error) {
f, err := openMmapFile(path) f, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newIndexReader(realByteSlice(f.b), f) return newReader(realByteSlice(f.Bytes()), f)
} }
func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
r := &indexReader{ r := &Reader{
b: b, b: b,
c: c, c: c,
symbols: map[uint32]string{}, symbols: map[uint32]string{},
@ -650,7 +605,7 @@ func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) {
return r, errors.Wrap(err, "read postings table") return r, errors.Wrap(err, "read postings table")
} }
func (r *indexReader) readTOC() error { func (r *Reader) readTOC() error {
if r.b.Len() < indexTOCLen { if r.b.Len() < indexTOCLen {
return errInvalidSize return errInvalidSize
} }
@ -676,7 +631,7 @@ func (r *indexReader) readTOC() error {
// decbufAt returns a new decoding buffer. It expects the first 4 bytes // decbufAt returns a new decoding buffer. It expects the first 4 bytes
// after offset to hold the big endian encoded content length, followed by the contents and the expected // after offset to hold the big endian encoded content length, followed by the contents and the expected
// checksum. // checksum.
func (r *indexReader) decbufAt(off int) decbuf { func (r *Reader) decbufAt(off int) decbuf {
if r.b.Len() < off+4 { if r.b.Len() < off+4 {
return decbuf{e: errInvalidSize} return decbuf{e: errInvalidSize}
} }
@ -700,7 +655,7 @@ func (r *indexReader) decbufAt(off int) decbuf {
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes // decbufUvarintAt returns a new decoding buffer. It expects the first bytes
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected // after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
// checksum. // checksum.
func (r *indexReader) decbufUvarintAt(off int) decbuf { func (r *Reader) decbufUvarintAt(off int) decbuf {
// We never have to access this method at the far end of the byte slice. Thus just checking // We never have to access this method at the far end of the byte slice. Thus just checking
// against the MaxVarintLen32 is sufficient. // against the MaxVarintLen32 is sufficient.
if r.b.Len() < off+binary.MaxVarintLen32 { if r.b.Len() < off+binary.MaxVarintLen32 {
@ -730,7 +685,7 @@ func (r *indexReader) decbufUvarintAt(off int) decbuf {
// readSymbols reads the symbol table fully into memory and allocates proper strings for them. // readSymbols reads the symbol table fully into memory and allocates proper strings for them.
// Strings backed by the mmap'd memory would cause memory faults if applications keep using them // Strings backed by the mmap'd memory would cause memory faults if applications keep using them
// after the reader is closed. // after the reader is closed.
func (r *indexReader) readSymbols(off int) error { func (r *Reader) readSymbols(off int) error {
if off == 0 { if off == 0 {
return nil return nil
} }
@ -754,7 +709,7 @@ func (r *indexReader) readSymbols(off int) error {
// readOffsetTable reads an offset table at the given position and returns a map // readOffsetTable reads an offset table at the given position and returns a map
// with the key strings concatenated by the 0xff unicode non-character. // with the key strings concatenated by the 0xff unicode non-character.
func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { func (r *Reader) readOffsetTable(off uint64) (map[string]uint32, error) {
const sep = "\xff" const sep = "\xff"
d := r.decbufAt(int(off)) d := r.decbufAt(int(off))
@ -776,11 +731,11 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
return res, d.err() return res, d.err()
} }
func (r *indexReader) Close() error { func (r *Reader) Close() error {
return r.c.Close() return r.c.Close()
} }
func (r *indexReader) lookupSymbol(o uint32) (string, error) { func (r *Reader) lookupSymbol(o uint32) (string, error) {
s, ok := r.symbols[o] s, ok := r.symbols[o]
if !ok { if !ok {
return "", errors.Errorf("unknown symbol offset %d", o) return "", errors.Errorf("unknown symbol offset %d", o)
@ -788,7 +743,7 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) {
return s, nil return s, nil
} }
func (r *indexReader) Symbols() (map[string]struct{}, error) { func (r *Reader) Symbols() (map[string]struct{}, error) {
res := make(map[string]struct{}, len(r.symbols)) res := make(map[string]struct{}, len(r.symbols))
for _, s := range r.symbols { for _, s := range r.symbols {
@ -797,7 +752,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) {
return res, nil return res, nil
} }
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
const sep = "\xff" const sep = "\xff"
key := strings.Join(names, sep) key := strings.Join(names, sep)
@ -830,7 +785,7 @@ type emptyStringTuples struct{}
func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 } func (emptyStringTuples) Len() int { return 0 }
func (r *indexReader) LabelIndices() ([][]string, error) { func (r *Reader) LabelIndices() ([][]string, error) {
const sep = "\xff" const sep = "\xff"
res := [][]string{} res := [][]string{}
@ -841,7 +796,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { func (r *Reader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error {
d := r.decbufUvarintAt(int(ref)) d := r.decbufUvarintAt(int(ref))
*lbls = (*lbls)[:0] *lbls = (*lbls)[:0]
@ -880,7 +835,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
maxt := int64(d.uvarint64()) + t0 maxt := int64(d.uvarint64()) + t0
ref0 := int64(d.uvarint64()) ref0 := int64(d.uvarint64())
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, chunks.Meta{
Ref: uint64(ref0), Ref: uint64(ref0),
MinTime: t0, MinTime: t0,
MaxTime: maxt, MaxTime: maxt,
@ -898,7 +853,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
return errors.Wrapf(d.err(), "read meta for chunk %d", i) return errors.Wrapf(d.err(), "read meta for chunk %d", i)
} }
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, chunks.Meta{
Ref: uint64(ref0), Ref: uint64(ref0),
MinTime: mint, MinTime: mint,
MaxTime: maxt, MaxTime: maxt,
@ -907,7 +862,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta)
return d.err() return d.err()
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *Reader) Postings(name, value string) (Postings, error) {
const sep = "\xff" const sep = "\xff"
key := strings.Join([]string{name, value}, sep) key := strings.Join([]string{name, value}, sep)
@ -921,7 +876,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) {
return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes")
} }
func (r *indexReader) SortedPostings(p Postings) Postings { func (r *Reader) SortedPostings(p Postings) Postings {
return p return p
} }
@ -930,7 +885,7 @@ type stringTuples struct {
s []string // flattened tuple entries s []string // flattened tuple entries
} }
func newStringTuples(s []string, l int) (*stringTuples, error) { func NewStringTuples(s []string, l int) (*stringTuples, error) {
if len(s)%l != 0 { if len(s)%l != 0 {
return nil, errors.Wrap(errInvalidSize, "string tuple list") return nil, errors.Wrap(errInvalidSize, "string tuple list")
} }

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package index
import ( import (
"io/ioutil" "io/ioutil"
@ -20,8 +20,12 @@ import (
"path/filepath" "path/filepath"
"sort" "sort"
"testing" "testing"
"unsafe"
"github.com/pkg/errors" "github.com/pkg/errors"
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
@ -29,13 +33,13 @@ import (
type series struct { type series struct {
l labels.Labels l labels.Labels
chunks []ChunkMeta chunks []chunks.Meta
} }
type mockIndex struct { type mockIndex struct {
series map[uint64]series series map[uint64]series
labelIndex map[string][]string labelIndex map[string][]string
postings *memPostings postings map[labels.Label][]uint64
symbols map[string]struct{} symbols map[string]struct{}
} }
@ -43,11 +47,9 @@ func newMockIndex() mockIndex {
ix := mockIndex{ ix := mockIndex{
series: make(map[uint64]series), series: make(map[uint64]series),
labelIndex: make(map[string][]string), labelIndex: make(map[string][]string),
postings: newMemPostings(), postings: make(map[labels.Label][]uint64),
symbols: make(map[string]struct{}), symbols: make(map[string]struct{}),
} }
ix.postings.ensureOrder()
return ix return ix
} }
@ -55,7 +57,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) {
return m.symbols, nil return m.symbols, nil
} }
func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error { func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
if _, ok := m.series[ref]; ok { if _, ok := m.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref) return errors.Errorf("series with reference %d already added", ref)
} }
@ -80,23 +82,22 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
if len(names) != 1 { if len(names) != 1 {
return errors.New("composite indexes not supported yet") return errors.New("composite indexes not supported yet")
} }
sort.Strings(values) sort.Strings(values)
m.labelIndex[names[0]] = values m.labelIndex[names[0]] = values
return nil return nil
} }
func (m mockIndex) WritePostings(name, value string, it Postings) error { func (m mockIndex) WritePostings(name, value string, it Postings) error {
if _, ok := m.postings.m[labels.Label{name, value}]; ok { l := labels.Label{Name: name, Value: value}
return errors.Errorf("postings for %s=%q already added", name, value) if _, ok := m.postings[l]; ok {
return errors.Errorf("postings for %s already added", l)
} }
ep, err := expandPostings(it) ep, err := ExpandPostings(it)
if err != nil { if err != nil {
return err return err
} }
m.postings.m[labels.Label{name, value}] = ep m.postings[l] = ep
return nil
return it.Err()
} }
func (m mockIndex) Close() error { func (m mockIndex) Close() error {
@ -109,29 +110,30 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) {
return nil, errors.New("composite indexes not supported yet") return nil, errors.New("composite indexes not supported yet")
} }
return newStringTuples(m.labelIndex[names[0]], 1) return NewStringTuples(m.labelIndex[names[0]], 1)
} }
func (m mockIndex) Postings(name, value string) (Postings, error) { func (m mockIndex) Postings(name, value string) (Postings, error) {
return m.postings.get(name, value), nil l := labels.Label{Name: name, Value: value}
return NewListPostings(m.postings[l]), nil
} }
func (m mockIndex) SortedPostings(p Postings) Postings { func (m mockIndex) SortedPostings(p Postings) Postings {
ep, err := expandPostings(p) ep, err := ExpandPostings(p)
if err != nil { if err != nil {
return errPostings{err: errors.Wrap(err, "expand postings")} return ErrPostings(errors.Wrap(err, "expand postings"))
} }
sort.Slice(ep, func(i, j int) bool { sort.Slice(ep, func(i, j int) bool {
return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0 return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0
}) })
return newListPostings(ep) return NewListPostings(ep)
} }
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref] s, ok := m.series[ref]
if !ok { if !ok {
return ErrNotFound return errors.New("not found")
} }
*lset = append((*lset)[:0], s.l...) *lset = append((*lset)[:0], s.l...)
*chks = append((*chks)[:0], s.chunks...) *chks = append((*chks)[:0], s.chunks...)
@ -154,22 +156,24 @@ func TestIndexRW_Create_Open(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
fn := filepath.Join(dir, "index")
// An empty index must still result in a readable file. // An empty index must still result in a readable file.
iw, err := newIndexWriter(dir) iw, err := NewWriter(fn)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, iw.Close()) testutil.Ok(t, iw.Close())
ir, err := NewFileIndexReader(filepath.Join(dir, "index")) ir, err := NewFileReader(fn)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, ir.Close()) testutil.Ok(t, ir.Close())
// Modify magic header must cause open to fail. // Modify magic header must cause open to fail.
f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_WRONLY, 0666) f, err := os.OpenFile(fn, os.O_WRONLY, 0666)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = f.WriteAt([]byte{0, 0}, 0) _, err = f.WriteAt([]byte{0, 0}, 0)
testutil.Ok(t, err) testutil.Ok(t, err)
_, err = NewFileIndexReader(dir) _, err = NewFileReader(dir)
testutil.NotOk(t, err) testutil.NotOk(t, err)
} }
@ -178,7 +182,9 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, err) testutil.Ok(t, err)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
iw, err := newIndexWriter(dir) fn := filepath.Join(dir, "index")
iw, err := NewWriter(fn)
testutil.Ok(t, err) testutil.Ok(t, err)
series := []labels.Labels{ series := []labels.Labels{
@ -210,14 +216,14 @@ func TestIndexRW_Postings(t *testing.T) {
testutil.Ok(t, iw.Close()) testutil.Ok(t, iw.Close())
ir, err := NewFileIndexReader(filepath.Join(dir, "index")) ir, err := NewFileReader(fn)
testutil.Ok(t, err) testutil.Ok(t, err)
p, err := ir.Postings("a", "1") p, err := ir.Postings("a", "1")
testutil.Ok(t, err) testutil.Ok(t, err)
var l labels.Labels var l labels.Labels
var c []ChunkMeta var c []chunks.Meta
for i := 0; p.Next(); i++ { for i := 0; p.Next(); i++ {
err := ir.Series(p.At(), &l, &c) err := ir.Series(p.At(), &l, &c)
@ -254,14 +260,14 @@ func TestPersistence_index_e2e(t *testing.T) {
// Generate ChunkMetas for every label set. // Generate ChunkMetas for every label set.
for i, lset := range lbls { for i, lset := range lbls {
var metas []ChunkMeta var metas []chunks.Meta
for j := 0; j <= (i % 20); j++ { for j := 0; j <= (i % 20); j++ {
metas = append(metas, ChunkMeta{ metas = append(metas, chunks.Meta{
MinTime: int64(j * 10000), MinTime: int64(j * 10000),
MaxTime: int64((j + 1) * 10000), MaxTime: int64((j + 1) * 10000),
Ref: rand.Uint64(), Ref: rand.Uint64(),
Chunk: chunks.NewXORChunk(), Chunk: chunkenc.NewXORChunk(),
}) })
} }
input = append(input, &indexWriterSeries{ input = append(input, &indexWriterSeries{
@ -270,17 +276,16 @@ func TestPersistence_index_e2e(t *testing.T) {
}) })
} }
iw, err := newIndexWriter(dir) iw, err := NewWriter(filepath.Join(dir, "index"))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, iw.AddSymbols(symbols)) testutil.Ok(t, iw.AddSymbols(symbols))
// Population procedure as done by compaction. // Population procedure as done by compaction.
var ( var (
postings = newMemPostings() postings = NewMemPostings()
values = map[string]stringset{} values = map[string]map[string]struct{}{}
) )
postings.ensureOrder()
mi := newMockIndex() mi := newMockIndex()
@ -292,12 +297,12 @@ func TestPersistence_index_e2e(t *testing.T) {
for _, l := range s.labels { for _, l := range s.labels {
valset, ok := values[l.Name] valset, ok := values[l.Name]
if !ok { if !ok {
valset = stringset{} valset = map[string]struct{}{}
values[l.Name] = valset values[l.Name] = valset
} }
valset.set(l.Value) valset[l.Value] = struct{}{}
} }
postings.add(uint64(i), s.labels) postings.Add(uint64(i), s.labels)
i++ i++
} }
@ -328,14 +333,14 @@ func TestPersistence_index_e2e(t *testing.T) {
ir, err := NewFileIndexReader(filepath.Join(dir, "index")) ir, err := NewFileIndexReader(filepath.Join(dir, "index"))
testutil.Ok(t, err) testutil.Ok(t, err)
for p := range mi.postings.m { for p := range mi.postings {
gotp, err := ir.Postings(p.Name, p.Value) gotp, err := ir.Postings(p.Name, p.Value)
testutil.Ok(t, err) testutil.Ok(t, err)
expp, err := mi.Postings(p.Name, p.Value) expp, err := mi.Postings(p.Name, p.Value)
var lset, explset labels.Labels var lset, explset labels.Labels
var chks, expchks []ChunkMeta var chks, expchks []chunks.Meta
for gotp.Next() { for gotp.Next() {
testutil.Assert(t, expp.Next() == true, "") testutil.Assert(t, expp.Next() == true, "")
@ -374,3 +379,41 @@ func TestPersistence_index_e2e(t *testing.T) {
testutil.Ok(t, ir.Close()) testutil.Ok(t, ir.Close())
} }
func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) {
f, err := os.Open(fn)
if err != nil {
return nil, err
}
defer f.Close()
b, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
p := textparse.New(b)
i := 0
var mets []labels.Labels
hashes := map[uint64]struct{}{}
for p.Next() && i < n {
m := make(labels.Labels, 0, 10)
p.Metric((*promlabels.Labels)(unsafe.Pointer(&m)))
h := m.Hash()
if _, ok := hashes[h]; ok {
continue
}
mets = append(mets, m)
hashes[h] = struct{}{}
i++
}
if err := p.Err(); err != nil {
return nil, err
}
if i != n {
return mets, errors.Errorf("requested %d metrics but found %d", n, i)
}
return mets, nil
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package index
import ( import (
"encoding/binary" "encoding/binary"
@ -23,35 +23,35 @@ import (
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
// memPostings holds postings list for series ID per label pair. They may be written // MemPostings holds postings list for series ID per label pair. They may be written
// to out of order. // to out of order.
// ensureOrder() must be called once before any reads are done. This allows for quick // ensureOrder() must be called once before any reads are done. This allows for quick
// unordered batch fills on startup. // unordered batch fills on startup.
type memPostings struct { type MemPostings struct {
mtx sync.RWMutex mtx sync.RWMutex
m map[labels.Label][]uint64 m map[labels.Label][]uint64
ordered bool ordered bool
} }
// newMemPoistings returns a memPostings that's ready for reads and writes. // NewMemPostings returns a memPostings that's ready for reads and writes.
func newMemPostings() *memPostings { func NewMemPostings() *MemPostings {
return &memPostings{ return &MemPostings{
m: make(map[labels.Label][]uint64, 512), m: make(map[labels.Label][]uint64, 512),
ordered: true, ordered: true,
} }
} }
// newUnorderedMemPostings returns a memPostings that is not safe to be read from // NewUnorderedMemPostings returns a memPostings that is not safe to be read from
// until ensureOrder was called once. // until ensureOrder was called once.
func newUnorderedMemPostings() *memPostings { func NewUnorderedMemPostings() *MemPostings {
return &memPostings{ return &MemPostings{
m: make(map[labels.Label][]uint64, 512), m: make(map[labels.Label][]uint64, 512),
ordered: false, ordered: false,
} }
} }
// sortedKeys returns a list of sorted label keys of the postings. // SortedKeys returns a list of sorted label keys of the postings.
func (p *memPostings) sortedKeys() []labels.Label { func (p *MemPostings) SortedKeys() []labels.Label {
p.mtx.RLock() p.mtx.RLock()
keys := make([]labels.Label, 0, len(p.m)) keys := make([]labels.Label, 0, len(p.m))
@ -69,23 +69,28 @@ func (p *memPostings) sortedKeys() []labels.Label {
return keys return keys
} }
// Postings returns an iterator over the postings list for s. // Get returns a postings list for the given label pair.
func (p *memPostings) get(name, value string) Postings { func (p *MemPostings) Get(name, value string) Postings {
p.mtx.RLock() p.mtx.RLock()
l := p.m[labels.Label{Name: name, Value: value}] l := p.m[labels.Label{Name: name, Value: value}]
p.mtx.RUnlock() p.mtx.RUnlock()
if l == nil { if l == nil {
return emptyPostings return EmptyPostings()
} }
return newListPostings(l) return newListPostings(l)
} }
// All returns a postings list over all documents ever added.
func (p *MemPostings) All() Postings {
return p.Get(allPostingsKey.Name, allPostingsKey.Value)
}
var allPostingsKey = labels.Label{} var allPostingsKey = labels.Label{}
// ensurePostings ensures that all postings lists are sorted. After it returns all further // EnsureOrder ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner. // calls to add and addFor will insert new IDs in a sorted manner.
func (p *memPostings) ensureOrder() { func (p *MemPostings) EnsureOrder() {
p.mtx.Lock() p.mtx.Lock()
defer p.mtx.Unlock() defer p.mtx.Unlock()
@ -117,9 +122,61 @@ func (p *memPostings) ensureOrder() {
p.ordered = true p.ordered = true
} }
// add adds a document to the index. The caller has to ensure that no // Delete removes all ids in the given map from the postings lists.
// term argument appears twice. func (p *MemPostings) Delete(deleted map[uint64]struct{}) {
func (p *memPostings) add(id uint64, lset labels.Labels) { var keys []labels.Label
p.mtx.RLock()
for l := range p.m {
keys = append(keys, l)
}
p.mtx.RUnlock()
for _, l := range keys {
p.mtx.Lock()
found := false
for _, id := range p.m[l] {
if _, ok := deleted[id]; ok {
found = true
break
}
}
if !found {
p.mtx.Unlock()
continue
}
repl := make([]uint64, 0, len(p.m[l]))
for _, id := range p.m[l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[l] = repl
} else {
delete(p.m, l)
}
p.mtx.Unlock()
}
}
// Iter calls f for each postings list. It aborts if f returns an error and returns it.
func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error {
p.mtx.RLock()
defer p.mtx.RUnlock()
for l, p := range p.m {
if err := f(l, newListPostings(p)); err != nil {
return err
}
}
return nil
}
// Add a label set to the postings index.
func (p *MemPostings) Add(id uint64, lset labels.Labels) {
p.mtx.Lock() p.mtx.Lock()
for _, l := range lset { for _, l := range lset {
@ -130,7 +187,7 @@ func (p *memPostings) add(id uint64, lset labels.Labels) {
p.mtx.Unlock() p.mtx.Unlock()
} }
func (p *memPostings) addFor(id uint64, l labels.Label) { func (p *MemPostings) addFor(id uint64, l labels.Label) {
list := append(p.m[l], id) list := append(p.m[l], id)
p.m[l] = list p.m[l] = list
@ -149,7 +206,8 @@ func (p *memPostings) addFor(id uint64, l labels.Label) {
} }
} }
func expandPostings(p Postings) (res []uint64, err error) { // ExpandPostings returns the postings expanded as a slice.
func ExpandPostings(p Postings) (res []uint64, err error) {
for p.Next() { for p.Next() {
res = append(res, p.At()) res = append(res, p.At())
} }
@ -189,6 +247,11 @@ func EmptyPostings() Postings {
return emptyPostings return emptyPostings
} }
// ErrPostings returns new postings that immediately error.
func ErrPostings(err error) Postings {
return errPostings{err}
}
// Intersect returns a new postings list over the intersection of the // Intersect returns a new postings list over the intersection of the
// input postings. // input postings.
func Intersect(its ...Postings) Postings { func Intersect(its ...Postings) Postings {
@ -340,6 +403,12 @@ func (it *mergedPostings) Err() error {
return it.b.Err() return it.b.Err()
} }
// Without returns a new postings list that contains all elements from the full list that
// are not in the drop list
func Without(full, drop Postings) Postings {
return newRemovedPostings(full, drop)
}
type removedPostings struct { type removedPostings struct {
full, remove Postings full, remove Postings
@ -420,6 +489,10 @@ type listPostings struct {
cur uint64 cur uint64
} }
func NewListPostings(list []uint64) Postings {
return newListPostings(list)
}
func newListPostings(list []uint64) *listPostings { func newListPostings(list []uint64) *listPostings {
return &listPostings{list: list} return &listPostings{list: list}
} }
@ -508,27 +581,3 @@ func (it *bigEndianPostings) Seek(x uint64) bool {
func (it *bigEndianPostings) Err() error { func (it *bigEndianPostings) Err() error {
return nil return nil
} }
type stringset map[string]struct{}
func (ss stringset) set(s string) {
ss[s] = struct{}{}
}
func (ss stringset) has(s string) bool {
_, ok := ss[s]
return ok
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}
func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}

View file

@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package tsdb package index
import ( import (
"encoding/binary" "encoding/binary"
@ -25,7 +25,7 @@ import (
) )
func TestMemPostings_addFor(t *testing.T) { func TestMemPostings_addFor(t *testing.T) {
p := newMemPostings() p := NewMemPostings()
p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8}
p.addFor(5, allPostingsKey) p.addFor(5, allPostingsKey)
@ -34,7 +34,7 @@ func TestMemPostings_addFor(t *testing.T) {
} }
func TestMemPostings_ensureOrder(t *testing.T) { func TestMemPostings_ensureOrder(t *testing.T) {
p := newUnorderedMemPostings() p := NewUnorderedMemPostings()
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
l := make([]uint64, 100) l := make([]uint64, 100)
@ -46,7 +46,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
p.m[labels.Label{"a", v}] = l p.m[labels.Label{"a", v}] = l
} }
p.ensureOrder() p.EnsureOrder()
for _, l := range p.m { for _, l := range p.m {
ok := sort.SliceIsSorted(l, func(i, j int) bool { ok := sort.SliceIsSorted(l, func(i, j int) bool {
@ -100,7 +100,7 @@ func TestIntersect(t *testing.T) {
a := newListPostings(c.a) a := newListPostings(c.a)
b := newListPostings(c.b) b := newListPostings(c.b)
res, err := expandPostings(Intersect(a, b)) res, err := ExpandPostings(Intersect(a, b))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.res, res) testutil.Equals(t, c.res, res)
} }
@ -140,7 +140,7 @@ func TestMultiIntersect(t *testing.T) {
ps = append(ps, newListPostings(postings)) ps = append(ps, newListPostings(postings))
} }
res, err := expandPostings(Intersect(ps...)) res, err := ExpandPostings(Intersect(ps...))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.res, res) testutil.Equals(t, c.res, res)
@ -174,7 +174,7 @@ func BenchmarkIntersect(t *testing.B) {
t.ResetTimer() t.ResetTimer()
for i := 0; i < t.N; i++ { for i := 0; i < t.N; i++ {
if _, err := expandPostings(Intersect(i1, i2, i3, i4)); err != nil { if _, err := ExpandPostings(Intersect(i1, i2, i3, i4)); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -198,7 +198,7 @@ func TestMultiMerge(t *testing.T) {
i2 := newListPostings(c.b) i2 := newListPostings(c.b)
i3 := newListPostings(c.c) i3 := newListPostings(c.c)
res, err := expandPostings(Merge(i1, i2, i3)) res, err := ExpandPostings(Merge(i1, i2, i3))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.res, res) testutil.Equals(t, c.res, res)
} }
@ -230,7 +230,7 @@ func TestMergedPostings(t *testing.T) {
a := newListPostings(c.a) a := newListPostings(c.a)
b := newListPostings(c.b) b := newListPostings(c.b)
res, err := expandPostings(newMergedPostings(a, b)) res, err := ExpandPostings(newMergedPostings(a, b))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.res, res) testutil.Equals(t, c.res, res)
} }
@ -290,7 +290,7 @@ func TestMergedPostingsSeek(t *testing.T) {
// After Seek(), At() should be called. // After Seek(), At() should be called.
if c.success { if c.success {
start := p.At() start := p.At()
lst, err := expandPostings(p) lst, err := ExpandPostings(p)
testutil.Ok(t, err) testutil.Ok(t, err)
lst = append([]uint64{start}, lst...) lst = append([]uint64{start}, lst...)
@ -347,7 +347,7 @@ func TestRemovedPostings(t *testing.T) {
a := newListPostings(c.a) a := newListPostings(c.a)
b := newListPostings(c.b) b := newListPostings(c.b)
res, err := expandPostings(newRemovedPostings(a, b)) res, err := ExpandPostings(newRemovedPostings(a, b))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, c.res, res) testutil.Equals(t, c.res, res)
} }
@ -431,7 +431,7 @@ func TestRemovedPostingsSeek(t *testing.T) {
// After Seek(), At() should be called. // After Seek(), At() should be called.
if c.success { if c.success {
start := p.At() start := p.At()
lst, err := expandPostings(p) lst, err := ExpandPostings(p)
testutil.Ok(t, err) testutil.Ok(t, err)
lst = append([]uint64{start}, lst...) lst = append([]uint64{start}, lst...)
@ -527,7 +527,7 @@ func TestIntersectWithMerge(t *testing.T) {
) )
p := Intersect(a, b) p := Intersect(a, b)
res, err := expandPostings(p) res, err := ExpandPostings(p)
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Equals(t, []uint64{30}, res) testutil.Equals(t, []uint64{30}, res)

View file

@ -19,7 +19,9 @@ import (
"strings" "strings"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
) )
@ -202,18 +204,17 @@ func (q *blockQuerier) Close() error {
// PostingsForMatchers assembles a single postings iterator against the index reader // PostingsForMatchers assembles a single postings iterator against the index reader
// based on the given matchers. It returns a list of label names that must be manually // based on the given matchers. It returns a list of label names that must be manually
// checked to not exist in series the postings list points to. // checked to not exist in series the postings list points to.
func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, error) { func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, error) {
var ( var its []index.Postings
its []Postings
)
for _, m := range ms { for _, m := range ms {
it, err := postingsForMatcher(index, m) it, err := postingsForMatcher(ix, m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
its = append(its, it) its = append(its, it)
} }
return index.SortedPostings(Intersect(its...)), nil return ix.SortedPostings(index.Intersect(its...)), nil
} }
// tuplesByPrefix uses binary search to find prefix matches within ts. // tuplesByPrefix uses binary search to find prefix matches within ts.
@ -247,24 +248,24 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error)
return matches, nil return matches, nil
} }
func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) {
// If the matcher selects an empty value, it selects all the series which dont // If the matcher selects an empty value, it selects all the series which dont
// have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575
// and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
if m.Matches("") { if m.Matches("") {
return postingsForUnsetLabelMatcher(index, m) return postingsForUnsetLabelMatcher(ix, m)
} }
// Fast-path for equal matching. // Fast-path for equal matching.
if em, ok := m.(*labels.EqualMatcher); ok { if em, ok := m.(*labels.EqualMatcher); ok {
it, err := index.Postings(em.Name(), em.Value()) it, err := ix.Postings(em.Name(), em.Value())
if err != nil { if err != nil {
return nil, err return nil, err
} }
return it, nil return it, nil
} }
tpls, err := index.LabelValues(m.Name()) tpls, err := ix.LabelValues(m.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -289,24 +290,24 @@ func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) {
} }
if len(res) == 0 { if len(res) == 0 {
return EmptyPostings(), nil return index.EmptyPostings(), nil
} }
var rit []Postings var rit []index.Postings
for _, v := range res { for _, v := range res {
it, err := index.Postings(m.Name(), v) it, err := ix.Postings(m.Name(), v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rit = append(rit, it) rit = append(rit, it)
} }
return Merge(rit...), nil return index.Merge(rit...), nil
} }
func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings, error) { func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) {
tpls, err := index.LabelValues(m.Name()) tpls, err := ix.LabelValues(m.Name())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -323,23 +324,22 @@ func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings
} }
} }
var rit []Postings var rit []index.Postings
for _, v := range res { for _, v := range res {
it, err := index.Postings(m.Name(), v) it, err := ix.Postings(m.Name(), v)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rit = append(rit, it) rit = append(rit, it)
} }
mrit := Merge(rit...)
allPostings, err := index.Postings(allPostingsKey.Name, allPostingsKey.Value) allPostings, err := ix.Postings("", "")
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newRemovedPostings(allPostings, mrit), nil return index.Without(allPostings, index.Merge(rit...)), nil
} }
func mergeStrings(a, b []string) []string { func mergeStrings(a, b []string) []string {
@ -458,19 +458,19 @@ func (s *mergedSeriesSet) Next() bool {
// actual series itself. // actual series itself.
type ChunkSeriesSet interface { type ChunkSeriesSet interface {
Next() bool Next() bool
At() (labels.Labels, []ChunkMeta, Intervals) At() (labels.Labels, []chunks.Meta, Intervals)
Err() error Err() error
} }
// baseChunkSeries loads the label set and chunk references for a postings // baseChunkSeries loads the label set and chunk references for a postings
// list from an index. It filters out series that have labels set that should be unset. // list from an index. It filters out series that have labels set that should be unset.
type baseChunkSeries struct { type baseChunkSeries struct {
p Postings p index.Postings
index IndexReader index IndexReader
tombstones TombstoneReader tombstones TombstoneReader
lset labels.Labels lset labels.Labels
chks []ChunkMeta chks []chunks.Meta
intervals Intervals intervals Intervals
err error err error
} }
@ -492,7 +492,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
}, nil }, nil
} }
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
@ -501,13 +501,13 @@ func (s *baseChunkSeries) Err() error { return s.err }
func (s *baseChunkSeries) Next() bool { func (s *baseChunkSeries) Next() bool {
var ( var (
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chkMetas []chunks.Meta
err error err error
) )
for s.p.Next() { for s.p.Next() {
ref := s.p.At() ref := s.p.At()
if err := s.index.Series(ref, &lset, &chunks); err != nil { if err := s.index.Series(ref, &lset, &chkMetas); err != nil {
// Postings may be stale. Skip if no underlying series exists. // Postings may be stale. Skip if no underlying series exists.
if errors.Cause(err) == ErrNotFound { if errors.Cause(err) == ErrNotFound {
continue continue
@ -517,7 +517,7 @@ func (s *baseChunkSeries) Next() bool {
} }
s.lset = lset s.lset = lset
s.chks = chunks s.chks = chkMetas
s.intervals, err = s.tombstones.Get(s.p.At()) s.intervals, err = s.tombstones.Get(s.p.At())
if err != nil { if err != nil {
s.err = errors.Wrap(err, "get tombstones") s.err = errors.Wrap(err, "get tombstones")
@ -526,7 +526,7 @@ func (s *baseChunkSeries) Next() bool {
if len(s.intervals) > 0 { if len(s.intervals) > 0 {
// Only those chunks that are not entirely deleted. // Only those chunks that are not entirely deleted.
chks := make([]ChunkMeta, 0, len(s.chks)) chks := make([]chunks.Meta, 0, len(s.chks))
for _, chk := range s.chks { for _, chk := range s.chks {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
chks = append(chks, chk) chks = append(chks, chk)
@ -553,12 +553,12 @@ type populatedChunkSeries struct {
mint, maxt int64 mint, maxt int64
err error err error
chks []ChunkMeta chks []chunks.Meta
lset labels.Labels lset labels.Labels
intervals Intervals intervals Intervals
} }
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) {
return s.lset, s.chks, s.intervals return s.lset, s.chks, s.intervals
} }
@ -651,7 +651,7 @@ func (s *blockSeriesSet) Err() error { return s.err }
// time series data. // time series data.
type chunkSeries struct { type chunkSeries struct {
labels labels.Labels labels labels.Labels
chunks []ChunkMeta // in-order chunk refs chunks []chunks.Meta // in-order chunk refs
mint, maxt int64 mint, maxt int64
@ -754,17 +754,17 @@ func (it *chainedSeriesIterator) Err() error {
// chunkSeriesIterator implements a series iterator on top // chunkSeriesIterator implements a series iterator on top
// of a list of time-sorted, non-overlapping chunks. // of a list of time-sorted, non-overlapping chunks.
type chunkSeriesIterator struct { type chunkSeriesIterator struct {
chunks []ChunkMeta chunks []chunks.Meta
i int i int
cur chunks.Iterator cur chunkenc.Iterator
maxt, mint int64 maxt, mint int64
intervals Intervals intervals Intervals
} }
func newChunkSeriesIterator(cs []ChunkMeta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
it := cs[0].Chunk.Iterator() it := cs[0].Chunk.Iterator()
if len(dranges) > 0 { if len(dranges) > 0 {
@ -853,6 +853,46 @@ func (it *chunkSeriesIterator) Err() error {
return it.cur.Err() return it.cur.Err()
} }
// deletedIterator wraps an Iterator and makes sure any deleted metrics are not
// returned.
type deletedIterator struct {
it chunkenc.Iterator
intervals Intervals
}
func (it *deletedIterator) At() (int64, float64) {
return it.it.At()
}
func (it *deletedIterator) Next() bool {
Outer:
for it.it.Next() {
ts, _ := it.it.At()
for _, tr := range it.intervals {
if tr.inBounds(ts) {
continue Outer
}
if ts > tr.Maxt {
it.intervals = it.intervals[1:]
continue
}
return true
}
return true
}
return false
}
func (it *deletedIterator) Err() error {
return it.it.Err()
}
type mockSeriesSet struct { type mockSeriesSet struct {
next func() bool next func() bool
series func() Series series func() Series

View file

@ -20,7 +20,10 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/pkg/errors"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
"github.com/prometheus/tsdb/testutil" "github.com/prometheus/tsdb/testutil"
) )
@ -228,25 +231,25 @@ func createIdxChkReaders(tc []struct {
return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0
}) })
postings := newMemPostings() postings := index.NewMemPostings()
chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk))
lblIdx := make(map[string]stringset) lblIdx := make(map[string]stringset)
mi := newMockIndex() mi := newMockIndex()
for i, s := range tc { for i, s := range tc {
i = i + 1 // 0 is not a valid posting. i = i + 1 // 0 is not a valid posting.
metas := make([]ChunkMeta, 0, len(s.chunks)) metas := make([]chunks.Meta, 0, len(s.chunks))
for _, chk := range s.chunks { for _, chk := range s.chunks {
// Collisions can be there, but for tests, its fine. // Collisions can be there, but for tests, its fine.
ref := rand.Uint64() ref := rand.Uint64()
metas = append(metas, ChunkMeta{ metas = append(metas, chunks.Meta{
MinTime: chk[0].t, MinTime: chk[0].t,
MaxTime: chk[len(chk)-1].t, MaxTime: chk[len(chk)-1].t,
Ref: ref, Ref: ref,
}) })
chunk := chunks.NewXORChunk() chunk := chunkenc.NewXORChunk()
app, _ := chunk.Appender() app, _ := chunk.Appender()
for _, smpl := range chk { for _, smpl := range chk {
app.Append(smpl.t, smpl.v) app.Append(smpl.t, smpl.v)
@ -257,7 +260,7 @@ func createIdxChkReaders(tc []struct {
ls := labels.FromMap(s.lset) ls := labels.FromMap(s.lset)
mi.AddSeries(uint64(i), ls, metas...) mi.AddSeries(uint64(i), ls, metas...)
postings.add(uint64(i), ls) postings.Add(uint64(i), ls)
for _, l := range ls { for _, l := range ls {
vs, present := lblIdx[l.Name] vs, present := lblIdx[l.Name]
@ -273,9 +276,9 @@ func createIdxChkReaders(tc []struct {
mi.WriteLabelIndex([]string{l}, vs.slice()) mi.WriteLabelIndex([]string{l}, vs.slice())
} }
for l := range postings.m { postings.Iter(func(l labels.Label, p index.Postings) error {
mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)) return mi.WritePostings(l.Name, l.Value, p)
} })
return mi, chkReader return mi, chkReader
} }
@ -660,7 +663,7 @@ Outer:
func TestBaseChunkSeries(t *testing.T) { func TestBaseChunkSeries(t *testing.T) {
type refdSeries struct { type refdSeries struct {
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chunks []chunks.Meta
ref uint64 ref uint64
} }
@ -676,7 +679,7 @@ func TestBaseChunkSeries(t *testing.T) {
series: []refdSeries{ series: []refdSeries{
{ {
lset: labels.New([]labels.Label{{"a", "a"}}...), lset: labels.New([]labels.Label{{"a", "a"}}...),
chunks: []ChunkMeta{ chunks: []chunks.Meta{
{Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344},
{Ref: 121}, {Ref: 121},
}, },
@ -684,19 +687,19 @@ func TestBaseChunkSeries(t *testing.T) {
}, },
{ {
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
chunks: []ChunkMeta{ chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
}, },
ref: 10, ref: 10,
}, },
{ {
lset: labels.New([]labels.Label{{"b", "c"}}...), lset: labels.New([]labels.Label{{"b", "c"}}...),
chunks: []ChunkMeta{{Ref: 8282}}, chunks: []chunks.Meta{{Ref: 8282}},
ref: 1, ref: 1,
}, },
{ {
lset: labels.New([]labels.Label{{"b", "b"}}...), lset: labels.New([]labels.Label{{"b", "b"}}...),
chunks: []ChunkMeta{ chunks: []chunks.Meta{
{Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269},
}, },
ref: 108, ref: 108,
@ -709,14 +712,14 @@ func TestBaseChunkSeries(t *testing.T) {
series: []refdSeries{ series: []refdSeries{
{ {
lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...),
chunks: []ChunkMeta{ chunks: []chunks.Meta{
{Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26},
}, },
ref: 10, ref: 10,
}, },
{ {
lset: labels.New([]labels.Label{{"b", "c"}}...), lset: labels.New([]labels.Label{{"b", "c"}}...),
chunks: []ChunkMeta{{Ref: 8282}}, chunks: []chunks.Meta{{Ref: 8282}},
ref: 3, ref: 3,
}, },
}, },
@ -732,7 +735,7 @@ func TestBaseChunkSeries(t *testing.T) {
} }
bcs := &baseChunkSeries{ bcs := &baseChunkSeries{
p: newListPostings(tc.postings), p: index.NewListPostings(tc.postings),
index: mi, index: mi,
tombstones: EmptyTombstoneReader(), tombstones: EmptyTombstoneReader(),
} }
@ -763,20 +766,20 @@ type itSeries struct {
func (s itSeries) Iterator() SeriesIterator { return s.si } func (s itSeries) Iterator() SeriesIterator { return s.si }
func (s itSeries) Labels() labels.Labels { return labels.Labels{} } func (s itSeries) Labels() labels.Labels { return labels.Labels{} }
func chunkFromSamples(s []sample) ChunkMeta { func chunkFromSamples(s []sample) chunks.Meta {
mint, maxt := int64(0), int64(0) mint, maxt := int64(0), int64(0)
if len(s) > 0 { if len(s) > 0 {
mint, maxt = s[0].t, s[len(s)-1].t mint, maxt = s[0].t, s[len(s)-1].t
} }
c := chunks.NewXORChunk() c := chunkenc.NewXORChunk()
ca, _ := c.Appender() ca, _ := c.Appender()
for _, s := range s { for _, s := range s {
ca.Append(s.t, s.v) ca.Append(s.t, s.v)
} }
return ChunkMeta{ return chunks.Meta{
MinTime: mint, MinTime: mint,
MaxTime: maxt, MaxTime: maxt,
Chunk: c, Chunk: c,
@ -941,7 +944,7 @@ func TestSeriesIterator(t *testing.T) {
t.Run("Chunk", func(t *testing.T) { t.Run("Chunk", func(t *testing.T) {
for _, tc := range itcases { for _, tc := range itcases {
chkMetas := []ChunkMeta{ chkMetas := []chunks.Meta{
chunkFromSamples(tc.a), chunkFromSamples(tc.a),
chunkFromSamples(tc.b), chunkFromSamples(tc.b),
chunkFromSamples(tc.c), chunkFromSamples(tc.c),
@ -1012,7 +1015,7 @@ func TestSeriesIterator(t *testing.T) {
seekcases2 := append(seekcases, extra...) seekcases2 := append(seekcases, extra...)
for _, tc := range seekcases2 { for _, tc := range seekcases2 {
chkMetas := []ChunkMeta{ chkMetas := []chunks.Meta{
chunkFromSamples(tc.a), chunkFromSamples(tc.a),
chunkFromSamples(tc.b), chunkFromSamples(tc.b),
chunkFromSamples(tc.c), chunkFromSamples(tc.c),
@ -1099,7 +1102,7 @@ func TestSeriesIterator(t *testing.T) {
// Regression for: https://github.com/prometheus/tsdb/pull/97 // Regression for: https://github.com/prometheus/tsdb/pull/97
func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
chkMetas := []ChunkMeta{ chkMetas := []chunks.Meta{
chunkFromSamples([]sample{}), chunkFromSamples([]sample{}),
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}),
chunkFromSamples([]sample{{4, 4}, {5, 5}}), chunkFromSamples([]sample{{4, 4}, {5, 5}}),
@ -1116,7 +1119,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
// Regression when seeked chunks were still found via binary search and we always // Regression when seeked chunks were still found via binary search and we always
// skipped to the end when seeking a value in the current chunk. // skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
metas := []ChunkMeta{ metas := []chunks.Meta{
chunkFromSamples([]sample{}), chunkFromSamples([]sample{}),
chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}), chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}),
chunkFromSamples([]sample{}), chunkFromSamples([]sample{}),
@ -1138,7 +1141,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
// Regression when calling Next() with a time bounded to fit within two samples. // Regression when calling Next() with a time bounded to fit within two samples.
// Seek gets called and advances beyond the max time, which was just accepted as a valid sample. // Seek gets called and advances beyond the max time, which was just accepted as a valid sample.
func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
metas := []ChunkMeta{ metas := []chunks.Meta{
chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}), chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}),
} }
@ -1148,7 +1151,7 @@ func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})}
chunkMetas := [][]ChunkMeta{ chunkMetas := [][]chunks.Meta{
{ {
{MinTime: 1, MaxTime: 2, Ref: 1}, {MinTime: 1, MaxTime: 2, Ref: 1},
{MinTime: 3, MaxTime: 4, Ref: 2}, {MinTime: 3, MaxTime: 4, Ref: 2},
@ -1157,10 +1160,10 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
} }
cr := mockChunkReader( cr := mockChunkReader(
map[uint64]chunks.Chunk{ map[uint64]chunkenc.Chunk{
1: chunks.NewXORChunk(), 1: chunkenc.NewXORChunk(),
2: chunks.NewXORChunk(), 2: chunkenc.NewXORChunk(),
3: chunks.NewXORChunk(), 3: chunkenc.NewXORChunk(),
}, },
) )
@ -1180,7 +1183,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
testutil.Assert(t, p.Next() == false, "") testutil.Assert(t, p.Next() == false, "")
// Test the case where 1 chunk could cause an unpopulated chunk to be returned. // Test the case where 1 chunk could cause an unpopulated chunk to be returned.
chunkMetas = [][]ChunkMeta{ chunkMetas = [][]chunks.Meta{
{ {
{MinTime: 1, MaxTime: 2, Ref: 1}, {MinTime: 1, MaxTime: 2, Ref: 1},
}, },
@ -1200,7 +1203,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
type mockChunkSeriesSet struct { type mockChunkSeriesSet struct {
l []labels.Labels l []labels.Labels
cm [][]ChunkMeta cm [][]chunks.Meta
i int i int
} }
@ -1213,7 +1216,7 @@ func (m *mockChunkSeriesSet) Next() bool {
return m.i < len(m.l) return m.i < len(m.l)
} }
func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
return m.l[m.i], m.cm[m.i], nil return m.l[m.i], m.cm[m.i], nil
} }
@ -1279,3 +1282,198 @@ func BenchmarkMergedSeriesSet(b *testing.B) {
} }
} }
} }
type mockChunkReader map[uint64]chunkenc.Chunk
func (cr mockChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) {
chk, ok := cr[id]
if ok {
return chk, nil
}
return nil, errors.New("Chunk with ref not found")
}
func (cr mockChunkReader) Close() error {
return nil
}
func TestDeletedIterator(t *testing.T) {
chk := chunks.NewXORChunk()
app, err := chk.Appender()
testutil.Ok(t, err)
// Insert random stuff from (0, 1000).
act := make([]sample, 1000)
for i := 0; i < 1000; i++ {
act[i].t = int64(i)
act[i].v = rand.Float64()
app.Append(act[i].t, act[i].v)
}
cases := []struct {
r Intervals
}{
{r: Intervals{{1, 20}}},
{r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}},
{r: Intervals{{1, 10}, {12, 20}, {20, 30}}},
{r: Intervals{{1, 10}, {12, 23}, {25, 30}}},
{r: Intervals{{1, 23}, {12, 20}, {25, 30}}},
{r: Intervals{{1, 23}, {12, 20}, {25, 3000}}},
{r: Intervals{{0, 2000}}},
{r: Intervals{{500, 2000}}},
{r: Intervals{{0, 200}}},
{r: Intervals{{1000, 20000}}},
}
for _, c := range cases {
i := int64(-1)
it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]}
ranges := c.r[:]
for it.Next() {
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000 == true, "")
ts, v := it.At()
testutil.Equals(t, act[i].t, ts)
testutil.Equals(t, act[i].v, v)
}
// There has been an extra call to Next().
i++
for _, tr := range ranges {
if tr.inBounds(i) {
i = tr.Maxt + 1
ranges = ranges[1:]
}
}
testutil.Assert(t, i < 1000 == false, "")
testutil.Ok(t, it.Err())
}
}
type series struct {
l labels.Labels
chunks []chunks.Meta
}
type mockIndex struct {
series map[uint64]series
labelIndex map[string][]string
postings map[labels.Label][]uint64
symbols map[string]struct{}
}
func newMockIndex() mockIndex {
ix := mockIndex{
series: make(map[uint64]series),
labelIndex: make(map[string][]string),
postings: make(map[labels.Label][]uint64),
symbols: make(map[string]struct{}),
}
return ix
}
func (m mockIndex) Symbols() (map[string]struct{}, error) {
return m.symbols, nil
}
func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error {
if _, ok := m.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref)
}
for _, lbl := range l {
m.symbols[lbl.Name] = struct{}{}
m.symbols[lbl.Value] = struct{}{}
}
s := series{l: l}
// Actual chunk data is not stored in the index.
for _, c := range chunks {
c.Chunk = nil
s.chunks = append(s.chunks, c)
}
m.series[ref] = s
return nil
}
func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
// TODO support composite indexes
if len(names) != 1 {
return errors.New("composite indexes not supported yet")
}
sort.Strings(values)
m.labelIndex[names[0]] = values
return nil
}
func (m mockIndex) WritePostings(name, value string, it index.Postings) error {
l := labels.Label{Name: name, Value: value}
if _, ok := m.postings[l]; ok {
return errors.Errorf("postings for %s already added", l)
}
ep, err := index.ExpandPostings(it)
if err != nil {
return err
}
m.postings[l] = ep
return nil
}
func (m mockIndex) Close() error {
return nil
}
func (m mockIndex) LabelValues(names ...string) (index.StringTuples, error) {
// TODO support composite indexes
if len(names) != 1 {
return nil, errors.New("composite indexes not supported yet")
}
return index.NewStringTuples(m.labelIndex[names[0]], 1)
}
func (m mockIndex) Postings(name, value string) (index.Postings, error) {
l := labels.Label{Name: name, Value: value}
return index.NewListPostings(m.postings[l]), nil
}
func (m mockIndex) SortedPostings(p index.Postings) index.Postings {
ep, err := index.ExpandPostings(p)
if err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings"))
}
sort.Slice(ep, func(i, j int) bool {
return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0
})
return index.NewListPostings(ep)
}
func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
s, ok := m.series[ref]
if !ok {
return ErrNotFound
}
*lset = append((*lset)[:0], s.l...)
*chks = append((*chks)[:0], s.chunks...)
return nil
}
func (m mockIndex) LabelIndices() ([][]string, error) {
res := make([][]string, 0, len(m.labelIndex))
for k := range m.labelIndex {
res = append(res, []string{k})
}
return res, nil
}