Merge remote-tracking branch 'upstream/master' into delete-compact-block-on-reload-error

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2019-01-30 11:39:19 +02:00
commit 7245c6dc33
3 changed files with 23 additions and 14 deletions

View file

@ -352,30 +352,31 @@ func (s *Reader) Size() int64 {
return s.size return s.size
} }
// Chunk returns a chunk from a given reference.
func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
var ( var (
seq = int(ref >> 32) sgmSeq = int(ref >> 32)
off = int((ref << 32) >> 32) sgmOffset = int((ref << 32) >> 32)
) )
if seq >= len(s.bs) { if sgmSeq >= len(s.bs) {
return nil, errors.Errorf("reference sequence %d out of range", seq) return nil, errors.Errorf("reference sequence %d out of range", sgmSeq)
} }
b := s.bs[seq] chkS := s.bs[sgmSeq]
if off >= b.Len() { if sgmOffset >= chkS.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len())
} }
// With the minimum chunk length this should never cause us reading // With the minimum chunk length this should never cause us reading
// over the end of the slice. // over the end of the slice.
r := b.Range(off, off+binary.MaxVarintLen32) chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32)
l, n := binary.Uvarint(r) chkLen, n := binary.Uvarint(chk)
if n <= 0 { if n <= 0 {
return nil, errors.Errorf("reading chunk length failed with %d", n) return nil, errors.Errorf("reading chunk length failed with %d", n)
} }
r = b.Range(off+n, off+n+1+int(l)) chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen))
return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen])
} }
func nextSequenceFile(dir string) (string, int, error) { func nextSequenceFile(dir string) (string, int, error) {

View file

@ -414,6 +414,8 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u
} }
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
start := time.Now()
entropy := rand.New(rand.NewSource(time.Now().UnixNano())) entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy) uid := ulid.MustNew(ulid.Now(), entropy)
@ -440,7 +442,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p
return ulid.ULID{}, nil return ulid.ULID{}, nil
} }
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) level.Info(c.logger).Log(
"msg", "write block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"duration", time.Since(start),
)
return uid, nil return uid, nil
} }

View file

@ -1604,9 +1604,9 @@ func TestCorrectNumTombstones(t *testing.T) {
// TestBlockRanges checks the following use cases: // TestBlockRanges checks the following use cases:
// - No samples can be added with timestamps lower than the last block maxt. // - No samples can be added with timestamps lower than the last block maxt.
// - The compactor doesn't create overlaping blocks // - The compactor doesn't create overlapping blocks
// even when the last blocks is not within the default boundaries. // even when the last blocks is not within the default boundaries.
// - Lower bondary is based on the smallest sample in the head and // - Lower boundary is based on the smallest sample in the head and
// upper boundary is rounded to the configured block range. // upper boundary is rounded to the configured block range.
// //
// This ensures that a snapshot that includes the head and creates a block with a custom time range // This ensures that a snapshot that includes the head and creates a block with a custom time range