From 5534e6c53c3b7fb408d4e3e0f0af1c02582fe6a3 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 12 May 2017 16:34:41 +0200 Subject: [PATCH 1/4] Make HeadBlock impl public, make interface private --- block.go | 4 ++-- db.go | 18 +++++++++--------- head.go | 49 +++++++++++++++++++++++++------------------------ head_test.go | 12 ++++++------ 4 files changed, 42 insertions(+), 41 deletions(-) diff --git a/block.go b/block.go index e1ccfd40d3..72ebb1f8a8 100644 --- a/block.go +++ b/block.go @@ -48,8 +48,8 @@ type Block interface { Queryable } -// HeadBlock is a regular block that can still be appended to. -type HeadBlock interface { +// headBlock is a regular block that can still be appended to. +type headBlock interface { Block Appendable } diff --git a/db.go b/db.go index 864b9cfbdc..51c49965c8 100644 --- a/db.go +++ b/db.go @@ -118,7 +118,7 @@ type DB struct { // or the general layout. // Must never be held when acquiring a blocks's mutex! headmtx sync.RWMutex - heads []HeadBlock + heads []headBlock compactor Compactor @@ -401,7 +401,7 @@ func (db *DB) reloadBlocks() error { var ( metas []*BlockMeta blocks []Block - heads []HeadBlock + heads []headBlock seqBlocks = make(map[int]Block, len(dirs)) ) @@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error { if meta.Compaction.Generation == 0 { if !ok { - b, err = openHeadBlock(dirs[i], db.logger) + b, err = OpenHeadBlock(dirs[i], db.logger) if err != nil { return errors.Wrapf(err, "load head at %s", dirs[i]) } @@ -426,7 +426,7 @@ func (db *DB) reloadBlocks() error { if meta.ULID != b.Meta().ULID { return errors.Errorf("head block ULID changed unexpectedly") } - heads = append(heads, b.(HeadBlock)) + heads = append(heads, b.(headBlock)) } else { if !ok || meta.ULID != b.Meta().ULID { b, err = newPersistedBlock(dirs[i]) @@ -559,7 +559,7 @@ func (a *dbAppender) appenderFor(t int64) (*metaAppender, error) { if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime { a.db.headmtx.Lock() - var newHeads []HeadBlock + var newHeads []headBlock if err := a.db.ensureHead(t); err != nil { a.db.headmtx.Unlock() @@ -670,9 +670,9 @@ func (a *dbAppender) Rollback() error { } // appendable returns a copy of a slice of HeadBlocks that can still be appended to. -func (db *DB) appendable() []HeadBlock { +func (db *DB) appendable() []headBlock { var i int - app := make([]HeadBlock, 0, db.opts.AppendableBlocks) + app := make([]headBlock, 0, db.opts.AppendableBlocks) if len(db.heads) > db.opts.AppendableBlocks { i = len(db.heads) - db.opts.AppendableBlocks @@ -711,14 +711,14 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. -func (db *DB) cut(mint int64) (HeadBlock, error) { +func (db *DB) cut(mint int64) (headBlock, error) { maxt := mint + int64(db.opts.MinBlockDuration) dir, seq, err := nextSequenceFile(db.dir, "b-") if err != nil { return nil, err } - newHead, err := createHeadBlock(dir, seq, db.logger, mint, maxt) + newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt) if err != nil { return nil, err } diff --git a/head.go b/head.go index 7d19011dd3..b39385fb64 100644 --- a/head.go +++ b/head.go @@ -47,8 +47,8 @@ var ( ErrOutOfBounds = errors.New("out of bounds") ) -// headBlock handles reads and writes of time series data within a time window. -type headBlock struct { +// HeadBlock handles reads and writes of time series data within a time window. +type HeadBlock struct { mtx sync.RWMutex dir string wal *WAL @@ -69,7 +69,8 @@ type headBlock struct { meta BlockMeta } -func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*headBlock, error) { +// CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt). +func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) { // Make head block creation appear atomic. tmp := dir + ".tmp" @@ -95,11 +96,11 @@ func createHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*head if err := renameFile(tmp, dir); err != nil { return nil, err } - return openHeadBlock(dir, l) + return OpenHeadBlock(dir, l) } -// openHeadBlock creates a new empty head block. -func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { +// OpenHeadBlock opens the head block in dir. +func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) if err != nil { return nil, err @@ -109,7 +110,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) { return nil, err } - h := &headBlock{ + h := &HeadBlock{ dir: dir, wal: wal, series: []*memSeries{nil}, // 0 is not a valid posting, filled with nil. @@ -151,16 +152,16 @@ Outer: // inBounds returns true if the given timestamp is within the valid // time bounds of the block. -func (h *headBlock) inBounds(t int64) bool { +func (h *HeadBlock) inBounds(t int64) bool { return t >= h.meta.MinTime && t <= h.meta.MaxTime } -func (h *headBlock) String() string { +func (h *HeadBlock) String() string { return fmt.Sprintf("(%d, %s)", h.meta.Sequence, h.meta.ULID) } // Close syncs all data and closes underlying resources of the head block. -func (h *headBlock) Close() error { +func (h *HeadBlock) Close() error { h.mtx.Lock() defer h.mtx.Unlock() @@ -184,7 +185,7 @@ func (h *headBlock) Close() error { return nil } -func (h *headBlock) Meta() BlockMeta { +func (h *HeadBlock) Meta() BlockMeta { m := BlockMeta{ ULID: h.meta.ULID, Sequence: h.meta.Sequence, @@ -200,12 +201,12 @@ func (h *headBlock) Meta() BlockMeta { return m } -func (h *headBlock) Dir() string { return h.dir } -func (h *headBlock) Persisted() bool { return false } -func (h *headBlock) Index() IndexReader { return &headIndexReader{h} } -func (h *headBlock) Chunks() ChunkReader { return &headChunkReader{h} } +func (h *HeadBlock) Dir() string { return h.dir } +func (h *HeadBlock) Persisted() bool { return false } +func (h *HeadBlock) Index() IndexReader { return &headIndexReader{h} } +func (h *HeadBlock) Chunks() ChunkReader { return &headChunkReader{h} } -func (h *headBlock) Querier(mint, maxt int64) Querier { +func (h *HeadBlock) Querier(mint, maxt int64) Querier { h.mtx.RLock() defer h.mtx.RUnlock() @@ -244,7 +245,7 @@ func (h *headBlock) Querier(mint, maxt int64) Querier { } } -func (h *headBlock) Appender() Appender { +func (h *HeadBlock) Appender() Appender { atomic.AddUint64(&h.activeWriters, 1) h.mtx.RLock() @@ -252,10 +253,10 @@ func (h *headBlock) Appender() Appender { if h.closed { panic(fmt.Sprintf("block %s already closed", h.dir)) } - return &headAppender{headBlock: h, samples: getHeadAppendBuffer()} + return &headAppender{HeadBlock: h, samples: getHeadAppendBuffer()} } -func (h *headBlock) Busy() bool { +func (h *HeadBlock) Busy() bool { return atomic.LoadUint64(&h.activeWriters) > 0 } @@ -274,7 +275,7 @@ func putHeadAppendBuffer(b []refdSample) { } type headAppender struct { - *headBlock + *HeadBlock newSeries map[uint64]hashedLabels newHashes map[uint64]uint64 @@ -454,7 +455,7 @@ func (a *headAppender) Rollback() error { } type headChunkReader struct { - *headBlock + *HeadBlock } // Chunk returns the chunk for the reference number. @@ -490,7 +491,7 @@ func (c *safeChunk) Iterator() chunks.Iterator { // func (c *safeChunk) Encoding() chunks.Encoding { panic("illegal") } type headIndexReader struct { - *headBlock + *HeadBlock } // LabelValues returns the possible label values @@ -558,7 +559,7 @@ func (h *headIndexReader) LabelIndices() ([][]string, error) { // get retrieves the chunk with the hash and label set and creates // a new one if it doesn't exist yet. -func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) get(hash uint64, lset labels.Labels) *memSeries { series := h.hashes[hash] for _, s := range series { @@ -569,7 +570,7 @@ func (h *headBlock) get(hash uint64, lset labels.Labels) *memSeries { return nil } -func (h *headBlock) create(hash uint64, lset labels.Labels) *memSeries { +func (h *HeadBlock) create(hash uint64, lset labels.Labels) *memSeries { s := &memSeries{ lset: lset, ref: uint32(len(h.series)), diff --git a/head_test.go b/head_test.go index f7d9b19f8c..45b4f49d0f 100644 --- a/head_test.go +++ b/head_test.go @@ -39,7 +39,7 @@ func BenchmarkCreateSeries(b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - h, err := createHeadBlock(dir, 0, nil, 0, 1) + h, err := CreateHeadBlock(dir, 0, nil, 0, 1) require.NoError(b, err) b.ReportAllocs() @@ -93,7 +93,7 @@ func TestAmendDatapointCausesError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -110,7 +110,7 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -127,7 +127,7 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err, "Error creating head block") app := hb.Appender() @@ -144,7 +144,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) require.NoError(t, err) // Append AmendedValue. @@ -246,7 +246,7 @@ func TestHeadBlock_e2e(t *testing.T) { tmpdir, _ := ioutil.TempDir("", "test") defer os.RemoveAll(tmpdir) - hb, err := createHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) + hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) require.NoError(t, err) app := hb.Appender() From 535532ca02ec4ac11a2ef5844525953598a7d866 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Fri, 12 May 2017 17:06:26 +0200 Subject: [PATCH 2/4] Export refdSample The type was part of a exported method signatures and should therefore be exported as well. --- head.go | 38 ++++++++++++---------------- wal.go | 71 ++++++++++++++++++++++++++++------------------------- wal_test.go | 34 ++++++++++++------------- 3 files changed, 71 insertions(+), 72 deletions(-) diff --git a/head.go b/head.go index b39385fb64..6e3dd21c4c 100644 --- a/head.go +++ b/head.go @@ -131,13 +131,13 @@ Outer: h.meta.Stats.NumSeries++ } for _, s := range samples { - if int(s.ref) >= len(h.series) { - l.Log("msg", "unknown series reference, abort WAL restore", "got", s.ref, "max", len(h.series)-1) + if int(s.Ref) >= len(h.series) { + l.Log("msg", "unknown series reference, abort WAL restore", "got", s.Ref, "max", len(h.series)-1) break Outer } - h.series[s.ref].append(s.t, s.v) + h.series[s.Ref].append(s.T, s.V) - if !h.inBounds(s.t) { + if !h.inBounds(s.T) { return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") } h.meta.Stats.NumSamples++ @@ -262,15 +262,15 @@ func (h *HeadBlock) Busy() bool { var headPool = sync.Pool{} -func getHeadAppendBuffer() []refdSample { +func getHeadAppendBuffer() []RefSample { b := headPool.Get() if b == nil { - return make([]refdSample, 0, 512) + return make([]RefSample, 0, 512) } - return b.([]refdSample) + return b.([]RefSample) } -func putHeadAppendBuffer(b []refdSample) { +func putHeadAppendBuffer(b []RefSample) { headPool.Put(b[:0]) } @@ -282,7 +282,7 @@ type headAppender struct { refmap map[uint64]uint64 newLabels []labels.Labels - samples []refdSample + samples []RefSample } type hashedLabels struct { @@ -290,12 +290,6 @@ type hashedLabels struct { labels labels.Labels } -type refdSample struct { - ref uint64 - t int64 - v float64 -} - func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { if !a.inBounds(t) { return 0, ErrOutOfBounds @@ -370,10 +364,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } } - a.samples = append(a.samples, refdSample{ - ref: ref, - t: t, - v: v, + a.samples = append(a.samples, RefSample{ + Ref: ref, + T: t, + V: v, }) return nil } @@ -419,8 +413,8 @@ func (a *headAppender) Commit() error { for i := range a.samples { s := &a.samples[i] - if s.ref&(1<<32) > 0 { - s.ref = a.refmap[s.ref] + if s.Ref&(1<<32) > 0 { + s.Ref = a.refmap[s.Ref] } } @@ -434,7 +428,7 @@ func (a *headAppender) Commit() error { total := uint64(len(a.samples)) for _, s := range a.samples { - if !a.series[s.ref].append(s.t, s.v) { + if !a.series[s.Ref].append(s.T, s.V) { total-- } } diff --git a/wal.go b/wal.go index 853065f695..80f3508e8e 100644 --- a/wal.go +++ b/wal.go @@ -50,7 +50,7 @@ const ( ) // WAL is a write ahead log for series data. It can only be written to. -// Use WALReader to read back from a write ahead log. +// Use walReader to read back from a write ahead log. type WAL struct { mtx sync.Mutex @@ -69,6 +69,13 @@ type WAL struct { donec chan struct{} } +// RefSample is a timestamp/value pair associated with a reference to a series. +type RefSample struct { + Ref uint64 + T int64 + V float64 +} + const ( walDirName = "wal" walSegmentSizeBytes = 256 * 1024 * 1024 // 256 MB @@ -119,12 +126,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *WAL) Reader() *WALReader { - return NewWALReader(w.logger, w) +func (w *WAL) Reader() WALReader { + return newWALReader(w, w.logger) } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []refdSample) error { +func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { if err := w.encodeSeries(series); err != nil { return err } @@ -395,7 +402,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (w *WAL) encodeSamples(samples []refdSample) error { +func (w *WAL) encodeSamples(samples []RefSample) error { if len(samples) == 0 { return nil } @@ -409,27 +416,27 @@ func (w *WAL) encodeSamples(samples []refdSample) error { // TODO(fabxc): optimize for all samples having the same timestamp. first := samples[0] - binary.BigEndian.PutUint64(b, first.ref) + binary.BigEndian.PutUint64(b, first.Ref) buf = append(buf, b[:8]...) - binary.BigEndian.PutUint64(b, uint64(first.t)) + binary.BigEndian.PutUint64(b, uint64(first.T)) buf = append(buf, b[:8]...) for _, s := range samples { - n := binary.PutVarint(b, int64(s.ref)-int64(first.ref)) + n := binary.PutVarint(b, int64(s.Ref)-int64(first.Ref)) buf = append(buf, b[:n]...) - n = binary.PutVarint(b, s.t-first.t) + n = binary.PutVarint(b, s.T-first.T) buf = append(buf, b[:n]...) - binary.BigEndian.PutUint64(b, math.Float64bits(s.v)) + binary.BigEndian.PutUint64(b, math.Float64bits(s.V)) buf = append(buf, b[:8]...) } return w.entry(WALEntrySamples, walSamplesSimple, buf) } -// WALReader decodes and emits write ahead log entries. -type WALReader struct { +// walReader decodes and emits write ahead log entries. +type walReader struct { logger log.Logger wal *WAL @@ -439,37 +446,35 @@ type WALReader struct { err error labels []labels.Labels - samples []refdSample + samples []RefSample } -// NewWALReader returns a new WALReader over the sequence of the given ReadClosers. -func NewWALReader(logger log.Logger, w *WAL) *WALReader { - if logger == nil { - logger = log.NewNopLogger() +func newWALReader(w *WAL, l log.Logger) *walReader { + if l == nil { + l = log.NewNopLogger() } - r := &WALReader{ - logger: logger, + return &walReader{ + logger: l, wal: w, buf: make([]byte, 0, 128*4096), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } - return r } // At returns the last decoded entry of labels or samples. // The returned slices are only valid until the next call to Next(). Their elements // have to be copied to preserve them. -func (r *WALReader) At() ([]labels.Labels, []refdSample) { +func (r *walReader) At() ([]labels.Labels, []RefSample) { return r.labels, r.samples } // Err returns the last error the reader encountered. -func (r *WALReader) Err() error { +func (r *walReader) Err() error { return r.err } // nextEntry retrieves the next entry. It is also used as a testing hook. -func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { +func (r *walReader) nextEntry() (WALEntryType, byte, []byte, error) { if r.cur >= len(r.wal.files) { return 0, 0, nil, io.EOF } @@ -492,7 +497,7 @@ func (r *WALReader) nextEntry() (WALEntryType, byte, []byte, error) { // Next returns decodes the next entry pair and returns true // if it was succesful. -func (r *WALReader) Next() bool { +func (r *walReader) Next() bool { r.labels = r.labels[:0] r.samples = r.samples[:0] @@ -549,12 +554,12 @@ func (r *WALReader) Next() bool { return r.err == nil } -func (r *WALReader) current() *os.File { +func (r *walReader) current() *os.File { return r.wal.files[r.cur] } // truncate the WAL after the last valid entry. -func (r *WALReader) truncate(lastOffset int64) error { +func (r *walReader) truncate(lastOffset int64) error { r.logger.Log("msg", "WAL corruption detected; truncating", "err", r.err, "file", r.current().Name(), "pos", lastOffset) @@ -582,7 +587,7 @@ func walCorruptionErrf(s string, args ...interface{}) error { return walCorruptionErr(errors.Errorf(s, args...)) } -func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { +func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { r.crc32.Reset() tr := io.TeeReader(cr, r.crc32) @@ -629,7 +634,7 @@ func (r *WALReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) { return etype, flag, buf, nil } -func (r *WALReader) decodeSeries(flag byte, b []byte) error { +func (r *walReader) decodeSeries(flag byte, b []byte) error { for len(b) > 0 { l, n := binary.Uvarint(b) if n < 1 { @@ -659,7 +664,7 @@ func (r *WALReader) decodeSeries(flag byte, b []byte) error { return nil } -func (r *WALReader) decodeSamples(flag byte, b []byte) error { +func (r *walReader) decodeSamples(flag byte, b []byte) error { if len(b) < 16 { return errors.Wrap(errInvalidSize, "header length") } @@ -670,7 +675,7 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { b = b[16:] for len(b) > 0 { - var smpl refdSample + var smpl RefSample dref, n := binary.Varint(b) if n < 1 { @@ -678,19 +683,19 @@ func (r *WALReader) decodeSamples(flag byte, b []byte) error { } b = b[n:] - smpl.ref = uint64(int64(baseRef) + dref) + smpl.Ref = uint64(int64(baseRef) + dref) dtime, n := binary.Varint(b) if n < 1 { return errors.Wrap(errInvalidSize, "sample timestamp delta") } b = b[n:] - smpl.t = baseTime + dtime + smpl.T = baseTime + dtime if len(b) < 8 { return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b)) } - smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) + smpl.V = float64(math.Float64frombits(binary.BigEndian.Uint64(b))) b = b[8:] r.samples = append(r.samples, smpl) diff --git a/wal_test.go b/wal_test.go index 34508a1b4f..99822ec01f 100644 --- a/wal_test.go +++ b/wal_test.go @@ -122,7 +122,7 @@ func TestWAL_cut(t *testing.T) { // We cannot actually check for correct pre-allocation as it is // optional per filesystem and handled transparently. - et, flag, b, err := NewWALReader(nil, nil).entry(f) + et, flag, b, err := newWALReader(nil, nil).entry(f) require.NoError(t, err) require.Equal(t, WALEntrySeries, et) require.Equal(t, flag, byte(walSeriesSimple)) @@ -148,7 +148,7 @@ func TestWAL_Log_Restore(t *testing.T) { var ( recordedSeries [][]labels.Labels - recordedSamples [][]refdSample + recordedSamples [][]RefSample ) var totalSamples int @@ -165,7 +165,7 @@ func TestWAL_Log_Restore(t *testing.T) { var ( resultSeries [][]labels.Labels - resultSamples [][]refdSample + resultSamples [][]RefSample ) for r.Next() { @@ -177,7 +177,7 @@ func TestWAL_Log_Restore(t *testing.T) { resultSeries = append(resultSeries, clsets) } if len(smpls) > 0 { - csmpls := make([]refdSample, len(smpls)) + csmpls := make([]RefSample, len(smpls)) copy(csmpls, smpls) resultSamples = append(resultSamples, csmpls) } @@ -191,13 +191,13 @@ func TestWAL_Log_Restore(t *testing.T) { // Insert in batches and generate different amounts of samples for each. for i := 0; i < len(series); i += stepSize { - var samples []refdSample + var samples []RefSample for j := 0; j < i*10; j++ { - samples = append(samples, refdSample{ - ref: uint64(j % 10000), - t: int64(j * 2), - v: rand.Float64(), + samples = append(samples, RefSample{ + Ref: uint64(j % 10000), + T: int64(j * 2), + V: rand.Float64(), }) } @@ -292,13 +292,13 @@ func TestWALRestoreCorrupted(t *testing.T) { w, err := OpenWAL(dir, nil, 0) require.NoError(t, err) - require.NoError(t, w.Log(nil, []refdSample{{t: 1, v: 2}})) - require.NoError(t, w.Log(nil, []refdSample{{t: 2, v: 3}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 2, V: 3}})) require.NoError(t, w.cut()) - require.NoError(t, w.Log(nil, []refdSample{{t: 3, v: 4}})) - require.NoError(t, w.Log(nil, []refdSample{{t: 5, v: 6}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 3, V: 4}})) + require.NoError(t, w.Log(nil, []RefSample{{T: 5, V: 6}})) require.NoError(t, w.Close()) @@ -318,13 +318,13 @@ func TestWALRestoreCorrupted(t *testing.T) { require.True(t, r.Next()) l, s := r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) // Truncation should happen transparently and now cause an error. require.False(t, r.Next()) require.Nil(t, r.Err()) - require.NoError(t, w2.Log(nil, []refdSample{{t: 99, v: 100}})) + require.NoError(t, w2.Log(nil, []RefSample{{T: 99, V: 100}})) require.NoError(t, w2.Close()) files, err := fileutil.ReadDir(dir) @@ -341,12 +341,12 @@ func TestWALRestoreCorrupted(t *testing.T) { require.True(t, r.Next()) l, s = r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 1, v: 2}}, s) + require.Equal(t, []RefSample{{T: 1, V: 2}}, s) require.True(t, r.Next()) l, s = r.At() require.Equal(t, 0, len(l)) - require.Equal(t, []refdSample{{t: 99, v: 100}}, s) + require.Equal(t, []RefSample{{T: 99, V: 100}}, s) require.False(t, r.Next()) require.Nil(t, r.Err()) From 4862b261d0c5839bb2474bbd9a42ae0f71811c28 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 13 May 2017 17:09:26 +0200 Subject: [PATCH 3/4] Abstract WAL into interface --- head.go | 8 +++----- wal.go | 54 +++++++++++++++++++++++++++++++++-------------------- wal_test.go | 30 ++++++++++++++--------------- 3 files changed, 52 insertions(+), 40 deletions(-) diff --git a/head.go b/head.go index 6e3dd21c4c..6edbae8948 100644 --- a/head.go +++ b/head.go @@ -51,7 +51,7 @@ var ( type HeadBlock struct { mtx sync.RWMutex dir string - wal *WAL + wal WAL activeWriters uint64 closed bool @@ -101,7 +101,7 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head // OpenHeadBlock opens the head block in dir. func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { - wal, err := OpenWAL(dir, log.With(l, "component", "wal"), 5*time.Second) + wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second) if err != nil { return nil, err } @@ -122,7 +122,6 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { r := wal.Reader() -Outer: for r.Next() { series, samples := r.At() @@ -132,8 +131,7 @@ Outer: } for _, s := range samples { if int(s.Ref) >= len(h.series) { - l.Log("msg", "unknown series reference, abort WAL restore", "got", s.Ref, "max", len(h.series)-1) - break Outer + return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) diff --git a/wal.go b/wal.go index 80f3508e8e..251944f0bf 100644 --- a/wal.go +++ b/wal.go @@ -49,9 +49,8 @@ const ( WALEntrySamples WALEntryType = 3 ) -// WAL is a write ahead log for series data. It can only be written to. -// Use walReader to read back from a write ahead log. -type WAL struct { +// SegmentWAL is a write ahead log for series data. +type SegmentWAL struct { mtx sync.Mutex dirFile *os.File @@ -69,6 +68,21 @@ type WAL struct { donec chan struct{} } +// WAL is a write ahead log that can log new series labels and samples. +// It must be completely read before new entries are logged. +type WAL interface { + Reader() WALReader + Log([]labels.Labels, []RefSample) error + Close() error +} + +// WALReader reads entries from a WAL. +type WALReader interface { + At() ([]labels.Labels, []RefSample) + Next() bool + Err() error +} + // RefSample is a timestamp/value pair associated with a reference to a series. type RefSample struct { Ref uint64 @@ -90,9 +104,9 @@ func init() { castagnoliTable = crc32.MakeTable(crc32.Castagnoli) } -// OpenWAL opens or creates a write ahead log in the given directory. +// OpenSegmentWAL opens or creates a write ahead log in the given directory. // The WAL must be read completely before new data is written. -func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, error) { +func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) { dir = filepath.Join(dir, walDirName) if err := os.MkdirAll(dir, 0777); err != nil { @@ -106,7 +120,7 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, logger = log.NewNopLogger() } - w := &WAL{ + w := &SegmentWAL{ dirFile: df, logger: logger, flushInterval: flushInterval, @@ -126,12 +140,12 @@ func OpenWAL(dir string, logger log.Logger, flushInterval time.Duration) (*WAL, // Reader returns a new reader over the the write ahead log data. // It must be completely consumed before writing to the WAL. -func (w *WAL) Reader() WALReader { +func (w *SegmentWAL) Reader() WALReader { return newWALReader(w, w.logger) } // Log writes a batch of new series labels and samples to the log. -func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { +func (w *SegmentWAL) Log(series []labels.Labels, samples []RefSample) error { if err := w.encodeSeries(series); err != nil { return err } @@ -146,7 +160,7 @@ func (w *WAL) Log(series []labels.Labels, samples []RefSample) error { // initSegments finds all existing segment files and opens them in the // appropriate file modes. -func (w *WAL) initSegments() error { +func (w *SegmentWAL) initSegments() error { fns, err := sequenceFiles(w.dirFile.Name(), "") if err != nil { return err @@ -187,7 +201,7 @@ func (w *WAL) initSegments() error { // cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. -func (w *WAL) cut() error { +func (w *SegmentWAL) cut() error { // Sync current tail to disk and close. if tf := w.tail(); tf != nil { if err := w.sync(); err != nil { @@ -236,7 +250,7 @@ func (w *WAL) cut() error { return nil } -func (w *WAL) tail() *os.File { +func (w *SegmentWAL) tail() *os.File { if len(w.files) == 0 { return nil } @@ -244,14 +258,14 @@ func (w *WAL) tail() *os.File { } // Sync flushes the changes to disk. -func (w *WAL) Sync() error { +func (w *SegmentWAL) Sync() error { w.mtx.Lock() defer w.mtx.Unlock() return w.sync() } -func (w *WAL) sync() error { +func (w *SegmentWAL) sync() error { if w.cur == nil { return nil } @@ -261,7 +275,7 @@ func (w *WAL) sync() error { return fileutil.Fdatasync(w.tail()) } -func (w *WAL) run(interval time.Duration) { +func (w *SegmentWAL) run(interval time.Duration) { var tick <-chan time.Time if interval > 0 { @@ -284,7 +298,7 @@ func (w *WAL) run(interval time.Duration) { } // Close syncs all data and closes the underlying resources. -func (w *WAL) Close() error { +func (w *SegmentWAL) Close() error { close(w.stopc) <-w.donec @@ -312,7 +326,7 @@ const ( walPageBytes = 16 * minSectorSize ) -func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { +func (w *SegmentWAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() @@ -376,7 +390,7 @@ func putWALBuffer(b []byte) { walBuffers.Put(b) } -func (w *WAL) encodeSeries(series []labels.Labels) error { +func (w *SegmentWAL) encodeSeries(series []labels.Labels) error { if len(series) == 0 { return nil } @@ -402,7 +416,7 @@ func (w *WAL) encodeSeries(series []labels.Labels) error { return w.entry(WALEntrySeries, walSeriesSimple, buf) } -func (w *WAL) encodeSamples(samples []RefSample) error { +func (w *SegmentWAL) encodeSamples(samples []RefSample) error { if len(samples) == 0 { return nil } @@ -439,7 +453,7 @@ func (w *WAL) encodeSamples(samples []RefSample) error { type walReader struct { logger log.Logger - wal *WAL + wal *SegmentWAL cur int buf []byte crc32 hash.Hash32 @@ -449,7 +463,7 @@ type walReader struct { samples []RefSample } -func newWALReader(w *WAL, l log.Logger) *walReader { +func newWALReader(w *SegmentWAL, l log.Logger) *walReader { if l == nil { l = log.NewNopLogger() } diff --git a/wal_test.go b/wal_test.go index 99822ec01f..cbd6c62477 100644 --- a/wal_test.go +++ b/wal_test.go @@ -27,7 +27,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestWAL_initSegments(t *testing.T) { +func TestSegmentWAL_initSegments(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_open") require.NoError(t, err) defer os.RemoveAll(tmpdir) @@ -35,7 +35,7 @@ func TestWAL_initSegments(t *testing.T) { df, err := fileutil.OpenDir(tmpdir) require.NoError(t, err) - w := &WAL{dirFile: df} + w := &SegmentWAL{dirFile: df} // Create segment files with an appropriate header. for i := 1; i <= 5; i++ { @@ -80,7 +80,7 @@ func TestWAL_initSegments(t *testing.T) { _, err = f.WriteAt([]byte{0}, 4) require.NoError(t, err) - w = &WAL{dirFile: df} + w = &SegmentWAL{dirFile: df} require.Error(t, w.initSegments(), "init corrupted segments") for _, f := range w.files { @@ -88,13 +88,13 @@ func TestWAL_initSegments(t *testing.T) { } } -func TestWAL_cut(t *testing.T) { +func TestSegmentWAL_cut(t *testing.T) { tmpdir, err := ioutil.TempDir("", "test_wal_cut") require.NoError(t, err) defer os.RemoveAll(tmpdir) // This calls cut() implicitly the first time without a previous tail. - w, err := OpenWAL(tmpdir, nil, 0) + w, err := OpenSegmentWAL(tmpdir, nil, 0) require.NoError(t, err) require.NoError(t, w.entry(WALEntrySeries, 1, []byte("Hello World!!"))) @@ -131,7 +131,7 @@ func TestWAL_cut(t *testing.T) { } // Symmetrical test of reading and writing to the WAL via its main interface. -func TestWAL_Log_Restore(t *testing.T) { +func TestSegmentWAL_Log_Restore(t *testing.T) { const ( numMetrics = 5000 iterations = 5 @@ -155,7 +155,7 @@ func TestWAL_Log_Restore(t *testing.T) { // Open WAL a bunch of times, validate all previous data can be read, // write more data to it, close it. for k := 0; k < numMetrics; k += numMetrics / iterations { - w, err := OpenWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) // Set smaller segment size so we can actually write several files. @@ -222,11 +222,11 @@ func TestWAL_Log_Restore(t *testing.T) { func TestWALRestoreCorrupted(t *testing.T) { cases := []struct { name string - f func(*testing.T, *WAL) + f func(*testing.T, *SegmentWAL) }{ { name: "truncate_checksum", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -239,7 +239,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "truncate_body", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -252,7 +252,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "body_content", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -267,7 +267,7 @@ func TestWALRestoreCorrupted(t *testing.T) { }, { name: "checksum", - f: func(t *testing.T, w *WAL) { + f: func(t *testing.T, w *SegmentWAL) { f, err := os.OpenFile(w.files[0].Name(), os.O_WRONLY, 0666) require.NoError(t, err) defer f.Close() @@ -289,7 +289,7 @@ func TestWALRestoreCorrupted(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - w, err := OpenWAL(dir, nil, 0) + w, err := OpenSegmentWAL(dir, nil, 0) require.NoError(t, err) require.NoError(t, w.Log(nil, []RefSample{{T: 1, V: 2}})) @@ -310,7 +310,7 @@ func TestWALRestoreCorrupted(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) - w2, err := OpenWAL(dir, logger, 0) + w2, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) r := w2.Reader() @@ -333,7 +333,7 @@ func TestWALRestoreCorrupted(t *testing.T) { // We should see the first valid entry and the new one, everything after // is truncated. - w3, err := OpenWAL(dir, logger, 0) + w3, err := OpenSegmentWAL(dir, logger, 0) require.NoError(t, err) r = w3.Reader() From 8b51b7e2be38c0fb7f54c38df65dcd3b45053d70 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Sat, 13 May 2017 18:14:18 +0200 Subject: [PATCH 4/4] Make WAL for HeadBlock composeable. --- db.go | 25 ++++++++++++++++++++-- head.go | 37 +++++++++++++------------------- head_test.go | 60 +++++++++++++++++++++++++++++----------------------- 3 files changed, 72 insertions(+), 50 deletions(-) diff --git a/db.go b/db.go index 51c49965c8..c4e97abcc1 100644 --- a/db.go +++ b/db.go @@ -418,7 +418,7 @@ func (db *DB) reloadBlocks() error { if meta.Compaction.Generation == 0 { if !ok { - b, err = OpenHeadBlock(dirs[i], db.logger) + b, err = db.openHeadBlock(dirs[i]) if err != nil { return errors.Wrapf(err, "load head at %s", dirs[i]) } @@ -709,6 +709,24 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block { return bs } +// openHeadBlock opens the head block at dir. +func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) { + var ( + wdir = filepath.Join(dir, "wal") + l = log.With(db.logger, "wal", wdir) + ) + wal, err := OpenSegmentWAL(wdir, l, 5*time.Second) + if err != nil { + return nil, errors.Wrap(err, "open WAL %s") + } + + h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal) + if err != nil { + return nil, errors.Wrapf(err, "open head block %s", dir) + } + return h, nil +} + // cut starts a new head block to append to. The completed head block // will still be appendable for the configured grace period. func (db *DB) cut(mint int64) (headBlock, error) { @@ -718,7 +736,10 @@ func (db *DB) cut(mint int64) (headBlock, error) { if err != nil { return nil, err } - newHead, err := CreateHeadBlock(dir, seq, db.logger, mint, maxt) + if err := TouchHeadBlock(dir, seq, mint, maxt); err != nil { + return nil, errors.Wrapf(err, "touch head block %s", dir) + } + newHead, err := db.openHeadBlock(dir) if err != nil { return nil, err } diff --git a/head.go b/head.go index 6edbae8948..b71bbafc09 100644 --- a/head.go +++ b/head.go @@ -69,20 +69,21 @@ type HeadBlock struct { meta BlockMeta } -// CreateHeadBlock creates a new head block in dir that holds samples in the range [mint,maxt). -func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*HeadBlock, error) { +// TouchHeadBlock atomically touches a new head block in dir for +// samples in the range [mint,maxt). +func TouchHeadBlock(dir string, seq int, mint, maxt int64) error { // Make head block creation appear atomic. tmp := dir + ".tmp" if err := os.MkdirAll(tmp, 0777); err != nil { - return nil, err + return err } entropy := rand.New(rand.NewSource(time.Now().UnixNano())) ulid, err := ulid.New(ulid.Now(), entropy) if err != nil { - return nil, err + return err } if err := writeMetaFile(tmp, &BlockMeta{ @@ -91,20 +92,13 @@ func CreateHeadBlock(dir string, seq int, l log.Logger, mint, maxt int64) (*Head MinTime: mint, MaxTime: maxt, }); err != nil { - return nil, err + return err } - if err := renameFile(tmp, dir); err != nil { - return nil, err - } - return OpenHeadBlock(dir, l) + return renameFile(tmp, dir) } // OpenHeadBlock opens the head block in dir. -func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { - wal, err := OpenSegmentWAL(dir, log.With(l, "component", "wal"), 5*time.Second) - if err != nil { - return nil, err - } +func OpenHeadBlock(dir string, l log.Logger, wal WAL) (*HeadBlock, error) { meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -119,8 +113,11 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { postings: &memPostings{m: make(map[term][]uint32)}, meta: *meta, } + return h, h.init() +} - r := wal.Reader() +func (h *HeadBlock) init() error { + r := h.wal.Reader() for r.Next() { series, samples := r.At() @@ -131,21 +128,17 @@ func OpenHeadBlock(dir string, l log.Logger) (*HeadBlock, error) { } for _, s := range samples { if int(s.Ref) >= len(h.series) { - return nil, errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) + return errors.Errorf("unknown series reference %d (max %d); abort WAL restore", s.Ref, len(h.series)) } h.series[s.Ref].append(s.T, s.V) if !h.inBounds(s.T) { - return nil, errors.Wrap(ErrOutOfBounds, "consume WAL") + return errors.Wrap(ErrOutOfBounds, "consume WAL") } h.meta.Stats.NumSamples++ } } - if err := r.Err(); err != nil { - return nil, errors.Wrap(err, "consume WAL") - } - - return h, nil + return errors.Wrap(r.Err(), "consume WAL") } // inBounds returns true if the given timestamp is within the valid diff --git a/head_test.go b/head_test.go index 45b4f49d0f..aa9138060d 100644 --- a/head_test.go +++ b/head_test.go @@ -20,6 +20,7 @@ import ( "os" "sort" "testing" + "time" "unsafe" "github.com/pkg/errors" @@ -30,6 +31,19 @@ import ( "github.com/stretchr/testify/require" ) +// createTestHeadBlock creates a new head block with a SegmentWAL. +func createTestHeadBlock(t testing.TB, dir string, mint, maxt int64) *HeadBlock { + err := TouchHeadBlock(dir, 0, mint, maxt) + require.NoError(t, err) + + wal, err := OpenSegmentWAL(dir, nil, 5*time.Second) + require.NoError(t, err) + + h, err := OpenHeadBlock(dir, nil, wal) + require.NoError(t, err) + return h +} + func BenchmarkCreateSeries(b *testing.B) { lbls, err := readPrometheusLabels("cmd/tsdb/testdata.1m", 1e6) require.NoError(b, err) @@ -39,8 +53,7 @@ func BenchmarkCreateSeries(b *testing.B) { require.NoError(b, err) defer os.RemoveAll(dir) - h, err := CreateHeadBlock(dir, 0, nil, 0, 1) - require.NoError(b, err) + h := createTestHeadBlock(b, dir, 0, 1) b.ReportAllocs() b.ResetTimer() @@ -90,14 +103,13 @@ func readPrometheusLabels(fn string, n int) ([]labels.Labels, error) { } func TestAmendDatapointCausesError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, 0) + _, err := app.Add(labels.Labels{}, 0, 0) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -107,14 +119,13 @@ func TestAmendDatapointCausesError(t *testing.T) { } func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.NaN()) + _, err := app.Add(labels.Labels{}, 0, math.NaN()) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -124,14 +135,13 @@ func TestDuplicateNaNDatapointNoAmendError(t *testing.T) { } func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err, "Error creating head block") + hb := createTestHeadBlock(t, dir, 0, 1000) app := hb.Appender() - _, err = app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) + _, err := app.Add(labels.Labels{}, 0, math.Float64frombits(0x7ff0000000000001)) require.NoError(t, err, "Failed to add sample") require.NoError(t, app.Commit(), "Unexpected error committing appender") @@ -141,15 +151,14 @@ func TestNonDuplicateNaNDatapointsCausesAmendError(t *testing.T) { } func TestSkippingInvalidValuesInSameTxn(t *testing.T) { - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, 0, 1000) - require.NoError(t, err) + hb := createTestHeadBlock(t, dir, 0, 1000) // Append AmendedValue. app := hb.Appender() - _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 1) + _, err := app.Add(labels.Labels{{"a", "b"}}, 0, 1) require.NoError(t, err) _, err = app.Add(labels.Labels{{"a", "b"}}, 0, 2) require.NoError(t, err) @@ -243,11 +252,10 @@ func TestHeadBlock_e2e(t *testing.T) { seriesMap[labels.New(l...).String()] = []sample{} } - tmpdir, _ := ioutil.TempDir("", "test") - defer os.RemoveAll(tmpdir) + dir, _ := ioutil.TempDir("", "test") + defer os.RemoveAll(dir) - hb, err := CreateHeadBlock(tmpdir+"/hb", 0, nil, minTime, maxTime) - require.NoError(t, err) + hb := createTestHeadBlock(t, dir, minTime, maxTime) app := hb.Appender() for _, l := range lbls {