Make concurrent head chunk reads safe, fix misc races

This adds a 4 sample buffer to every head chunk. The XOR
compression scheme may edit bytes in place. The minimum size
of a sample is 2 bits. So keeping the last 4 samples in an in-memory
buffer makes it safe to query the preceeding ones while samples
are added
This commit is contained in:
Fabian Reinartz 2017-01-09 16:51:39 +01:00
parent 1943f8d1bb
commit 8c31c6e934
3 changed files with 125 additions and 28 deletions

27
db.go
View file

@ -18,7 +18,6 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/labels" "github.com/fabxc/tsdb/labels"
"github.com/go-kit/kit/log" "github.com/go-kit/kit/log"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -523,32 +522,6 @@ func (db *DB) nextBlockDir() (string, error) {
return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil return filepath.Join(db.dir, fmt.Sprintf("b-%0.6d", i+1)), nil
} }
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
ref uint32
lset labels.Labels
chunk chunks.Chunk
// Caching fielddb.
firstTimestamp int64
lastTimestamp int64
lastValue float64
numSamples int
app chunks.Appender // Current appender for the chunkdb.
}
func (cd *chunkDesc) append(ts int64, v float64) {
if cd.numSamples == 0 {
cd.firstTimestamp = ts
}
cd.app.Append(ts, v)
cd.lastTimestamp = ts
cd.lastValue = v
cd.numSamples++
}
// PartitionedDB is a time series storage. // PartitionedDB is a time series storage.
type PartitionedDB struct { type PartitionedDB struct {
logger log.Logger logger log.Logger

120
head.go
View file

@ -116,12 +116,55 @@ func (h *HeadBlock) Chunk(ref uint32) (chunks.Chunk, error) {
return h.descs[int(ref)].chunk, nil return h.descs[int(ref)].chunk, nil
} }
type headSeriesReader struct {
h *HeadBlock
}
func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) {
h.h.mtx.RLock()
defer h.h.mtx.RUnlock()
if int(ref) >= len(h.h.descs) {
return nil, errNotFound
}
return &safeChunk{
cd: h.h.descs[int(ref)],
}, nil
}
type safeChunk struct {
cd *chunkDesc
}
func (c *safeChunk) Iterator() chunks.Iterator {
c.cd.mtx.Lock()
defer c.cd.mtx.Unlock()
return c.cd.iterator()
}
func (c *safeChunk) Appender() (chunks.Appender, error) {
panic("illegal")
}
func (c *safeChunk) Bytes() []byte {
panic("illegal")
}
func (c *safeChunk) Encoding() chunks.Encoding {
panic("illegal")
}
func (h *HeadBlock) interval() (int64, int64) { func (h *HeadBlock) interval() (int64, int64) {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return h.bstats.MinTime, h.bstats.MaxTime return h.bstats.MinTime, h.bstats.MaxTime
} }
// Stats returns statisitics about the indexed data. // Stats returns statisitics about the indexed data.
func (h *HeadBlock) Stats() (BlockStats, error) { func (h *HeadBlock) Stats() (BlockStats, error) {
h.bstats.mtx.RLock()
defer h.bstats.mtx.RUnlock()
return *h.bstats, nil return *h.bstats, nil
} }
@ -161,11 +204,13 @@ func (h *HeadBlock) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
} }
cd := h.descs[ref] cd := h.descs[ref]
cd.mtx.RLock()
meta := ChunkMeta{ meta := ChunkMeta{
MinTime: cd.firstTimestamp, MinTime: cd.firstTimestamp,
MaxTime: cd.lastTimestamp, MaxTime: cd.lastTimestamp,
Ref: ref, Ref: ref,
} }
cd.mtx.RUnlock()
return cd.lset, []ChunkMeta{meta}, nil return cd.lset, []ChunkMeta{meta}, nil
} }
@ -273,6 +318,7 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
// We don't want to reserve a new space for each. // We don't want to reserve a new space for each.
if ref, ok := uniqueHashes[s.hash]; ok { if ref, ok := uniqueHashes[s.hash]; ok {
s.ref = ref s.ref = ref
newSamples = append(newSamples, s)
continue continue
} }
s.ref = uint32(len(newSeries)) s.ref = uint32(len(newSeries))
@ -315,12 +361,14 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
) )
for _, s := range samples { for _, s := range samples {
cd := h.descs[s.ref] cd := h.descs[s.ref]
cd.mtx.Lock()
// Skip duplicate samples. // Skip duplicate samples.
if cd.lastTimestamp == s.t && cd.lastValue != s.v { if cd.lastTimestamp == s.t && cd.lastValue != s.v {
total-- total--
continue continue
} }
cd.append(s.t, s.v) cd.append(s.t, s.v)
cd.mtx.Unlock()
if mint > s.t { if mint > s.t {
mint = s.t mint = s.t
@ -394,6 +442,78 @@ func (h *HeadBlock) remapPostings(p Postings) Postings {
return newListPostings(list) return newListPostings(list)
} }
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
mtx sync.RWMutex
ref uint32
lset labels.Labels
chunk chunks.Chunk
// Caching fielddb.
firstTimestamp int64
lastTimestamp int64
lastValue float64
numSamples int
sampleBuf [4]sample
app chunks.Appender // Current appender for the chunkdb.
}
func (cd *chunkDesc) append(ts int64, v float64) {
if cd.numSamples == 0 {
cd.firstTimestamp = ts
}
cd.app.Append(ts, v)
cd.lastTimestamp = ts
cd.lastValue = v
cd.numSamples++
cd.sampleBuf[0] = cd.sampleBuf[1]
cd.sampleBuf[1] = cd.sampleBuf[2]
cd.sampleBuf[2] = cd.sampleBuf[3]
cd.sampleBuf[3] = sample{t: ts, v: v}
}
func (cd *chunkDesc) iterator() chunks.Iterator {
it := &memSafeIterator{
Iterator: cd.chunk.Iterator(),
i: -1,
total: cd.numSamples,
buf: cd.sampleBuf,
}
return it
}
type memSafeIterator struct {
chunks.Iterator
i int
total int
buf [4]sample
}
func (it *memSafeIterator) Next() bool {
if it.i+1 >= it.total {
return false
}
it.i++
if it.total-it.i > 4 {
return it.Iterator.Next()
}
return true
}
func (it *memSafeIterator) At() (int64, float64) {
if it.total-it.i > 4 {
return it.Iterator.At()
}
s := it.buf[4-(it.total-it.i)]
return s.t, s.v
}
// positionMapper stores a position mapping from unsorted to // positionMapper stores a position mapping from unsorted to
// sorted indices of a sortable collection. // sorted indices of a sortable collection.
type positionMapper struct { type positionMapper struct {

View file

@ -61,13 +61,17 @@ func (s *DB) Querier(mint, maxt int64) Querier {
index: b.index(), index: b.index(),
series: b.series(), series: b.series(),
} }
sq.blocks = append(sq.blocks, q)
// TODO(fabxc): find nicer solution. // TODO(fabxc): find nicer solution.
if hb, ok := b.(*HeadBlock); ok { if hb, ok := b.(*HeadBlock); ok {
hb.updateMapping() hb.updateMapping()
q.postingsMapper = hb.remapPostings q.postingsMapper = hb.remapPostings
q.series = &headSeriesReader{
h: hb,
}
} }
sq.blocks = append(sq.blocks, q)
} }
return sq return sq