Improve heuristic to spread chunks across block

This commit is contained in:
Fabian Reinartz 2017-06-07 13:42:53 +02:00
parent f006e2d1ab
commit 05e411a8eb
4 changed files with 90 additions and 21 deletions

View file

@ -113,7 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{ st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
WALFlushInterval: 200 * time.Millisecond, WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds RetentionDuration: 2 * 24 * 60 * 60 * 1000, // 1 days in milliseconds
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 3 hours in milliseconds
MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds MaxBlockDuration: 27 * 60 * 60 * 1000, // 1 days in milliseconds
}) })
if err != nil { if err != nil {
@ -157,6 +157,8 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
}) })
} }
const timeDelta = 30000
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) { func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
var mu sync.Mutex var mu sync.Mutex
var total uint64 var total uint64
@ -174,7 +176,7 @@ func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (u
wg.Add(1) wg.Add(1)
go func() { go func() {
n, err := b.ingestScrapesShard(batch, 100, int64(30000*i)) n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i))
if err != nil { if err != nil {
// exitWithError(err) // exitWithError(err)
fmt.Println(" err", err) fmt.Println(" err", err)
@ -212,7 +214,7 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount
for i := 0; i < scrapeCount; i++ { for i := 0; i < scrapeCount; i++ {
app := b.storage.Appender() app := b.storage.Appender()
ts += int64(30000) ts += timeDelta
for _, s := range scrape { for _, s := range scrape {
s.value += 1000 s.value += 1000

View file

@ -314,6 +314,8 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
var metas []BlockMeta var metas []BlockMeta
for i, b := range blocks { for i, b := range blocks {
metas = append(metas, b.Meta())
all, err := b.Index().Postings("", "") all, err := b.Index().Postings("", "")
if err != nil { if err != nil {
return nil, err return nil, err
@ -328,7 +330,6 @@ func populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*Blo
if err != nil { if err != nil {
return nil, err return nil, err
} }
metas = append(metas, b.Meta())
} }
// We fully rebuild the postings list index from merged series. // We fully rebuild the postings list index from merged series.

58
head.go
View file

@ -719,12 +719,7 @@ func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries {
} }
func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries {
s := &memSeries{ s := newMemSeries(lset, uint32(len(h.series)), h.meta.MaxTime)
lset: lset,
ref: uint32(len(h.series)),
}
// create the initial chunk and appender
s.cut()
// Allocate empty space until we can insert at the given index. // Allocate empty space until we can insert at the given index.
h.series = append(h.series, s) h.series = append(h.series, s)
@ -759,15 +754,18 @@ type memSeries struct {
lset labels.Labels lset labels.Labels
chunks []*memChunk chunks []*memChunk
nextAt int64 // timestamp at which to cut the next chunk.
maxt int64 // maximum timestamp for the series.
lastValue float64 lastValue float64
sampleBuf [4]sample sampleBuf [4]sample
app chunks.Appender // Current appender for the chunk. app chunks.Appender // Current appender for the chunk.
} }
func (s *memSeries) cut() *memChunk { func (s *memSeries) cut(mint int64) *memChunk {
c := &memChunk{ c := &memChunk{
chunk: chunks.NewXORChunk(), chunk: chunks.NewXORChunk(),
minTime: mint,
maxTime: math.MinInt64, maxTime: math.MinInt64,
} }
s.chunks = append(s.chunks, c) s.chunks = append(s.chunks, c)
@ -776,32 +774,47 @@ func (s *memSeries) cut() *memChunk {
if err != nil { if err != nil {
panic(err) panic(err)
} }
s.app = app s.app = app
return c return c
} }
func newMemSeries(lset labels.Labels, id uint32, maxt int64) *memSeries {
s := &memSeries{
lset: lset,
ref: id,
maxt: maxt,
nextAt: math.MinInt64,
}
return s
}
func (s *memSeries) append(t int64, v float64) bool { func (s *memSeries) append(t int64, v float64) bool {
const samplesPerChunk = 120
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
var c *memChunk var c *memChunk
if s.head().samples > 130 { if len(s.chunks) == 0 {
c = s.cut() c = s.cut(t)
c.minTime = t }
} else { c = s.head()
c = s.head() if c.maxTime >= t {
// Skip duplicate and out of order samples. return false
if c.maxTime >= t { }
return false if c.samples > samplesPerChunk/4 && t >= s.nextAt {
} c = s.cut(t)
} }
s.app.Append(t, v) s.app.Append(t, v)
c.maxTime = t c.maxTime = t
c.samples++ c.samples++
if c.samples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.maxt)
}
s.lastValue = v s.lastValue = v
s.sampleBuf[0] = s.sampleBuf[1] s.sampleBuf[0] = s.sampleBuf[1]
@ -812,6 +825,17 @@ func (s *memSeries) append(t int64, v float64) bool {
return true return true
} }
// computeChunkEndTime estimates the end timestamp based the beginning of a chunk,
// its current timestamp and the upper bound up to which we insert data.
// It assumes that the time range is 1/4 full.
func computeChunkEndTime(start, cur, max int64) int64 {
a := (max - start) / ((cur - start + 1) * 4)
if a == 0 {
return max
}
return start + (max-start)/a
}
func (s *memSeries) iterator(i int) chunks.Iterator { func (s *memSeries) iterator(i int) chunks.Iterator {
c := s.chunks[i] c := s.chunks[i]

View file

@ -731,3 +731,45 @@ Outer:
return ds return ds
} }
func TestComputeChunkEndTime(t *testing.T) {
cases := []struct {
start, cur, max int64
res int64
}{
{
start: 0,
cur: 250,
max: 1000,
res: 1000,
},
{
start: 100,
cur: 200,
max: 1000,
res: 550,
},
// Case where we fit floored 0 chunks. Must catch division by 0
// and default to maximum time.
{
start: 0,
cur: 500,
max: 1000,
res: 1000,
},
// Catch divison by zero for cur == start. Strictly not a possible case.
{
start: 100,
cur: 100,
max: 1000,
res: 104,
},
}
for _, c := range cases {
got := computeChunkEndTime(c.start, c.cur, c.max)
if got != c.res {
t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got)
}
}
}