Replace single head chunk per series with memSeries

This adds a memory series holding several chunk to replace
the single head chunk per series so far.
This is necessary for uniform maximum chunk sizes in cases
where some series have higher frequency samples than others.
This commit is contained in:
Fabian Reinartz 2017-01-11 13:02:38 +01:00
parent 80affd98a8
commit 0ca755b4ae

238
head.go
View file

@ -20,13 +20,13 @@ type headBlock struct {
// descs holds all chunk descs for the head block. Each chunk implicitly // descs holds all chunk descs for the head block. Each chunk implicitly
// is assigned the index as its ID. // is assigned the index as its ID.
descs []*chunkDesc series []*memSeries
// mapping maps a series ID to its position in an ordered list // mapping maps a series ID to its position in an ordered list
// of all series. The orderDirty flag indicates that it has gone stale. // of all series. The orderDirty flag indicates that it has gone stale.
mapper *positionMapper mapper *positionMapper
// hashes contains a collision map of label set hashes of chunks // hashes contains a collision map of label set hashes of chunks
// to their chunk descs. // to their chunk descs.
hashes map[uint64][]*chunkDesc hashes map[uint64][]*memSeries
values map[string]stringset // label names to possible values values map[string]stringset // label names to possible values
postings *memPostings // postings lists for terms postings *memPostings // postings lists for terms
@ -45,8 +45,8 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
b := &headBlock{ b := &headBlock{
dir: dir, dir: dir,
descs: []*chunkDesc{}, series: []*memSeries{},
hashes: map[uint64][]*chunkDesc{}, hashes: map[uint64][]*memSeries{},
values: map[string]stringset{}, values: map[string]stringset{},
postings: &memPostings{m: make(map[term][]uint32)}, postings: &memPostings{m: make(map[term][]uint32)},
wal: wal, wal: wal,
@ -64,7 +64,9 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
b.stats.ChunkCount++ // head block has one chunk/series b.stats.ChunkCount++ // head block has one chunk/series
}, },
sample: func(s hashedSample) { sample: func(s hashedSample) {
cd := b.descs[s.ref] si := s.ref
cd := b.series[si]
cd.append(s.t, s.v) cd.append(s.t, s.v)
if s.t > b.stats.MaxTime { if s.t > b.stats.MaxTime {
@ -109,26 +111,29 @@ func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) {
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()
if int(ref) >= len(h.descs) { c := &safeChunk{
return nil, errNotFound Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk,
s: h.series[ref>>8],
i: int((ref << 24) >> 24),
} }
return h.descs[int(ref)].chunk, nil return c, nil
} }
type safeChunk struct { type safeChunk struct {
cd *chunkDesc chunks.Chunk
s *memSeries
i int
} }
func (c *safeChunk) Iterator() chunks.Iterator { func (c *safeChunk) Iterator() chunks.Iterator {
c.cd.mtx.Lock() c.s.mtx.RLock()
defer c.cd.mtx.Unlock() defer c.s.mtx.RUnlock()
return c.s.iterator(c.i)
return c.cd.iterator()
} }
func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") } // func (c *safeChunk) Appender() (chunks.Appender, error) { panic("illegal") }
func (c *safeChunk) Bytes() []byte { panic("illegal") } // func (c *safeChunk) Bytes() []byte { panic("illegal") }
func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") }
type headIndexReader struct { type headIndexReader struct {
*headBlock *headBlock
@ -165,19 +170,24 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error)
h.mtx.RLock() h.mtx.RLock()
defer h.mtx.RUnlock() defer h.mtx.RUnlock()
if int(ref) >= len(h.descs) { if int(ref) >= len(h.series) {
return nil, nil, errNotFound return nil, nil, errNotFound
} }
cd := h.descs[ref] s := h.series[ref]
metas := make([]ChunkMeta, 0, len(s.chunks))
cd.mtx.RLock() s.mtx.RLock()
meta := ChunkMeta{ defer s.mtx.RUnlock()
MinTime: cd.firstTimestamp,
MaxTime: cd.lastTimestamp, for i, c := range s.chunks {
Ref: ref, metas = append(metas, ChunkMeta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: (ref << 8) | uint32(i),
})
} }
cd.mtx.RUnlock()
return cd.lset, []ChunkMeta{meta}, nil return s.lset, metas, nil
} }
func (h *headIndexReader) LabelIndices() ([][]string, error) { func (h *headIndexReader) LabelIndices() ([][]string, error) {
@ -200,35 +210,25 @@ func (h *headIndexReader) Stats() (BlockStats, error) {
// get retrieves the chunk with the hash and label set and creates // get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet. // a new one if it doesn't exist yet.
func (h *headBlock) get(hash uint64, lset labels.Labels) *chunkDesc { func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries {
cds := h.hashes[hash] series := h.hashes[hash]
for _, cd := range cds { for _, s := range series {
if cd.lset.Equals(lset) { if s.lset.Equals(lset) {
return cd return s
} }
} }
return nil return nil
} }
func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc { func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries {
cd := &chunkDesc{ s := &memSeries{lset: lset}
lset: lset,
chunk: chunks.NewXORChunk(),
lastTimestamp: math.MinInt64,
}
var err error
cd.app, err = cd.chunk.Appender()
if err != nil {
// Getting an Appender for a new chunk must not panic.
panic(err)
}
// Index the new chunk. // Index the new chunk.
cd.ref = uint32(len(h.descs)) s.ref = uint32(len(h.series))
h.descs = append(h.descs, cd) h.series = append(h.series, s)
h.hashes[hash] = append(h.hashes[hash], cd) h.hashes[hash] = append(h.hashes[hash], s)
for _, l := range lset { for _, l := range lset {
valset, ok := h.values[l.Name] valset, ok := h.values[l.Name]
@ -238,12 +238,12 @@ func (h *headBlock) create(hash uint64, lset labels.Labels) *chunkDesc {
} }
valset.set(l.Value) valset.set(l.Value)
h.postings.add(cd.ref, term{name: l.Name, value: l.Value}) h.postings.add(s.ref, term{name: l.Name, value: l.Value})
} }
h.postings.add(cd.ref, term{}) h.postings.add(s.ref, term{})
return cd return s
} }
var ( var (
@ -273,18 +273,19 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
for i := range samples { for i := range samples {
s := &samples[i] s := &samples[i]
cd := h.get(s.hash, s.labels) ms := h.get(s.hash, s.labels)
if cd != nil { if ms != nil {
// Samples must only occur in order. c := ms.head()
if s.t < cd.lastTimestamp {
if s.t < c.maxTime {
return 0, ErrOutOfOrderSample return 0, ErrOutOfOrderSample
} }
if cd.lastTimestamp == s.t && cd.lastValue != s.v { if c.maxTime == s.t && ms.lastValue != s.v {
return 0, ErrAmendSample return 0, ErrAmendSample
} }
// TODO(fabxc): sample refs are only scoped within a block for // TODO(fabxc): sample refs are only scoped within a block for
// now and we ignore any previously set value // now and we ignore any previously set value
s.ref = cd.ref s.ref = ms.ref
continue continue
} }
@ -303,19 +304,17 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
newSamples = append(newSamples, s) newSamples = append(newSamples, s)
} }
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := h.wal.Log(newSeries, samples); err != nil {
return 0, err
}
// After the samples were successfully written to the WAL, there may // After the samples were successfully written to the WAL, there may
// be no further failures. // be no further failures.
if len(newSeries) > 0 { if len(newSeries) > 0 {
// TODO(fabxc): re-check if we actually have to create a new series
// after acquiring the write lock.
// If concurrent appenders attempt to create the same series, there's
// a semantical race between switching locks.
h.mtx.RUnlock() h.mtx.RUnlock()
h.mtx.Lock() h.mtx.Lock()
base := len(h.descs) base := len(h.series)
for i, s := range newSeries { for i, s := range newSeries {
h.create(newHashes[i], s) h.create(newHashes[i], s)
@ -327,6 +326,11 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
h.mtx.Unlock() h.mtx.Unlock()
h.mtx.RLock() h.mtx.RLock()
} }
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := h.wal.Log(newSeries, samples); err != nil {
return 0, err
}
var ( var (
total = uint64(len(samples)) total = uint64(len(samples))
@ -334,16 +338,14 @@ func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
maxt = int64(math.MinInt64) maxt = int64(math.MinInt64)
) )
for _, s := range samples { for _, s := range samples {
cd := h.descs[s.ref] ser := h.series[s.ref]
cd.mtx.Lock() ser.mtx.Lock()
// Skip duplicate samples. ok := ser.append(s.t, s.v)
if cd.lastTimestamp == s.t && cd.lastValue != s.v { ser.mtx.Unlock()
if !ok {
total-- total--
continue continue
} }
cd.append(s.t, s.v)
cd.mtx.Unlock()
if mint > s.t { if mint > s.t {
mint = s.t mint = s.t
} }
@ -379,18 +381,18 @@ func (h *headBlock) fullness() float64 {
func (h *headBlock) updateMapping() { func (h *headBlock) updateMapping() {
h.mtx.RLock() h.mtx.RLock()
if h.mapper.sortable != nil && h.mapper.Len() == len(h.descs) { if h.mapper.sortable != nil && h.mapper.Len() == len(h.series) {
h.mtx.RUnlock() h.mtx.RUnlock()
return return
} }
cds := make([]*chunkDesc, len(h.descs)) series := make([]*memSeries, len(h.series))
copy(cds, h.descs) copy(series, h.series)
h.mtx.RUnlock() h.mtx.RUnlock()
s := slice.SortInterface(cds, func(i, j int) bool { s := slice.SortInterface(series, func(i, j int) bool {
return labels.Compare(cds[i].lset, cds[j].lset) < 0 return labels.Compare(series[i].lset, series[j].lset) < 0
}) })
h.mapper.update(s) h.mapper.update(s)
@ -415,51 +417,89 @@ func (h *headBlock) remapPostings(p Postings) Postings {
return newListPostings(list) return newListPostings(list)
} }
// chunkDesc wraps a plain data chunk and provides cached meta data about it. type memSeries struct {
type chunkDesc struct {
mtx sync.RWMutex mtx sync.RWMutex
ref uint32 ref uint32
lset labels.Labels lset labels.Labels
chunk chunks.Chunk chunks []*memChunk
// Caching fielddb.
firstTimestamp int64
lastTimestamp int64
lastValue float64
numSamples int
lastValue float64
sampleBuf [4]sample sampleBuf [4]sample
app chunks.Appender // Current appender for the chunkdb. app chunks.Appender // Current appender for the chunkdb.
} }
func (cd *chunkDesc) append(ts int64, v float64) { func (s *memSeries) cut() *memChunk {
if cd.numSamples == 0 { c := &memChunk{
cd.firstTimestamp = ts chunk: chunks.NewXORChunk(),
maxTime: math.MinInt64,
} }
cd.app.Append(ts, v) s.chunks = append(s.chunks, c)
cd.lastTimestamp = ts app, err := c.chunk.Appender()
cd.lastValue = v if err != nil {
cd.numSamples++ panic(err)
}
cd.sampleBuf[0] = cd.sampleBuf[1] s.app = app
cd.sampleBuf[1] = cd.sampleBuf[2] return c
cd.sampleBuf[2] = cd.sampleBuf[3]
cd.sampleBuf[3] = sample{t: ts, v: v}
} }
func (cd *chunkDesc) iterator() chunks.Iterator { func (s *memSeries) append(t int64, v float64) bool {
var c *memChunk
if s.app == nil || s.head().samples > 10050 {
c = s.cut()
c.minTime = t
} else {
c = s.head()
// Skip duplicate samples.
if c.maxTime == t && s.lastValue != v {
return false
}
}
s.app.Append(t, v)
c.maxTime = t
c.samples++
s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1]
s.sampleBuf[1] = s.sampleBuf[2]
s.sampleBuf[2] = s.sampleBuf[3]
s.sampleBuf[3] = sample{t: t, v: v}
return true
}
func (s *memSeries) iterator(i int) chunks.Iterator {
c := s.chunks[i]
if i < len(s.chunks)-1 {
return c.chunk.Iterator()
}
it := &memSafeIterator{ it := &memSafeIterator{
Iterator: cd.chunk.Iterator(), Iterator: c.chunk.Iterator(),
i: -1, i: -1,
total: cd.numSamples, total: c.samples,
buf: cd.sampleBuf, buf: s.sampleBuf,
} }
return it return it
} }
func (s *memSeries) head() *memChunk {
return s.chunks[len(s.chunks)-1]
}
type memChunk struct {
chunk chunks.Chunk
minTime, maxTime int64
samples int
}
type memSafeIterator struct { type memSafeIterator struct {
chunks.Iterator chunks.Iterator