Change series ID from uint32 to uint64

This commit is contained in:
Fabian Reinartz 2017-09-04 16:08:38 +02:00
parent af2c2f9674
commit 1ddedf2b30
13 changed files with 198 additions and 181 deletions

View file

@ -225,7 +225,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr ir := pb.indexr
// Choose only valid postings which have chunks in the time-range. // Choose only valid postings which have chunks in the time-range.
stones := map[uint32]Intervals{} stones := map[uint64]Intervals{}
var lset labels.Labels var lset labels.Labels
var chks []ChunkMeta var chks []ChunkMeta

View file

@ -477,9 +477,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
// We fully rebuild the postings list index from merged series. // We fully rebuild the postings list index from merged series.
var ( var (
postings = &memPostings{m: make(map[term][]uint32, 512)} postings = &memPostings{m: make(map[term][]uint64, 512)}
values = map[string]stringset{} values = map[string]stringset{}
i = uint32(0) i = uint64(0)
) )
if err := indexw.AddSymbols(allSymbols); err != nil { if err := indexw.AddSymbols(allSymbols); err != nil {
@ -568,9 +568,9 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
} }
} }
// Write a postings list containing all series. // Write a postings list containing all series.
all := make([]uint32, i) all := make([]uint64, i)
for i := range all { for i := range all {
all[i] = uint32(i) all[i] = uint64(i)
} }
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil { if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return errors.Wrap(err, "write 'all' postings") return errors.Wrap(err, "write 'all' postings")

60
head.go
View file

@ -55,11 +55,11 @@ type Head struct {
appendPool sync.Pool appendPool sync.Pool
minTime, maxTime int64 minTime, maxTime int64
lastSeriesID uint32 lastSeriesID uint64
// descs holds all chunk descs for the head block. Each chunk implicitly // descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID. // is assigned the index as its ID.
series map[uint32]*memSeries series map[uint64]*memSeries
// hashes contains a collision map of label set hashes of chunks // hashes contains a collision map of label set hashes of chunks
// to their chunk descs. // to their chunk descs.
hashes map[uint64][]*memSeries hashes map[uint64][]*memSeries
@ -178,11 +178,11 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) (
chunkRange: chunkRange, chunkRange: chunkRange,
minTime: math.MaxInt64, minTime: math.MaxInt64,
maxTime: math.MinInt64, maxTime: math.MinInt64,
series: map[uint32]*memSeries{}, series: map[uint64]*memSeries{},
hashes: map[uint64][]*memSeries{}, hashes: map[uint64][]*memSeries{},
values: map[string]stringset{}, values: map[string]stringset{},
symbols: map[string]struct{}{}, symbols: map[string]struct{}{},
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint64)},
tombstones: newEmptyTombstoneReader(), tombstones: newEmptyTombstoneReader(),
} }
h.metrics = newHeadMetrics(h, r) h.metrics = newHeadMetrics(h, r)
@ -201,7 +201,7 @@ func (h *Head) readWAL() error {
} }
samplesFunc := func(samples []RefSample) error { samplesFunc := func(samples []RefSample) error {
for _, s := range samples { for _, s := range samples {
ms, ok := h.series[uint32(s.Ref)] ms, ok := h.series[s.Ref]
if !ok { if !ok {
return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref) return errors.Errorf("unknown series reference %d; abort WAL restore", s.Ref)
} }
@ -424,13 +424,13 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
} }
var ( var (
refn = binary.BigEndian.Uint64(yoloBytes(ref)) refn = binary.BigEndian.Uint64(yoloBytes(ref))
id = uint32(refn) id = (refn << 1) >> 1
inTx = refn&(1<<63) != 0 inTx = refn&(1<<63) != 0
) )
// Distinguish between existing series and series created in // Distinguish between existing series and series created in
// this transaction. // this transaction.
if inTx { if inTx {
if id > uint32(len(a.newSeries)-1) { if id > uint64(len(a.newSeries)-1) {
return errors.Wrap(ErrNotFound, "transaction series ID too high") return errors.Wrap(ErrNotFound, "transaction series ID too high")
} }
// TODO(fabxc): we also have to validate here that the // TODO(fabxc): we also have to validate here that the
@ -527,7 +527,7 @@ func (a *headAppender) Commit() error {
total := uint64(len(a.samples)) total := uint64(len(a.samples))
for _, s := range a.samples { for _, s := range a.samples {
series, ok := a.head.series[uint32(s.Ref)] series, ok := a.head.series[s.Ref]
if !ok { if !ok {
return errors.Errorf("series with ID %d not found", s.Ref) return errors.Errorf("series with ID %d not found", s.Ref)
} }
@ -614,7 +614,7 @@ func (h *Head) gc() {
// Only data strictly lower than this timestamp must be deleted. // Only data strictly lower than this timestamp must be deleted.
mint := h.MinTime() mint := h.MinTime()
deletedHashes := map[uint64][]uint32{} deletedHashes := map[uint64][]uint64{}
h.mtx.RLock() h.mtx.RLock()
@ -630,7 +630,7 @@ func (h *Head) gc() {
} }
} }
deletedIDs := make(map[uint32]struct{}, len(deletedHashes)) deletedIDs := make(map[uint64]struct{}, len(deletedHashes))
h.mtx.RUnlock() h.mtx.RUnlock()
@ -639,7 +639,7 @@ func (h *Head) gc() {
for hash, ids := range deletedHashes { for hash, ids := range deletedHashes {
inIDs := func(id uint32) bool { inIDs := func(id uint64) bool {
for _, o := range ids { for _, o := range ids {
if o == id { if o == id {
return true return true
@ -675,7 +675,7 @@ func (h *Head) gc() {
} }
for t, p := range h.postings.m { for t, p := range h.postings.m {
repl := make([]uint32, 0, len(p)) repl := make([]uint64, 0, len(p))
for _, id := range p { for _, id := range p {
if _, ok := deletedIDs[id]; !ok { if _, ok := deletedIDs[id]; !ok {
@ -761,16 +761,32 @@ func (h *headChunkReader) Close() error {
return nil return nil
} }
// packChunkID packs a seriesID and a chunkID within it into a global 8 byte ID.
// It panicks if the seriesID exceeds 5 bytes or the chunk ID 3 bytes.
func packChunkID(seriesID, chunkID uint64) uint64 {
if seriesID > (1<<40)-1 {
panic("series ID exceeds 5 bytes")
}
if chunkID > (1<<24)-1 {
panic("chunk ID exceeds 3 bytes")
}
return (seriesID << 24) | chunkID
}
func unpackChunkID(id uint64) (seriesID, chunkID uint64) {
return id >> 24, (id << 40) >> 40
}
// Chunk returns the chunk for the reference number. // Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) { func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
h.head.mtx.RLock() h.head.mtx.RLock()
defer h.head.mtx.RUnlock() defer h.head.mtx.RUnlock()
s := h.head.series[uint32(ref>>32)] sid, cid := unpackChunkID(ref)
s := h.head.series[sid]
s.mtx.RLock() s.mtx.RLock()
cid := int((ref << 32) >> 32) c := s.chunk(int(cid))
c := s.chunk(cid)
s.mtx.RUnlock() s.mtx.RUnlock()
// Do not expose chunks that are outside of the specified range. // Do not expose chunks that are outside of the specified range.
@ -780,7 +796,7 @@ func (h *headChunkReader) Chunk(ref uint64) (chunks.Chunk, error) {
return &safeChunk{ return &safeChunk{
Chunk: c.chunk, Chunk: c.chunk,
s: s, s: s,
cid: cid, cid: int(cid),
}, nil }, nil
} }
@ -860,7 +876,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
h.head.mtx.RLock() h.head.mtx.RLock()
defer h.head.mtx.RUnlock() defer h.head.mtx.RUnlock()
ep := make([]uint32, 0, 1024) ep := make([]uint64, 0, 1024)
for p.Next() { for p.Next() {
ep = append(ep, p.At()) ep = append(ep, p.At())
@ -890,7 +906,7 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
} }
// Series returns the series for the given reference. // Series returns the series for the given reference.
func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
h.head.mtx.RLock() h.head.mtx.RLock()
defer h.head.mtx.RUnlock() defer h.head.mtx.RUnlock()
@ -913,7 +929,7 @@ func (h *headIndexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkM
*chks = append(*chks, ChunkMeta{ *chks = append(*chks, ChunkMeta{
MinTime: c.minTime, MinTime: c.minTime,
MaxTime: c.maxTime, MaxTime: c.maxTime,
Ref: (uint64(ref) << 32) | uint64(s.chunkID(i)), Ref: packChunkID(s.ref, uint64(s.chunkID(i))),
}) })
} }
@ -949,7 +965,7 @@ func (h *Head) create(hash uint64, lset labels.Labels) *memSeries {
h.metrics.series.Inc() h.metrics.series.Inc()
h.metrics.seriesCreated.Inc() h.metrics.seriesCreated.Inc()
id := atomic.AddUint32(&h.lastSeriesID, 1) id := atomic.AddUint64(&h.lastSeriesID, 1)
s := newMemSeries(lset, id, h.chunkRange) s := newMemSeries(lset, id, h.chunkRange)
h.series[id] = s h.series[id] = s
@ -983,7 +999,7 @@ type sample struct {
type memSeries struct { type memSeries struct {
mtx sync.RWMutex mtx sync.RWMutex
ref uint32 ref uint64
lset labels.Labels lset labels.Labels
chunks []*memChunk chunks []*memChunk
chunkRange int64 chunkRange int64
@ -1020,7 +1036,7 @@ func (s *memSeries) cut(mint int64) *memChunk {
return c return c
} }
func newMemSeries(lset labels.Labels, id uint32, chunkRange int64) *memSeries { func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries {
s := &memSeries{ s := &memSeries{
lset: lset, lset: lset,
ref: id, ref: id,

View file

@ -134,10 +134,10 @@ func TestHead_Truncate(t *testing.T) {
postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"})) postingsC1, _ := expandPostings(h.postings.get(term{"c", "1"}))
postingsAll, _ := expandPostings(h.postings.get(term{"", ""})) postingsAll, _ := expandPostings(h.postings.get(term{"", ""}))
require.Equal(t, []uint32{s1.ref}, postingsA1) require.Equal(t, []uint64{s1.ref}, postingsA1)
require.Equal(t, []uint32{s2.ref}, postingsA2) require.Equal(t, []uint64{s2.ref}, postingsA2)
require.Equal(t, []uint32{s1.ref, s2.ref}, postingsB1) require.Equal(t, []uint64{s1.ref, s2.ref}, postingsB1)
require.Equal(t, []uint32{s1.ref, s2.ref}, postingsAll) require.Equal(t, []uint64{s1.ref, s2.ref}, postingsAll)
require.Nil(t, postingsB2) require.Nil(t, postingsB2)
require.Nil(t, postingsC1) require.Nil(t, postingsC1)

View file

@ -99,7 +99,7 @@ type IndexWriter interface {
// their labels. // their labels.
// The reference numbers are used to resolve entries in postings lists that // The reference numbers are used to resolve entries in postings lists that
// are added later. // are added later.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values. // WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names. // The passed in values chained tuples of strings of the length of names.
@ -130,7 +130,7 @@ type indexWriter struct {
uint32s []uint32 uint32s []uint32
symbols map[string]uint32 // symbol offsets symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint32]uint64 // offsets of series seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []hashEntry // label index offsets labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets postings []hashEntry // postings lists offsets
@ -175,7 +175,7 @@ func newIndexWriter(dir string) (*indexWriter, error) {
// Caches. // Caches.
symbols: make(map[string]uint32, 1<<13), symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint32]uint64, 1<<16), seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(), crc32: newCRC32(),
} }
if err := iw.writeMeta(); err != nil { if err := iw.writeMeta(); err != nil {
@ -260,7 +260,7 @@ func (w *indexWriter) writeMeta() error {
return w.write(w.buf1.get()) return w.write(w.buf1.get())
} }
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil { if err := w.ensureStage(idxStageSeries); err != nil {
return err return err
} }
@ -457,7 +457,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if !ok { if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At()) return errors.Errorf("%p series for reference %d not found", w, it.At())
} }
refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out. if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
} }
if err := it.Err(); err != nil { if err := it.Err(); err != nil {
return err return err
@ -524,7 +527,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified // Series populates the given labels and chunk metas for the series identified
// by the reference. // by the reference.
Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
// LabelIndices returns the label pairs for which indices exist. // LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error) LabelIndices() ([][]string, error)
@ -740,7 +743,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
return res, nil return res, nil
} }
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error { func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref)) d1 := r.decbufAt(int(ref))
d2 := d1.decbuf(int(d1.uvarint())) d2 := d1.decbuf(int(d1.uvarint()))

View file

@ -33,7 +33,7 @@ type series struct {
} }
type mockIndex struct { type mockIndex struct {
series map[uint32]series series map[uint64]series
labelIndex map[string][]string labelIndex map[string][]string
postings *memPostings postings *memPostings
symbols map[string]struct{} symbols map[string]struct{}
@ -41,9 +41,9 @@ type mockIndex struct {
func newMockIndex() mockIndex { func newMockIndex() mockIndex {
return mockIndex{ return mockIndex{
series: make(map[uint32]series), series: make(map[uint64]series),
labelIndex: make(map[string][]string), labelIndex: make(map[string][]string),
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint64)},
symbols: make(map[string]struct{}), symbols: make(map[string]struct{}),
} }
} }
@ -52,7 +52,7 @@ func (m mockIndex) Symbols() (map[string]struct{}, error) {
return m.symbols, nil return m.symbols, nil
} }
func (m mockIndex) AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error { func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error {
if _, ok := m.series[ref]; ok { if _, ok := m.series[ref]; ok {
return errors.Errorf("series with reference %d already added", ref) return errors.Errorf("series with reference %d already added", ref)
} }
@ -125,7 +125,7 @@ func (m mockIndex) SortedPostings(p Postings) Postings {
return newListPostings(ep) return newListPostings(ep)
} }
func (m mockIndex) Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error { func (m mockIndex) Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error {
s, ok := m.series[ref] s, ok := m.series[ref]
if !ok { if !ok {
return ErrNotFound return ErrNotFound
@ -202,7 +202,7 @@ func TestIndexRW_Postings(t *testing.T) {
require.NoError(t, iw.AddSeries(3, series[2])) require.NoError(t, iw.AddSeries(3, series[2]))
require.NoError(t, iw.AddSeries(4, series[3])) require.NoError(t, iw.AddSeries(4, series[3]))
err = iw.WritePostings("a", "1", newListPostings([]uint32{1, 2, 3, 4})) err = iw.WritePostings("a", "1", newListPostings([]uint64{1, 2, 3, 4}))
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, iw.Close()) require.NoError(t, iw.Close())
@ -274,16 +274,16 @@ func TestPersistence_index_e2e(t *testing.T) {
// Population procedure as done by compaction. // Population procedure as done by compaction.
var ( var (
postings = &memPostings{m: make(map[term][]uint32, 512)} postings = &memPostings{m: make(map[term][]uint64, 512)}
values = map[string]stringset{} values = map[string]stringset{}
) )
mi := newMockIndex() mi := newMockIndex()
for i, s := range input { for i, s := range input {
err = iw.AddSeries(uint32(i), s.labels, s.chunks...) err = iw.AddSeries(uint64(i), s.labels, s.chunks...)
require.NoError(t, err) require.NoError(t, err)
mi.AddSeries(uint32(i), s.labels, s.chunks...) mi.AddSeries(uint64(i), s.labels, s.chunks...)
for _, l := range s.labels { for _, l := range s.labels {
valset, ok := values[l.Name] valset, ok := values[l.Name]
@ -293,7 +293,7 @@ func TestPersistence_index_e2e(t *testing.T) {
} }
valset.set(l.Value) valset.set(l.Value)
postings.add(uint32(i), term{name: l.Name, value: l.Value}) postings.add(uint64(i), term{name: l.Name, value: l.Value})
} }
i++ i++
} }
@ -305,9 +305,9 @@ func TestPersistence_index_e2e(t *testing.T) {
require.NoError(t, mi.WriteLabelIndex([]string{k}, vals)) require.NoError(t, mi.WriteLabelIndex([]string{k}, vals))
} }
all := make([]uint32, len(lbls)) all := make([]uint64, len(lbls))
for i := range all { for i := range all {
all[i] = uint32(i) all[i] = uint64(i)
} }
err = iw.WritePostings("", "", newListPostings(all)) err = iw.WritePostings("", "", newListPostings(all))
require.NoError(t, err) require.NoError(t, err)

View file

@ -20,7 +20,7 @@ import (
) )
type memPostings struct { type memPostings struct {
m map[term][]uint32 m map[term][]uint64
} }
type term struct { type term struct {
@ -38,7 +38,7 @@ func (p *memPostings) get(t term) Postings {
// add adds a document to the index. The caller has to ensure that no // add adds a document to the index. The caller has to ensure that no
// term argument appears twice. // term argument appears twice.
func (p *memPostings) add(id uint32, terms ...term) { func (p *memPostings) add(id uint64, terms ...term) {
for _, t := range terms { for _, t := range terms {
p.m[t] = append(p.m[t], id) p.m[t] = append(p.m[t], id)
} }
@ -51,10 +51,10 @@ type Postings interface {
// Seek advances the iterator to value v or greater and returns // Seek advances the iterator to value v or greater and returns
// true if a value was found. // true if a value was found.
Seek(v uint32) bool Seek(v uint64) bool
// At returns the value at the current iterator position. // At returns the value at the current iterator position.
At() uint32 At() uint64
// Err returns the last error of the iterator. // Err returns the last error of the iterator.
Err() error Err() error
@ -66,8 +66,8 @@ type errPostings struct {
} }
func (e errPostings) Next() bool { return false } func (e errPostings) Next() bool { return false }
func (e errPostings) Seek(uint32) bool { return false } func (e errPostings) Seek(uint64) bool { return false }
func (e errPostings) At() uint32 { return 0 } func (e errPostings) At() uint64 { return 0 }
func (e errPostings) Err() error { return e.err } func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{} var emptyPostings = errPostings{}
@ -88,18 +88,18 @@ func Intersect(its ...Postings) Postings {
type intersectPostings struct { type intersectPostings struct {
a, b Postings a, b Postings
aok, bok bool aok, bok bool
cur uint32 cur uint64
} }
func newIntersectPostings(a, b Postings) *intersectPostings { func newIntersectPostings(a, b Postings) *intersectPostings {
return &intersectPostings{a: a, b: b} return &intersectPostings{a: a, b: b}
} }
func (it *intersectPostings) At() uint32 { func (it *intersectPostings) At() uint64 {
return it.cur return it.cur
} }
func (it *intersectPostings) doNext(id uint32) bool { func (it *intersectPostings) doNext(id uint64) bool {
for { for {
if !it.b.Seek(id) { if !it.b.Seek(id) {
return false return false
@ -125,7 +125,7 @@ func (it *intersectPostings) Next() bool {
return it.doNext(it.a.At()) return it.doNext(it.a.At())
} }
func (it *intersectPostings) Seek(id uint32) bool { func (it *intersectPostings) Seek(id uint64) bool {
if !it.a.Seek(id) { if !it.a.Seek(id) {
return false return false
} }
@ -155,14 +155,14 @@ type mergedPostings struct {
a, b Postings a, b Postings
initialized bool initialized bool
aok, bok bool aok, bok bool
cur uint32 cur uint64
} }
func newMergedPostings(a, b Postings) *mergedPostings { func newMergedPostings(a, b Postings) *mergedPostings {
return &mergedPostings{a: a, b: b} return &mergedPostings{a: a, b: b}
} }
func (it *mergedPostings) At() uint32 { func (it *mergedPostings) At() uint64 {
return it.cur return it.cur
} }
@ -204,7 +204,7 @@ func (it *mergedPostings) Next() bool {
return true return true
} }
func (it *mergedPostings) Seek(id uint32) bool { func (it *mergedPostings) Seek(id uint64) bool {
if it.cur >= id { if it.cur >= id {
return true return true
} }
@ -225,15 +225,15 @@ func (it *mergedPostings) Err() error {
// listPostings implements the Postings interface over a plain list. // listPostings implements the Postings interface over a plain list.
type listPostings struct { type listPostings struct {
list []uint32 list []uint64
cur uint32 cur uint64
} }
func newListPostings(list []uint32) *listPostings { func newListPostings(list []uint64) *listPostings {
return &listPostings{list: list} return &listPostings{list: list}
} }
func (it *listPostings) At() uint32 { func (it *listPostings) At() uint64 {
return it.cur return it.cur
} }
@ -247,7 +247,7 @@ func (it *listPostings) Next() bool {
return false return false
} }
func (it *listPostings) Seek(x uint32) bool { func (it *listPostings) Seek(x uint64) bool {
// If the current value satisfies, then return. // If the current value satisfies, then return.
if it.cur >= x { if it.cur >= x {
return true return true
@ -281,8 +281,8 @@ func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list} return &bigEndianPostings{list: list}
} }
func (it *bigEndianPostings) At() uint32 { func (it *bigEndianPostings) At() uint64 {
return it.cur return uint64(it.cur)
} }
func (it *bigEndianPostings) Next() bool { func (it *bigEndianPostings) Next() bool {
@ -294,15 +294,15 @@ func (it *bigEndianPostings) Next() bool {
return false return false
} }
func (it *bigEndianPostings) Seek(x uint32) bool { func (it *bigEndianPostings) Seek(x uint64) bool {
if it.cur >= x { if uint64(it.cur) >= x {
return true return true
} }
num := len(it.list) / 4 num := len(it.list) / 4
// Do binary search between current position and end. // Do binary search between current position and end.
i := sort.Search(num, func(i int) bool { i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= x return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
}) })
if i < num { if i < num {
j := i * 4 j := i * 4

View file

@ -23,17 +23,17 @@ import (
type mockPostings struct { type mockPostings struct {
next func() bool next func() bool
seek func(uint32) bool seek func(uint64) bool
value func() uint32 value func() uint64
err func() error err func() error
} }
func (m *mockPostings) Next() bool { return m.next() } func (m *mockPostings) Next() bool { return m.next() }
func (m *mockPostings) Seek(v uint32) bool { return m.seek(v) } func (m *mockPostings) Seek(v uint64) bool { return m.seek(v) }
func (m *mockPostings) Value() uint32 { return m.value() } func (m *mockPostings) Value() uint64 { return m.value() }
func (m *mockPostings) Err() error { return m.err() } func (m *mockPostings) Err() error { return m.err() }
func expandPostings(p Postings) (res []uint32, err error) { func expandPostings(p Postings) (res []uint64, err error) {
for p.Next() { for p.Next() {
res = append(res, p.At()) res = append(res, p.At())
} }
@ -42,27 +42,27 @@ func expandPostings(p Postings) (res []uint32, err error) {
func TestIntersect(t *testing.T) { func TestIntersect(t *testing.T) {
var cases = []struct { var cases = []struct {
a, b []uint32 a, b []uint64
res []uint32 res []uint64
}{ }{
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10}, b: []uint64{6, 7, 8, 9, 10},
res: nil, res: nil,
}, },
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{4, 5, 6, 7, 8}, b: []uint64{4, 5, 6, 7, 8},
res: []uint32{4, 5}, res: []uint64{4, 5},
}, },
{ {
a: []uint32{1, 2, 3, 4, 9, 10}, a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint32{1, 4, 10}, res: []uint64{1, 4, 10},
}, { }, {
a: []uint32{1}, a: []uint64{1},
b: []uint32{0, 1}, b: []uint64{0, 1},
res: []uint32{1}, res: []uint64{1},
}, },
} }
@ -78,29 +78,29 @@ func TestIntersect(t *testing.T) {
func TestMultiIntersect(t *testing.T) { func TestMultiIntersect(t *testing.T) {
var cases = []struct { var cases = []struct {
p [][]uint32 p [][]uint64
res []uint32 res []uint64
}{ }{
{ {
p: [][]uint32{ p: [][]uint64{
{1, 2, 3, 4, 5, 6, 1000, 1001}, {1, 2, 3, 4, 5, 6, 1000, 1001},
{2, 4, 5, 6, 7, 8, 999, 1001}, {2, 4, 5, 6, 7, 8, 999, 1001},
{1, 2, 5, 6, 7, 8, 1001, 1200}, {1, 2, 5, 6, 7, 8, 1001, 1200},
}, },
res: []uint32{2, 5, 6, 1001}, res: []uint64{2, 5, 6, 1001},
}, },
// One of the reproduceable cases for: // One of the reproduceable cases for:
// https://github.com/prometheus/prometheus/issues/2616 // https://github.com/prometheus/prometheus/issues/2616
// The initialisation of intersectPostings was moving the iterator forward // The initialisation of intersectPostings was moving the iterator forward
// prematurely making us miss some postings. // prematurely making us miss some postings.
{ {
p: [][]uint32{ p: [][]uint64{
{1, 2}, {1, 2},
{1, 2}, {1, 2},
{1, 2}, {1, 2},
{2}, {2},
}, },
res: []uint32{2}, res: []uint64{2},
}, },
} }
@ -118,22 +118,22 @@ func TestMultiIntersect(t *testing.T) {
} }
func BenchmarkIntersect(t *testing.B) { func BenchmarkIntersect(t *testing.B) {
var a, b, c, d []uint32 var a, b, c, d []uint64
for i := 0; i < 10000000; i += 2 { for i := 0; i < 10000000; i += 2 {
a = append(a, uint32(i)) a = append(a, uint64(i))
} }
for i := 5000000; i < 5000100; i += 4 { for i := 5000000; i < 5000100; i += 4 {
b = append(b, uint32(i)) b = append(b, uint64(i))
} }
for i := 5090000; i < 5090600; i += 4 { for i := 5090000; i < 5090600; i += 4 {
b = append(b, uint32(i)) b = append(b, uint64(i))
} }
for i := 4990000; i < 5100000; i++ { for i := 4990000; i < 5100000; i++ {
c = append(c, uint32(i)) c = append(c, uint64(i))
} }
for i := 4000000; i < 6000000; i++ { for i := 4000000; i < 6000000; i++ {
d = append(d, uint32(i)) d = append(d, uint64(i))
} }
i1 := newListPostings(a) i1 := newListPostings(a)
@ -152,14 +152,14 @@ func BenchmarkIntersect(t *testing.B) {
func TestMultiMerge(t *testing.T) { func TestMultiMerge(t *testing.T) {
var cases = []struct { var cases = []struct {
a, b, c []uint32 a, b, c []uint64
res []uint32 res []uint64
}{ }{
{ {
a: []uint32{1, 2, 3, 4, 5, 6, 1000, 1001}, a: []uint64{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []uint32{2, 4, 5, 6, 7, 8, 999, 1001}, b: []uint64{2, 4, 5, 6, 7, 8, 999, 1001},
c: []uint32{1, 2, 5, 6, 7, 8, 1001, 1200}, c: []uint64{1, 2, 5, 6, 7, 8, 1001, 1200},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200},
}, },
} }
@ -176,23 +176,23 @@ func TestMultiMerge(t *testing.T) {
func TestMergedPostings(t *testing.T) { func TestMergedPostings(t *testing.T) {
var cases = []struct { var cases = []struct {
a, b []uint32 a, b []uint64
res []uint32 res []uint64
}{ }{
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10}, b: []uint64{6, 7, 8, 9, 10},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}, },
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{4, 5, 6, 7, 8}, b: []uint64{4, 5, 6, 7, 8},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8}, res: []uint64{1, 2, 3, 4, 5, 6, 7, 8},
}, },
{ {
a: []uint32{1, 2, 3, 4, 9, 10}, a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
res: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, res: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
}, },
} }
@ -209,43 +209,43 @@ func TestMergedPostings(t *testing.T) {
func TestMergedPostingsSeek(t *testing.T) { func TestMergedPostingsSeek(t *testing.T) {
var cases = []struct { var cases = []struct {
a, b []uint32 a, b []uint64
seek uint32 seek uint64
success bool success bool
res []uint32 res []uint64
}{ }{
{ {
a: []uint32{2, 3, 4, 5}, a: []uint64{2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10}, b: []uint64{6, 7, 8, 9, 10},
seek: 1, seek: 1,
success: true, success: true,
res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, res: []uint64{2, 3, 4, 5, 6, 7, 8, 9, 10},
}, },
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{6, 7, 8, 9, 10}, b: []uint64{6, 7, 8, 9, 10},
seek: 2, seek: 2,
success: true, success: true,
res: []uint32{2, 3, 4, 5, 6, 7, 8, 9, 10}, res: []uint64{2, 3, 4, 5, 6, 7, 8, 9, 10},
}, },
{ {
a: []uint32{1, 2, 3, 4, 5}, a: []uint64{1, 2, 3, 4, 5},
b: []uint32{4, 5, 6, 7, 8}, b: []uint64{4, 5, 6, 7, 8},
seek: 9, seek: 9,
success: false, success: false,
res: nil, res: nil,
}, },
{ {
a: []uint32{1, 2, 3, 4, 9, 10}, a: []uint64{1, 2, 3, 4, 9, 10},
b: []uint32{1, 4, 5, 6, 7, 8, 10, 11}, b: []uint64{1, 4, 5, 6, 7, 8, 10, 11},
seek: 10, seek: 10,
success: true, success: true,
res: []uint32{10, 11}, res: []uint64{10, 11},
}, },
} }
@ -263,7 +263,7 @@ func TestMergedPostingsSeek(t *testing.T) {
lst, err := expandPostings(p) lst, err := expandPostings(p)
require.NoError(t, err) require.NoError(t, err)
lst = append([]uint32{start}, lst...) lst = append([]uint64{start}, lst...)
require.Equal(t, c.res, lst) require.Equal(t, c.res, lst)
} }
} }
@ -290,7 +290,7 @@ func TestBigEndian(t *testing.T) {
bep := newBigEndianPostings(beLst) bep := newBigEndianPostings(beLst)
for i := 0; i < num; i++ { for i := 0; i < num; i++ {
require.True(t, bep.Next()) require.True(t, bep.Next())
require.Equal(t, ls[i], bep.At()) require.Equal(t, uint64(ls[i]), bep.At())
} }
require.False(t, bep.Next()) require.False(t, bep.Next())
@ -338,8 +338,8 @@ func TestBigEndian(t *testing.T) {
bep := newBigEndianPostings(beLst) bep := newBigEndianPostings(beLst)
for _, v := range table { for _, v := range table {
require.Equal(t, v.found, bep.Seek(v.seek)) require.Equal(t, v.found, bep.Seek(uint64(v.seek)))
require.Equal(t, v.val, bep.At()) require.Equal(t, uint64(v.val), bep.At())
require.Nil(t, bep.Err()) require.Nil(t, bep.Err())
} }
}) })
@ -348,16 +348,16 @@ func TestBigEndian(t *testing.T) {
func TestIntersectWithMerge(t *testing.T) { func TestIntersectWithMerge(t *testing.T) {
// One of the reproduceable cases for: // One of the reproduceable cases for:
// https://github.com/prometheus/prometheus/issues/2616 // https://github.com/prometheus/prometheus/issues/2616
a := newListPostings([]uint32{21, 22, 23, 24, 25, 30}) a := newListPostings([]uint64{21, 22, 23, 24, 25, 30})
b := newMergedPostings( b := newMergedPostings(
newListPostings([]uint32{10, 20, 30}), newListPostings([]uint64{10, 20, 30}),
newListPostings([]uint32{15, 26, 30}), newListPostings([]uint64{15, 26, 30}),
) )
p := Intersect(a, b) p := Intersect(a, b)
res, err := expandPostings(p) res, err := expandPostings(p)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []uint32{30}, res) require.Equal(t, []uint64{30}, res)
} }

View file

@ -228,7 +228,7 @@ func createIdxChkReaders(tc []struct {
return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0 return labels.Compare(labels.FromMap(tc[i].lset), labels.FromMap(tc[i].lset)) < 0
}) })
postings := &memPostings{m: make(map[term][]uint32, 512)} postings := &memPostings{m: make(map[term][]uint64, 512)}
chkReader := mockChunkReader(make(map[uint64]chunks.Chunk)) chkReader := mockChunkReader(make(map[uint64]chunks.Chunk))
lblIdx := make(map[string]stringset) lblIdx := make(map[string]stringset)
mi := newMockIndex() mi := newMockIndex()
@ -255,11 +255,11 @@ func createIdxChkReaders(tc []struct {
} }
ls := labels.FromMap(s.lset) ls := labels.FromMap(s.lset)
mi.AddSeries(uint32(i), ls, metas...) mi.AddSeries(uint64(i), ls, metas...)
postings.add(uint32(i), term{}) postings.add(uint64(i), term{})
for _, l := range ls { for _, l := range ls {
postings.add(uint32(i), term{l.Name, l.Value}) postings.add(uint64(i), term{l.Name, l.Value})
vs, present := lblIdx[l.Name] vs, present := lblIdx[l.Name]
if !present { if !present {
@ -555,7 +555,7 @@ func TestBlockQuerierDelete(t *testing.T) {
}, },
}, },
tombstones: newTombstoneReader( tombstones: newTombstoneReader(
map[uint32]Intervals{ map[uint64]Intervals{
1: Intervals{{1, 3}}, 1: Intervals{{1, 3}},
2: Intervals{{1, 3}, {6, 10}}, 2: Intervals{{1, 3}, {6, 10}},
3: Intervals{{6, 10}}, 3: Intervals{{6, 10}},
@ -663,13 +663,13 @@ func TestBaseChunkSeries(t *testing.T) {
lset labels.Labels lset labels.Labels
chunks []ChunkMeta chunks []ChunkMeta
ref uint32 ref uint64
} }
cases := []struct { cases := []struct {
series []refdSeries series []refdSeries
// Postings should be in the sorted order of the the series // Postings should be in the sorted order of the the series
postings []uint32 postings []uint64
expIdxs []int expIdxs []int
}{ }{
@ -703,7 +703,7 @@ func TestBaseChunkSeries(t *testing.T) {
ref: 108, ref: 108,
}, },
}, },
postings: []uint32{12, 10, 108}, postings: []uint64{12, 10, 108},
expIdxs: []int{0, 1, 3}, expIdxs: []int{0, 1, 3},
}, },
@ -722,7 +722,7 @@ func TestBaseChunkSeries(t *testing.T) {
ref: 1, ref: 1,
}, },
}, },
postings: []uint32{}, postings: []uint64{},
expIdxs: []int{}, expIdxs: []int{},
}, },

View file

@ -33,6 +33,11 @@ const (
tombstoneFormatV1 = 1 tombstoneFormatV1 = 1
) )
// TombstoneReader is the iterator over tombstones.
type TombstoneReader interface {
Get(ref uint64) Intervals
}
func writeTombstoneFile(dir string, tr tombstoneReader) error { func writeTombstoneFile(dir string, tr tombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename) path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp" tmp := path + ".tmp"
@ -59,7 +64,7 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
for k, v := range tr { for k, v := range tr {
for _, itv := range v { for _, itv := range v {
buf.reset() buf.reset()
buf.putUvarint32(k) buf.putUvarint64(k)
buf.putVarint64(itv.Mint) buf.putVarint64(itv.Mint)
buf.putVarint64(itv.Maxt) buf.putVarint64(itv.Maxt)
@ -81,15 +86,10 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
// Stone holds the information on the posting and time-range // Stone holds the information on the posting and time-range
// that is deleted. // that is deleted.
type Stone struct { type Stone struct {
ref uint32 ref uint64
intervals Intervals intervals Intervals
} }
// TombstoneReader is the iterator over tombstones.
type TombstoneReader interface {
Get(ref uint32) Intervals
}
func readTombstones(dir string) (tombstoneReader, error) { func readTombstones(dir string) (tombstoneReader, error) {
b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename))
if err != nil { if err != nil {
@ -123,7 +123,7 @@ func readTombstones(dir string) (tombstoneReader, error) {
stonesMap := newEmptyTombstoneReader() stonesMap := newEmptyTombstoneReader()
for d.len() > 0 { for d.len() > 0 {
k := d.uvarint32() k := d.uvarint64()
mint := d.varint64() mint := d.varint64()
maxt := d.varint64() maxt := d.varint64()
if d.err() != nil { if d.err() != nil {
@ -136,21 +136,21 @@ func readTombstones(dir string) (tombstoneReader, error) {
return newTombstoneReader(stonesMap), nil return newTombstoneReader(stonesMap), nil
} }
type tombstoneReader map[uint32]Intervals type tombstoneReader map[uint64]Intervals
func newTombstoneReader(ts map[uint32]Intervals) tombstoneReader { func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader {
return tombstoneReader(ts) return tombstoneReader(ts)
} }
func newEmptyTombstoneReader() tombstoneReader { func newEmptyTombstoneReader() tombstoneReader {
return tombstoneReader(make(map[uint32]Intervals)) return tombstoneReader(make(map[uint64]Intervals))
} }
func (t tombstoneReader) Get(ref uint32) Intervals { func (t tombstoneReader) Get(ref uint64) Intervals {
return t[ref] return t[ref]
} }
func (t tombstoneReader) add(ref uint32, itv Interval) { func (t tombstoneReader) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv) t[ref] = t[ref].add(itv)
} }

View file

@ -27,12 +27,12 @@ func TestWriteAndReadbackTombStones(t *testing.T) {
tmpdir, _ := ioutil.TempDir("", "test") tmpdir, _ := ioutil.TempDir("", "test")
defer os.RemoveAll(tmpdir) defer os.RemoveAll(tmpdir)
ref := uint32(0) ref := uint64(0)
stones := make(map[uint32]Intervals) stones := make(map[uint64]Intervals)
// Generate the tombstones. // Generate the tombstones.
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
ref += uint32(rand.Int31n(10)) + 1 ref += uint64(rand.Int31n(10)) + 1
numRanges := rand.Intn(5) + 1 numRanges := rand.Intn(5) + 1
dranges := make(Intervals, 0, numRanges) dranges := make(Intervals, 0, numRanges)
mint := rand.Int63n(time.Now().UnixNano()) mint := rand.Int63n(time.Now().UnixNano())

24
wal.go
View file

@ -238,11 +238,11 @@ WRLoop:
activeSeries := make([]RefSeries, 0, len(series)) activeSeries := make([]RefSeries, 0, len(series))
for _, s := range series { for _, s := range series {
if !p.Seek(uint32(s.Ref)) { if !p.Seek(s.Ref) {
break WRLoop break WRLoop
} }
if p.At() == uint32(s.Ref) { if p.At() == s.Ref {
activeSeries = append(activeSeries, s) activeSeries = append(activeSeries, s)
} }
} }
@ -596,11 +596,11 @@ func encodeSeries(buf []byte, series []RefSeries) []byte {
buf = append(buf, b[:8]...) buf = append(buf, b[:8]...)
for _, s := range series { for _, s := range series {
n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) binary.BigEndian.PutUint64(b, s.Ref)
buf = append(buf, b[:n]...) buf = append(buf, b[:8]...)
lset := s.Labels lset := s.Labels
n = binary.PutUvarint(b, uint64(len(lset))) n := binary.PutUvarint(b, uint64(len(lset)))
buf = append(buf, b[:n]...) buf = append(buf, b[:n]...)
for _, l := range lset { for _, l := range lset {
@ -662,7 +662,7 @@ func (w *SegmentWAL) encodeDeletes(stones []Stone) error {
for _, s := range stones { for _, s := range stones {
for _, itv := range s.intervals { for _, itv := range s.intervals {
eb.reset() eb.reset()
eb.putUvarint32(s.ref) eb.putUvarint64(s.ref)
eb.putVarint64(itv.Mint) eb.putVarint64(itv.Mint)
eb.putVarint64(itv.Maxt) eb.putVarint64(itv.Maxt)
buf = append(buf, eb.get()...) buf = append(buf, eb.get()...)
@ -913,18 +913,16 @@ func (r *walReader) decodeSeries(flag byte, b []byte) ([]RefSeries, error) {
return nil, errors.Wrap(errInvalidSize, "header length") return nil, errors.Wrap(errInvalidSize, "header length")
} }
baseRef := binary.BigEndian.Uint64(b)
b = b[8:] b = b[8:]
for len(b) > 0 { for len(b) > 0 {
var ser RefSeries var ser RefSeries
// TODO: Check again. // TODO: Check again.
dref, n := binary.Varint(b) if len(b) < 8 {
if n < 1 { return nil, errors.Wrap(errInvalidSize, "series ref")
return nil, errors.Wrap(errInvalidSize, "series ref delta")
} }
b = b[n:] ser.Ref = binary.BigEndian.Uint64(b)
ser.Ref = uint64(int64(baseRef) + dref) b = b[8:]
l, n := binary.Uvarint(b) l, n := binary.Uvarint(b)
if n < 1 { if n < 1 {
@ -1002,7 +1000,7 @@ func (r *walReader) decodeDeletes(flag byte, b []byte) ([]Stone, error) {
for db.len() > 0 { for db.len() > 0 {
var s Stone var s Stone
s.ref = db.uvarint32() s.ref = db.uvarint64()
s.intervals = Intervals{{db.varint64(), db.varint64()}} s.intervals = Intervals{{db.varint64(), db.varint64()}}
if db.err() != nil { if db.err() != nil {
return nil, db.err() return nil, db.err()

View file

@ -219,7 +219,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
for j := 0; j < i*20; j++ { for j := 0; j < i*20; j++ {
ts := rand.Int63() ts := rand.Int63()
stones = append(stones, Stone{rand.Uint32(), Intervals{{ts, ts + rand.Int63n(10000)}}}) stones = append(stones, Stone{rand.Uint64(), Intervals{{ts, ts + rand.Int63n(10000)}}})
} }
lbls := series[i : i+stepSize] lbls := series[i : i+stepSize]