update tsdb

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
This commit is contained in:
Krasi Georgiev 2018-05-22 13:35:35 +03:00
parent 7e376dfc89
commit 229f4e63f7
25 changed files with 463 additions and 447 deletions

View file

@ -1,19 +0,0 @@
Copyright (c) 2012 Ingo Oeser
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View file

@ -1,52 +0,0 @@
lockfile
=========
Handle locking via pid files.
[![Build Status Unix][1]][2]
[![Build status Windows][3]][4]
[1]: https://secure.travis-ci.org/nightlyone/lockfile.png
[2]: https://travis-ci.org/nightlyone/lockfile
[3]: https://ci.appveyor.com/api/projects/status/7mojkmauj81uvp8u/branch/master?svg=true
[4]: https://ci.appveyor.com/project/nightlyone/lockfile/branch/master
install
-------
Install [Go 1][5], either [from source][6] or [with a prepackaged binary][7].
For Windows suport, Go 1.4 or newer is required.
Then run
go get github.com/nightlyone/lockfile
[5]: http://golang.org
[6]: http://golang.org/doc/install/source
[7]: http://golang.org/doc/install
LICENSE
-------
BSD
documentation
-------------
[package documentation at godoc.org](http://godoc.org/github.com/nightlyone/lockfile)
install
-------------------
go get github.com/nightlyone/lockfile
contributing
============
Contributions are welcome. Please open an issue or send me a pull request for a dedicated branch.
Make sure the git commit hooks show it works.
git commit hooks
-----------------------
enable commit hooks via
cd .git ; rm -rf hooks; ln -s ../git-hooks hooks ; cd ..

View file

@ -1,12 +0,0 @@
clone_folder: c:\gopath\src\github.com\nightlyone\lockfile
environment:
GOPATH: c:\gopath
install:
- go version
- go env
- go get -v -t ./...
build_script:
- go test -v ./...

View file

@ -1,201 +0,0 @@
// Package lockfile handles pid file based locking.
// While a sync.Mutex helps against concurrency issues within a single process,
// this package is designed to help against concurrency issues between cooperating processes
// or serializing multiple invocations of the same process. You can also combine sync.Mutex
// with Lockfile in order to serialize an action between different goroutines in a single program
// and also multiple invocations of this program.
package lockfile
import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
)
// Lockfile is a pid file which can be locked
type Lockfile string
// TemporaryError is a type of error where a retry after a random amount of sleep should help to mitigate it.
type TemporaryError string
func (t TemporaryError) Error() string { return string(t) }
// Temporary returns always true.
// It exists, so you can detect it via
// if te, ok := err.(interface{ Temporary() bool }); ok {
// fmt.Println("I am a temporay error situation, so wait and retry")
// }
func (t TemporaryError) Temporary() bool { return true }
// Various errors returned by this package
var (
ErrBusy = TemporaryError("Locked by other process") // If you get this, retry after a short sleep might help
ErrNotExist = TemporaryError("Lockfile created, but doesn't exist") // If you get this, retry after a short sleep might help
ErrNeedAbsPath = errors.New("Lockfiles must be given as absolute path names")
ErrInvalidPid = errors.New("Lockfile contains invalid pid for system")
ErrDeadOwner = errors.New("Lockfile contains pid of process not existent on this system anymore")
ErrRogueDeletion = errors.New("Lockfile owned by me has been removed unexpectedly")
)
// New describes a new filename located at the given absolute path.
func New(path string) (Lockfile, error) {
if !filepath.IsAbs(path) {
return Lockfile(""), ErrNeedAbsPath
}
return Lockfile(path), nil
}
// GetOwner returns who owns the lockfile.
func (l Lockfile) GetOwner() (*os.Process, error) {
name := string(l)
// Ok, see, if we have a stale lockfile here
content, err := ioutil.ReadFile(name)
if err != nil {
return nil, err
}
// try hard for pids. If no pid, the lockfile is junk anyway and we delete it.
pid, err := scanPidLine(content)
if err != nil {
return nil, err
}
running, err := isRunning(pid)
if err != nil {
return nil, err
}
if running {
proc, err := os.FindProcess(pid)
if err != nil {
return nil, err
}
return proc, nil
}
return nil, ErrDeadOwner
}
// TryLock tries to own the lock.
// It Returns nil, if successful and and error describing the reason, it didn't work out.
// Please note, that existing lockfiles containing pids of dead processes
// and lockfiles containing no pid at all are simply deleted.
func (l Lockfile) TryLock() error {
name := string(l)
// This has been checked by New already. If we trigger here,
// the caller didn't use New and re-implemented it's functionality badly.
// So panic, that he might find this easily during testing.
if !filepath.IsAbs(name) {
panic(ErrNeedAbsPath)
}
tmplock, err := ioutil.TempFile(filepath.Dir(name), "")
if err != nil {
return err
}
cleanup := func() {
_ = tmplock.Close()
_ = os.Remove(tmplock.Name())
}
defer cleanup()
if err := writePidLine(tmplock, os.Getpid()); err != nil {
return err
}
// return value intentionally ignored, as ignoring it is part of the algorithm
_ = os.Link(tmplock.Name(), name)
fiTmp, err := os.Lstat(tmplock.Name())
if err != nil {
return err
}
fiLock, err := os.Lstat(name)
if err != nil {
// tell user that a retry would be a good idea
if os.IsNotExist(err) {
return ErrNotExist
}
return err
}
// Success
if os.SameFile(fiTmp, fiLock) {
return nil
}
proc, err := l.GetOwner()
switch err {
default:
// Other errors -> defensively fail and let caller handle this
return err
case nil:
if proc.Pid != os.Getpid() {
return ErrBusy
}
case ErrDeadOwner, ErrInvalidPid:
// cases we can fix below
}
// clean stale/invalid lockfile
err = os.Remove(name)
if err != nil {
// If it doesn't exist, then it doesn't matter who removed it.
if !os.IsNotExist(err) {
return err
}
}
// now that the stale lockfile is gone, let's recurse
return l.TryLock()
}
// Unlock a lock again, if we owned it. Returns any error that happend during release of lock.
func (l Lockfile) Unlock() error {
proc, err := l.GetOwner()
switch err {
case ErrInvalidPid, ErrDeadOwner:
return ErrRogueDeletion
case nil:
if proc.Pid == os.Getpid() {
// we really own it, so let's remove it.
return os.Remove(string(l))
}
// Not owned by me, so don't delete it.
return ErrRogueDeletion
default:
// This is an application error or system error.
// So give a better error for logging here.
if os.IsNotExist(err) {
return ErrRogueDeletion
}
// Other errors -> defensively fail and let caller handle this
return err
}
}
func writePidLine(w io.Writer, pid int) error {
_, err := io.WriteString(w, fmt.Sprintf("%d\n", pid))
return err
}
func scanPidLine(content []byte) (int, error) {
if len(content) == 0 {
return 0, ErrInvalidPid
}
var pid int
if _, err := fmt.Sscanln(string(content), &pid); err != nil {
return 0, ErrInvalidPid
}
if pid <= 0 {
return 0, ErrInvalidPid
}
return pid, nil
}

