diff --git a/db.go b/db.go index ac57bc710..6c0382ece 100644 --- a/db.go +++ b/db.go @@ -18,7 +18,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/coreos/etcd/pkg/fileutil" - "github.com/fabxc/tsdb/chunks" "github.com/fabxc/tsdb/labels" "github.com/go-kit/kit/log" "github.com/pkg/errors" @@ -523,32 +522,6 @@ func (db *DB) nextBlockDir() (string, error) { return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil } -// chunkDesc wraps a plain data chunk and provides cached meta data about it. -type chunkDesc struct { - ref uint32 - lset labels.Labels - chunk chunks.Chunk - - // Caching fielddb. - firstTimestamp int64 - lastTimestamp int64 - lastValue float64 - numSamples int - - app chunks.Appender // Current appender for the chunkdb. -} - -func (cd *chunkDesc) append(ts int64, v float64) { - if cd.numSamples == 0 { - cd.firstTimestamp = ts - } - cd.app.Append(ts, v) - - cd.lastTimestamp = ts - cd.lastValue = v - cd.numSamples++ -} - // PartitionedDB is a time series storage. type PartitionedDB struct { logger log.Logger diff --git a/head.go b/head.go index 5685dba0b..a56f3ad2f 100644 --- a/head.go +++ b/head.go @@ -116,12 +116,55 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) { return h.descs[int(ref)].chunk, nil } +type headSeriesReader struct { + h *HeadBlock +} + +func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) { + h.h.mtx.RLock() + defer h.h.mtx.RUnlock() + + if int(ref) >= len(h.h.descs) { + return nil, errNotFound + } + return &safeChunk{ + cd: h.h.descs[int(ref)], + }, nil +} + +type safeChunk struct { + cd *chunkDesc +} + +func (c *safeChunk) Iterator() chunks.Iterator { + c.cd.mtx.Lock() + defer c.cd.mtx.Unlock() + + return c.cd.iterator() +} + +func (c *safeChunk) Appender() (chunks.Appender, error) { + panic("illegal") +} + +func (c *safeChunk) Bytes() []byte { + panic("illegal") +} + +func (c *safeChunk) Encoding() chunks.Encoding { + panic("illegal") +} + func (h *HeadBlock) interval() (int64, int64) { + h.bstats.mtx.RLock() + defer h.bstats.mtx.RUnlock() return h.bstats.MinTime, h.bstats.MaxTime } // Stats returns statisitics about the indexed data. func (h *HeadBlock) Stats() (BlockStats, error) { + h.bstats.mtx.RLock() + defer h.bstats.mtx.RUnlock() return *h.bstats, nil } @@ -161,11 +204,13 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) { } cd := h.descs[ref] + cd.mtx.RLock() meta := ChunkMeta{ MinTime: cd.firstTimestamp, MaxTime: cd.lastTimestamp, Ref: ref, } + cd.mtx.RUnlock() return cd.lset, []ChunkMeta{meta}, nil } @@ -273,6 +318,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { // We don't want to reserve a new space for each. if ref, ok := uniqueHashes[s.hash]; ok { s.ref = ref + newSamples = append(newSamples, s) continue } s.ref = uint32(len(newSeries)) @@ -315,12 +361,14 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error { ) for _, s := range samples { cd := h.descs[s.ref] + cd.mtx.Lock() // Skip duplicate samples. if cd.lastTimestamp == s.t && cd.lastValue != s.v { total-- continue } cd.append(s.t, s.v) + cd.mtx.Unlock() if mint > s.t { mint = s.t @@ -394,6 +442,78 @@ func (h *HeadBlock) remapPostings(p Postings) Postings { return newListPostings(list) } +// chunkDesc wraps a plain data chunk and provides cached meta data about it. +type chunkDesc struct { + mtx sync.RWMutex + + ref uint32 + lset labels.Labels + chunk chunks.Chunk + + // Caching fielddb. + firstTimestamp int64 + lastTimestamp int64 + lastValue float64 + numSamples int + + sampleBuf [4]sample + + app chunks.Appender // Current appender for the chunkdb. +} + +func (cd *chunkDesc) append(ts int64, v float64) { + if cd.numSamples == 0 { + cd.firstTimestamp = ts + } + cd.app.Append(ts, v) + + cd.lastTimestamp = ts + cd.lastValue = v + cd.numSamples++ + + cd.sampleBuf[0] = cd.sampleBuf[1] + cd.sampleBuf[1] = cd.sampleBuf[2] + cd.sampleBuf[2] = cd.sampleBuf[3] + cd.sampleBuf[3] = sample{t: ts, v: v} +} + +func (cd *chunkDesc) iterator() chunks.Iterator { + it := &memSafeIterator{ + Iterator: cd.chunk.Iterator(), + i: -1, + total: cd.numSamples, + buf: cd.sampleBuf, + } + return it +} + +type memSafeIterator struct { + chunks.Iterator + + i int + total int + buf [4]sample +} + +func (it *memSafeIterator) Next() bool { + if it.i+1 >= it.total { + return false + } + it.i++ + if it.total-it.i > 4 { + return it.Iterator.Next() + } + return true +} + +func (it *memSafeIterator) At() (int64, float64) { + if it.total-it.i > 4 { + return it.Iterator.At() + } + s := it.buf[4-(it.total-it.i)] + return s.t, s.v +} + // positionMapper stores a position mapping from unsorted to // sorted indices of a sortable collection. type positionMapper struct { diff --git a/querier.go b/querier.go index 3080a29d1..73158b6ec 100644 --- a/querier.go +++ b/querier.go @@ -61,13 +61,17 @@ func (s *DB) Querier(mint, maxt int64) Querier { index: b.index(), series: b.series(), } - sq.blocks = append(sq.blocks, q) // TODO(fabxc): find nicer solution. if hb, ok := b.(*HeadBlock); ok { hb.updateMapping() q.postingsMapper = hb.remapPostings + q.series = &headSeriesReader{ + h: hb, + } } + + sq.blocks = append(sq.blocks, q) } return sq