diff --git a/block.go b/block.go index 48f768223..4ef58e913 100644 --- a/block.go +++ b/block.go @@ -23,10 +23,101 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) +// IndexWriter serializes the index for a block of series data. +// The methods must be called in the order they are specified in. +type IndexWriter interface { + // AddSymbols registers all string symbols that are encountered in series + // and other indices. + AddSymbols(sym map[string]struct{}) error + + // AddSeries populates the index writer with a series and its offsets + // of chunks that the index can reference. + // Implementations may require series to be insert in increasing order by + // their labels. + // The reference numbers are used to resolve entries in postings lists that + // are added later. + AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error + + // WriteLabelIndex serializes an index from label names to values. + // The passed in values chained tuples of strings of the length of names. + WriteLabelIndex(names []string, values []string) error + + // WritePostings writes a postings list for a single label pair. + // The Postings here contain refs to the series that were added. + WritePostings(name, value string, it index.Postings) error + + // Close writes any finalization and closes the resources associated with + // the underlying writer. + Close() error +} + +// IndexReader provides reading access of serialized index data. +type IndexReader interface { + // Symbols returns a set of string symbols that may occur in series' labels + // and indices. + Symbols() (map[string]struct{}, error) + + // LabelValues returns the possible label values + LabelValues(names ...string) (index.StringTuples, error) + + // Postings returns the postings list iterator for the label pair. + // The Postings here contain the offsets to the series inside the index. + // Found IDs are not strictly required to point to a valid Series, e.g. during + // background garbage collections. + Postings(name, value string) (index.Postings, error) + + // SortedPostings returns a postings list that is reordered to be sorted + // by the label set of the underlying series. + SortedPostings(index.Postings) index.Postings + + // Series populates the given labels and chunk metas for the series identified + // by the reference. + // Returns ErrNotFound if the ref does not resolve to a known series. + Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error + + // LabelIndices returns the label pairs for which indices exist. + LabelIndices() ([][]string, error) + + // Close released the underlying resources of the reader. + Close() error +} + +// StringTuples provides access to a sorted list of string tuples. +type StringTuples interface { + // Total number of tuples in the list. + Len() int + // At returns the tuple at position i. + At(i int) ([]string, error) +} + +// ChunkWriter serializes a time block of chunked series data. +type ChunkWriter interface { + // WriteChunks writes several chunks. The Chunk field of the ChunkMetas + // must be populated. + // After returning successfully, the Ref fields in the ChunkMetas + // are set and can be used to retrieve the chunks from the written data. + WriteChunks(chunks ...chunks.Meta) error + + // Close writes any required finalization and closes the resources + // associated with the underlying writer. + Close() error +} + +// ChunkReader provides reading access of serialized time series data. +type ChunkReader interface { + // Chunk returns the series data chunk with the given reference. + Chunk(ref uint64) (chunkenc.Chunk, error) + + // Close releases all underlying resources of the reader. + Close() error +} + // BlockReader provides reading access to a data block. type BlockReader interface { // Index returns an IndexReader over the block's data. @@ -91,8 +182,12 @@ type blockMeta struct { *BlockMeta } +const indexFilename = "index" const metaFilename = "meta.json" +func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } +func walDir(dir string) string { return filepath.Join(dir, "wal") } + func readMetaFile(dir string) (*BlockMeta, error) { b, err := ioutil.ReadFile(filepath.Join(dir, metaFilename)) if err != nil { @@ -150,17 +245,17 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunks.Pool) (*Block, error) { +func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err } - cr, err := NewDirChunkReader(chunkDir(dir), pool) + cr, err := chunks.NewDirReader(chunkDir(dir), pool) if err != nil { return nil, err } - ir, err := NewFileIndexReader(filepath.Join(dir, "index")) + ir, err := index.NewFileReader(filepath.Join(dir, "index")) if err != nil { return nil, err } @@ -300,7 +395,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { stones := memTombstones{} var lset labels.Labels - var chks []ChunkMeta + var chks []chunks.Meta Outer: for p.Next() { @@ -405,9 +500,6 @@ func (pb *Block) Snapshot(dir string) error { return nil } -func chunkDir(dir string) string { return filepath.Join(dir, "chunks") } -func walDir(dir string) string { return filepath.Join(dir, "wal") } - func clampInterval(a, b, mint, maxt int64) (int64, int64) { if a < mint { a = mint @@ -417,36 +509,3 @@ func clampInterval(a, b, mint, maxt int64) (int64, int64) { } return a, b } - -type mmapFile struct { - f *os.File - b []byte -} - -func openMmapFile(path string) (*mmapFile, error) { - f, err := os.Open(path) - if err != nil { - return nil, errors.Wrap(err, "try lock file") - } - info, err := f.Stat() - if err != nil { - return nil, errors.Wrap(err, "stat") - } - - b, err := mmap(f, int(info.Size())) - if err != nil { - return nil, errors.Wrap(err, "mmap") - } - - return &mmapFile{f: f, b: b}, nil -} - -func (f *mmapFile) Close() error { - err0 := munmap(f.b) - err1 := f.f.Close() - - if err0 != nil { - return err0 - } - return err1 -} diff --git a/block_test.go b/block_test.go index be6a88090..60d9fe54f 100644 --- a/block_test.go +++ b/block_test.go @@ -16,8 +16,10 @@ package tsdb import ( "io/ioutil" "os" + "path/filepath" "testing" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/testutil" ) @@ -42,9 +44,9 @@ func createEmptyBlock(t *testing.T, dir string) *Block { testutil.Ok(t, writeMetaFile(dir, &BlockMeta{})) - ir, err := newIndexWriter(dir) - testutil.Ok(t, err) - testutil.Ok(t, ir.Close()) + ir, err := index.NewWriter(filepath.Join(dir, indexFilename)) + Ok(t, err) + Ok(t, ir.Close()) testutil.Ok(t, os.MkdirAll(chunkDir(dir), 0777)) diff --git a/chunks/bstream.go b/chunkenc/bstream.go similarity index 99% rename from chunks/bstream.go rename to chunkenc/bstream.go index ed651db23..a352f078e 100644 --- a/chunks/bstream.go +++ b/chunkenc/bstream.go @@ -39,7 +39,7 @@ // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -package chunks +package chunkenc import "io" diff --git a/chunks/chunk.go b/chunkenc/chunk.go similarity index 99% rename from chunks/chunk.go rename to chunkenc/chunk.go index 4d298041e..4c85fa054 100644 --- a/chunks/chunk.go +++ b/chunkenc/chunk.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunks +package chunkenc import ( "fmt" diff --git a/chunks/chunk_test.go b/chunkenc/chunk_test.go similarity index 99% rename from chunks/chunk_test.go rename to chunkenc/chunk_test.go index b03e52819..edabd749f 100644 --- a/chunks/chunk_test.go +++ b/chunkenc/chunk_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package chunks +package chunkenc import ( "fmt" diff --git a/chunks/xor.go b/chunkenc/xor.go similarity index 99% rename from chunks/xor.go rename to chunkenc/xor.go index dcee466f7..b3f5b5135 100644 --- a/chunks/xor.go +++ b/chunkenc/xor.go @@ -41,7 +41,7 @@ // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -package chunks +package chunkenc import ( "encoding/binary" diff --git a/chunks.go b/chunks/chunks.go similarity index 63% rename from chunks.go rename to chunks/chunks.go index 30ab93f80..d07cf11a6 100644 --- a/chunks.go +++ b/chunks/chunks.go @@ -11,18 +11,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package chunks import ( "bufio" "encoding/binary" "fmt" "hash" + "hash/crc32" "io" + "io/ioutil" "os" + "path/filepath" + "strconv" "github.com/pkg/errors" - "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" ) @@ -31,19 +35,19 @@ const ( MagicChunks = 0x85BD40DD ) -// ChunkMeta holds information about a chunk of data. -type ChunkMeta struct { +// Meta holds information about a chunk of data. +type Meta struct { // Ref and Chunk hold either a reference that can be used to retrieve // chunk data or the data itself. // Generally, only one of them is set. Ref uint64 - Chunk chunks.Chunk + Chunk chunkenc.Chunk MinTime, MaxTime int64 // time range the data covers } // writeHash writes the chunk encoding and raw data into the provided hash. -func (cm *ChunkMeta) writeHash(h hash.Hash) error { +func (cm *Meta) writeHash(h hash.Hash) error { if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { return err } @@ -53,62 +57,30 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error { return nil } -// deletedIterator wraps an Iterator and makes sure any deleted metrics are not -// returned. -type deletedIterator struct { - it chunks.Iterator +var ( + errInvalidSize = fmt.Errorf("invalid size") + errInvalidFlag = fmt.Errorf("invalid flag") + errInvalidChecksum = fmt.Errorf("invalid checksum") +) - intervals Intervals +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } -func (it *deletedIterator) At() (int64, float64) { - return it.it.At() +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) } -func (it *deletedIterator) Next() bool { -Outer: - for it.it.Next() { - ts, _ := it.it.At() - - for _, tr := range it.intervals { - if tr.inBounds(ts) { - continue Outer - } - - if ts > tr.Maxt { - it.intervals = it.intervals[1:] - continue - } - - return true - } - - return true - } - - return false -} - -func (it *deletedIterator) Err() error { - return it.it.Err() -} - -// ChunkWriter serializes a time block of chunked series data. -type ChunkWriter interface { - // WriteChunks writes several chunks. The Chunk field of the ChunkMetas - // must be populated. - // After returning successfully, the Ref fields in the ChunkMetas - // are set and can be used to retrieve the chunks from the written data. - WriteChunks(chunks ...ChunkMeta) error - - // Close writes any required finalization and closes the resources - // associated with the underlying writer. - Close() error -} - -// chunkWriter implements the ChunkWriter interface for the standard +// Writer implements the ChunkWriter interface for the standard // serialization format. -type chunkWriter struct { +type Writer struct { dirFile *os.File files []*os.File wbuf *bufio.Writer @@ -124,7 +96,8 @@ const ( chunksFormatV1 = 1 ) -func newChunkWriter(dir string) (*chunkWriter, error) { +// NewWriter returns a new writer against the given directory. +func NewWriter(dir string) (*Writer, error) { if err := os.MkdirAll(dir, 0777); err != nil { return nil, err } @@ -132,7 +105,7 @@ func newChunkWriter(dir string) (*chunkWriter, error) { if err != nil { return nil, err } - cw := &chunkWriter{ + cw := &Writer{ dirFile: dirFile, n: 0, crc32: newCRC32(), @@ -141,7 +114,7 @@ func newChunkWriter(dir string) (*chunkWriter, error) { return cw, nil } -func (w *chunkWriter) tail() *os.File { +func (w *Writer) tail() *os.File { if len(w.files) == 0 { return nil } @@ -150,7 +123,7 @@ func (w *chunkWriter) tail() *os.File { // finalizeTail writes all pending data to the current tail file, // truncates its size, and closes it. -func (w *chunkWriter) finalizeTail() error { +func (w *Writer) finalizeTail() error { tf := w.tail() if tf == nil { return nil @@ -174,7 +147,7 @@ func (w *chunkWriter) finalizeTail() error { return tf.Close() } -func (w *chunkWriter) cut() error { +func (w *Writer) cut() error { // Sync current tail to disk and close. if err := w.finalizeTail(); err != nil { return err @@ -216,13 +189,13 @@ func (w *chunkWriter) cut() error { return nil } -func (w *chunkWriter) write(b []byte) error { +func (w *Writer) write(b []byte) error { n, err := w.wbuf.Write(b) w.n += int64(n) return err } -func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { +func (w *Writer) WriteChunks(chks ...Meta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. maxLen := int64(binary.MaxVarintLen32) // The number of chunks. @@ -272,11 +245,11 @@ func (w *chunkWriter) WriteChunks(chks ...ChunkMeta) error { return nil } -func (w *chunkWriter) seq() int { +func (w *Writer) seq() int { return len(w.files) - 1 } -func (w *chunkWriter) Close() error { +func (w *Writer) Close() error { if err := w.finalizeTail(); err != nil { return err } @@ -285,29 +258,40 @@ func (w *chunkWriter) Close() error { return w.dirFile.Close() } -// ChunkReader provides reading access of serialized time series data. -type ChunkReader interface { - // Chunk returns the series data chunk with the given reference. - Chunk(ref uint64) (chunks.Chunk, error) - - // Close releases all underlying resources of the reader. - Close() error +// ByteSlice abstracts a byte slice. +type ByteSlice interface { + Len() int + Range(start, end int) []byte } -// chunkReader implements a SeriesReader for a serialized byte stream +type realByteSlice []byte + +func (b realByteSlice) Len() int { + return len(b) +} + +func (b realByteSlice) Range(start, end int) []byte { + return b[start:end] +} + +func (b realByteSlice) Sub(start, end int) ByteSlice { + return b[start:end] +} + +// Reader implements a SeriesReader for a serialized byte stream // of series data. -type chunkReader struct { +type Reader struct { // The underlying bytes holding the encoded series data. bs []ByteSlice // Closers for resources behind the byte slices. cs []io.Closer - pool chunks.Pool + pool chunkenc.Pool } -func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkReader, error) { - cr := chunkReader{pool: pool, bs: bs, cs: cs} +func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { + cr := Reader{pool: pool, bs: bs, cs: cs} for i, b := range cr.bs { if b.Len() < 4 { @@ -321,44 +305,44 @@ func newChunkReader(bs []ByteSlice, cs []io.Closer, pool chunks.Pool) (*chunkRea return &cr, nil } -// NewChunkReader returns a new chunk reader against the given byte slices. -func NewChunkReader(bs []ByteSlice, pool chunks.Pool) (ChunkReader, error) { +// NewReader returns a new chunk reader against the given byte slices. +func NewReader(bs []ByteSlice, pool chunkenc.Pool) (*Reader, error) { if pool == nil { - pool = chunks.NewPool() + pool = chunkenc.NewPool() } - return newChunkReader(bs, nil, pool) + return newReader(bs, nil, pool) } -// NewDirChunkReader returns a new ChunkReader against sequentially numbered files in the +// NewDirReader returns a new Reader against sequentially numbered files in the // given directory. -func NewDirChunkReader(dir string, pool chunks.Pool) (ChunkReader, error) { +func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { files, err := sequenceFiles(dir) if err != nil { return nil, err } if pool == nil { - pool = chunks.NewPool() + pool = chunkenc.NewPool() } var bs []ByteSlice var cs []io.Closer for _, fn := range files { - f, err := openMmapFile(fn) + f, err := fileutil.OpenMmapFile(fn) if err != nil { return nil, errors.Wrapf(err, "mmap files") } cs = append(cs, f) - bs = append(bs, realByteSlice(f.b)) + bs = append(bs, realByteSlice(f.Bytes())) } - return newChunkReader(bs, cs, pool) + return newReader(bs, cs, pool) } -func (s *chunkReader) Close() error { +func (s *Reader) Close() error { return closeAll(s.cs...) } -func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) off = int((ref << 32) >> 32) @@ -381,5 +365,47 @@ func (s *chunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } r = b.Range(off+n, off+n+int(l)) - return s.pool.Get(chunks.Encoding(r[0]), r[1:1+l]) + return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) +} + +func nextSequenceFile(dir string) (string, int, error) { + names, err := fileutil.ReadDir(dir) + if err != nil { + return "", 0, err + } + + i := uint64(0) + for _, n := range names { + j, err := strconv.ParseUint(n, 10, 64) + if err != nil { + continue + } + i = j + } + return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil +} + +func sequenceFiles(dir string) ([]string, error) { + files, err := ioutil.ReadDir(dir) + if err != nil { + return nil, err + } + var res []string + + for _, fi := range files { + if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil { + continue + } + res = append(res, filepath.Join(dir, fi.Name())) + } + return res, nil +} + +func closeAll(cs ...io.Closer) (err error) { + for _, c := range cs { + if e := c.Close(); e != nil { + err = e + } + } + return err } diff --git a/chunks_test.go b/chunks_test.go deleted file mode 100644 index 40a282814..000000000 --- a/chunks_test.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2017 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tsdb - -import ( - "math/rand" - "testing" - - "github.com/pkg/errors" - "github.com/prometheus/tsdb/chunks" - "github.com/prometheus/tsdb/testutil" -) - -type mockChunkReader map[uint64]chunks.Chunk - -func (cr mockChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { - chk, ok := cr[ref] - if ok { - return chk, nil - } - - return nil, errors.New("Chunk with ref not found") -} - -func (cr mockChunkReader) Close() error { - return nil -} - -func TestDeletedIterator(t *testing.T) { - chk := chunks.NewXORChunk() - app, err := chk.Appender() - testutil.Ok(t, err) - // Insert random stuff from (0, 1000). - act := make([]sample, 1000) - for i := 0; i < 1000; i++ { - act[i].t = int64(i) - act[i].v = rand.Float64() - app.Append(act[i].t, act[i].v) - } - - cases := []struct { - r Intervals - }{ - {r: Intervals{{1, 20}}}, - {r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, - {r: Intervals{{1, 10}, {12, 20}, {20, 30}}}, - {r: Intervals{{1, 10}, {12, 23}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 30}}}, - {r: Intervals{{1, 23}, {12, 20}, {25, 3000}}}, - {r: Intervals{{0, 2000}}}, - {r: Intervals{{500, 2000}}}, - {r: Intervals{{0, 200}}}, - {r: Intervals{{1000, 20000}}}, - } - - for _, c := range cases { - i := int64(-1) - it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]} - ranges := c.r[:] - for it.Next() { - i++ - for _, tr := range ranges { - if tr.inBounds(i) { - i = tr.Maxt + 1 - ranges = ranges[1:] - } - } - - testutil.Assert(t, i < 1000 == true, "") - - ts, v := it.At() - testutil.Equals(t, act[i].t, ts) - testutil.Equals(t, act[i].v, v) - } - // There has been an extra call to Next(). - i++ - for _, tr := range ranges { - if tr.inBounds(i) { - i = tr.Maxt + 1 - ranges = ranges[1:] - } - } - - testutil.Assert(t, i < 1000 == false, "") - testutil.Ok(t, it.Err()) - } -} diff --git a/compact.go b/compact.go index 5bb328aa3..989756bdb 100644 --- a/compact.go +++ b/compact.go @@ -26,8 +26,10 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -65,7 +67,7 @@ type LeveledCompactor struct { metrics *compactorMetrics logger log.Logger ranges []int64 - chunkPool chunks.Pool + chunkPool chunkenc.Pool } type compactorMetrics struct { @@ -123,12 +125,12 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunks.Pool) (*LeveledCompactor, error) { +func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } if pool == nil { - pool = chunks.NewPool() + pool = chunkenc.NewPool() } return &LeveledCompactor{ ranges: ranges, @@ -370,7 +372,7 @@ type instrumentedChunkWriter struct { trange prometheus.Histogram } -func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error { +func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { for _, c := range chunks { w.size.Observe(float64(len(c.Chunk.Bytes()))) w.samples.Observe(float64(c.Chunk.NumSamples())) @@ -411,7 +413,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // data of all blocks. var chunkw ChunkWriter - chunkw, err = newChunkWriter(chunkDir(tmp)) + chunkw, err = chunks.NewWriter(chunkDir(tmp)) if err != nil { return errors.Wrap(err, "open chunk writer") } @@ -425,7 +427,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } } - indexw, err := newIndexWriter(tmp) + indexw, err := index.NewWriter(filepath.Join(tmp, indexFilename)) if err != nil { return errors.Wrap(err, "open index writer") } @@ -514,7 +516,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, allSymbols[s] = struct{}{} } - all, err := indexr.Postings(allPostingsKey.Name, allPostingsKey.Value) + all, err := indexr.Postings("", "") if err != nil { return err } @@ -534,7 +536,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, // We fully rebuild the postings list index from merged series. var ( - postings = newMemPostings() + postings = index.NewMemPostings() values = map[string]stringset{} i = uint64(0) ) @@ -558,7 +560,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, continue } - newChunk := chunks.NewXORChunk() + newChunk := chunkenc.NewXORChunk() app, err := newChunk.Appender() if err != nil { return err @@ -599,7 +601,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } valset.set(l.Value) } - postings.add(i, lset) + postings.Add(i, lset) i++ } @@ -619,8 +621,8 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - for _, l := range postings.sortedKeys() { - if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil { + for _, l := range postings.SortedKeys() { + if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil { return errors.Wrap(err, "write postings") } } @@ -628,18 +630,18 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } type compactionSeriesSet struct { - p Postings + p index.Postings index IndexReader chunks ChunkReader tombstones TombstoneReader l labels.Labels - c []ChunkMeta + c []chunks.Meta intervals Intervals err error } -func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p Postings) *compactionSeriesSet { +func newCompactionSeriesSet(i IndexReader, c ChunkReader, t TombstoneReader, p index.Postings) *compactionSeriesSet { return &compactionSeriesSet{ index: i, chunks: c, @@ -667,7 +669,7 @@ func (c *compactionSeriesSet) Next() bool { // Remove completely deleted chunks. if len(c.intervals) > 0 { - chks := make([]ChunkMeta, 0, len(c.c)) + chks := make([]chunks.Meta, 0, len(c.c)) for _, chk := range c.c { if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) { chks = append(chks, chk) @@ -697,7 +699,7 @@ func (c *compactionSeriesSet) Err() error { return c.p.Err() } -func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { +func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { return c.l, c.c, c.intervals } @@ -706,13 +708,13 @@ type compactionMerger struct { aok, bok bool l labels.Labels - c []ChunkMeta + c []chunks.Meta intervals Intervals } type compactionSeries struct { labels labels.Labels - chunks []*ChunkMeta + chunks []*chunks.Meta } func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) { @@ -747,7 +749,7 @@ func (c *compactionMerger) Next() bool { // While advancing child iterators the memory used for labels and chunks // may be reused. When picking a series we have to store the result. var lset labels.Labels - var chks []ChunkMeta + var chks []chunks.Meta d := c.compare() // Both sets contain the current series. Chain them into a single one. @@ -788,7 +790,7 @@ func (c *compactionMerger) Err() error { return c.b.Err() } -func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, Intervals) { +func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) { return c.l, c.c, c.intervals } diff --git a/db.go b/db.go index eeb81e8d6..522330b63 100644 --- a/db.go +++ b/db.go @@ -36,7 +36,7 @@ import ( "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -99,7 +99,7 @@ type DB struct { logger log.Logger metrics *dbMetrics opts *Options - chunkPool chunks.Pool + chunkPool chunkenc.Pool compactor Compactor // Mutex for that must be held when modifying the general block layout. @@ -185,7 +185,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), compactionsEnabled: true, - chunkPool: chunks.NewPool(), + chunkPool: chunkenc.NewPool(), } db.metrics = newDBMetrics(db, r) diff --git a/encoding_helpers.go b/encoding_helpers.go index b55c7fda9..d0805fe07 100644 --- a/encoding_helpers.go +++ b/encoding_helpers.go @@ -5,8 +5,12 @@ import ( "hash" "hash/crc32" "unsafe" + + "github.com/pkg/errors" ) +var errInvalidSize = errors.New("invalid size") + // enbuf is a helper type to populate a byte slice with various types. type encbuf struct { b []byte diff --git a/fileutil/mmap.go b/fileutil/mmap.go new file mode 100644 index 000000000..a0c598254 --- /dev/null +++ b/fileutil/mmap.go @@ -0,0 +1,48 @@ +package fileutil + +import ( + "os" + + "github.com/pkg/errors" +) + +type MmapFile struct { + f *os.File + b []byte +} + +func OpenMmapFile(path string) (*MmapFile, error) { + f, err := os.Open(path) + if err != nil { + return nil, errors.Wrap(err, "try lock file") + } + info, err := f.Stat() + if err != nil { + return nil, errors.Wrap(err, "stat") + } + + b, err := mmap(f, int(info.Size())) + if err != nil { + return nil, errors.Wrap(err, "mmap") + } + + return &MmapFile{f: f, b: b}, nil +} + +func (f *MmapFile) Close() error { + err0 := munmap(f.b) + err1 := f.f.Close() + + if err0 != nil { + return err0 + } + return err1 +} + +func (f *MmapFile) File() *os.File { + return f.f +} + +func (f *MmapFile) Bytes() []byte { + return f.b +} diff --git a/db_unix.go b/fileutil/mmap_unix.go similarity index 98% rename from db_unix.go rename to fileutil/mmap_unix.go index 02c411d7f..043f4d408 100644 --- a/db_unix.go +++ b/fileutil/mmap_unix.go @@ -13,7 +13,7 @@ // +build !windows,!plan9 -package tsdb +package fileutil import ( "os" diff --git a/db_windows.go b/fileutil/mmap_windows.go similarity index 98% rename from db_windows.go rename to fileutil/mmap_windows.go index 444bf4103..3bee807c2 100644 --- a/db_windows.go +++ b/fileutil/mmap_windows.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package fileutil import ( "os" diff --git a/head.go b/head.go index 2c1468aca..4652c9579 100644 --- a/head.go +++ b/head.go @@ -17,6 +17,7 @@ import ( "math" "runtime" "sort" + "strings" "sync" "sync/atomic" "time" @@ -25,7 +26,9 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -64,7 +67,7 @@ type Head struct { symbols map[string]struct{} values map[string]stringset // label names to possible values - postings *memPostings // postings lists for terms + postings *index.MemPostings // postings lists for terms tombstones memTombstones } @@ -185,7 +188,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( series: newStripeSeries(), values: map[string]stringset{}, symbols: map[string]struct{}{}, - postings: newUnorderedMemPostings(), + postings: index.NewUnorderedMemPostings(), tombstones: memTombstones{}, } h.metrics = newHeadMetrics(h, r) @@ -226,7 +229,7 @@ func (h *Head) processWALSamples( // ReadWAL initializes the head by consuming the write ahead log. func (h *Head) ReadWAL() error { - defer h.postings.ensureOrder() + defer h.postings.EnsureOrder() r := h.wal.Reader() mint := h.MinTime() @@ -616,64 +619,14 @@ func (h *Head) gc() { h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) - // Remove deleted series IDs from the postings lists. First do a collection - // run where we rebuild all postings that have something to delete - h.postings.mtx.RLock() - - type replEntry struct { - idx int - l []uint64 - } - collected := map[labels.Label]replEntry{} - - for t, p := range h.postings.m { - repl := replEntry{idx: len(p)} - - for i, id := range p { - if _, ok := deleted[id]; ok { - // First ID that got deleted, initialize replacement with - // all remaining IDs so far. - if repl.l == nil { - repl.l = make([]uint64, 0, len(p)) - repl.l = append(repl.l, p[:i]...) - } - continue - } - // Only add to the replacement once we know we have to do it. - if repl.l != nil { - repl.l = append(repl.l, id) - } - } - if repl.l != nil { - collected[t] = repl - } - } - - h.postings.mtx.RUnlock() - - // Replace all postings that have changed. Append all IDs that may have - // been added while we switched locks. - h.postings.mtx.Lock() - - for t, repl := range collected { - l := append(repl.l, h.postings.m[t][repl.idx:]...) - - if len(l) > 0 { - h.postings.m[t] = l - } else { - delete(h.postings.m, t) - } - } - - h.postings.mtx.Unlock() + // Remove deleted series IDs from the postings lists. + h.postings.Delete(deleted) // Rebuild symbols and label value indices from what is left in the postings terms. - h.postings.mtx.RLock() - symbols := make(map[string]struct{}) values := make(map[string]stringset, len(h.values)) - for t := range h.postings.m { + h.postings.Iter(func(t labels.Label, _ index.Postings) error { symbols[t.Name] = struct{}{} symbols[t.Value] = struct{}{} @@ -683,9 +636,8 @@ func (h *Head) gc() { values[t.Name] = ss } ss.set(t.Value) - } - - h.postings.mtx.RUnlock() + return nil + }) h.symMtx.Lock() @@ -765,7 +717,7 @@ func unpackChunkID(id uint64) (seriesID, chunkID uint64) { } // Chunk returns the chunk for the reference number. -func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { +func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { sid, cid := unpackChunkID(ref) s := h.head.series.getByID(sid) @@ -798,12 +750,12 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { } type safeChunk struct { - chunks.Chunk + chunkenc.Chunk s *memSeries cid int } -func (c *safeChunk) Iterator() chunks.Iterator { +func (c *safeChunk) Iterator() chunkenc.Iterator { c.s.Lock() it := c.s.iterator(c.cid) c.s.Unlock() @@ -836,7 +788,7 @@ func (h *headIndexReader) Symbols() (map[string]struct{}, error) { } // LabelValues returns the possible label values -func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { +func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, error) { if len(names) != 1 { return nil, errInvalidSize } @@ -850,22 +802,22 @@ func (h *headIndexReader) LabelValues(names ...string) (StringTuples, error) { } sort.Strings(sl) - return &stringTuples{l: len(names), s: sl}, nil + return index.NewStringTuples(sl, len(names)) } // Postings returns the postings list iterator for the label pair. -func (h *headIndexReader) Postings(name, value string) (Postings, error) { - return h.head.postings.get(name, value), nil +func (h *headIndexReader) Postings(name, value string) (index.Postings, error) { + return h.head.postings.Get(name, value), nil } -func (h *headIndexReader) SortedPostings(p Postings) Postings { +func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings { ep := make([]uint64, 0, 128) for p.Next() { ep = append(ep, p.At()) } if err := p.Err(); err != nil { - return errPostings{err: errors.Wrap(err, "expand postings")} + return index.ErrPostings(errors.Wrap(err, "expand postings")) } sort.Slice(ep, func(i, j int) bool { @@ -878,11 +830,11 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings { } return labels.Compare(a.lset, b.lset) < 0 }) - return newListPostings(ep) + return index.NewListPostings(ep) } // Series returns the series for the given reference. -func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { +func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { s := h.head.series.getByID(ref) if s == nil { @@ -901,7 +853,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { continue } - *chks = append(*chks, ChunkMeta{ + *chks = append(*chks, chunks.Meta{ MinTime: c.minTime, MaxTime: c.maxTime, Ref: packChunkID(s.ref, uint64(s.chunkID(i))), @@ -949,7 +901,7 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie h.metrics.series.Inc() h.metrics.seriesCreated.Inc() - h.postings.add(id, lset) + h.postings.Add(id, lset) h.symMtx.Lock() defer h.symMtx.Unlock() @@ -1154,7 +1106,7 @@ type memSeries struct { lastValue float64 sampleBuf [4]sample - app chunks.Appender // Current appender for the chunk. + app chunkenc.Appender // Current appender for the chunk. } func (s *memSeries) minTime() int64 { @@ -1167,7 +1119,7 @@ func (s *memSeries) maxTime() int64 { func (s *memSeries) cut(mint int64) *memChunk { c := &memChunk{ - chunk: chunks.NewXORChunk(), + chunk: chunkenc.NewXORChunk(), minTime: mint, maxTime: math.MinInt64, } @@ -1295,13 +1247,13 @@ func computeChunkEndTime(start, cur, max int64) int64 { return start + (max-start)/a } -func (s *memSeries) iterator(id int) chunks.Iterator { +func (s *memSeries) iterator(id int) chunkenc.Iterator { c := s.chunk(id) // TODO(fabxc): Work around! A querier may have retrieved a pointer to a series' chunk, // which got then garbage collected before it got accessed. // We must ensure to not garbage collect as long as any readers still hold a reference. if c == nil { - return chunks.NewNopIterator() + return chunkenc.NewNopIterator() } if id-s.firstChunkID < len(s.chunks)-1 { @@ -1326,12 +1278,12 @@ func (s *memSeries) head() *memChunk { } type memChunk struct { - chunk chunks.Chunk + chunk chunkenc.Chunk minTime, maxTime int64 } type memSafeIterator struct { - chunks.Iterator + chunkenc.Iterator i int total int @@ -1356,3 +1308,27 @@ func (it *memSafeIterator) At() (int64, float64) { s := it.buf[4-(it.total-it.i)] return s.t, s.v } + +type stringset map[string]struct{} + +func (ss stringset) set(s string) { + ss[s] = struct{}{} +} + +func (ss stringset) has(s string) bool { + _, ok := ss[s] + return ok +} + +func (ss stringset) String() string { + return strings.Join(ss.slice(), ",") +} + +func (ss stringset) slice() []string { + slice := make([]string, 0, len(ss)) + for k := range ss { + slice = append(slice, k) + } + sort.Strings(slice) + return slice +} diff --git a/head_test.go b/head_test.go index 4cc675442..b88f34a1e 100644 --- a/head_test.go +++ b/head_test.go @@ -22,7 +22,8 @@ import ( "testing" "github.com/pkg/errors" - "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/chunkenc" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -150,7 +151,7 @@ func TestHead_ReadWAL(t *testing.T) { testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) - expandChunk := func(c chunks.Iterator) (x []sample) { + expandChunk := func(c chunkenc.Iterator) (x []sample) { for c.Next() { t, v := c.At() x = append(x, sample{t: t, v: v}) @@ -210,12 +211,12 @@ func TestHead_Truncate(t *testing.T) { testutil.Assert(t, h.series.getByID(s3.ref) == nil, "") testutil.Assert(t, h.series.getByID(s4.ref) == nil, "") - postingsA1, _ := expandPostings(h.postings.get("a", "1")) - postingsA2, _ := expandPostings(h.postings.get("a", "2")) - postingsB1, _ := expandPostings(h.postings.get("b", "1")) - postingsB2, _ := expandPostings(h.postings.get("b", "2")) - postingsC1, _ := expandPostings(h.postings.get("c", "1")) - postingsAll, _ := expandPostings(h.postings.get("", "")) + postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1")) + postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2")) + postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1")) + postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2")) + postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1")) + postingsAll, _ := index.ExpandPostings(h.postings.Get("", "")) testutil.Equals(t, []uint64{s1.ref}, postingsA1) testutil.Equals(t, []uint64{s2.ref}, postingsA2) diff --git a/index/encoding_helpers.go b/index/encoding_helpers.go new file mode 100644 index 000000000..69e729791 --- /dev/null +++ b/index/encoding_helpers.go @@ -0,0 +1,179 @@ +package index + +import ( + "encoding/binary" + "hash" + "hash/crc32" + "unsafe" +) + +// enbuf is a helper type to populate a byte slice with various types. +type encbuf struct { + b []byte + c [binary.MaxVarintLen64]byte +} + +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } + +func (e *encbuf) putBE32(x uint32) { + binary.BigEndian.PutUint32(e.c[:], x) + e.b = append(e.b, e.c[:4]...) +} + +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) +} + +func (e *encbuf) putUvarint64(x uint64) { + n := binary.PutUvarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +func (e *encbuf) putVarint64(x int64) { + n := binary.PutVarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.putUvarint(len(b)) + e.putString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *encbuf) putHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.b) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.b = h.Sum(e.b) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type decbuf struct { + b []byte + e error +} + +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } + +// crc32 returns a CRC32 checksum over the remaining bytes. +func (d *decbuf) crc32() uint32 { + return crc32.Checksum(d.b, castagnoliTable) +} + +func (d *decbuf) uvarintStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := string(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) varint64() int64 { + if d.e != nil { + return 0 + } + x, n := binary.Varint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) uvarint64() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) be64() uint64 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.b) + d.b = d.b[8:] + return x +} + +func (d *decbuf) be32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) byte() byte { + if d.e != nil { + return 0 + } + if len(d.b) < 1 { + d.e = errInvalidSize + return 0 + } + x := d.b[0] + d.b = d.b[1:] + return x +} + +func (d *decbuf) decbuf(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: errInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { return d.e } +func (d *decbuf) len() int { return len(d.b) } +func (d *decbuf) get() []byte { return d.b } diff --git a/index.go b/index/index.go similarity index 79% rename from index.go rename to index/index.go index 6895c16f4..dc84386bd 100644 --- a/index.go +++ b/index/index.go @@ -11,13 +11,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package index import ( "bufio" "encoding/binary" "fmt" "hash" + "hash/crc32" "io" "math" "os" @@ -26,6 +27,7 @@ import ( "strings" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/fileutil" "github.com/prometheus/tsdb/labels" ) @@ -35,18 +37,12 @@ const ( MagicIndex = 0xBAAAD700 indexFormatV1 = 1 - - size_unit = 4 ) -const indexFilename = "index" - -const compactionPageBytes = minSectorSize * 64 - type indexWriterSeries struct { labels labels.Labels - chunks []ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference + chunks []chunks.Meta // series file offset of chunks + offset uint32 // index file offset of series reference } type indexWriterSeriesSlice []*indexWriterSeries @@ -87,37 +83,24 @@ func (s indexWriterStage) String() string { return "" } -// IndexWriter serializes the index for a block of series data. -// The methods must be called in the order they are specified in. -type IndexWriter interface { - // AddSymbols registers all string symbols that are encountered in series - // and other indices. - AddSymbols(sym map[string]struct{}) error +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table - // AddSeries populates the index writer with a series and its offsets - // of chunks that the index can reference. - // Implementations may require series to be insert in increasing order by - // their labels. - // The reference numbers are used to resolve entries in postings lists that - // are added later. - AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} - // WriteLabelIndex serializes an index from label names to values. - // The passed in values chained tuples of strings of the length of names. - WriteLabelIndex(names []string, values []string) error - - // WritePostings writes a postings list for a single label pair. - // The Postings here contain refs to the series that were added. - WritePostings(name, value string, it Postings) error - - // Close writes any finalization and closes the resources associated with - // the underlying writer. - Close() error +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) } // indexWriter implements the IndexWriter interface for the standard // serialization format. -type indexWriter struct { +type Writer struct { f *os.File fbuf *bufio.Writer pos uint64 @@ -150,14 +133,17 @@ type indexTOC struct { postingsTable uint64 } -func newIndexWriter(dir string) (*indexWriter, error) { +// NewWriter returns a new Writer to the given filename. +func NewWriter(fn string) (*Writer, error) { + dir := filepath.Dir(fn) + df, err := fileutil.OpenDir(dir) if err != nil { return nil, err } defer df.Close() // close for flatform windows - f, err := os.OpenFile(filepath.Join(dir, indexFilename), os.O_CREATE|os.O_WRONLY, 0666) + f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err } @@ -165,7 +151,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { return nil, errors.Wrap(err, "sync dir") } - iw := &indexWriter{ + iw := &Writer{ f: f, fbuf: bufio.NewWriterSize(f, 1<<22), pos: 0, @@ -187,7 +173,7 @@ func newIndexWriter(dir string) (*indexWriter, error) { return iw, nil } -func (w *indexWriter) write(bufs ...[]byte) error { +func (w *Writer) write(bufs ...[]byte) error { for _, b := range bufs { n, err := w.fbuf.Write(b) w.pos += uint64(n) @@ -206,18 +192,18 @@ func (w *indexWriter) write(bufs ...[]byte) error { } // addPadding adds zero byte padding until the file size is a multiple size_unit. -func (w *indexWriter) addPadding() error { - p := w.pos % size_unit +func (w *Writer) addPadding(size int) error { + p := w.pos % uint64(size) if p == 0 { return nil } - p = size_unit - p + p = uint64(size) - p return errors.Wrap(w.write(make([]byte, p)), "add padding") } // ensureStage handles transitions between write stages and ensures that IndexWriter // methods are called in an order valid for the implementation. -func (w *indexWriter) ensureStage(s indexWriterStage) error { +func (w *Writer) ensureStage(s indexWriterStage) error { if w.stage == s { return nil } @@ -256,7 +242,7 @@ func (w *indexWriter) ensureStage(s indexWriterStage) error { return nil } -func (w *indexWriter) writeMeta() error { +func (w *Writer) writeMeta() error { w.buf1.reset() w.buf1.putBE32(MagicIndex) w.buf1.putByte(indexFormatV1) @@ -264,7 +250,7 @@ func (w *indexWriter) writeMeta() error { return w.write(w.buf1.get()) } -func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error { +func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta) error { if err := w.ensureStage(idxStageSeries); err != nil { return err } @@ -328,7 +314,7 @@ func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkM return nil } -func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { +func (w *Writer) AddSymbols(sym map[string]struct{}) error { if err := w.ensureStage(idxStageSymbols); err != nil { return err } @@ -361,7 +347,7 @@ func (w *indexWriter) AddSymbols(sym map[string]struct{}) error { return errors.Wrap(err, "write symbols") } -func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { +func (w *Writer) WriteLabelIndex(names []string, values []string) error { if len(values)%len(names) != 0 { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) } @@ -369,14 +355,14 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { return errors.Wrap(err, "ensure stage") } - valt, err := newStringTuples(values, len(names)) + valt, err := NewStringTuples(values, len(names)) if err != nil { return err } sort.Sort(valt) // Align beginning to 4 bytes for more efficient index list scans. - if err := w.addPadding(); err != nil { + if err := w.addPadding(4); err != nil { return err } @@ -407,7 +393,7 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } // writeOffsetTable writes a sequence of readable hash entries. -func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { +func (w *Writer) writeOffsetTable(entries []hashEntry) error { w.buf2.reset() w.buf2.putBE32int(len(entries)) @@ -428,7 +414,7 @@ func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { const indexTOCLen = 6*8 + 4 -func (w *indexWriter) writeTOC() error { +func (w *Writer) writeTOC() error { w.buf1.reset() w.buf1.putBE64(w.toc.symbols) @@ -443,13 +429,13 @@ func (w *indexWriter) writeTOC() error { return w.write(w.buf1.get()) } -func (w *indexWriter) WritePostings(name, value string, it Postings) error { +func (w *Writer) WritePostings(name, value string, it Postings) error { if err := w.ensureStage(idxStagePostings); err != nil { return errors.Wrap(err, "ensure stage") } // Align beginning to 4 bytes for more efficient postings list scans. - if err := w.addPadding(); err != nil { + if err := w.addPadding(4); err != nil { return err } @@ -506,7 +492,7 @@ type hashEntry struct { offset uint64 } -func (w *indexWriter) Close() error { +func (w *Writer) Close() error { if err := w.ensureStage(idxStageDone); err != nil { return err } @@ -519,37 +505,6 @@ func (w *indexWriter) Close() error { return w.f.Close() } -// IndexReader provides reading access of serialized index data. -type IndexReader interface { - // Symbols returns a set of string symbols that may occur in series' labels - // and indices. - Symbols() (map[string]struct{}, error) - - // LabelValues returns the possible label values - LabelValues(names ...string) (StringTuples, error) - - // Postings returns the postings list iterator for the label pair. - // The Postings here contain the offsets to the series inside the index. - // Found IDs are not strictly required to point to a valid Series, e.g. during - // background garbage collections. - Postings(name, value string) (Postings, error) - - // SortedPostings returns a postings list that is reordered to be sorted - // by the label set of the underlying series. - SortedPostings(Postings) Postings - - // Series populates the given labels and chunk metas for the series identified - // by the reference. - // Returns ErrNotFound if the ref does not resolve to a known series. - Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error - - // LabelIndices returns the label pairs for which indices exist. - LabelIndices() ([][]string, error) - - // Close released the underlying resources of the reader. - Close() error -} - // StringTuples provides access to a sorted list of string tuples. type StringTuples interface { // Total number of tuples in the list. @@ -558,7 +513,7 @@ type StringTuples interface { At(i int) ([]string, error) } -type indexReader struct { +type Reader struct { // The underlying byte slice holding the encoded series data. b ByteSlice toc indexTOC @@ -605,22 +560,22 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { return b[start:end] } -// NewIndexReader returns a new IndexReader on the given byte slice. -func NewIndexReader(b ByteSlice) (IndexReader, error) { - return newIndexReader(b, nil) +// NewReader returns a new IndexReader on the given byte slice. +func NewReader(b ByteSlice) (*Reader, error) { + return newReader(b, nil) } -// NewFileIndexReader returns a new index reader against the given index file. -func NewFileIndexReader(path string) (IndexReader, error) { - f, err := openMmapFile(path) +// NewFileReader returns a new index reader against the given index file. +func NewFileReader(path string) (*Reader, error) { + f, err := fileutil.OpenMmapFile(path) if err != nil { return nil, err } - return newIndexReader(realByteSlice(f.b), f) + return newReader(realByteSlice(f.Bytes()), f) } -func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { - r := &indexReader{ +func newReader(b ByteSlice, c io.Closer) (*Reader, error) { + r := &Reader{ b: b, c: c, symbols: map[uint32]string{}, @@ -650,7 +605,7 @@ func newIndexReader(b ByteSlice, c io.Closer) (*indexReader, error) { return r, errors.Wrap(err, "read postings table") } -func (r *indexReader) readTOC() error { +func (r *Reader) readTOC() error { if r.b.Len() < indexTOCLen { return errInvalidSize } @@ -676,7 +631,7 @@ func (r *indexReader) readTOC() error { // decbufAt returns a new decoding buffer. It expects the first 4 bytes // after offset to hold the big endian encoded content length, followed by the contents and the expected // checksum. -func (r *indexReader) decbufAt(off int) decbuf { +func (r *Reader) decbufAt(off int) decbuf { if r.b.Len() < off+4 { return decbuf{e: errInvalidSize} } @@ -700,7 +655,7 @@ func (r *indexReader) decbufAt(off int) decbuf { // decbufUvarintAt returns a new decoding buffer. It expects the first bytes // after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected // checksum. -func (r *indexReader) decbufUvarintAt(off int) decbuf { +func (r *Reader) decbufUvarintAt(off int) decbuf { // We never have to access this method at the far end of the byte slice. Thus just checking // against the MaxVarintLen32 is sufficient. if r.b.Len() < off+binary.MaxVarintLen32 { @@ -730,7 +685,7 @@ func (r *indexReader) decbufUvarintAt(off int) decbuf { // readSymbols reads the symbol table fully into memory and allocates proper strings for them. // Strings backed by the mmap'd memory would cause memory faults if applications keep using them // after the reader is closed. -func (r *indexReader) readSymbols(off int) error { +func (r *Reader) readSymbols(off int) error { if off == 0 { return nil } @@ -754,7 +709,7 @@ func (r *indexReader) readSymbols(off int) error { // readOffsetTable reads an offset table at the given position and returns a map // with the key strings concatenated by the 0xff unicode non-character. -func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { +func (r *Reader) readOffsetTable(off uint64) (map[string]uint32, error) { const sep = "\xff" d := r.decbufAt(int(off)) @@ -776,11 +731,11 @@ func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { return res, d.err() } -func (r *indexReader) Close() error { +func (r *Reader) Close() error { return r.c.Close() } -func (r *indexReader) lookupSymbol(o uint32) (string, error) { +func (r *Reader) lookupSymbol(o uint32) (string, error) { s, ok := r.symbols[o] if !ok { return "", errors.Errorf("unknown symbol offset %d", o) @@ -788,7 +743,7 @@ func (r *indexReader) lookupSymbol(o uint32) (string, error) { return s, nil } -func (r *indexReader) Symbols() (map[string]struct{}, error) { +func (r *Reader) Symbols() (map[string]struct{}, error) { res := make(map[string]struct{}, len(r.symbols)) for _, s := range r.symbols { @@ -797,7 +752,7 @@ func (r *indexReader) Symbols() (map[string]struct{}, error) { return res, nil } -func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { +func (r *Reader) LabelValues(names ...string) (StringTuples, error) { const sep = "\xff" key := strings.Join(names, sep) @@ -830,7 +785,7 @@ type emptyStringTuples struct{} func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } -func (r *indexReader) LabelIndices() ([][]string, error) { +func (r *Reader) LabelIndices() ([][]string, error) { const sep = "\xff" res := [][]string{} @@ -841,7 +796,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) { return res, nil } -func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error { +func (r *Reader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks.Meta) error { d := r.decbufUvarintAt(int(ref)) *lbls = (*lbls)[:0] @@ -880,7 +835,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) maxt := int64(d.uvarint64()) + t0 ref0 := int64(d.uvarint64()) - *chks = append(*chks, ChunkMeta{ + *chks = append(*chks, chunks.Meta{ Ref: uint64(ref0), MinTime: t0, MaxTime: maxt, @@ -898,7 +853,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) return errors.Wrapf(d.err(), "read meta for chunk %d", i) } - *chks = append(*chks, ChunkMeta{ + *chks = append(*chks, chunks.Meta{ Ref: uint64(ref0), MinTime: mint, MaxTime: maxt, @@ -907,7 +862,7 @@ func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) return d.err() } -func (r *indexReader) Postings(name, value string) (Postings, error) { +func (r *Reader) Postings(name, value string) (Postings, error) { const sep = "\xff" key := strings.Join([]string{name, value}, sep) @@ -921,7 +876,7 @@ func (r *indexReader) Postings(name, value string) (Postings, error) { return newBigEndianPostings(d.get()), errors.Wrap(d.err(), "get postings bytes") } -func (r *indexReader) SortedPostings(p Postings) Postings { +func (r *Reader) SortedPostings(p Postings) Postings { return p } @@ -930,7 +885,7 @@ type stringTuples struct { s []string // flattened tuple entries } -func newStringTuples(s []string, l int) (*stringTuples, error) { +func NewStringTuples(s []string, l int) (*stringTuples, error) { if len(s)%l != 0 { return nil, errors.Wrap(errInvalidSize, "string tuple list") } diff --git a/index_test.go b/index/index_test.go similarity index 77% rename from index_test.go rename to index/index_test.go index 4c272f897..f4ba813f3 100644 --- a/index_test.go +++ b/index/index_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package index import ( "io/ioutil" @@ -20,8 +20,12 @@ import ( "path/filepath" "sort" "testing" + "unsafe" "github.com/pkg/errors" + promlabels "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" @@ -29,13 +33,13 @@ import ( type series struct { l labels.Labels - chunks []ChunkMeta + chunks []chunks.Meta } type mockIndex struct { series map[uint64]series labelIndex map[string][]string - postings *memPostings + postings map[labels.Label][]uint64 symbols map[string]struct{} } @@ -43,11 +47,9 @@ func newMockIndex() mockIndex { ix := mockIndex{ series: make(map[uint64]series), labelIndex: make(map[string][]string), - postings: newMemPostings(), + postings: make(map[labels.Label][]uint64), symbols: make(map[string]struct{}), } - ix.postings.ensureOrder() - return ix } @@ -55,7 +57,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) { return m.symbols, nil } -func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error { +func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error { if _, ok := m.series[ref]; ok { return errors.Errorf("series with reference %d already added", ref) } @@ -80,23 +82,22 @@ func (m mockIndex) WriteLabelIndex(names []string, values []string) error { if len(names) != 1 { return errors.New("composite indexes not supported yet") } - sort.Strings(values) m.labelIndex[names[0]] = values return nil } func (m mockIndex) WritePostings(name, value string, it Postings) error { - if _, ok := m.postings.m[labels.Label{name, value}]; ok { - return errors.Errorf("postings for %s=%q already added", name, value) + l := labels.Label{Name: name, Value: value} + if _, ok := m.postings[l]; ok { + return errors.Errorf("postings for %s already added", l) } - ep, err := expandPostings(it) + ep, err := ExpandPostings(it) if err != nil { return err } - m.postings.m[labels.Label{name, value}] = ep - - return it.Err() + m.postings[l] = ep + return nil } func (m mockIndex) Close() error { @@ -109,29 +110,30 @@ func (m mockIndex) LabelValues(names ...string) (StringTuples, error) { return nil, errors.New("composite indexes not supported yet") } - return newStringTuples(m.labelIndex[names[0]], 1) + return NewStringTuples(m.labelIndex[names[0]], 1) } func (m mockIndex) Postings(name, value string) (Postings, error) { - return m.postings.get(name, value), nil + l := labels.Label{Name: name, Value: value} + return NewListPostings(m.postings[l]), nil } func (m mockIndex) SortedPostings(p Postings) Postings { - ep, err := expandPostings(p) + ep, err := ExpandPostings(p) if err != nil { - return errPostings{err: errors.Wrap(err, "expand postings")} + return ErrPostings(errors.Wrap(err, "expand postings")) } sort.Slice(ep, func(i, j int) bool { return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0 }) - return newListPostings(ep) + return NewListPostings(ep) } -func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error { +func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { s, ok := m.series[ref] if !ok { - return ErrNotFound + return errors.New("not found") } *lset = append((*lset)[:0], s.l...) *chks = append((*chks)[:0], s.chunks...) @@ -154,22 +156,24 @@ func TestIndexRW_Create_Open(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) + fn := filepath.Join(dir, "index") + // An empty index must still result in a readable file. - iw, err := newIndexWriter(dir) + iw, err := NewWriter(fn) testutil.Ok(t, err) testutil.Ok(t, iw.Close()) - ir, err := NewFileIndexReader(filepath.Join(dir, "index")) + ir, err := NewFileReader(fn) testutil.Ok(t, err) testutil.Ok(t, ir.Close()) // Modify magic header must cause open to fail. - f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_WRONLY, 0666) + f, err := os.OpenFile(fn, os.O_WRONLY, 0666) testutil.Ok(t, err) _, err = f.WriteAt([]byte{0, 0}, 0) testutil.Ok(t, err) - _, err = NewFileIndexReader(dir) + _, err = NewFileReader(dir) testutil.NotOk(t, err) } @@ -178,7 +182,9 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, err) defer os.RemoveAll(dir) - iw, err := newIndexWriter(dir) + fn := filepath.Join(dir, "index") + + iw, err := NewWriter(fn) testutil.Ok(t, err) series := []labels.Labels{ @@ -210,14 +216,14 @@ func TestIndexRW_Postings(t *testing.T) { testutil.Ok(t, iw.Close()) - ir, err := NewFileIndexReader(filepath.Join(dir, "index")) + ir, err := NewFileReader(fn) testutil.Ok(t, err) p, err := ir.Postings("a", "1") testutil.Ok(t, err) var l labels.Labels - var c []ChunkMeta + var c []chunks.Meta for i := 0; p.Next(); i++ { err := ir.Series(p.At(), &l, &c) @@ -254,14 +260,14 @@ func TestPersistence_index_e2e(t *testing.T) { // Generate ChunkMetas for every label set. for i, lset := range lbls { - var metas []ChunkMeta + var metas []chunks.Meta for j := 0; j <= (i % 20); j++ { - metas = append(metas, ChunkMeta{ + metas = append(metas, chunks.Meta{ MinTime: int64(j * 10000), MaxTime: int64((j + 1) * 10000), Ref: rand.Uint64(), - Chunk: chunks.NewXORChunk(), + Chunk: chunkenc.NewXORChunk(), }) } input = append(input, &indexWriterSeries{ @@ -270,17 +276,16 @@ func TestPersistence_index_e2e(t *testing.T) { }) } - iw, err := newIndexWriter(dir) + iw, err := NewWriter(filepath.Join(dir, "index")) testutil.Ok(t, err) testutil.Ok(t, iw.AddSymbols(symbols)) // Population procedure as done by compaction. var ( - postings = newMemPostings() - values = map[string]stringset{} + postings = NewMemPostings() + values = map[string]map[string]struct{}{} ) - postings.ensureOrder() mi := newMockIndex() @@ -292,12 +297,12 @@ func TestPersistence_index_e2e(t *testing.T) { for _, l := range s.labels { valset, ok := values[l.Name] if !ok { - valset = stringset{} + valset = map[string]struct{}{} values[l.Name] = valset } - valset.set(l.Value) + valset[l.Value] = struct{}{} } - postings.add(uint64(i), s.labels) + postings.Add(uint64(i), s.labels) i++ } @@ -328,14 +333,14 @@ func TestPersistence_index_e2e(t *testing.T) { ir, err := NewFileIndexReader(filepath.Join(dir, "index")) testutil.Ok(t, err) - for p := range mi.postings.m { + for p := range mi.postings { gotp, err := ir.Postings(p.Name, p.Value) testutil.Ok(t, err) expp, err := mi.Postings(p.Name, p.Value) var lset, explset labels.Labels - var chks, expchks []ChunkMeta + var chks, expchks []chunks.Meta for gotp.Next() { testutil.Assert(t, expp.Next() == true, "") @@ -374,3 +379,41 @@ func TestPersistence_index_e2e(t *testing.T) { testutil.Ok(t, ir.Close()) } + +func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { + f, err := os.Open(fn) + if err != nil { + return nil, err + } + defer f.Close() + + b, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + + p := textparse.New(b) + i := 0 + var mets []labels.Labels + hashes := map[uint64]struct{}{} + + for p.Next() && i < n { + m := make(labels.Labels, 0, 10) + p.Metric((*promlabels.Labels)(unsafe.Pointer(&m))) + + h := m.Hash() + if _, ok := hashes[h]; ok { + continue + } + mets = append(mets, m) + hashes[h] = struct{}{} + i++ + } + if err := p.Err(); err != nil { + return nil, err + } + if i != n { + return mets, errors.Errorf("requested %d metrics but found %d", n, i) + } + return mets, nil +} diff --git a/postings.go b/index/postings.go similarity index 78% rename from postings.go rename to index/postings.go index 200917e13..bc63b4dac 100644 --- a/postings.go +++ b/index/postings.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package index import ( "encoding/binary" @@ -23,35 +23,35 @@ import ( "github.com/prometheus/tsdb/labels" ) -// memPostings holds postings list for series ID per label pair. They may be written +// MemPostings holds postings list for series ID per label pair. They may be written // to out of order. // ensureOrder() must be called once before any reads are done. This allows for quick // unordered batch fills on startup. -type memPostings struct { +type MemPostings struct { mtx sync.RWMutex m map[labels.Label][]uint64 ordered bool } -// newMemPoistings returns a memPostings that's ready for reads and writes. -func newMemPostings() *memPostings { - return &memPostings{ +// NewMemPostings returns a memPostings that's ready for reads and writes. +func NewMemPostings() *MemPostings { + return &MemPostings{ m: make(map[labels.Label][]uint64, 512), ordered: true, } } -// newUnorderedMemPostings returns a memPostings that is not safe to be read from +// NewUnorderedMemPostings returns a memPostings that is not safe to be read from // until ensureOrder was called once. -func newUnorderedMemPostings() *memPostings { - return &memPostings{ +func NewUnorderedMemPostings() *MemPostings { + return &MemPostings{ m: make(map[labels.Label][]uint64, 512), ordered: false, } } -// sortedKeys returns a list of sorted label keys of the postings. -func (p *memPostings) sortedKeys() []labels.Label { +// SortedKeys returns a list of sorted label keys of the postings. +func (p *MemPostings) SortedKeys() []labels.Label { p.mtx.RLock() keys := make([]labels.Label, 0, len(p.m)) @@ -69,23 +69,28 @@ func (p *memPostings) sortedKeys() []labels.Label { return keys } -// Postings returns an iterator over the postings list for s. -func (p *memPostings) get(name, value string) Postings { +// Get returns a postings list for the given label pair. +func (p *MemPostings) Get(name, value string) Postings { p.mtx.RLock() l := p.m[labels.Label{Name: name, Value: value}] p.mtx.RUnlock() if l == nil { - return emptyPostings + return EmptyPostings() } return newListPostings(l) } +// All returns a postings list over all documents ever added. +func (p *MemPostings) All() Postings { + return p.Get(allPostingsKey.Name, allPostingsKey.Value) +} + var allPostingsKey = labels.Label{} -// ensurePostings ensures that all postings lists are sorted. After it returns all further +// EnsureOrder ensures that all postings lists are sorted. After it returns all further // calls to add and addFor will insert new IDs in a sorted manner. -func (p *memPostings) ensureOrder() { +func (p *MemPostings) EnsureOrder() { p.mtx.Lock() defer p.mtx.Unlock() @@ -117,9 +122,61 @@ func (p *memPostings) ensureOrder() { p.ordered = true } -// add adds a document to the index. The caller has to ensure that no -// term argument appears twice. -func (p *memPostings) add(id uint64, lset labels.Labels) { +// Delete removes all ids in the given map from the postings lists. +func (p *MemPostings) Delete(deleted map[uint64]struct{}) { + var keys []labels.Label + + p.mtx.RLock() + for l := range p.m { + keys = append(keys, l) + } + p.mtx.RUnlock() + + for _, l := range keys { + p.mtx.Lock() + + found := false + for _, id := range p.m[l] { + if _, ok := deleted[id]; ok { + found = true + break + } + } + if !found { + p.mtx.Unlock() + continue + } + repl := make([]uint64, 0, len(p.m[l])) + + for _, id := range p.m[l] { + if _, ok := deleted[id]; !ok { + repl = append(repl, id) + } + } + if len(repl) > 0 { + p.m[l] = repl + } else { + delete(p.m, l) + } + p.mtx.Unlock() + } +} + +// Iter calls f for each postings list. It aborts if f returns an error and returns it. +func (p *MemPostings) Iter(f func(labels.Label, Postings) error) error { + p.mtx.RLock() + defer p.mtx.RUnlock() + + for l, p := range p.m { + if err := f(l, newListPostings(p)); err != nil { + return err + } + } + return nil +} + +// Add a label set to the postings index. +func (p *MemPostings) Add(id uint64, lset labels.Labels) { p.mtx.Lock() for _, l := range lset { @@ -130,7 +187,7 @@ func (p *memPostings) add(id uint64, lset labels.Labels) { p.mtx.Unlock() } -func (p *memPostings) addFor(id uint64, l labels.Label) { +func (p *MemPostings) addFor(id uint64, l labels.Label) { list := append(p.m[l], id) p.m[l] = list @@ -149,7 +206,8 @@ func (p *memPostings) addFor(id uint64, l labels.Label) { } } -func expandPostings(p Postings) (res []uint64, err error) { +// ExpandPostings returns the postings expanded as a slice. +func ExpandPostings(p Postings) (res []uint64, err error) { for p.Next() { res = append(res, p.At()) } @@ -189,6 +247,11 @@ func EmptyPostings() Postings { return emptyPostings } +// ErrPostings returns new postings that immediately error. +func ErrPostings(err error) Postings { + return errPostings{err} +} + // Intersect returns a new postings list over the intersection of the // input postings. func Intersect(its ...Postings) Postings { @@ -340,6 +403,12 @@ func (it *mergedPostings) Err() error { return it.b.Err() } +// Without returns a new postings list that contains all elements from the full list that +// are not in the drop list +func Without(full, drop Postings) Postings { + return newRemovedPostings(full, drop) +} + type removedPostings struct { full, remove Postings @@ -420,6 +489,10 @@ type listPostings struct { cur uint64 } +func NewListPostings(list []uint64) Postings { + return newListPostings(list) +} + func newListPostings(list []uint64) *listPostings { return &listPostings{list: list} } @@ -508,27 +581,3 @@ func (it *bigEndianPostings) Seek(x uint64) bool { func (it *bigEndianPostings) Err() error { return nil } - -type stringset map[string]struct{} - -func (ss stringset) set(s string) { - ss[s] = struct{}{} -} - -func (ss stringset) has(s string) bool { - _, ok := ss[s] - return ok -} - -func (ss stringset) String() string { - return strings.Join(ss.slice(), ",") -} - -func (ss stringset) slice() []string { - slice := make([]string, 0, len(ss)) - for k := range ss { - slice = append(slice, k) - } - sort.Strings(slice) - return slice -} diff --git a/postings_test.go b/index/postings_test.go similarity index 95% rename from postings_test.go rename to index/postings_test.go index abaf1b054..68553e357 100644 --- a/postings_test.go +++ b/index/postings_test.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package tsdb +package index import ( "encoding/binary" @@ -25,7 +25,7 @@ import ( ) func TestMemPostings_addFor(t *testing.T) { - p := newMemPostings() + p := NewMemPostings() p.m[allPostingsKey] = []uint64{1, 2, 3, 4, 6, 7, 8} p.addFor(5, allPostingsKey) @@ -34,7 +34,7 @@ func TestMemPostings_addFor(t *testing.T) { } func TestMemPostings_ensureOrder(t *testing.T) { - p := newUnorderedMemPostings() + p := NewUnorderedMemPostings() for i := 0; i < 100; i++ { l := make([]uint64, 100) @@ -46,7 +46,7 @@ func TestMemPostings_ensureOrder(t *testing.T) { p.m[labels.Label{"a", v}] = l } - p.ensureOrder() + p.EnsureOrder() for _, l := range p.m { ok := sort.SliceIsSorted(l, func(i, j int) bool { @@ -100,7 +100,7 @@ func TestIntersect(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - res, err := expandPostings(Intersect(a, b)) + res, err := ExpandPostings(Intersect(a, b)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) } @@ -140,7 +140,7 @@ func TestMultiIntersect(t *testing.T) { ps = append(ps, newListPostings(postings)) } - res, err := expandPostings(Intersect(ps...)) + res, err := ExpandPostings(Intersect(ps...)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) @@ -174,7 +174,7 @@ func BenchmarkIntersect(t *testing.B) { t.ResetTimer() for i := 0; i < t.N; i++ { - if _, err := expandPostings(Intersect(i1, i2, i3, i4)); err != nil { + if _, err := ExpandPostings(Intersect(i1, i2, i3, i4)); err != nil { t.Fatal(err) } } @@ -198,7 +198,7 @@ func TestMultiMerge(t *testing.T) { i2 := newListPostings(c.b) i3 := newListPostings(c.c) - res, err := expandPostings(Merge(i1, i2, i3)) + res, err := ExpandPostings(Merge(i1, i2, i3)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) } @@ -230,7 +230,7 @@ func TestMergedPostings(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - res, err := expandPostings(newMergedPostings(a, b)) + res, err := ExpandPostings(newMergedPostings(a, b)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) } @@ -290,7 +290,7 @@ func TestMergedPostingsSeek(t *testing.T) { // After Seek(), At() should be called. if c.success { start := p.At() - lst, err := expandPostings(p) + lst, err := ExpandPostings(p) testutil.Ok(t, err) lst = append([]uint64{start}, lst...) @@ -347,7 +347,7 @@ func TestRemovedPostings(t *testing.T) { a := newListPostings(c.a) b := newListPostings(c.b) - res, err := expandPostings(newRemovedPostings(a, b)) + res, err := ExpandPostings(newRemovedPostings(a, b)) testutil.Ok(t, err) testutil.Equals(t, c.res, res) } @@ -431,7 +431,7 @@ func TestRemovedPostingsSeek(t *testing.T) { // After Seek(), At() should be called. if c.success { start := p.At() - lst, err := expandPostings(p) + lst, err := ExpandPostings(p) testutil.Ok(t, err) lst = append([]uint64{start}, lst...) @@ -527,7 +527,7 @@ func TestIntersectWithMerge(t *testing.T) { ) p := Intersect(a, b) - res, err := expandPostings(p) + res, err := ExpandPostings(p) testutil.Ok(t, err) testutil.Equals(t, []uint64{30}, res) diff --git a/querier.go b/querier.go index 145258b8e..48d7ba9c2 100644 --- a/querier.go +++ b/querier.go @@ -19,7 +19,9 @@ import ( "strings" "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" ) @@ -202,18 +204,17 @@ func (q *blockQuerier) Close() error { // PostingsForMatchers assembles a single postings iterator against the index reader // based on the given matchers. It returns a list of label names that must be manually // checked to not exist in series the postings list points to. -func PostingsForMatchers(index IndexReader, ms ...labels.Matcher) (Postings, error) { - var ( - its []Postings - ) +func PostingsForMatchers(ix IndexReader, ms ...labels.Matcher) (index.Postings, error) { + var its []index.Postings + for _, m := range ms { - it, err := postingsForMatcher(index, m) + it, err := postingsForMatcher(ix, m) if err != nil { return nil, err } its = append(its, it) } - return index.SortedPostings(Intersect(its...)), nil + return ix.SortedPostings(index.Intersect(its...)), nil } // tuplesByPrefix uses binary search to find prefix matches within ts. @@ -247,24 +248,24 @@ func tuplesByPrefix(m *labels.PrefixMatcher, ts StringTuples) ([]string, error) return matches, nil } -func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { +func postingsForMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { // If the matcher selects an empty value, it selects all the series which dont // have the label name set too. See: https://github.com/prometheus/prometheus/issues/3575 // and https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555 if m.Matches("") { - return postingsForUnsetLabelMatcher(index, m) + return postingsForUnsetLabelMatcher(ix, m) } // Fast-path for equal matching. if em, ok := m.(*labels.EqualMatcher); ok { - it, err := index.Postings(em.Name(), em.Value()) + it, err := ix.Postings(em.Name(), em.Value()) if err != nil { return nil, err } return it, nil } - tpls, err := index.LabelValues(m.Name()) + tpls, err := ix.LabelValues(m.Name()) if err != nil { return nil, err } @@ -289,24 +290,24 @@ func postingsForMatcher(index IndexReader, m labels.Matcher) (Postings, error) { } if len(res) == 0 { - return EmptyPostings(), nil + return index.EmptyPostings(), nil } - var rit []Postings + var rit []index.Postings for _, v := range res { - it, err := index.Postings(m.Name(), v) + it, err := ix.Postings(m.Name(), v) if err != nil { return nil, err } rit = append(rit, it) } - return Merge(rit...), nil + return index.Merge(rit...), nil } -func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings, error) { - tpls, err := index.LabelValues(m.Name()) +func postingsForUnsetLabelMatcher(ix IndexReader, m labels.Matcher) (index.Postings, error) { + tpls, err := ix.LabelValues(m.Name()) if err != nil { return nil, err } @@ -323,23 +324,22 @@ func postingsForUnsetLabelMatcher(index IndexReader, m labels.Matcher) (Postings } } - var rit []Postings + var rit []index.Postings for _, v := range res { - it, err := index.Postings(m.Name(), v) + it, err := ix.Postings(m.Name(), v) if err != nil { return nil, err } rit = append(rit, it) } - mrit := Merge(rit...) - allPostings, err := index.Postings(allPostingsKey.Name, allPostingsKey.Value) + allPostings, err := ix.Postings("", "") if err != nil { return nil, err } - return newRemovedPostings(allPostings, mrit), nil + return index.Without(allPostings, index.Merge(rit...)), nil } func mergeStrings(a, b []string) []string { @@ -458,19 +458,19 @@ func (s *mergedSeriesSet) Next() bool { // actual series itself. type ChunkSeriesSet interface { Next() bool - At() (labels.Labels, []ChunkMeta, Intervals) + At() (labels.Labels, []chunks.Meta, Intervals) Err() error } // baseChunkSeries loads the label set and chunk references for a postings // list from an index. It filters out series that have labels set that should be unset. type baseChunkSeries struct { - p Postings + p index.Postings index IndexReader tombstones TombstoneReader lset labels.Labels - chks []ChunkMeta + chks []chunks.Meta intervals Intervals err error } @@ -492,7 +492,7 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) }, nil } -func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { +func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { return s.lset, s.chks, s.intervals } @@ -500,14 +500,14 @@ func (s *baseChunkSeries) Err() error { return s.err } func (s *baseChunkSeries) Next() bool { var ( - lset labels.Labels - chunks []ChunkMeta - err error + lset labels.Labels + chkMetas []chunks.Meta + err error ) for s.p.Next() { ref := s.p.At() - if err := s.index.Series(ref, &lset, &chunks); err != nil { + if err := s.index.Series(ref, &lset, &chkMetas); err != nil { // Postings may be stale. Skip if no underlying series exists. if errors.Cause(err) == ErrNotFound { continue @@ -517,7 +517,7 @@ func (s *baseChunkSeries) Next() bool { } s.lset = lset - s.chks = chunks + s.chks = chkMetas s.intervals, err = s.tombstones.Get(s.p.At()) if err != nil { s.err = errors.Wrap(err, "get tombstones") @@ -526,7 +526,7 @@ func (s *baseChunkSeries) Next() bool { if len(s.intervals) > 0 { // Only those chunks that are not entirely deleted. - chks := make([]ChunkMeta, 0, len(s.chks)) + chks := make([]chunks.Meta, 0, len(s.chks)) for _, chk := range s.chks { if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) { chks = append(chks, chk) @@ -553,12 +553,12 @@ type populatedChunkSeries struct { mint, maxt int64 err error - chks []ChunkMeta + chks []chunks.Meta lset labels.Labels intervals Intervals } -func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) { +func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) { return s.lset, s.chks, s.intervals } @@ -651,7 +651,7 @@ func (s *blockSeriesSet) Err() error { return s.err } // time series data. type chunkSeries struct { labels labels.Labels - chunks []ChunkMeta // in-order chunk refs + chunks []chunks.Meta // in-order chunk refs mint, maxt int64 @@ -754,17 +754,17 @@ func (it *chainedSeriesIterator) Err() error { // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { - chunks []ChunkMeta + chunks []chunks.Meta i int - cur chunks.Iterator + cur chunkenc.Iterator maxt, mint int64 intervals Intervals } -func newChunkSeriesIterator(cs []ChunkMeta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator { it := cs[0].Chunk.Iterator() if len(dranges) > 0 { @@ -853,6 +853,46 @@ func (it *chunkSeriesIterator) Err() error { return it.cur.Err() } +// deletedIterator wraps an Iterator and makes sure any deleted metrics are not +// returned. +type deletedIterator struct { + it chunkenc.Iterator + + intervals Intervals +} + +func (it *deletedIterator) At() (int64, float64) { + return it.it.At() +} + +func (it *deletedIterator) Next() bool { +Outer: + for it.it.Next() { + ts, _ := it.it.At() + + for _, tr := range it.intervals { + if tr.inBounds(ts) { + continue Outer + } + + if ts > tr.Maxt { + it.intervals = it.intervals[1:] + continue + } + + return true + } + + return true + } + + return false +} + +func (it *deletedIterator) Err() error { + return it.it.Err() +} + type mockSeriesSet struct { next func() bool series func() Series diff --git a/querier_test.go b/querier_test.go index fee93d9af..2eb34f450 100644 --- a/querier_test.go +++ b/querier_test.go @@ -20,7 +20,10 @@ import ( "sort" "testing" + "github.com/pkg/errors" + "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" + "github.com/prometheus/tsdb/index" "github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/testutil" ) @@ -228,25 +231,25 @@ func createIdxChkReaders(tc []struct { return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 }) - postings := newMemPostings() - chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) + postings := index.NewMemPostings() + chkReader := mockChunkReader(make(map[uint64]chunkenc.Chunk)) lblIdx := make(map[string]stringset) mi := newMockIndex() for i, s := range tc { i = i + 1 // 0 is not a valid posting. - metas := make([]ChunkMeta, 0, len(s.chunks)) + metas := make([]chunks.Meta, 0, len(s.chunks)) for _, chk := range s.chunks { // Collisions can be there, but for tests, its fine. ref := rand.Uint64() - metas = append(metas, ChunkMeta{ + metas = append(metas, chunks.Meta{ MinTime: chk[0].t, MaxTime: chk[len(chk)-1].t, Ref: ref, }) - chunk := chunks.NewXORChunk() + chunk := chunkenc.NewXORChunk() app, _ := chunk.Appender() for _, smpl := range chk { app.Append(smpl.t, smpl.v) @@ -257,7 +260,7 @@ func createIdxChkReaders(tc []struct { ls := labels.FromMap(s.lset) mi.AddSeries(uint64(i), ls, metas...) - postings.add(uint64(i), ls) + postings.Add(uint64(i), ls) for _, l := range ls { vs, present := lblIdx[l.Name] @@ -273,9 +276,9 @@ func createIdxChkReaders(tc []struct { mi.WriteLabelIndex([]string{l}, vs.slice()) } - for l := range postings.m { - mi.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)) - } + postings.Iter(func(l labels.Label, p index.Postings) error { + return mi.WritePostings(l.Name, l.Value, p) + }) return mi, chkReader } @@ -660,7 +663,7 @@ Outer: func TestBaseChunkSeries(t *testing.T) { type refdSeries struct { lset labels.Labels - chunks []ChunkMeta + chunks []chunks.Meta ref uint64 } @@ -676,7 +679,7 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}}...), - chunks: []ChunkMeta{ + chunks: []chunks.Meta{ {Ref: 29}, {Ref: 45}, {Ref: 245}, {Ref: 123}, {Ref: 4232}, {Ref: 5344}, {Ref: 121}, }, @@ -684,19 +687,19 @@ func TestBaseChunkSeries(t *testing.T) { }, { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []ChunkMeta{ + chunks: []chunks.Meta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []ChunkMeta{{Ref: 8282}}, + chunks: []chunks.Meta{{Ref: 8282}}, ref: 1, }, { lset: labels.New([]labels.Label{{"b", "b"}}...), - chunks: []ChunkMeta{ + chunks: []chunks.Meta{ {Ref: 829}, {Ref: 239}, {Ref: 2349}, {Ref: 659}, {Ref: 269}, }, ref: 108, @@ -709,14 +712,14 @@ func TestBaseChunkSeries(t *testing.T) { series: []refdSeries{ { lset: labels.New([]labels.Label{{"a", "a"}, {"b", "b"}}...), - chunks: []ChunkMeta{ + chunks: []chunks.Meta{ {Ref: 82}, {Ref: 23}, {Ref: 234}, {Ref: 65}, {Ref: 26}, }, ref: 10, }, { lset: labels.New([]labels.Label{{"b", "c"}}...), - chunks: []ChunkMeta{{Ref: 8282}}, + chunks: []chunks.Meta{{Ref: 8282}}, ref: 3, }, }, @@ -732,7 +735,7 @@ func TestBaseChunkSeries(t *testing.T) { } bcs := &baseChunkSeries{ - p: newListPostings(tc.postings), + p: index.NewListPostings(tc.postings), index: mi, tombstones: EmptyTombstoneReader(), } @@ -763,20 +766,20 @@ type itSeries struct { func (s itSeries) Iterator() SeriesIterator { return s.si } func (s itSeries) Labels() labels.Labels { return labels.Labels{} } -func chunkFromSamples(s []sample) ChunkMeta { +func chunkFromSamples(s []sample) chunks.Meta { mint, maxt := int64(0), int64(0) if len(s) > 0 { mint, maxt = s[0].t, s[len(s)-1].t } - c := chunks.NewXORChunk() + c := chunkenc.NewXORChunk() ca, _ := c.Appender() for _, s := range s { ca.Append(s.t, s.v) } - return ChunkMeta{ + return chunks.Meta{ MinTime: mint, MaxTime: maxt, Chunk: c, @@ -941,7 +944,7 @@ func TestSeriesIterator(t *testing.T) { t.Run("Chunk", func(t *testing.T) { for _, tc := range itcases { - chkMetas := []ChunkMeta{ + chkMetas := []chunks.Meta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1012,7 +1015,7 @@ func TestSeriesIterator(t *testing.T) { seekcases2 := append(seekcases, extra...) for _, tc := range seekcases2 { - chkMetas := []ChunkMeta{ + chkMetas := []chunks.Meta{ chunkFromSamples(tc.a), chunkFromSamples(tc.b), chunkFromSamples(tc.c), @@ -1099,7 +1102,7 @@ func TestSeriesIterator(t *testing.T) { // Regression for: https://github.com/prometheus/tsdb/pull/97 func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { - chkMetas := []ChunkMeta{ + chkMetas := []chunks.Meta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}), chunkFromSamples([]sample{{4, 4}, {5, 5}}), @@ -1116,7 +1119,7 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) { // Regression when seeked chunks were still found via binary search and we always // skipped to the end when seeking a value in the current chunk. func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { - metas := []ChunkMeta{ + metas := []chunks.Meta{ chunkFromSamples([]sample{}), chunkFromSamples([]sample{{1, 2}, {3, 4}, {5, 6}, {7, 8}}), chunkFromSamples([]sample{}), @@ -1138,7 +1141,7 @@ func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) { // Regression when calling Next() with a time bounded to fit within two samples. // Seek gets called and advances beyond the max time, which was just accepted as a valid sample. func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { - metas := []ChunkMeta{ + metas := []chunks.Meta{ chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}), } @@ -1148,7 +1151,7 @@ func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) { func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})} - chunkMetas := [][]ChunkMeta{ + chunkMetas := [][]chunks.Meta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, {MinTime: 3, MaxTime: 4, Ref: 2}, @@ -1157,10 +1160,10 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { } cr := mockChunkReader( - map[uint64]chunks.Chunk{ - 1: chunks.NewXORChunk(), - 2: chunks.NewXORChunk(), - 3: chunks.NewXORChunk(), + map[uint64]chunkenc.Chunk{ + 1: chunkenc.NewXORChunk(), + 2: chunkenc.NewXORChunk(), + 3: chunkenc.NewXORChunk(), }, ) @@ -1180,7 +1183,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { testutil.Assert(t, p.Next() == false, "") // Test the case where 1 chunk could cause an unpopulated chunk to be returned. - chunkMetas = [][]ChunkMeta{ + chunkMetas = [][]chunks.Meta{ { {MinTime: 1, MaxTime: 2, Ref: 1}, }, @@ -1200,7 +1203,7 @@ func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) { type mockChunkSeriesSet struct { l []labels.Labels - cm [][]ChunkMeta + cm [][]chunks.Meta i int } @@ -1213,7 +1216,7 @@ func (m *mockChunkSeriesSet) Next() bool { return m.i < len(m.l) } -func (m *mockChunkSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) { +func (m *mockChunkSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) { return m.l[m.i], m.cm[m.i], nil } @@ -1279,3 +1282,198 @@ func BenchmarkMergedSeriesSet(b *testing.B) { } } } + +type mockChunkReader map[uint64]chunkenc.Chunk + +func (cr mockChunkReader) Chunk(id uint64) (chunkenc.Chunk, error) { + chk, ok := cr[id] + if ok { + return chk, nil + } + + return nil, errors.New("Chunk with ref not found") +} + +func (cr mockChunkReader) Close() error { + return nil +} + +func TestDeletedIterator(t *testing.T) { + chk := chunks.NewXORChunk() + app, err := chk.Appender() + testutil.Ok(t, err) + // Insert random stuff from (0, 1000). + act := make([]sample, 1000) + for i := 0; i < 1000; i++ { + act[i].t = int64(i) + act[i].v = rand.Float64() + app.Append(act[i].t, act[i].v) + } + + cases := []struct { + r Intervals + }{ + {r: Intervals{{1, 20}}}, + {r: Intervals{{1, 10}, {12, 20}, {21, 23}, {25, 30}}}, + {r: Intervals{{1, 10}, {12, 20}, {20, 30}}}, + {r: Intervals{{1, 10}, {12, 23}, {25, 30}}}, + {r: Intervals{{1, 23}, {12, 20}, {25, 30}}}, + {r: Intervals{{1, 23}, {12, 20}, {25, 3000}}}, + {r: Intervals{{0, 2000}}}, + {r: Intervals{{500, 2000}}}, + {r: Intervals{{0, 200}}}, + {r: Intervals{{1000, 20000}}}, + } + + for _, c := range cases { + i := int64(-1) + it := &deletedIterator{it: chk.Iterator(), intervals: c.r[:]} + ranges := c.r[:] + for it.Next() { + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.Maxt + 1 + ranges = ranges[1:] + } + } + + testutil.Assert(t, i < 1000 == true, "") + + ts, v := it.At() + testutil.Equals(t, act[i].t, ts) + testutil.Equals(t, act[i].v, v) + } + // There has been an extra call to Next(). + i++ + for _, tr := range ranges { + if tr.inBounds(i) { + i = tr.Maxt + 1 + ranges = ranges[1:] + } + } + + testutil.Assert(t, i < 1000 == false, "") + testutil.Ok(t, it.Err()) + } +} + +type series struct { + l labels.Labels + chunks []chunks.Meta +} + +type mockIndex struct { + series map[uint64]series + labelIndex map[string][]string + postings map[labels.Label][]uint64 + symbols map[string]struct{} +} + +func newMockIndex() mockIndex { + ix := mockIndex{ + series: make(map[uint64]series), + labelIndex: make(map[string][]string), + postings: make(map[labels.Label][]uint64), + symbols: make(map[string]struct{}), + } + return ix +} + +func (m mockIndex) Symbols() (map[string]struct{}, error) { + return m.symbols, nil +} + +func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error { + if _, ok := m.series[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) + } + for _, lbl := range l { + m.symbols[lbl.Name] = struct{}{} + m.symbols[lbl.Value] = struct{}{} + } + + s := series{l: l} + // Actual chunk data is not stored in the index. + for _, c := range chunks { + c.Chunk = nil + s.chunks = append(s.chunks, c) + } + m.series[ref] = s + + return nil +} + +func (m mockIndex) WriteLabelIndex(names []string, values []string) error { + // TODO support composite indexes + if len(names) != 1 { + return errors.New("composite indexes not supported yet") + } + sort.Strings(values) + m.labelIndex[names[0]] = values + return nil +} + +func (m mockIndex) WritePostings(name, value string, it index.Postings) error { + l := labels.Label{Name: name, Value: value} + if _, ok := m.postings[l]; ok { + return errors.Errorf("postings for %s already added", l) + } + ep, err := index.ExpandPostings(it) + if err != nil { + return err + } + m.postings[l] = ep + return nil +} + +func (m mockIndex) Close() error { + return nil +} + +func (m mockIndex) LabelValues(names ...string) (index.StringTuples, error) { + // TODO support composite indexes + if len(names) != 1 { + return nil, errors.New("composite indexes not supported yet") + } + + return index.NewStringTuples(m.labelIndex[names[0]], 1) +} + +func (m mockIndex) Postings(name, value string) (index.Postings, error) { + l := labels.Label{Name: name, Value: value} + return index.NewListPostings(m.postings[l]), nil +} + +func (m mockIndex) SortedPostings(p index.Postings) index.Postings { + ep, err := index.ExpandPostings(p) + if err != nil { + return index.ErrPostings(errors.Wrap(err, "expand postings")) + } + + sort.Slice(ep, func(i, j int) bool { + return labels.Compare(m.series[ep[i]].l, m.series[ep[j]].l) < 0 + }) + return index.NewListPostings(ep) +} + +func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error { + s, ok := m.series[ref] + if !ok { + return ErrNotFound + } + *lset = append((*lset)[:0], s.l...) + *chks = append((*chks)[:0], s.chunks...) + + return nil +} + +func (m mockIndex) LabelIndices() ([][]string, error) { + res := make([][]string, 0, len(m.labelIndex)) + + for k := range m.labelIndex { + res = append(res, []string{k}) + } + + return res, nil +}