View file

@ -1,20 +0,0 @@
// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris
package lockfile
import (
"os"
"syscall"
)
func isRunning(pid int) (bool, error) {
proc, err := os.FindProcess(pid)
if err != nil {
return false, err
}
if err := proc.Signal(syscall.Signal(0)); err != nil {
return false, nil
}
return true, nil
}

View file

@ -1,30 +0,0 @@
package lockfile
import (
"syscall"
)
//For some reason these consts don't exist in syscall.
const (
error_invalid_parameter = 87
code_still_active = 259
)
func isRunning(pid int) (bool, error) {
procHnd, err := syscall.OpenProcess(syscall.PROCESS_QUERY_INFORMATION, true, uint32(pid))
if err != nil {
if scerr, ok := err.(syscall.Errno); ok {
if uintptr(scerr) == error_invalid_parameter {
return false, nil
}
}
}
var code uint32
err = syscall.GetExitCodeProcess(procHnd, &code)
if err != nil {
return false, err
}
return code == code_still_active, nil
}

View file

@ -1,5 +1,10 @@
# TSDB
# TSDB [![Build Status](https://travis-ci.org/prometheus/tsdb.svg?branch=master)](https://travis-ci.org/prometheus/tsdb)
This repository contains the new Prometheus storage layer that will be used in its 2.0 release.
[![GoDoc](https://godoc.org/github.com/prometheus/tsdb?status.svg)](https://godoc.org/github.com/prometheus/tsdb)
[![Go Report Card](https://goreportcard.com/badge/github.com/prometheus/tsdb)](https://goreportcard.com/report/github.com/prometheus/tsdb)
This repository contains the Prometheus storage layer that is used in its 2.x releases.
A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).
See also the [format documentation](docs/format/README.md).

View file

@ -438,7 +438,7 @@ Outer:
for _, chk := range chks {
if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) {
// Delete only until the current vlaues and not beyond.
// Delete only until the current values and not beyond.
tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime)
stones[p.At()] = Intervals{{tmin, tmax}}
continue Outer
@ -474,9 +474,7 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (bool, error) {
numStones := 0
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
for _ = range ivs {
numStones++
}
numStones += len(ivs)
return nil
})

View file

@ -13,7 +13,7 @@
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It was modified to accomodate reading from byte slices without modifying
// It was modified to accommodate reading from byte slices without modifying
// the underlying bytes, which would panic when reading from mmaped
// read-only byte slices.
@ -259,7 +259,7 @@ func (it *xorIterator) Next() bool {
it.err = err
return false
}
it.t = int64(t)
it.t = t
it.val = math.Float64frombits(v)
it.numRead++

View file

@ -133,7 +133,7 @@ func (w *Writer) finalizeTail() error {
return err
}
// As the file was pre-allocated, we truncate any superfluous zero bytes.
off, err := tf.Seek(0, os.SEEK_CUR)
off, err := tf.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
@ -349,7 +349,7 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
}
b := s.bs[seq]
if int(off) >= b.Len() {
if off >= b.Len() {
return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len())
}
// With the minimum chunk length this should never cause us reading

View file

@ -14,6 +14,7 @@
package tsdb
import (
"fmt"
"io"
"math/rand"
"os"
@ -33,7 +34,7 @@ import (
"github.com/prometheus/tsdb/labels"
)
// ExponentialBlockRanges returns the time ranges based on the stepSize
// ExponentialBlockRanges returns the time ranges based on the stepSize.
func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
ranges := make([]int64, 0, steps)
curRange := minSize
@ -215,7 +216,7 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
Outer:
for _, p := range parts {
// Donot select the range if it has a block whose compaction failed.
// Do not select the range if it has a block whose compaction failed.
for _, dm := range p {
if dm.meta.Compaction.Failed {
continue Outer
@ -312,9 +313,12 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
// Compact creates a new block in the compactor's directory from the blocks in the
// provided directories.
func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, err error) {
var blocks []BlockReader
var bs []*Block
var metas []*BlockMeta
var (
blocks []BlockReader
bs []*Block
metas []*BlockMeta
uids []string
)
for _, d := range dirs {
b, err := OpenBlock(d, c.chunkPool)
@ -331,13 +335,23 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
metas = append(metas, meta)
blocks = append(blocks, b)
bs = append(bs, b)
uids = append(uids, meta.ULID.String())
}
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
uid = ulid.MustNew(ulid.Now(), entropy)
err = c.write(dest, compactBlockMetas(uid, metas...), blocks...)
meta := compactBlockMetas(uid, metas...)
err = c.write(dest, meta, blocks...)
if err == nil {
level.Info(c.logger).Log(
"msg", "compact blocks",
"count", len(blocks),
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
"sources", fmt.Sprintf("%v", uids),
)
return uid, nil
}
@ -365,7 +379,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
meta.Compaction.Level = 1
meta.Compaction.Sources = []ulid.ULID{uid}
return uid, c.write(dest, meta, b)
err := c.write(dest, meta, b)
if err != nil {
return uid, err
}
level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID)
return uid, nil
}
// instrumentedChunkWriter is used for level 1 compactions to record statistics
@ -390,8 +410,6 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error {
// write creates a new block that is the union of the provided blocks into dir.
// It cleans up all files of the old blocks after completing successfully.
func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) {
level.Info(c.logger).Log("msg", "compact blocks", "count", len(blocks), "mint", meta.MinTime, "maxt", meta.MaxTime)
dir := filepath.Join(dest, meta.ULID.String())
tmp := dir + ".tmp"
@ -472,7 +490,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
return errors.Wrap(err, "sync temporary dir file")
}
// close temp dir before rename block dir(for windows platform)
// Close temp dir before rename block dir (for windows platform).
if err = df.Close(); err != nil {
return errors.Wrap(err, "close temporary dir")
}
@ -482,6 +500,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe
if err := renameFile(tmp, dir); err != nil {
return errors.Wrap(err, "rename block dir")
}
return nil
}
@ -718,11 +737,6 @@ type compactionMerger struct {
intervals Intervals
}
type compactionSeries struct {
labels labels.Labels
chunks []*chunks.Meta
}
func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,

View file

@ -19,26 +19,25 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"unsafe"
"golang.org/x/sync/errgroup"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/nightlyone/lockfile"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/fileutil"
"github.com/prometheus/tsdb/labels"
"golang.org/x/sync/errgroup"
)
// DefaultOptions used for the DB. They are sane for setups using
@ -94,7 +93,7 @@ type Appender interface {
// a hashed partition of a seriedb.
type DB struct {
dir string
lockf *lockfile.Lockfile
lockf fileutil.Releaser
logger log.Logger
metrics *dbMetrics
@ -210,14 +209,11 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err != nil {
return nil, err
}
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
lockf, _, err := fileutil.Flock(filepath.Join(absdir, "lock"))
if err != nil {
return nil, err
return nil, errors.Wrap(err, "lock DB directory")
}
if err := lockf.TryLock(); err != nil {
return nil, errors.Wrapf(err, "open DB in %s", dir)
}
db.lockf = &lockf
db.lockf = lockf
}
db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool)
@ -522,6 +518,9 @@ func (db *DB) reload(deleteable ...string) (err error) {
blocks = append(blocks, b)
exist[meta.ULID] = struct{}{}
}
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
})
if err := validateBlockSequence(blocks); err != nil {
return errors.Wrap(err, "invalid block sequence")
@ -556,22 +555,129 @@ func (db *DB) reload(deleteable ...string) (err error) {
return errors.Wrap(db.head.Truncate(maxt), "head truncate failed")
}
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
func validateBlockSequence(bs []*Block) error {
if len(bs) == 0 {
if len(bs) <= 1 {
return nil
}
sort.Slice(bs, func(i, j int) bool {
return bs[i].Meta().MinTime < bs[j].Meta().MinTime
})
prev := bs[0]
for _, b := range bs[1:] {
if b.Meta().MinTime < prev.Meta().MaxTime {
return errors.Errorf("block time ranges overlap (%d, %d)", b.Meta().MinTime, prev.Meta().MaxTime)
}
var metas []BlockMeta
for _, b := range bs {
metas = append(metas, b.meta)
}
overlaps := OverlappingBlocks(metas)
if len(overlaps) > 0 {
return errors.Errorf("block time ranges overlap: %s", overlaps)
}
return nil
}
// TimeRange specifies minTime and maxTime range.
type TimeRange struct {
Min, Max int64
}
// Overlaps contains overlapping blocks aggregated by overlapping range.
type Overlaps map[TimeRange][]BlockMeta
// String returns human readable string form of overlapped blocks.
func (o Overlaps) String() string {
var res []string
for r, overlaps := range o {
var groups []string
for _, m := range overlaps {
groups = append(groups, fmt.Sprintf(
"<ulid: %s, mint: %d, maxt: %d, range: %s>",
m.ULID.String(),
m.MinTime,
m.MaxTime,
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
))
}
res = append(res, fmt.Sprintf(
"[mint: %d, maxt: %d, range: %s, blocks: %d]: %s",
r.Min, r.Max,
(time.Duration((r.Max-r.Min)/1000)*time.Second).String(),
len(overlaps),
strings.Join(groups, ", ")),
)
}
return strings.Join(res, "\n")
}
// OverlappingBlocks returns all overlapping blocks from given meta files.
func OverlappingBlocks(bm []BlockMeta) Overlaps {
if len(bm) <= 1 {
return nil
}
sort.Slice(bm, func(i, j int) bool {
return bm[i].MinTime < bm[j].MinTime
})
var (
overlaps [][]BlockMeta
// pending contains not ended blocks in regards to "current" timestamp.
pending = []BlockMeta{bm[0]}
// continuousPending helps to aggregate same overlaps to single group.
continuousPending = true
)
// We have here blocks sorted by minTime. We iterate over each block and treat its minTime as our "current" timestamp.
// We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current
// timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending.
for _, b := range bm[1:] {
var newPending []BlockMeta
for _, p := range pending {
// "b.MinTime" is our current time.
if b.MinTime >= p.MaxTime {
continuousPending = false
continue
}
// "p" overlaps with "b" and "p" is still pending.
newPending = append(newPending, p)
}
// Our block "b" is now pending.
pending = append(newPending, b)
if len(newPending) == 0 {
// No overlaps.
continue
}
if continuousPending && len(overlaps) > 0 {
overlaps[len(overlaps)-1] = append(overlaps[len(overlaps)-1], b)
continue
}
overlaps = append(overlaps, append(newPending, b))
// Start new pendings.
continuousPending = true
}
// Fetch the critical overlapped time range foreach overlap groups.
overlapGroups := Overlaps{}
for _, overlap := range overlaps {
minRange := TimeRange{Min: 0, Max: math.MaxInt64}
for _, b := range overlap {
if minRange.Max > b.MaxTime {
minRange.Max = b.MaxTime
}
if minRange.Min < b.MinTime {
minRange.Min = b.MinTime
}
}
overlapGroups[minRange] = overlap
}
return overlapGroups
}
func (db *DB) String() string {
return "HEAD"
}
@ -609,7 +715,7 @@ func (db *DB) Close() error {
merr.Add(g.Wait())
if db.lockf != nil {
merr.Add(db.lockf.Unlock())
merr.Add(db.lockf.Release())
}
merr.Add(db.head.Close())
return merr.Err()
@ -725,10 +831,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error {
g.Go(func() error {
return db.head.Delete(mint, maxt, ms...)
})
if err := g.Wait(); err != nil {
return err
}
return nil
return g.Wait()
}
// CleanTombstones re-writes any blocks with tombstones.
@ -737,7 +840,7 @@ func (db *DB) CleanTombstones() error {
defer db.cmtx.Unlock()
start := time.Now()
defer db.metrics.tombCleanTimer.Observe(float64(time.Since(start).Seconds()))
defer db.metrics.tombCleanTimer.Observe(time.Since(start).Seconds())
db.mtx.RLock()
blocks := db.blocks[:]
@ -767,10 +870,6 @@ func intervalOverlap(amin, amax, bmin, bmax int64) bool {
return amin <= bmax && bmin <= amax
}
func intervalContains(min, max, t int64) bool {
return t >= min && t <= max
}
func isBlockDir(fi os.FileInfo) bool {
if !fi.IsDir() {
return false
@ -869,9 +968,6 @@ func (es MultiError) Err() error {
return es
}
func yoloString(b []byte) string { return *((*string)(unsafe.Pointer(&b))) }
func yoloBytes(s string) []byte { return *((*[]byte)(unsafe.Pointer(&s))) }
func closeAll(cs ...io.Closer) error {
var merr MultiError

View file

@ -11,7 +11,7 @@ import (
var errInvalidSize = errors.New("invalid size")
// enbuf is a helper type to populate a byte slice with various types.
// encbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte

41
vendor/github.com/prometheus/tsdb/fileutil/flock.go generated vendored Normal file
View file

@ -0,0 +1,41 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import (
"os"
"path/filepath"
)
// Releaser provides the Release method to release a file lock.
type Releaser interface {
Release() error
}
// Flock locks the file with the provided name. If the file does not exist, it is
// created. The returned Releaser is used to release the lock. existed is true
// if the file to lock already existed. A non-nil error is returned if the
// locking has failed. Neither this function nor the returned Releaser is
// goroutine-safe.
func Flock(fileName string) (r Releaser, existed bool, err error) {
if err = os.MkdirAll(filepath.Dir(fileName), 0755); err != nil {
return nil, false, err
}
_, err = os.Stat(fileName)
existed = err == nil
r, err = newLock(fileName)
return r, existed, err
}

View file

@ -0,0 +1,32 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import "os"
type plan9Lock struct {
f *os.File
}
func (l *plan9Lock) Release() error {
return l.f.Close()
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, os.ModeExclusive|0644)
if err != nil {
return nil, err
}
return &plan9Lock{f}, nil
}

View file

@ -0,0 +1,59 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build solaris
package fileutil
import (
"os"
"syscall"
)
type unixLock struct {
f *os.File
}
func (l *unixLock) Release() error {
if err := l.set(false); err != nil {
return err
}
return l.f.Close()
}
func (l *unixLock) set(lock bool) error {
flock := syscall.Flock_t{
Type: syscall.F_UNLCK,
Start: 0,
Len: 0,
Whence: 1,
}
if lock {
flock.Type = syscall.F_WRLCK
}
return syscall.FcntlFlock(l.f.Fd(), syscall.F_SETLK, &flock)
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
l := &unixLock{f}
err = l.set(true)
if err != nil {
f.Close()
return nil, err
}
return l, nil
}

View file

@ -0,0 +1,54 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build darwin dragonfly freebsd linux netbsd openbsd
package fileutil
import (
"os"
"syscall"
)
type unixLock struct {
f *os.File
}
func (l *unixLock) Release() error {
if err := l.set(false); err != nil {
return err
}
return l.f.Close()
}
func (l *unixLock) set(lock bool) error {
how := syscall.LOCK_UN
if lock {
how = syscall.LOCK_EX
}
return syscall.Flock(int(l.f.Fd()), how|syscall.LOCK_NB)
}
func newLock(fileName string) (Releaser, error) {
f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
l := &unixLock{f}
err = l.set(true)
if err != nil {
f.Close()
return nil, err
}
return l, nil
}

View file

@ -0,0 +1,36 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fileutil
import "syscall"
type windowsLock struct {
fd syscall.Handle
}
func (fl *windowsLock) Release() error {
return syscall.Close(fl.fd)
}
func newLock(fileName string) (Releaser, error) {
pathp, err := syscall.UTF16PtrFromString(fileName)
if err != nil {
return nil, err
}
fd, err := syscall.CreateFile(pathp, syscall.GENERIC_READ|syscall.GENERIC_WRITE, 0, nil, syscall.CREATE_ALWAYS, syscall.FILE_ATTRIBUTE_NORMAL, 0)
if err != nil {
return nil, err
}
return &windowsLock{fd}, nil
}

View file

@ -31,4 +31,4 @@ func Fsync(f *os.File) error {
// to be correctly handled.
func Fdatasync(f *os.File) error {
return syscall.Fdatasync(int(f.Fd()))
}
}

View file

@ -449,10 +449,10 @@ func (h *Head) Appender() Appender {
func (h *Head) appender() *headAppender {
return &headAppender{
head: h,
mint: h.MaxTime() - h.chunkRange/2,
samples: h.getAppendBuffer(),
highTimestamp: math.MinInt64,
head: h,
mint: h.MaxTime() - h.chunkRange/2,
maxt: math.MinInt64,
samples: h.getAppendBuffer(),
}
}
@ -469,12 +469,11 @@ func (h *Head) putAppendBuffer(b []RefSample) {
}
type headAppender struct {
head *Head
mint int64
head *Head
mint, maxt int64
series []RefSeries
samples []RefSample
highTimestamp int64
series []RefSeries
samples []RefSample
}
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -508,8 +507,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < a.mint {
return ErrOutOfBounds
}
if t > a.highTimestamp {
a.highTimestamp = t
if t > a.maxt {
a.maxt = t
}
a.samples = append(a.samples, RefSample{
@ -551,10 +550,10 @@ func (a *headAppender) Commit() error {
for {
ht := a.head.MaxTime()
if a.highTimestamp <= ht {
if a.maxt <= ht {
break
}
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.highTimestamp) {
if atomic.CompareAndSwapInt64(&a.head.maxTime, ht, a.maxt) {
break
}
}
@ -587,8 +586,12 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error {
for p.Next() {
series := h.series.getByID(p.At())
t0, t1 := series.minTime(), series.maxTime()
if t0 == math.MinInt64 || t1 == math.MinInt64 {
continue
}
// Delete only until the current values and not beyond.
t0, t1 := clampInterval(mint, maxt, series.minTime(), series.maxTime())
t0, t1 = clampInterval(mint, maxt, t0, t1)
stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}})
}
@ -1106,11 +1109,18 @@ type memSeries struct {
}
func (s *memSeries) minTime() int64 {
if len(s.chunks) == 0 {
return math.MinInt64
}
return s.chunks[0].minTime
}
func (s *memSeries) maxTime() int64 {
return s.head().maxTime
c := s.head()
if c == nil {
return math.MinInt64
}
return c.maxTime
}
func (s *memSeries) cut(mint int64) *memChunk {

View file

@ -780,7 +780,7 @@ func (r *Reader) readSymbols(off int) error {
for d.err() == nil && d.len() > 0 && cnt > 0 {
s := d.uvarintStr()
r.symbols[uint32(nextPos)] = s
r.symbols[nextPos] = s
if r.version == 2 {
nextPos++
@ -800,7 +800,7 @@ func (r *Reader) readOffsetTable(off uint64, f func([]string, uint64) error) err
cnt := d.be32()
for d.err() == nil && d.len() > 0 && cnt > 0 {
keyCount := int(d.uvarint())
keyCount := d.uvarint()
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
@ -1038,7 +1038,7 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
d := decbuf{b: b}
k := int(d.uvarint())
k := d.uvarint()
for i := 0; i < k; i++ {
lno := uint32(d.uvarint())
@ -1061,7 +1061,7 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
}
// Read the chunks meta data.
k = int(d.uvarint())
k = d.uvarint()
if k == 0 {
return nil

View file

@ -117,7 +117,7 @@ func (q *querier) Close() error {
return merr.Err()
}
// NewBlockQuerier returns a queries against the readers.
// NewBlockQuerier returns a querier against the reader.
func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) {
indexr, err := b.Index()
if err != nil {

View file

@ -36,9 +36,20 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
return err
}
if meta.Version == 1 {
level.Info(logger).Log(
"msg", "found healthy block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
)
continue
}
level.Info(logger).Log("msg", "fixing broken block", "ulid", meta.ULID)
level.Info(logger).Log(
"msg", "fixing broken block",
"mint", meta.MinTime,
"maxt", meta.MaxTime,
"ulid", meta.ULID,
)
repl, err := os.Create(filepath.Join(d, "index.repaired"))
if err != nil {

View file

@ -290,7 +290,7 @@ func (w *SegmentWAL) truncate(err error, file int, lastOffset int64) error {
w.files = w.files[:file+1]
// Seek the current file to the last valid offset where we continue writing from.
_, err = w.files[file].Seek(lastOffset, os.SEEK_SET)
_, err = w.files[file].Seek(lastOffset, io.SeekStart)
return err
}
@ -393,7 +393,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
return errors.Wrap(r.Err(), "read candidate WAL files")
}
off, err := csf.Seek(0, os.SEEK_CUR)
off, err := csf.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
@ -418,7 +418,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
}
// The file object of csf still holds the name before rename. Recreate it so
// subsequent truncations do not look at a non-existant file name.
// subsequent truncations do not look at a non-existent file name.
csf.File, err = w.openSegmentFile(candidates[0].Name())
if err != nil {
return err
@ -583,7 +583,7 @@ func (w *SegmentWAL) cut() error {
// in the new segment.
go func() {
w.actorc <- func() error {
off, err := hf.Seek(0, os.SEEK_CUR)
off, err := hf.Seek(0, io.SeekCurrent)
if err != nil {
return errors.Wrapf(err, "finish old segment %s", hf.Name())
}
@ -937,7 +937,7 @@ func (r *walReader) Read(
series = v.([]RefSeries)
}
err := r.decodeSeries(flag, b, &series)
err = r.decodeSeries(flag, b, &series)
if err != nil {
err = errors.Wrap(err, "decode series entry")
break
@ -958,7 +958,7 @@ func (r *walReader) Read(
samples = v.([]RefSample)
}
err := r.decodeSamples(flag, b, &samples)
err = r.decodeSamples(flag, b, &samples)
if err != nil {
err = errors.Wrap(err, "decode samples entry")
break
@ -980,7 +980,7 @@ func (r *walReader) Read(
deletes = v.([]Stone)
}
err := r.decodeDeletes(flag, b, &deletes)
err = r.decodeDeletes(flag, b, &deletes)
if err != nil {
err = errors.Wrap(err, "decode delete entry")
break
@ -1015,7 +1015,7 @@ func (r *walReader) at() (WALEntryType, byte, []byte) {
}
// next returns decodes the next entry pair and returns true
// if it was succesful.
// if it was successful.
func (r *walReader) next() bool {
if r.cur >= len(r.files) {
return false
@ -1024,7 +1024,7 @@ func (r *walReader) next() bool {
// Remember the offset after the last correctly read entry. If the next one
// is corrupted, this is where we can safely truncate.
r.lastOffset, r.err = cf.Seek(0, os.SEEK_CUR)
r.lastOffset, r.err = cf.Seek(0, io.SeekCurrent)
if r.err != nil {
return false
}

40
vendor/vendor.json vendored
View file

@ -679,12 +679,6 @@
"revision": "cc309e4a22231782e8893f3c35ced0967807a33e",
"revisionTime": "2016-11-29T09:58:57Z"
},
{
"checksumSHA1": "aCtmlyAgau9n0UHs8Pk+3xfIaVk=",
"path": "github.com/nightlyone/lockfile",
"revision": "1d49c987357a327b5b03aa84cbddd582c328615d",
"revisionTime": "2016-09-28T00:14:32Z"
},
{
"checksumSHA1": "gkyBg/2hcIWR/8qGEeGVoHwOyfo=",
"path": "github.com/oklog/oklog/pkg/group",
@ -826,40 +820,40 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "vNslgGjRBqauFmVIBTkvEWwvURg=",
"checksumSHA1": "e7QdIY1+bs+75qObh1MLVdGNLvE=",
"path": "github.com/prometheus/tsdb",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "S7F4yWxVLhxQNHMdgoOo6plmOOs=",
"checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=",
"path": "github.com/prometheus/tsdb/chunkenc",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "+zsn1i8cqwgZXL8Bg6jDy32xjAo=",
"checksumSHA1": "746Mjy2y6wdsGjY/FcGhc8tI4w8=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "T7qvg4VhFLklT3g+qPkUWxBo0yw=",
"checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "4ebzIE2Jvj6+SG6yGFSXN8scgfo=",
"checksumSHA1": "A2uIFwIgeHmXGBzOpna95kM80RY=",
"path": "github.com/prometheus/tsdb/index",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
"revision": "ae33d7873d94cec6b3da3ffbf903b3b35a2db9ee",
"revisionTime": "2018-05-29T19:14:13Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",