mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Load head with WALs correctly
This commit is contained in:
parent
1dde3b6d31
commit
9c6a72aadd
28
block.go
28
block.go
|
@ -6,6 +6,9 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Block handles reads against a block of time series data within a time window.
|
||||
|
@ -104,20 +107,37 @@ func (p persistedBlocks) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
|||
func (p persistedBlocks) Less(i, j int) bool { return p[i].stats.MinTime < p[j].stats.MinTime }
|
||||
|
||||
// findBlocks finds time-ordered persisted blocks within a directory.
|
||||
func findPersistedBlocks(path string) ([]*persistedBlock, error) {
|
||||
func findBlocks(path string) ([]*persistedBlock, *HeadBlock, error) {
|
||||
var pbs persistedBlocks
|
||||
|
||||
files, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
var head *HeadBlock
|
||||
|
||||
for _, fi := range files {
|
||||
p := filepath.Join(path, fi.Name())
|
||||
|
||||
if _, err := os.Stat(chunksFileName(p)); os.IsNotExist(err) {
|
||||
fmt.Println("found head dir", p)
|
||||
if head != nil {
|
||||
return nil, nil, errors.Errorf("found two head blocks")
|
||||
}
|
||||
ts, err := strconv.Atoi(filepath.Base(p))
|
||||
if err != nil {
|
||||
return nil, nil, errors.Errorf("invalid directory name")
|
||||
}
|
||||
head, err = NewHeadBlock(p, int64(ts))
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
pb, err := newPersistedBlock(p)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error initializing block %q: %s", p, err)
|
||||
return nil, nil, fmt.Errorf("error initializing block %q: %s", p, err)
|
||||
}
|
||||
pbs = append(pbs, pb)
|
||||
}
|
||||
|
@ -126,7 +146,7 @@ func findPersistedBlocks(path string) ([]*persistedBlock, error) {
|
|||
// range of time.
|
||||
sort.Sort(pbs)
|
||||
|
||||
return pbs, nil
|
||||
return pbs, head, nil
|
||||
}
|
||||
|
||||
func chunksFileName(path string) string {
|
||||
|
|
15
db.go
15
db.go
|
@ -41,7 +41,7 @@ type DB struct {
|
|||
|
||||
// TODO(fabxc): make configurable
|
||||
const (
|
||||
shardShift = 4
|
||||
shardShift = 3
|
||||
numShards = 1 << shardShift
|
||||
maxChunkSize = 1024
|
||||
)
|
||||
|
@ -193,7 +193,7 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) {
|
|||
}
|
||||
|
||||
// Initialize previously persisted blocks.
|
||||
pbs, err := findPersistedBlocks(path)
|
||||
pbs, head, err := findBlocks(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -201,12 +201,15 @@ func OpenShard(path string, logger log.Logger) (*Shard, error) {
|
|||
// TODO(fabxc): get time from client-defined `now` function.
|
||||
baset := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
if len(pbs) > 0 {
|
||||
baset = pbs[0].stats.MaxTime
|
||||
baset = pbs[len(pbs)-1].stats.MaxTime
|
||||
}
|
||||
if head == nil {
|
||||
fmt.Println("creating new head", baset)
|
||||
|
||||
head, err := NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
head, err = NewHeadBlock(filepath.Join(path, fmt.Sprintf("%d", baset)), baset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s := &Shard{
|
||||
|
|
4
wal.go
4
wal.go
|
@ -41,13 +41,13 @@ func OpenWAL(dir string) (*WAL, error) {
|
|||
|
||||
p := filepath.Join(dir, "wal")
|
||||
|
||||
f, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
|
||||
f, err := fileutil.TryLockFile(p, os.O_RDWR, 0666)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f, err = fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
||||
f, err = fileutil.LockFile(p, os.O_RDWR|os.O_CREATE, 0666)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue