vendor: update prometheus/tsdb to single head mode

This commit is contained in:
Fabian Reinartz 2017-09-07 14:14:33 +02:00
parent 87918f3097
commit a007eb2e1e
25 changed files with 2562 additions and 1478 deletions

21
vendor/github.com/go-kit/kit/log/term/LICENSE generated vendored Normal file
View file

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Simon Eskildsen
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

144
vendor/github.com/go-kit/kit/log/term/colorlogger.go generated vendored Normal file
View file

@ -0,0 +1,144 @@
package term
import (
"bytes"
"fmt"
"io"
"sync"
"github.com/go-kit/kit/log"
)
// Color represents an ANSI color. The zero value is Default.
type Color uint8
// ANSI colors.
const (
Default = Color(iota)
Black
DarkRed
DarkGreen
Brown
DarkBlue
DarkMagenta
DarkCyan
Gray
DarkGray
Red
Green
Yellow
Blue
Magenta
Cyan
White
numColors
)
// For more on ANSI escape codes see
// https://en.wikipedia.org/wiki/ANSI_escape_code. See in particular
// https://en.wikipedia.org/wiki/ANSI_escape_code#Colors.
var (
resetColorBytes = []byte("\x1b[39;49;22m")
fgColorBytes [][]byte
bgColorBytes [][]byte
)
func init() {
// Default
fgColorBytes = append(fgColorBytes, []byte("\x1b[39m"))
bgColorBytes = append(bgColorBytes, []byte("\x1b[49m"))
// dark colors
for color := Black; color < DarkGray; color++ {
fgColorBytes = append(fgColorBytes, []byte(fmt.Sprintf("\x1b[%dm", 30+color-Black)))
bgColorBytes = append(bgColorBytes, []byte(fmt.Sprintf("\x1b[%dm", 40+color-Black)))
}
// bright colors
for color := DarkGray; color < numColors; color++ {
fgColorBytes = append(fgColorBytes, []byte(fmt.Sprintf("\x1b[%d;1m", 30+color-DarkGray)))
bgColorBytes = append(bgColorBytes, []byte(fmt.Sprintf("\x1b[%d;1m", 40+color-DarkGray)))
}
}
// FgBgColor represents a foreground and background color.
type FgBgColor struct {
Fg, Bg Color
}
func (c FgBgColor) isZero() bool {
return c.Fg == Default && c.Bg == Default
}
// NewColorLogger returns a Logger which writes colored logs to w. ANSI color
// codes for the colors returned by color are added to the formatted output
// from the Logger returned by newLogger and the combined result written to w.
func NewColorLogger(w io.Writer, newLogger func(io.Writer) log.Logger, color func(keyvals ...interface{}) FgBgColor) log.Logger {
if color == nil {
panic("color func nil")
}
return &colorLogger{
w: w,
newLogger: newLogger,
color: color,
bufPool: sync.Pool{New: func() interface{} { return &loggerBuf{} }},
noColorLogger: newLogger(w),
}
}
type colorLogger struct {
w io.Writer
newLogger func(io.Writer) log.Logger
color func(keyvals ...interface{}) FgBgColor
bufPool sync.Pool
noColorLogger log.Logger
}
func (l *colorLogger) Log(keyvals ...interface{}) error {
color := l.color(keyvals...)
if color.isZero() {
return l.noColorLogger.Log(keyvals...)
}
lb := l.getLoggerBuf()
defer l.putLoggerBuf(lb)
if color.Fg != Default {
lb.buf.Write(fgColorBytes[color.Fg])
}
if color.Bg != Default {
lb.buf.Write(bgColorBytes[color.Bg])
}
err := lb.logger.Log(keyvals...)
if err != nil {
return err
}
if color.Fg != Default || color.Bg != Default {
lb.buf.Write(resetColorBytes)
}
_, err = io.Copy(l.w, lb.buf)
return err
}
type loggerBuf struct {
buf *bytes.Buffer
logger log.Logger
}
func (l *colorLogger) getLoggerBuf() *loggerBuf {
lb := l.bufPool.Get().(*loggerBuf)
if lb.buf == nil {
lb.buf = &bytes.Buffer{}
lb.logger = l.newLogger(lb.buf)
} else {
lb.buf.Reset()
}
return lb
}
func (l *colorLogger) putLoggerBuf(cb *loggerBuf) {
l.bufPool.Put(cb)
}

View file

@ -0,0 +1,12 @@
// +build !windows
package term
import "io"
// NewColorWriter returns an io.Writer that writes to w and provides cross
// platform support for ANSI color codes. If w is not a terminal it is
// returned unmodified.
func NewColorWriter(w io.Writer) io.Writer {
return w
}

View file

@ -0,0 +1,190 @@
// The code in this file is adapted from github.com/mattn/go-colorable.
// +build windows
package term
import (
"bytes"
"fmt"
"io"
"strconv"
"strings"
"syscall"
"unsafe"
)
type colorWriter struct {
out io.Writer
handle syscall.Handle
lastbuf bytes.Buffer
oldattr word
}
// NewColorWriter returns an io.Writer that writes to w and provides cross
// platform support for ANSI color codes. If w is not a terminal it is
// returned unmodified.
func NewColorWriter(w io.Writer) io.Writer {
if !IsConsole(w) {
return w
}
var csbi consoleScreenBufferInfo
handle := syscall.Handle(w.(fder).Fd())
procGetConsoleScreenBufferInfo.Call(uintptr(handle), uintptr(unsafe.Pointer(&csbi)))
return &colorWriter{
out: w,
handle: handle,
oldattr: csbi.attributes,
}
}
func (w *colorWriter) Write(data []byte) (n int, err error) {
var csbi consoleScreenBufferInfo
procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
er := bytes.NewBuffer(data)
loop:
for {
r1, _, err := procGetConsoleScreenBufferInfo.Call(uintptr(w.handle), uintptr(unsafe.Pointer(&csbi)))
if r1 == 0 {
break loop
}
c1, _, err := er.ReadRune()
if err != nil {
break loop
}
if c1 != 0x1b {
fmt.Fprint(w.out, string(c1))
continue
}
c2, _, err := er.ReadRune()
if err != nil {
w.lastbuf.WriteRune(c1)
break loop
}
if c2 != 0x5b {
w.lastbuf.WriteRune(c1)
w.lastbuf.WriteRune(c2)
continue
}
var buf bytes.Buffer
var m rune
for {
c, _, err := er.ReadRune()
if err != nil {
w.lastbuf.WriteRune(c1)
w.lastbuf.WriteRune(c2)
w.lastbuf.Write(buf.Bytes())
break loop
}
if ('a' <= c && c <= 'z') || ('A' <= c && c <= 'Z') || c == '@' {
m = c
break
}
buf.Write([]byte(string(c)))
}
switch m {
case 'm':
attr := csbi.attributes
cs := buf.String()
if cs == "" {
procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(w.oldattr))
continue
}
token := strings.Split(cs, ";")
intensityMode := word(0)
for _, ns := range token {
if n, err = strconv.Atoi(ns); err == nil {
switch {
case n == 0:
attr = w.oldattr
case n == 1:
attr |= intensityMode
case 30 <= n && n <= 37:
attr = (attr & backgroundMask)
if (n-30)&1 != 0 {
attr |= foregroundRed
}
if (n-30)&2 != 0 {
attr |= foregroundGreen
}
if (n-30)&4 != 0 {
attr |= foregroundBlue
}
intensityMode = foregroundIntensity
case n == 39: // reset foreground color
attr &= backgroundMask
attr |= w.oldattr & foregroundMask
case 40 <= n && n <= 47:
attr = (attr & foregroundMask)
if (n-40)&1 != 0 {
attr |= backgroundRed
}
if (n-40)&2 != 0 {
attr |= backgroundGreen
}
if (n-40)&4 != 0 {
attr |= backgroundBlue
}
intensityMode = backgroundIntensity
case n == 49: // reset background color
attr &= foregroundMask
attr |= w.oldattr & backgroundMask
}
procSetConsoleTextAttribute.Call(uintptr(w.handle), uintptr(attr))
}
}
}
}
return len(data) - w.lastbuf.Len(), nil
}
var (
procGetConsoleScreenBufferInfo = kernel32.NewProc("GetConsoleScreenBufferInfo")
procSetConsoleTextAttribute = kernel32.NewProc("SetConsoleTextAttribute")
)
const (
foregroundBlue = 0x1
foregroundGreen = 0x2
foregroundRed = 0x4
foregroundIntensity = 0x8
foregroundMask = (foregroundRed | foregroundBlue | foregroundGreen | foregroundIntensity)
backgroundBlue = 0x10
backgroundGreen = 0x20
backgroundRed = 0x40
backgroundIntensity = 0x80
backgroundMask = (backgroundRed | backgroundBlue | backgroundGreen | backgroundIntensity)
)
type (
wchar uint16
short int16
dword uint32
word uint16
)
type coord struct {
x short
y short
}
type smallRect struct {
left short
top short
right short
bottom short
}
type consoleScreenBufferInfo struct {
size coord
cursorPosition coord
attributes word
window smallRect
maximumWindowSize coord
}

22
vendor/github.com/go-kit/kit/log/term/term.go generated vendored Normal file
View file

@ -0,0 +1,22 @@
// Package term provides tools for logging to a terminal.
package term
import (
"io"
"github.com/go-kit/kit/log"
)
// NewLogger returns a Logger that takes advantage of terminal features if
// possible. Log events are formatted by the Logger returned by newLogger. If
// w is a terminal each log event is colored according to the color function.
func NewLogger(w io.Writer, newLogger func(io.Writer) log.Logger, color func(keyvals ...interface{}) FgBgColor) log.Logger {
if !IsTerminal(w) {
return newLogger(w)
}
return NewColorLogger(NewColorWriter(w), newLogger, color)
}
type fder interface {
Fd() uintptr
}

View file

@ -0,0 +1,15 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build appengine
package term
import "io"
// IsTerminal always returns false on AppEngine.
func IsTerminal(w io.Writer) bool {
return false
}

View file

@ -0,0 +1,10 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package term
import "syscall"
const ioctlReadTermios = syscall.TIOCGETA

View file

@ -0,0 +1,7 @@
package term
import (
"syscall"
)
const ioctlReadTermios = syscall.TIOCGETA

View file

@ -0,0 +1,12 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !appengine
package term
import "syscall"
const ioctlReadTermios = syscall.TCGETS

View file

@ -0,0 +1,25 @@
// Based on ssh/terminal:
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build linux,!appengine darwin freebsd openbsd
package term
import (
"io"
"syscall"
"unsafe"
)
// IsTerminal returns true if w writes to a terminal.
func IsTerminal(w io.Writer) bool {
fw, ok := w.(fder)
if !ok {
return false
}
var termios syscall.Termios
_, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fw.Fd(), ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0)
return err == 0
}

View file

@ -0,0 +1,5 @@
package term
import "syscall"
const ioctlReadTermios = syscall.TIOCGETA

View file

@ -0,0 +1,102 @@
// Based on ssh/terminal:
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build windows
package term
import (
"encoding/binary"
"io"
"regexp"
"syscall"
"unsafe"
)
var kernel32 = syscall.NewLazyDLL("kernel32.dll")
var (
procGetFileInformationByHandleEx = kernel32.NewProc("GetFileInformationByHandleEx")
msysPipeNameRegex = regexp.MustCompile(`\\(cygwin|msys)-\w+-pty\d?-(to|from)-master`)
)
const (
fileNameInfo = 0x02
)
// IsTerminal returns true if w writes to a terminal.
func IsTerminal(w io.Writer) bool {
return IsConsole(w) || IsMSYSTerminal(w)
}
// IsConsole returns true if w writes to a Windows console.
func IsConsole(w io.Writer) bool {
var handle syscall.Handle
if fw, ok := w.(fder); ok {
handle = syscall.Handle(fw.Fd())
} else {
// The writer has no file-descriptor and so can't be a terminal.
return false
}
var st uint32
err := syscall.GetConsoleMode(handle, &st)
// If the handle is attached to a terminal, GetConsoleMode returns a
// non-zero value containing the console mode flags. We don't care about
// the specifics of flags, just that it is not zero.
return (err == nil && st != 0)
}
// IsMSYSTerminal returns true if w writes to a MSYS/MSYS2 terminal.
func IsMSYSTerminal(w io.Writer) bool {
var handle syscall.Handle
if fw, ok := w.(fder); ok {
handle = syscall.Handle(fw.Fd())
} else {
// The writer has no file-descriptor and so can't be a terminal.
return false
}
// MSYS(2) terminal reports as a pipe for STDIN/STDOUT/STDERR. If it isn't
// a pipe, it can't be a MSYS(2) terminal.
filetype, err := syscall.GetFileType(handle)
if filetype != syscall.FILE_TYPE_PIPE || err != nil {
return false
}
// MSYS2/Cygwin terminal's name looks like: \msys-dd50a72ab4668b33-pty2-to-master
data := make([]byte, 256, 256)
r, _, e := syscall.Syscall6(
procGetFileInformationByHandleEx.Addr(),
4,
uintptr(handle),
uintptr(fileNameInfo),
uintptr(unsafe.Pointer(&data[0])),
uintptr(len(data)),
0,
0,
)
if r != 0 && e == 0 {
// The first 4 bytes of the buffer are the size of the UTF16 name, in bytes.
unameLen := binary.LittleEndian.Uint32(data[:4]) / 2
uname := make([]uint16, unameLen, unameLen)
for i := uint32(0); i < unameLen; i++ {
uname[i] = binary.LittleEndian.Uint16(data[i*2+4 : i*2+2+4])
}
name := syscall.UTF16ToString(uname)
return msysPipeNameRegex.MatchString(name)
}
return false
}

View file

@ -26,14 +26,23 @@ import (
"github.com/prometheus/tsdb/labels"
)
// DiskBlock handles reads against a Block of time series data.
type DiskBlock interface {
BlockReader
// Directory where block data is stored.
Dir() string
// Stats returns statistics about the block.
Meta() BlockMeta
Delete(mint, maxt int64, m ...labels.Matcher) error
Snapshot(dir string) error
Close() error
}
type BlockReader interface {
// Index returns an IndexReader over the block's data.
Index() IndexReader
@ -42,30 +51,6 @@ type DiskBlock interface {
// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() TombstoneReader
// Delete deletes data from the block.
Delete(mint, maxt int64, ms ...labels.Matcher) error
// Close releases all underlying resources of the block.
Close() error
}
// Block is an interface to a DiskBlock that can also be queried.
type Block interface {
DiskBlock
Queryable
Snapshottable
}
// headBlock is a regular block that can still be appended to.
type headBlock interface {
Block
Appendable
// ActiveWriters returns the number of currently active appenders.
ActiveWriters() int
// HighTimestamp returns the highest currently inserted timestamp.
HighTimestamp() int64
}
// Snapshottable defines an entity that can be backedup online.
@ -225,16 +210,6 @@ func (pb *persistedBlock) String() string {
return pb.meta.ULID.String()
}
func (pb *persistedBlock) Querier(mint, maxt int64) Querier {
return &blockQuerier{
mint: mint,
maxt: maxt,
index: pb.Index(),
chunks: pb.Chunks(),
tombstones: pb.Tombstones(),
}
}
func (pb *persistedBlock) Dir() string { return pb.dir }
func (pb *persistedBlock) Index() IndexReader { return pb.indexr }
func (pb *persistedBlock) Chunks() ChunkReader { return pb.chunkr }
@ -250,7 +225,7 @@ func (pb *persistedBlock) Delete(mint, maxt int64, ms ...labels.Matcher) error {
ir := pb.indexr
// Choose only valid postings which have chunks in the time-range.
stones := map[uint32]intervals{}
stones := map[uint64]Intervals{}
var lset labels.Labels
var chks []ChunkMeta
@ -272,7 +247,7 @@ Outer:
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
// Delete only until the current vlaues and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = intervals{{tmin, tmax}}
stones[p.At()] = Intervals{{tmin, tmax}}
continue Outer
}
}

View file

@ -18,7 +18,6 @@ import (
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"os"
@ -59,7 +58,7 @@ func (cm *ChunkMeta) writeHash(h hash.Hash) error {
type deletedIterator struct {
it chunks.Iterator
intervals intervals
intervals Intervals
}
func (it *deletedIterator) At() (int64, float64) {
@ -76,7 +75,7 @@ Outer:
continue Outer
}
if ts > tr.maxt {
if ts > tr.Maxt {
it.intervals = it.intervals[1:]
continue
}
@ -136,7 +135,7 @@ func newChunkWriter(dir string) (*chunkWriter, error) {
cw := &chunkWriter{
dirFile: dirFile,
n: 0,
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
crc32: newCRC32(),
segmentSize: defaultChunkSegmentSize,
}
return cw, nil
@ -180,7 +179,7 @@ func (w *chunkWriter) cut() error {
return err
}
p, _, err := nextSequenceFile(w.dirFile.Name(), "")
p, _, err := nextSequenceFile(w.dirFile.Name())
if err != nil {
return err
}
@ -303,7 +302,7 @@ type chunkReader struct {
// newChunkReader returns a new chunkReader based on mmaped files found in dir.
func newChunkReader(dir string, pool chunks.Pool) (*chunkReader, error) {
files, err := sequenceFiles(dir, "")
files, err := sequenceFiles(dir)
if err != nil {
return nil, err
}

View file

@ -14,10 +14,10 @@
package tsdb
import (
"fmt"
"math/rand"
"os"
"path/filepath"
"runtime"
"sort"
"time"
@ -51,7 +51,7 @@ type Compactor interface {
Plan(dir string) ([]string, error)
// Write persists a Block into a directory.
Write(dest string, b Block) error
Write(dest string, b BlockReader, mint, maxt int64) error
// Compact runs compaction against the provided directories. Must
// only be called concurrently with results of Plan().
@ -60,16 +60,20 @@ type Compactor interface {
// LeveledCompactor implements the Compactor interface.
type LeveledCompactor struct {
dir string
metrics *compactorMetrics
logger log.Logger
opts *LeveledCompactorOptions
dir string
metrics *compactorMetrics
logger log.Logger
ranges []int64
chunkPool chunks.Pool
}
type compactorMetrics struct {
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
}
func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
@ -83,9 +87,25 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.duration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_compaction_duration",
Help: "Duration of compaction runs.",
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_duration",
Help: "Duration of compaction runs.",
Buckets: prometheus.ExponentialBuckets(1, 2, 10),
})
m.chunkSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_size",
Help: "Final size of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(32, 1.5, 12),
})
m.chunkSamples = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_samples",
Help: "Final number of samples on their first compaction",
Buckets: prometheus.ExponentialBuckets(4, 1.5, 12),
})
m.chunkRange = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "tsdb_compaction_chunk_range",
Help: "Final time range of chunks on their first compaction",
Buckets: prometheus.ExponentialBuckets(100, 4, 10),
})
if r != nil {
@ -93,39 +113,30 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
m.ran,
m.failed,
m.duration,
m.chunkRange,
m.chunkSamples,
m.chunkSize,
)
}
return m
}
// LeveledCompactorOptions are the options for a LeveledCompactor.
type LeveledCompactorOptions struct {
blockRanges []int64
chunkPool chunks.Pool
}
// NewLeveledCompactor returns a LeveledCompactor.
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, opts *LeveledCompactorOptions) *LeveledCompactor {
if opts == nil {
opts = &LeveledCompactorOptions{
chunkPool: chunks.NewPool(),
}
func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunks.Pool) (*LeveledCompactor, error) {
if len(ranges) == 0 {
return nil, errors.Errorf("at least one range must be provided")
}
if pool == nil {
pool = chunks.NewPool()
}
return &LeveledCompactor{
opts: opts,
logger: l,
metrics: newCompactorMetrics(r),
}
ranges: ranges,
chunkPool: pool,
logger: l,
metrics: newCompactorMetrics(r),
}, nil
}
type compactionInfo struct {
seq int
generation int
mint, maxt int64
}
const compactionBlocksLen = 3
type dirMeta struct {
dir string
meta *BlockMeta
@ -145,21 +156,15 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
if err != nil {
return nil, err
}
if meta.Compaction.Level > 0 {
dms = append(dms, dirMeta{dir, meta})
}
dms = append(dms, dirMeta{dir, meta})
}
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
return c.plan(dms)
}
func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
if len(dms) <= 1 {
return nil, nil
}
sort.Slice(dms, func(i, j int) bool {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
var res []string
for _, dm := range c.selectDirs(dms) {
@ -172,11 +177,11 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
// Compact any blocks that have >5% tombstones.
for i := len(dms) - 1; i >= 0; i-- {
meta := dms[i].meta
if meta.MaxTime-meta.MinTime < c.opts.blockRanges[len(c.opts.blockRanges)/2] {
if meta.MaxTime-meta.MinTime < c.ranges[len(c.ranges)/2] {
break
}
if meta.Stats.NumSeries/(meta.Stats.NumTombstones+1) <= 20 { // 5%
if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 {
return []string{dms[i].dir}, nil
}
}
@ -187,13 +192,13 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
// selectDirs returns the dir metas that should be compacted into a single new block.
// If only a single block range is configured, the result is always nil.
func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
if len(c.ranges) < 2 || len(ds) < 1 {
return nil
}
highTime := ds[len(ds)-1].meta.MinTime
for _, iv := range c.opts.blockRanges[1:] {
for _, iv := range c.ranges[1:] {
parts := splitByRange(ds, iv)
if len(parts) == 0 {
continue
@ -258,9 +263,12 @@ func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
return splitDirs
}
func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
res.MinTime = blocks[0].MinTime
res.MaxTime = blocks[len(blocks)-1].MaxTime
func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
res := &BlockMeta{
ULID: uid,
MinTime: blocks[0].MinTime,
MaxTime: blocks[len(blocks)-1].MaxTime,
}
sources := map[ulid.ULID]struct{}{}
@ -271,10 +279,6 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
for _, s := range b.Compaction.Sources {
sources[s] = struct{}{}
}
// If it's an in memory block, its ULID goes into the sources.
if b.Compaction.Level == 0 {
sources[b.ULID] = struct{}{}
}
}
res.Compaction.Level++
@ -291,40 +295,69 @@ func compactBlockMetas(blocks ...BlockMeta) (res BlockMeta) {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (err error) {
var blocks []Block
var blocks []BlockReader
var metas []*BlockMeta
for _, d := range dirs {
b, err := newPersistedBlock(d, c.opts.chunkPool)
b, err := newPersistedBlock(d, c.chunkPool)
if err != nil {
return err
}
defer b.Close()
meta, err := readMetaFile(d)
if err != nil {
return err
}
metas = append(metas, meta)
blocks = append(blocks, b)
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(dest, uid, blocks...)
return c.write(dest, compactBlockMetas(uid, metas...), blocks...)
}
func (c *LeveledCompactor) Write(dest string, b Block) error {
// Buffering blocks might have been created that often have no data.
if b.Meta().Stats.NumSeries == 0 {
return nil
}
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) error {
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid := ulid.MustNew(ulid.Now(), entropy)
return c.write(dest, uid, b)
meta := &BlockMeta{
ULID: uid,
MinTime: mint,
MaxTime: maxt,
}
meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid}
return c.write(dest, meta, b)
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
// about compacted chunks.
type instrumentedChunkWriter struct {
ChunkWriter
size prometheus.Histogram
samples prometheus.Histogram
trange prometheus.Histogram
}
func (w *instrumentedChunkWriter) WriteChunks(chunks ...ChunkMeta) error {
for _, c := range chunks {
w.size.Observe(float64(len(c.Chunk.Bytes())))
w.samples.Observe(float64(c.Chunk.NumSamples()))
w.trange.Observe(float64(c.MaxTime - c.MinTime))
}
return w.ChunkWriter.WriteChunks(chunks...)
}
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (err error) {
c.logger.Log("msg", "compact blocks", "blocks", fmt.Sprintf("%v", blocks))
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
c.logger.Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
defer func(t time.Time) {
if err != nil {
@ -332,9 +365,13 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e
}
c.metrics.ran.Inc()
c.metrics.duration.Observe(time.Since(t).Seconds())
// We might have done quite a few allocs. Enforce a GC so they do not accumulate
// with subsequent compactions or head GCs.
runtime.GC()
}(time.Now())
dir := filepath.Join(dest, uid.String())
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
if err = os.RemoveAll(tmp); err != nil {
@ -347,20 +384,30 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e
// Populate chunk and index files into temporary directory with
// data of all blocks.
chunkw, err := newChunkWriter(chunkDir(tmp))
var chunkw ChunkWriter
chunkw, err = newChunkWriter(chunkDir(tmp))
if err != nil {
return errors.Wrap(err, "open chunk writer")
}
// Record written chunk sizes on level 1 compactions.
if meta.Compaction.Level == 1 {
chunkw = &instrumentedChunkWriter{
ChunkWriter: chunkw,
size: c.metrics.chunkSize,
samples: c.metrics.chunkSamples,
trange: c.metrics.chunkRange,
}
}
indexw, err := newIndexWriter(tmp)
if err != nil {
return errors.Wrap(err, "open index writer")
}
meta, err := c.populateBlock(blocks, indexw, chunkw)
if err != nil {
if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil {
return errors.Wrap(err, "write compaction")
}
meta.ULID = uid
if err = writeMetaFile(tmp, meta); err != nil {
return errors.Wrap(err, "write merged meta")
@ -398,18 +445,16 @@ func (c *LeveledCompactor) write(dest string, uid ulid.ULID, blocks ...Block) (e
// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chunkw ChunkWriter) (*BlockMeta, error) {
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
var (
set compactionSet
metas []BlockMeta
allSymbols = make(map[string]struct{}, 1<<16)
)
for i, b := range blocks {
metas = append(metas, b.Meta())
symbols, err := b.Index().Symbols()
if err != nil {
return nil, errors.Wrap(err, "read symbols")
return errors.Wrap(err, "read symbols")
}
for s := range symbols {
allSymbols[s] = struct{}{}
@ -419,7 +464,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
all, err := indexr.Postings("", "")
if err != nil {
return nil, err
return err
}
all = indexr.SortedPostings(all)
@ -431,20 +476,19 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
}
set, err = newCompactionMerger(set, s)
if err != nil {
return nil, err
return err
}
}
// We fully rebuild the postings list index from merged series.
var (
postings = &memPostings{m: make(map[term][]uint32, 512)}
postings = newMemPostings()
values = map[string]stringset{}
i = uint32(0)
meta = compactBlockMetas(metas...)
i = uint64(0)
)
if err := indexw.AddSymbols(allSymbols); err != nil {
return nil, errors.Wrap(err, "add symbols")
return errors.Wrap(err, "add symbols")
}
for set.Next() {
@ -458,11 +502,11 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
if len(dranges) > 0 {
// Re-encode the chunk to not have deleted values.
for _, chk := range chks {
if intervalOverlap(dranges[0].mint, dranges[len(dranges)-1].maxt, chk.MinTime, chk.MaxTime) {
if intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) {
newChunk := chunks.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
return err
}
it := &deletedIterator{it: chk.Chunk.Iterator(), intervals: dranges}
@ -476,11 +520,11 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
}
}
if err := chunkw.WriteChunks(chks...); err != nil {
return nil, err
return errors.Wrap(err, "write chunks")
}
if err := indexw.AddSeries(i, lset, chks...); err != nil {
return nil, errors.Wrapf(err, "add series")
return errors.Wrap(err, "add series")
}
meta.Stats.NumChunks += uint64(len(chks))
@ -490,7 +534,7 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
}
for _, chk := range chks {
c.opts.chunkPool.Put(chk.Chunk)
c.chunkPool.Put(chk.Chunk)
}
for _, l := range lset {
@ -500,15 +544,13 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
values[l.Name] = valset
}
valset.set(l.Value)
t := term{name: l.Name, value: l.Value}
postings.add(i, t)
}
postings.add(i, lset)
i++
}
if set.Err() != nil {
return nil, set.Err()
return errors.Wrap(set.Err(), "iterate compaction set")
}
s := make([]string, 0, 256)
@ -519,30 +561,30 @@ func (c *LeveledCompactor) populateBlock(blocks []Block, indexw IndexWriter, chu
s = append(s, x)
}
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
return nil, err
return errors.Wrap(err, "write label index")
}
}
for t := range postings.m {
if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil {
return nil, err
for l := range postings.m {
if err := indexw.WritePostings(l.Name, l.Value, postings.get(l.Name, l.Value)); err != nil {
return errors.Wrap(err, "write postings")
}
}
// Write a postings list containing all series.
all := make([]uint32, i)
all := make([]uint64, i)
for i := range all {
all[i] = uint32(i)
all[i] = uint64(i)
}
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
return nil, err
return errors.Wrap(err, "write 'all' postings")
}
return &meta, nil
return nil
}
type compactionSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, intervals)
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
@ -555,7 +597,7 @@ type compactionSeriesSet struct {
l labels.Labels
c []ChunkMeta
intervals intervals
intervals Intervals
err error
}
@ -572,9 +614,12 @@ func (c *compactionSeriesSet) Next() bool {
if !c.p.Next() {
return false
}
var err error
c.intervals = c.tombstones.Get(c.p.At())
if c.err = c.index.Series(c.p.At(), &c.l, &c.c); c.err != nil {
if err = c.index.Series(c.p.At(), &c.l, &c.c); err != nil {
c.err = errors.Wrapf(err, "get series %d", c.p.At())
return false
}
@ -582,7 +627,7 @@ func (c *compactionSeriesSet) Next() bool {
if len(c.intervals) > 0 {
chks := make([]ChunkMeta, 0, len(c.c))
for _, chk := range c.c {
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(c.intervals)) {
chks = append(chks, chk)
}
}
@ -593,8 +638,9 @@ func (c *compactionSeriesSet) Next() bool {
for i := range c.c {
chk := &c.c[i]
chk.Chunk, c.err = c.chunks.Chunk(chk.Ref)
if c.err != nil {
chk.Chunk, err = c.chunks.Chunk(chk.Ref)
if err != nil {
c.err = errors.Wrapf(err, "chunk %d not found", chk.Ref)
return false
}
}
@ -609,7 +655,7 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, intervals) {
func (c *compactionSeriesSet) At() (labels.Labels, []ChunkMeta, Intervals) {
return c.l, c.c, c.intervals
}
@ -619,7 +665,7 @@ type compactionMerger struct {
aok, bok bool
l labels.Labels
c []ChunkMeta
intervals intervals
intervals Intervals
}
type compactionSeries struct {
@ -700,7 +746,7 @@ func (c *compactionMerger) Err() error {
return c.b.Err()
}
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, intervals) {
func (c *compactionMerger) At() (labels.Labels, []ChunkMeta, Intervals) {
return c.l, c.c, c.intervals
}

View file

@ -21,10 +21,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"unsafe"
@ -77,11 +75,11 @@ type Appender interface {
// to AddFast() at any point. Adding the sample via Add() returns a new
// reference number.
// If the reference is the empty string it must not be used for caching.
Add(l labels.Labels, t int64, v float64) (string, error)
Add(l labels.Labels, t int64, v float64) (uint64, error)
// Add adds a sample pair for the referenced series. It is generally faster
// than adding a sample by providing its full label set.
AddFast(ref string, t int64, v float64) error
AddFast(ref uint64, t int64, v float64) error
// Commit submits the collected samples and purges the batch.
Commit() error
@ -100,18 +98,14 @@ type DB struct {
metrics *dbMetrics
opts *Options
chunkPool chunks.Pool
compactor Compactor
wal WAL
// Mutex for that must be held when modifying the general block layout.
mtx sync.RWMutex
blocks []Block
blocks []DiskBlock
// Mutex that must be held when modifying just the head blocks
// or the general layout.
// mtx must be held before acquiring.
headmtx sync.RWMutex
heads []headBlock
compactor Compactor
head *Head
compactc chan struct{}
donec chan struct{}
@ -123,22 +117,15 @@ type DB struct {
}
type dbMetrics struct {
activeAppenders prometheus.Gauge
loadedBlocks prometheus.GaugeFunc
reloads prometheus.Counter
reloadsFailed prometheus.Counter
reloadDuration prometheus.Summary
samplesAppended prometheus.Counter
compactionsTriggered prometheus.Counter
}
func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
m := &dbMetrics{}
m.activeAppenders = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "tsdb_active_appenders",
Help: "Number of currently active appender transactions",
})
m.loadedBlocks = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "tsdb_blocks_loaded",
Help: "Number of currently loaded data blocks",
@ -155,14 +142,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
Name: "tsdb_reloads_failures_total",
Help: "Number of times the database failed to reload black data from disk.",
})
m.reloadDuration = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "tsdb_reload_duration_seconds",
Help: "Duration of block reloads.",
})
m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_samples_appended_total",
Help: "Total number of appended sampledb.",
})
m.compactionsTriggered = prometheus.NewCounter(prometheus.CounterOpts{
Name: "tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions for the partition.",
@ -170,12 +149,9 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics {
if r != nil {
r.MustRegister(
m.activeAppenders,
m.loadedBlocks,
m.reloads,
m.reloadsFailed,
m.reloadDuration,
m.samplesAppended,
m.compactionsTriggered,
)
}
@ -187,12 +163,10 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err := os.MkdirAll(dir, 0777); err != nil {
return nil, err
}
if l == nil {
l = log.NewLogfmtLogger(os.Stdout)
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
}
if opts == nil {
opts = DefaultOptions
}
@ -224,29 +198,26 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db.lockf = &lockf
}
copts := &LeveledCompactorOptions{
blockRanges: opts.BlockRanges,
chunkPool: db.chunkPool,
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool)
if err != nil {
return nil, errors.Wrap(err, "create leveled compactor")
}
if len(copts.blockRanges) == 0 {
return nil, errors.New("at least one block-range must exist")
}
for float64(copts.blockRanges[len(copts.blockRanges)-1])/float64(opts.RetentionDuration) > 0.2 {
if len(copts.blockRanges) == 1 {
break
}
// Max overflow is restricted to 20%.
copts.blockRanges = copts.blockRanges[:len(copts.blockRanges)-1]
}
db.compactor = NewLeveledCompactor(r, l, copts)
if err := db.reloadBlocks(); err != nil {
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, 10*time.Second)
if err != nil {
return nil, err
}
db.head, err = NewHead(r, l, wal, opts.BlockRanges[0])
if err != nil {
return nil, err
}
if err := db.reload(); err != nil {
return nil, err
}
if err := db.head.ReadWAL(); err != nil {
return nil, errors.Wrap(err, "read WAL")
}
go db.run()
return db, nil
@ -260,12 +231,17 @@ func (db *DB) Dir() string {
func (db *DB) run() {
defer close(db.donec)
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
backoff := time.Duration(0)
for {
select {
case <-tick.C:
case <-db.stopc:
return
case <-time.After(backoff):
}
select {
case <-time.After(1 * time.Minute):
select {
case db.compactc <- struct{}{}:
default:
@ -273,20 +249,20 @@ func (db *DB) run() {
case <-db.compactc:
db.metrics.compactionsTriggered.Inc()
changes1, err := db.retentionCutoff()
if err != nil {
db.logger.Log("msg", "retention cutoff failed", "err", err)
_, err1 := db.retentionCutoff()
if err1 != nil {
db.logger.Log("msg", "retention cutoff failed", "err", err1)
}
changes2, err := db.compact()
if err != nil {
db.logger.Log("msg", "compaction failed", "err", err)
_, err2 := db.compact()
if err2 != nil {
db.logger.Log("msg", "compaction failed", "err", err2)
}
if changes1 || changes2 {
if err := db.reloadBlocks(); err != nil {
db.logger.Log("msg", "reloading blocks failed", "err", err)
}
if err1 != nil || err2 != nil {
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
} else {
backoff = 0
}
case <-db.stopc:
@ -303,74 +279,40 @@ func (db *DB) retentionCutoff() (bool, error) {
db.mtx.RLock()
defer db.mtx.RUnlock()
// We only consider the already persisted blocks. Head blocks generally
// only account for a fraction of the total data.
db.headmtx.RLock()
lenp := len(db.blocks) - len(db.heads)
db.headmtx.RUnlock()
if lenp == 0 {
if len(db.blocks) == 0 {
return false, nil
}
last := db.blocks[lenp-1]
last := db.blocks[len(db.blocks)-1]
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
return retentionCutoff(db.dir, mint)
}
// headFullness returns up to which fraction of a blocks time range samples
// were already inserted.
func headFullness(h headBlock) float64 {
m := h.Meta()
a := float64(h.HighTimestamp() - m.MinTime)
b := float64(m.MaxTime - m.MinTime)
return a / b
// Appender opens a new appender against the database.
func (db *DB) Appender() Appender {
return dbAppender{db: db, Appender: db.head.Appender()}
}
// appendableHeads returns a copy of a slice of HeadBlocks that can still be appended to.
func (db *DB) appendableHeads() (r []headBlock) {
switch l := len(db.heads); l {
case 0:
case 1:
r = append(r, db.heads[0])
default:
if headFullness(db.heads[l-1]) < 0.5 {
r = append(r, db.heads[l-2])
}
r = append(r, db.heads[l-1])
}
return r
// dbAppender wraps the DB's head appender and triggers compactions on commit
// if necessary.
type dbAppender struct {
Appender
db *DB
}
func (db *DB) completedHeads() (r []headBlock) {
db.mtx.RLock()
defer db.mtx.RUnlock()
func (a dbAppender) Commit() error {
err := a.Appender.Commit()
db.headmtx.RLock()
defer db.headmtx.RUnlock()
if len(db.heads) < 2 {
return nil
}
// Select all old heads unless they still have pending appenders.
for _, h := range db.heads[:len(db.heads)-2] {
if h.ActiveWriters() > 0 {
return r
// We could just run this check every few minutes practically. But for benchmarks
// and high frequency use cases this is the safer way.
if a.db.head.MaxTime()-a.db.head.MinTime() > a.db.head.chunkRange/2*3 {
select {
case a.db.compactc <- struct{}{}:
default:
}
r = append(r, h)
}
// Add the 2nd last head if the last head is more than 50% filled.
// Compacting it early allows us to free its memory before allocating
// more for the next block and thus reduces spikes.
h0 := db.heads[len(db.heads)-1]
h1 := db.heads[len(db.heads)-2]
if headFullness(h0) >= 0.5 && h1.ActiveWriters() == 0 {
r = append(r, h1)
}
return r
return err
}
func (db *DB) compact() (changes bool, err error) {
@ -383,22 +325,33 @@ func (db *DB) compact() (changes bool, err error) {
// Check whether we have pending head blocks that are ready to be persisted.
// They have the highest priority.
for _, h := range db.completedHeads() {
for {
select {
case <-db.stopc:
return changes, nil
default:
}
// The head has a compactable range if 1.5 level 0 ranges are between the oldest
// and newest timestamp. The 0.5 acts as a buffer of the appendable window.
if db.head.MaxTime()-db.head.MinTime() <= db.opts.BlockRanges[0]/2*3 {
break
}
mint, maxt := rangeForTimestamp(db.head.MinTime(), db.opts.BlockRanges[0])
if err = db.compactor.Write(db.dir, h); err != nil {
// Wrap head into a range that bounds all reads to it.
head := &rangeHead{
head: db.head,
mint: mint,
maxt: maxt,
}
if err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
return changes, errors.Wrap(err, "persist head block")
}
changes = true
if err := os.RemoveAll(h.Dir()); err != nil {
return changes, errors.Wrap(err, "delete compacted head block")
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
runtime.GC()
}
// Check for compactions of multiple blocks.
@ -427,7 +380,10 @@ func (db *DB) compact() (changes bool, err error) {
return changes, errors.Wrap(err, "delete compacted block")
}
}
runtime.GC()
if err := db.reload(); err != nil {
return changes, errors.Wrap(err, "reload blocks")
}
}
return changes, nil
@ -469,7 +425,7 @@ func retentionCutoff(dir string, mint int64) (bool, error) {
return changes, fileutil.Fsync(df)
}
func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
func (db *DB) getBlock(id ulid.ULID) (DiskBlock, bool) {
for _, b := range db.blocks {
if b.Meta().ULID == id {
return b, true
@ -478,30 +434,23 @@ func (db *DB) getBlock(id ulid.ULID) (Block, bool) {
return nil, false
}
func (db *DB) reloadBlocks() (err error) {
defer func(t time.Time) {
func (db *DB) reload() (err error) {
defer func() {
if err != nil {
db.metrics.reloadsFailed.Inc()
}
db.metrics.reloads.Inc()
db.metrics.reloadDuration.Observe(time.Since(t).Seconds())
}(time.Now())
}()
var cs []io.Closer
defer func() { closeAll(cs...) }()
db.mtx.Lock()
defer db.mtx.Unlock()
db.headmtx.Lock()
defer db.headmtx.Unlock()
dirs, err := blockDirs(db.dir)
if err != nil {
return errors.Wrap(err, "find blocks")
}
var (
blocks []Block
blocks []DiskBlock
exist = map[ulid.ULID]struct{}{}
)
@ -513,11 +462,7 @@ func (db *DB) reloadBlocks() (err error) {
b, ok := db.getBlock(meta.ULID)
if !ok {
if meta.Compaction.Level == 0 {
b, err = db.openHeadBlock(dir)
} else {
b, err = newPersistedBlock(dir, db.chunkPool)
}
b, err = newPersistedBlock(dir, db.chunkPool)
if err != nil {
return errors.Wrapf(err, "open block %s", dir)
}
@ -532,25 +477,29 @@ func (db *DB) reloadBlocks() (err error) {
}
// Close all opened blocks that no longer exist after we returned all locks.
// TODO(fabxc: probably races with querier still reading from them. Can
// we just abandon them and have the open FDs be GC'd automatically eventually?
for _, b := range db.blocks {
if _, ok := exist[b.Meta().ULID]; !ok {
cs = append(cs, b)
}
}
db.mtx.Lock()
db.blocks = blocks
db.heads = nil
db.mtx.Unlock()
for _, b := range blocks {
if b.Meta().Compaction.Level == 0 {
db.heads = append(db.heads, b.(*HeadBlock))
}
// Garbage collect data in the head if the most recent persisted block
// covers data of its current time range.
if len(blocks) == 0 {
return nil
}
maxt := blocks[len(db.blocks)-1].Meta().MaxTime
return nil
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
func validateBlockSequence(bs []Block) error {
func validateBlockSequence(bs []DiskBlock) error {
if len(bs) == 0 {
return nil
}
@ -584,10 +533,10 @@ func (db *DB) Close() error {
var merr MultiError
merr.Add(g.Wait())
if db.lockf != nil {
merr.Add(db.lockf.Unlock())
}
return merr.Err()
}
@ -614,125 +563,48 @@ func (db *DB) Snapshot(dir string) error {
if dir == db.dir {
return errors.Errorf("cannot snapshot into base directory")
}
if _, err := ulid.Parse(dir); err == nil {
return errors.Errorf("dir must not be a valid ULID")
}
db.cmtx.Lock()
defer db.cmtx.Unlock()
db.mtx.Lock() // To block any appenders.
defer db.mtx.Unlock()
db.mtx.RLock()
defer db.mtx.RUnlock()
blocks := db.blocks[:]
for _, b := range blocks {
for _, b := range db.blocks {
db.logger.Log("msg", "snapshotting block", "block", b)
if err := b.Snapshot(dir); err != nil {
return errors.Wrap(err, "error snapshotting headblock")
}
}
return nil
return db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
}
// Appender returns a new Appender on the database.
func (db *DB) Appender() Appender {
db.metrics.activeAppenders.Inc()
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (db *DB) Querier(mint, maxt int64) Querier {
db.mtx.RLock()
return &dbAppender{db: db}
}
type dbAppender struct {
db *DB
heads []*metaAppender
blocks := db.blocksForInterval(mint, maxt)
samples int
}
type metaAppender struct {
meta BlockMeta
app Appender
}
func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (string, error) {
h, err := a.appenderAt(t)
if err != nil {
return "", err
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
db: db,
}
ref, err := h.app.Add(lset, t, v)
if err != nil {
return "", err
for _, b := range blocks {
sq.blocks = append(sq.blocks, &blockQuerier{
mint: mint,
maxt: maxt,
index: b.Index(),
chunks: b.Chunks(),
tombstones: b.Tombstones(),
})
}
a.samples++
if ref == "" {
return "", nil
}
return string(append(h.meta.ULID[:], ref...)), nil
}
func (a *dbAppender) AddFast(ref string, t int64, v float64) error {
if len(ref) < 16 {
return errors.Wrap(ErrNotFound, "invalid ref length")
}
// The first 16 bytes a ref hold the ULID of the head block.
h, err := a.appenderAt(t)
if err != nil {
return err
}
// Validate the ref points to the same block we got for t.
if string(h.meta.ULID[:]) != ref[:16] {
return ErrNotFound
}
if err := h.app.AddFast(ref[16:], t, v); err != nil {
// The block the ref points to might fit the given timestamp.
// We mask the error to stick with our contract.
if errors.Cause(err) == ErrOutOfBounds {
err = ErrNotFound
}
return err
}
a.samples++
return nil
}
// appenderFor gets the appender for the head containing timestamp t.
// If the head block doesn't exist yet, it gets created.
func (a *dbAppender) appenderAt(t int64) (*metaAppender, error) {
for _, h := range a.heads {
if intervalContains(h.meta.MinTime, h.meta.MaxTime-1, t) {
return h, nil
}
}
// Currently opened appenders do not cover t. Ensure the head block is
// created and add missing appenders.
a.db.headmtx.Lock()
if err := a.db.ensureHead(t); err != nil {
a.db.headmtx.Unlock()
return nil, err
}
var hb headBlock
for _, h := range a.db.appendableHeads() {
m := h.Meta()
if intervalContains(m.MinTime, m.MaxTime-1, t) {
hb = h
break
}
}
a.db.headmtx.Unlock()
if hb == nil {
return nil, ErrOutOfBounds
}
// Instantiate appender after returning headmtx!
app := &metaAppender{
meta: hb.Meta(),
app: hb.Appender(),
}
a.heads = append(a.heads, app)
return app, nil
return sq
}
func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
@ -740,87 +612,7 @@ func rangeForTimestamp(t int64, width int64) (mint, maxt int64) {
return mint, mint + width
}
// ensureHead makes sure that there is a head block for the timestamp t if
// it is within or after the currently appendable window.
func (db *DB) ensureHead(t int64) error {
var (
mint, maxt = rangeForTimestamp(t, int64(db.opts.BlockRanges[0]))
addBuffer = len(db.blocks) == 0
last BlockMeta
)
if !addBuffer {
last = db.blocks[len(db.blocks)-1].Meta()
addBuffer = last.MaxTime <= mint-int64(db.opts.BlockRanges[0])
}
// Create another block of buffer in front if the DB is initialized or retrieving
// new data after a long gap.
// This ensures we always have a full block width of append window.
if addBuffer {
if _, err := db.createHeadBlock(mint-int64(db.opts.BlockRanges[0]), mint); err != nil {
return err
}
// If the previous block reaches into our new window, make it smaller.
} else if mt := last.MaxTime; mt > mint {
mint = mt
}
if mint >= maxt {
return nil
}
// Error if the requested time for a head is before the appendable window.
if len(db.heads) > 0 && t < db.heads[0].Meta().MinTime {
return ErrOutOfBounds
}
_, err := db.createHeadBlock(mint, maxt)
return err
}
func (a *dbAppender) Commit() error {
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()
// Commits to partial appenders must be concurrent as concurrent appenders
// may have conflicting locks on head appenders.
// For high-throughput use cases the errgroup causes significant blocking. Typically,
// we just deal with a single appender and special case it.
var err error
switch len(a.heads) {
case 1:
err = a.heads[0].app.Commit()
default:
var g errgroup.Group
for _, h := range a.heads {
g.Go(h.app.Commit)
}
err = g.Wait()
}
if err != nil {
return err
}
// XXX(fabxc): Push the metric down into head block to account properly
// for partial appends?
a.db.metrics.samplesAppended.Add(float64(a.samples))
return nil
}
func (a *dbAppender) Rollback() error {
defer a.db.metrics.activeAppenders.Dec()
defer a.db.mtx.RUnlock()
var g errgroup.Group
for _, h := range a.heads {
g.Go(h.app.Rollback)
}
return g.Wait()
}
// Delete implements deletion of metrics.
// Delete implements deletion of metrics. It only has atomicity guarantees on a per-block basis.
func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.cmtx.Lock()
defer db.cmtx.Unlock()
@ -828,16 +620,21 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
db.mtx.Lock()
defer db.mtx.Unlock()
blocks := db.blocksForInterval(mint, maxt)
var g errgroup.Group
for _, b := range blocks {
g.Go(func(b Block) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
for _, b := range db.blocks {
m := b.Meta()
if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) {
g.Go(func(b DiskBlock) func() error {
return func() error { return b.Delete(mint, maxt, ms...) }
}(b))
}
}
g.Go(func() error {
return db.head.Delete(mint, maxt, ms...)
})
if err := g.Wait(); err != nil {
return err
}
@ -856,8 +653,8 @@ func intervalContains(min, max, t int64) bool {
// blocksForInterval returns all blocks within the partition that may contain
// data for the given time range.
func (db *DB) blocksForInterval(mint, maxt int64) []Block {
var bs []Block
func (db *DB) blocksForInterval(mint, maxt int64) []BlockReader {
var bs []BlockReader
for _, b := range db.blocks {
m := b.Meta()
@ -865,52 +662,13 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
bs = append(bs, b)
}
}
if maxt >= db.head.MinTime() {
bs = append(bs, db.head)
}
return bs
}
// openHeadBlock opens the head block at dir.
func (db *DB) openHeadBlock(dir string) (*HeadBlock, error) {
var (
wdir = walDir(dir)
l = log.With(db.logger, "wal", wdir)
)
wal, err := OpenSegmentWAL(wdir, l, 5*time.Second)
if err != nil {
return nil, errors.Wrap(err, "open WAL %s")
}
h, err := OpenHeadBlock(dir, log.With(db.logger, "block", dir), wal, db.compactor)
if err != nil {
return nil, errors.Wrapf(err, "open head block %s", dir)
}
return h, nil
}
// createHeadBlock starts a new head block to append to.
func (db *DB) createHeadBlock(mint, maxt int64) (headBlock, error) {
dir, err := TouchHeadBlock(db.dir, mint, maxt)
if err != nil {
return nil, errors.Wrapf(err, "touch head block %s", dir)
}
newHead, err := db.openHeadBlock(dir)
if err != nil {
return nil, err
}
db.logger.Log("msg", "created head block", "ulid", newHead.meta.ULID, "mint", mint, "maxt", maxt)
db.blocks = append(db.blocks, newHead) // TODO(fabxc): this is a race!
db.heads = append(db.heads, newHead)
select {
case db.compactc <- struct{}{}:
default:
}
return newHead, nil
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
@ -934,7 +692,7 @@ func blockDirs(dir string) ([]string, error) {
return dirs, nil
}
func sequenceFiles(dir, prefix string) ([]string, error) {
func sequenceFiles(dir string) ([]string, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return nil, err
@ -942,24 +700,15 @@ func sequenceFiles(dir, prefix string) ([]string, error) {
var res []string
for _, fi := range files {
if isSequenceFile(fi, prefix) {
res = append(res, filepath.Join(dir, fi.Name()))
if _, err := strconv.ParseUint(fi.Name(), 10, 64); err != nil {
continue
}
res = append(res, filepath.Join(dir, fi.Name()))
}
return res, nil
}
func isSequenceFile(fi os.FileInfo, prefix string) bool {
if !strings.HasPrefix(fi.Name(), prefix) {
return false
}
if _, err := strconv.ParseUint(fi.Name()[len(prefix):], 10, 32); err != nil {
return false
}
return true
}
func nextSequenceFile(dir, prefix string) (string, int, error) {
func nextSequenceFile(dir string) (string, int, error) {
names, err := fileutil.ReadDir(dir)
if err != nil {
return "", 0, err
@ -967,16 +716,13 @@ func nextSequenceFile(dir, prefix string) (string, int, error) {
i := uint64(0)
for _, n := range names {
if !strings.HasPrefix(n, prefix) {
continue
}
j, err := strconv.ParseUint(n[len(prefix):], 10, 32)
j, err := strconv.ParseUint(n, 10, 64)
if err != nil {
continue
}
i = j
}
return filepath.Join(dir, fmt.Sprintf("%s%0.6d", prefix, i+1)), int(i + 1), nil
return filepath.Join(dir, fmt.Sprintf("%0.6d", i+1)), int(i + 1), nil
}
// The MultiError type implements the error interface, and contains the
@ -1032,3 +778,14 @@ func closeAll(cs ...io.Closer) error {
}
return merr.Err()
}
func exponential(d, min, max time.Duration) time.Duration {
d *= 2
if d < min {
d = min
}
if d > max {
d = max
}
return d
}

View file

@ -86,7 +86,7 @@ func (d *decbuf) uvarintStr() string {
d.e = errInvalidSize
return ""
}
s := yoloString(d.b[:l])
s := string(d.b[:l])
d.b = d.b[l:]
return s
}

File diff suppressed because it is too large Load diff

View file

@ -18,7 +18,6 @@ import (
"encoding/binary"
"fmt"
"hash"
"hash/crc32"
"io"
"os"
"path/filepath"
@ -100,7 +99,7 @@ type IndexWriter interface {
// their labels.
// The reference numbers are used to resolve entries in postings lists that
// are added later.
AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error
AddSeries(ref uint64, l labels.Labels, chunks ...ChunkMeta) error
// WriteLabelIndex serializes an index from label names to values.
// The passed in values chained tuples of strings of the length of names.
@ -131,7 +130,7 @@ type indexWriter struct {
uint32s []uint32
symbols map[string]uint32 // symbol offsets
seriesOffsets map[uint32]uint64 // offsets of series
seriesOffsets map[uint64]uint64 // offsets of series
labelIndexes []hashEntry // label index offsets
postings []hashEntry // postings lists offsets
@ -176,8 +175,8 @@ func newIndexWriter(dir string) (*indexWriter, error) {
// Caches.
symbols: make(map[string]uint32, 1<<13),
seriesOffsets: make(map[uint32]uint64, 1<<16),
crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)),
seriesOffsets: make(map[uint64]uint64, 1<<16),
crc32: newCRC32(),
}
if err := iw.writeMeta(); err != nil {
return nil, err
@ -261,7 +260,7 @@ func (w *indexWriter) writeMeta() error {
return w.write(w.buf1.get())
}
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error {
func (w *indexWriter) AddSeries(ref uint64, lset labels.Labels, chunks ...ChunkMeta) error {
if err := w.ensureStage(idxStageSeries); err != nil {
return err
}
@ -458,7 +457,10 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if !ok {
return errors.Errorf("%p series for reference %d not found", w, it.At())
}
refs = append(refs, uint32(offset)) // XXX(fabxc): get uint64 vs uint32 sorted out.
if offset > (1<<32)-1 {
return errors.Errorf("series offset %d exceeds 4 bytes", offset)
}
refs = append(refs, uint32(offset))
}
if err := it.Err(); err != nil {
return err
@ -525,7 +527,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
Series(ref uint32, lset *labels.Labels, chks *[]ChunkMeta) error
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
// LabelIndices returns the label pairs for which indices exist.
LabelIndices() ([][]string, error)
@ -741,7 +743,7 @@ func (r *indexReader) LabelIndices() ([][]string, error) {
return res, nil
}
func (r *indexReader) Series(ref uint32, lbls *labels.Labels, chks *[]ChunkMeta) error {
func (r *indexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkMeta) error {
d1 := r.decbufAt(int(ref))
d2 := d1.decbuf(int(d1.uvarint()))

79
vendor/github.com/prometheus/tsdb/pool.go generated vendored Normal file
View file

@ -0,0 +1,79 @@
package tsdb
import "sync"
type bucketPool struct {
buckets []sync.Pool
sizes []int
new func(sz int) interface{}
}
func newBucketPool(minSize, maxSize int, factor float64, f func(sz int) interface{}) *bucketPool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
panic("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &bucketPool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
new: f,
}
return p
}
func (p *bucketPool) get(sz int) interface{} {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
x := p.buckets[i].Get()
if x == nil {
x = p.new(sz)
}
return x
}
return p.new(sz)
}
func (p *bucketPool) put(x interface{}, sz int) {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
p.buckets[i].Put(x)
return
}
}
type poolUint64 struct {
p *bucketPool
}
func newPoolUint64(minSize, maxSize int, factor float64) poolUint64 {
return poolUint64{
p: newBucketPool(minSize, maxSize, factor, func(sz int) interface{} {
return make([]uint64, 0, sz)
}),
}
}
func (p poolUint64) get(sz int) []uint64 {
return p.p.get(sz).([]uint64)
}
func (p poolUint64) put(x []uint64) {
p.p.put(x[:0], cap(x))
}

View file

@ -17,31 +17,47 @@ import (
"encoding/binary"
"sort"
"strings"
"sync"
"github.com/prometheus/tsdb/labels"
)
type memPostings struct {
m map[term][]uint32
mtx sync.RWMutex
m map[labels.Label][]uint64
}
type term struct {
name, value string
func newMemPostings() *memPostings {
return &memPostings{
m: make(map[labels.Label][]uint64, 512),
}
}
// Postings returns an iterator over the postings list for s.
func (p *memPostings) get(t term) Postings {
l := p.m[t]
func (p *memPostings) get(name, value string) Postings {
p.mtx.RLock()
l := p.m[labels.Label{Name: name, Value: value}]
p.mtx.RUnlock()
if l == nil {
return emptyPostings
}
return newListPostings(l)
}
var allLabel = labels.Label{}
// add adds a document to the index. The caller has to ensure that no
// term argument appears twice.
func (p *memPostings) add(id uint32, terms ...term) {
for _, t := range terms {
p.m[t] = append(p.m[t], id)
func (p *memPostings) add(id uint64, lset labels.Labels) {
p.mtx.Lock()
for _, l := range lset {
p.m[l] = append(p.m[l], id)
}
p.m[allLabel] = append(p.m[allLabel], id)
p.mtx.Unlock()
}
// Postings provides iterative access over a postings list.
@ -51,10 +67,10 @@ type Postings interface {
// Seek advances the iterator to value v or greater and returns
// true if a value was found.
Seek(v uint32) bool
Seek(v uint64) bool
// At returns the value at the current iterator position.
At() uint32
At() uint64
// Err returns the last error of the iterator.
Err() error
@ -66,8 +82,8 @@ type errPostings struct {
}
func (e errPostings) Next() bool { return false }
func (e errPostings) Seek(uint32) bool { return false }
func (e errPostings) At() uint32 { return 0 }
func (e errPostings) Seek(uint64) bool { return false }
func (e errPostings) At() uint64 { return 0 }
func (e errPostings) Err() error { return e.err }
var emptyPostings = errPostings{}
@ -88,18 +104,18 @@ func Intersect(its ...Postings) Postings {
type intersectPostings struct {
a, b Postings
aok, bok bool
cur uint32
cur uint64
}
func newIntersectPostings(a, b Postings) *intersectPostings {
return &intersectPostings{a: a, b: b}
}
func (it *intersectPostings) At() uint32 {
func (it *intersectPostings) At() uint64 {
return it.cur
}
func (it *intersectPostings) doNext(id uint32) bool {
func (it *intersectPostings) doNext(id uint64) bool {
for {
if !it.b.Seek(id) {
return false
@ -125,7 +141,7 @@ func (it *intersectPostings) Next() bool {
return it.doNext(it.a.At())
}
func (it *intersectPostings) Seek(id uint32) bool {
func (it *intersectPostings) Seek(id uint64) bool {
if !it.a.Seek(id) {
return false
}
@ -155,14 +171,14 @@ type mergedPostings struct {
a, b Postings
initialized bool
aok, bok bool
cur uint32
cur uint64
}
func newMergedPostings(a, b Postings) *mergedPostings {
return &mergedPostings{a: a, b: b}
}
func (it *mergedPostings) At() uint32 {
func (it *mergedPostings) At() uint64 {
return it.cur
}
@ -204,7 +220,7 @@ func (it *mergedPostings) Next() bool {
return true
}
func (it *mergedPostings) Seek(id uint32) bool {
func (it *mergedPostings) Seek(id uint64) bool {
if it.cur >= id {
return true
}
@ -225,15 +241,15 @@ func (it *mergedPostings) Err() error {
// listPostings implements the Postings interface over a plain list.
type listPostings struct {
list []uint32
cur uint32
list []uint64
cur uint64
}
func newListPostings(list []uint32) *listPostings {
func newListPostings(list []uint64) *listPostings {
return &listPostings{list: list}
}
func (it *listPostings) At() uint32 {
func (it *listPostings) At() uint64 {
return it.cur
}
@ -247,7 +263,7 @@ func (it *listPostings) Next() bool {
return false
}
func (it *listPostings) Seek(x uint32) bool {
func (it *listPostings) Seek(x uint64) bool {
// If the current value satisfies, then return.
if it.cur >= x {
return true
@ -281,8 +297,8 @@ func newBigEndianPostings(list []byte) *bigEndianPostings {
return &bigEndianPostings{list: list}
}
func (it *bigEndianPostings) At() uint32 {
return it.cur
func (it *bigEndianPostings) At() uint64 {
return uint64(it.cur)
}
func (it *bigEndianPostings) Next() bool {
@ -294,15 +310,15 @@ func (it *bigEndianPostings) Next() bool {
return false
}
func (it *bigEndianPostings) Seek(x uint32) bool {
if it.cur >= x {
func (it *bigEndianPostings) Seek(x uint64) bool {
if uint64(it.cur) >= x {
return true
}
num := len(it.list) / 4
// Do binary search between current position and end.
i := sort.Search(num, func(i int) bool {
return binary.BigEndian.Uint32(it.list[i*4:]) >= x
return binary.BigEndian.Uint32(it.list[i*4:]) >= uint32(x)
})
if i < num {
j := i * 4

View file

@ -54,26 +54,6 @@ type querier struct {
blocks []Querier
}
// Querier returns a new querier over the data partition for the given time range.
// A goroutine must not handle more than one open Querier.
func (s *DB) Querier(mint, maxt int64) Querier {
s.mtx.RLock()
s.headmtx.RLock()
blocks := s.blocksForInterval(mint, maxt)
s.headmtx.RUnlock()
sq := &querier{
blocks: make([]Querier, 0, len(blocks)),
db: s,
}
for _, b := range blocks {
sq.blocks = append(sq.blocks, b.Querier(mint, maxt))
}
return sq
}
func (q *querier) LabelValues(n string) ([]string, error) {
return q.lvals(q.blocks, n)
}
@ -128,6 +108,18 @@ func (q *querier) Close() error {
return merr.Err()
}
// NewBlockQuerier returns a queries against the readers.
func NewBlockQuerier(ir IndexReader, cr ChunkReader, tr TombstoneReader, mint, maxt int64) Querier {
return &blockQuerier{
index: ir,
chunks: cr,
tombstones: tr,
mint: mint,
maxt: maxt,
}
}
// blockQuerier provides querying access to a single block database.
type blockQuerier struct {
index IndexReader
@ -348,6 +340,13 @@ type mergedSeriesSet struct {
adone, bdone bool
}
// NewMergedSeriesSet takes two series sets as a single series set. The input series sets
// must be sorted and sequential in time, i.e. if they have the same label set,
// the datapoints of a must be before the datapoints of b.
func NewMergedSeriesSet(a, b SeriesSet) SeriesSet {
return newMergedSeriesSet(a, b)
}
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
@ -403,7 +402,7 @@ func (s *mergedSeriesSet) Next() bool {
type chunkSeriesSet interface {
Next() bool
At() (labels.Labels, []ChunkMeta, intervals)
At() (labels.Labels, []ChunkMeta, Intervals)
Err() error
}
@ -417,11 +416,11 @@ type baseChunkSeries struct {
lset labels.Labels
chks []ChunkMeta
intervals intervals
intervals Intervals
err error
}
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
func (s *baseChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals
}
@ -455,7 +454,7 @@ Outer:
// Only those chunks that are not entirely deleted.
chks := make([]ChunkMeta, 0, len(s.chks))
for _, chk := range s.chks {
if !(interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
if !(Interval{chk.MinTime, chk.MaxTime}.isSubrange(s.intervals)) {
chks = append(chks, chk)
}
}
@ -482,10 +481,10 @@ type populatedChunkSeries struct {
err error
chks []ChunkMeta
lset labels.Labels
intervals intervals
intervals Intervals
}
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, intervals) {
func (s *populatedChunkSeries) At() (labels.Labels, []ChunkMeta, Intervals) {
return s.lset, s.chks, s.intervals
}
func (s *populatedChunkSeries) Err() error { return s.err }
@ -570,7 +569,7 @@ type chunkSeries struct {
mint, maxt int64
intervals intervals
intervals Intervals
}
func (s *chunkSeries) Labels() labels.Labels {
@ -676,11 +675,12 @@ type chunkSeriesIterator struct {
maxt, mint int64
intervals intervals
intervals Intervals
}
func newChunkSeriesIterator(cs []ChunkMeta, dranges intervals, mint, maxt int64) *chunkSeriesIterator {
func newChunkSeriesIterator(cs []ChunkMeta, dranges Intervals, mint, maxt int64) *chunkSeriesIterator {
it := cs[0].Chunk.Iterator()
if len(dranges) > 0 {
it = &deletedIterator{it: it, intervals: dranges}
}
@ -731,19 +731,22 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) {
}
func (it *chunkSeriesIterator) Next() bool {
for it.cur.Next() {
if it.cur.Next() {
t, _ := it.cur.At()
if t < it.mint {
return it.Seek(it.mint)
}
if t < it.mint {
if !it.Seek(it.mint) {
return false
}
t, _ = it.At()
return t <= it.maxt
}
if t > it.maxt {
return false
}
return true
}
if err := it.cur.Err(); err != nil {
return false
}

View file

@ -16,7 +16,6 @@ package tsdb
import (
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
@ -34,10 +33,15 @@ const (
tombstoneFormatV1 = 1
)
// TombstoneReader is the iterator over tombstones.
type TombstoneReader interface {
Get(ref uint64) Intervals
}
func writeTombstoneFile(dir string, tr tombstoneReader) error {
path := filepath.Join(dir, tombstoneFilename)
tmp := path + ".tmp"
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
hash := newCRC32()
f, err := os.Create(tmp)
if err != nil {
@ -60,9 +64,9 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
for k, v := range tr {
for _, itv := range v {
buf.reset()
buf.putUvarint32(k)
buf.putVarint64(itv.mint)
buf.putVarint64(itv.maxt)
buf.putUvarint64(k)
buf.putVarint64(itv.Mint)
buf.putVarint64(itv.Maxt)
_, err = mw.Write(buf.get())
if err != nil {
@ -82,13 +86,8 @@ func writeTombstoneFile(dir string, tr tombstoneReader) error {
// Stone holds the information on the posting and time-range
// that is deleted.
type Stone struct {
ref uint32
intervals intervals
}
// TombstoneReader is the iterator over tombstones.
type TombstoneReader interface {
Get(ref uint32) intervals
ref uint64
intervals Intervals
}
func readTombstones(dir string) (tombstoneReader, error) {
@ -114,7 +113,7 @@ func readTombstones(dir string) (tombstoneReader, error) {
}
// Verify checksum
hash := crc32.New(crc32.MakeTable(crc32.Castagnoli))
hash := newCRC32()
if _, err := hash.Write(d.get()); err != nil {
return nil, errors.Wrap(err, "write to hash")
}
@ -124,48 +123,49 @@ func readTombstones(dir string) (tombstoneReader, error) {
stonesMap := newEmptyTombstoneReader()
for d.len() > 0 {
k := d.uvarint32()
k := d.uvarint64()
mint := d.varint64()
maxt := d.varint64()
if d.err() != nil {
return nil, d.err()
}
stonesMap.add(k, interval{mint, maxt})
stonesMap.add(k, Interval{mint, maxt})
}
return newTombstoneReader(stonesMap), nil
}
type tombstoneReader map[uint32]intervals
type tombstoneReader map[uint64]Intervals
func newTombstoneReader(ts map[uint32]intervals) tombstoneReader {
func newTombstoneReader(ts map[uint64]Intervals) tombstoneReader {
return tombstoneReader(ts)
}
func newEmptyTombstoneReader() tombstoneReader {
return tombstoneReader(make(map[uint32]intervals))
return tombstoneReader(make(map[uint64]Intervals))
}
func (t tombstoneReader) Get(ref uint32) intervals {
func (t tombstoneReader) Get(ref uint64) Intervals {
return t[ref]
}
func (t tombstoneReader) add(ref uint32, itv interval) {
func (t tombstoneReader) add(ref uint64, itv Interval) {
t[ref] = t[ref].add(itv)
}
type interval struct {
mint, maxt int64
// Interval represents a single time-interval.
type Interval struct {
Mint, Maxt int64
}
func (tr interval) inBounds(t int64) bool {
return t >= tr.mint && t <= tr.maxt
func (tr Interval) inBounds(t int64) bool {
return t >= tr.Mint && t <= tr.Maxt
}
func (tr interval) isSubrange(dranges intervals) bool {
func (tr Interval) isSubrange(dranges Intervals) bool {
for _, r := range dranges {
if r.inBounds(tr.mint) && r.inBounds(tr.maxt) {
if r.inBounds(tr.Mint) && r.inBounds(tr.Maxt) {
return true
}
}
@ -173,43 +173,44 @@ func (tr interval) isSubrange(dranges intervals) bool {
return false
}
type intervals []interval
// Intervals represents a set of increasing and non-overlapping time-intervals.
type Intervals []Interval
// This adds the new time-range to the existing ones.
// The existing ones must be sorted.
func (itvs intervals) add(n interval) intervals {
func (itvs Intervals) add(n Interval) Intervals {
for i, r := range itvs {
// TODO(gouthamve): Make this codepath easier to digest.
if r.inBounds(n.mint-1) || r.inBounds(n.mint) {
if n.maxt > r.maxt {
itvs[i].maxt = n.maxt
if r.inBounds(n.Mint-1) || r.inBounds(n.Mint) {
if n.Maxt > r.Maxt {
itvs[i].Maxt = n.Maxt
}
j := 0
for _, r2 := range itvs[i+1:] {
if n.maxt < r2.mint {
if n.Maxt < r2.Mint {
break
}
j++
}
if j != 0 {
if itvs[i+j].maxt > n.maxt {
itvs[i].maxt = itvs[i+j].maxt
if itvs[i+j].Maxt > n.Maxt {
itvs[i].Maxt = itvs[i+j].Maxt
}
itvs = append(itvs[:i+1], itvs[i+j+1:]...)
}
return itvs
}
if r.inBounds(n.maxt+1) || r.inBounds(n.maxt) {
if n.mint < r.maxt {
itvs[i].mint = n.mint
if r.inBounds(n.Maxt+1) || r.inBounds(n.Maxt) {
if n.Mint < r.Maxt {
itvs[i].Mint = n.Mint
}
return itvs
}
if n.mint < r.mint {
newRange := make(intervals, i, len(itvs[:i])+1)
if n.Mint < r.Mint {
newRange := make(Intervals, i, len(itvs[:i])+1)
copy(newRange, itvs[:i])
newRange = append(newRange, n)
newRange = append(newRange, itvs[i:]...)

File diff suppressed because it is too large Load diff

14
vendor/vendor.json vendored
View file

@ -859,22 +859,22 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "WvgmP/a6PVjj33/h8L7XrNUmoQE=",
"checksumSHA1": "AoNkGFKIyLNi4a/QcO8p5D7xIXs=",
"path": "github.com/prometheus/tsdb",
"revision": "c4ca881685ae1266a75caf57da46d8b6934213c0",
"revisionTime": "2017-08-18T07:54:27Z"
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
},
{
"checksumSHA1": "Gua979gmISm4cJP/fR2hL8m5To8=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "c4ca881685ae1266a75caf57da46d8b6934213c0",
"revisionTime": "2017-08-18T07:54:27Z"
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
},
{
"checksumSHA1": "zhmlvc322RH1L3l9DaA9d/HVVWs=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "c4ca881685ae1266a75caf57da46d8b6934213c0",
"revisionTime": "2017-08-18T07:54:27Z"
"revision": "0db4c227b72145418ad4c1fbda8fdb87bfe77a02",
"revisionTime": "2017-09-07T11:04:02Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",