mirror of
https://github.com/prometheus/prometheus.git
synced 2025-02-02 08:31:11 -08:00
Correctly write empty blocks and extend appenders by new blocks
This commit is contained in:
parent
42fa342229
commit
ac5229e1b4
64
db.go
64
db.go
|
@ -28,7 +28,7 @@ import (
|
||||||
// millisecond precision timestampdb.
|
// millisecond precision timestampdb.
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
WALFlushInterval: 5 * time.Second,
|
WALFlushInterval: 5 * time.Second,
|
||||||
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
|
MinBlockDuration: 2 * 60 * 60 * 1000, // 2 hours in milliseconds
|
||||||
MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds
|
MaxBlockDuration: 48 * 60 * 60 * 1000, // 1 day in milliseconds
|
||||||
AppendableBlocks: 2,
|
AppendableBlocks: 2,
|
||||||
}
|
}
|
||||||
|
@ -93,7 +93,6 @@ type DB struct {
|
||||||
compactor *compactor
|
compactor *compactor
|
||||||
|
|
||||||
compactc chan struct{}
|
compactc chan struct{}
|
||||||
cutc chan struct{}
|
|
||||||
donec chan struct{}
|
donec chan struct{}
|
||||||
stopc chan struct{}
|
stopc chan struct{}
|
||||||
}
|
}
|
||||||
|
@ -137,6 +136,9 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = DefaultOptions
|
opts = DefaultOptions
|
||||||
}
|
}
|
||||||
|
if opts.AppendableBlocks < 1 {
|
||||||
|
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
|
||||||
|
}
|
||||||
|
|
||||||
db = &DB{
|
db = &DB{
|
||||||
dir: dir,
|
dir: dir,
|
||||||
|
@ -144,7 +146,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
metrics: newDBMetrics(r),
|
metrics: newDBMetrics(r),
|
||||||
opts: opts,
|
opts: opts,
|
||||||
compactc: make(chan struct{}, 1),
|
compactc: make(chan struct{}, 1),
|
||||||
cutc: make(chan struct{}, 1),
|
|
||||||
donec: make(chan struct{}),
|
donec: make(chan struct{}),
|
||||||
stopc: make(chan struct{}),
|
stopc: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
@ -164,32 +165,6 @@ func Open(dir string, logger log.Logger, opts *Options) (db *DB, err error) {
|
||||||
func (db *DB) run() {
|
func (db *DB) run() {
|
||||||
defer close(db.donec)
|
defer close(db.donec)
|
||||||
|
|
||||||
// go func() {
|
|
||||||
// for {
|
|
||||||
// select {
|
|
||||||
// case <-db.cutc:
|
|
||||||
// db.mtx.Lock()
|
|
||||||
// _, err := db.cut()
|
|
||||||
// db.mtx.Unlock()
|
|
||||||
|
|
||||||
// if err != nil {
|
|
||||||
// db.logger.Log("msg", "cut failed", "err", err)
|
|
||||||
// } else {
|
|
||||||
// select {
|
|
||||||
// case db.compactc <- struct{}{}:
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// // Drain cut channel so we don't trigger immediately again.
|
|
||||||
// select {
|
|
||||||
// case <-db.cutc:
|
|
||||||
// default:
|
|
||||||
// }
|
|
||||||
// case <-db.stopc:
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-db.compactc:
|
case <-db.compactc:
|
||||||
|
@ -342,9 +317,6 @@ func (db *DB) initBlocks() error {
|
||||||
db.persisted = persisted
|
db.persisted = persisted
|
||||||
db.heads = heads
|
db.heads = heads
|
||||||
|
|
||||||
// if len(heads) == 0 {
|
|
||||||
// _, err = db.cut()
|
|
||||||
// }
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,18 +405,31 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
|
||||||
// If the head block doesn't exist yet, it gets created.
|
// If the head block doesn't exist yet, it gets created.
|
||||||
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
|
func (a *dbAppender) appenderFor(t int64) (*headAppender, error) {
|
||||||
// If there's no fitting head block for t, ensure it gets created.
|
// If there's no fitting head block for t, ensure it gets created.
|
||||||
if len(a.heads) == 0 || t > a.heads[len(a.heads)-1].meta.MaxTime {
|
if len(a.heads) == 0 || t >= a.heads[len(a.heads)-1].meta.MaxTime {
|
||||||
a.db.mtx.RUnlock()
|
a.db.mtx.RUnlock()
|
||||||
|
var mints []int64
|
||||||
|
for _, h := range a.heads {
|
||||||
|
mints = append(mints, h.meta.MinTime)
|
||||||
|
}
|
||||||
|
fmt.Println("ensure head", t, mints)
|
||||||
if err := a.db.ensureHead(t); err != nil {
|
if err := a.db.ensureHead(t); err != nil {
|
||||||
a.db.mtx.RLock()
|
a.db.mtx.RLock()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
a.db.mtx.RLock()
|
a.db.mtx.RLock()
|
||||||
|
|
||||||
a.heads = nil
|
if len(a.heads) == 0 {
|
||||||
for _, b := range a.db.appendable() {
|
for _, b := range a.db.appendable() {
|
||||||
a.heads = append(a.heads, b.Appender().(*headAppender))
|
a.heads = append(a.heads, b.Appender().(*headAppender))
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
maxSeq := a.heads[len(a.heads)-1].meta.Sequence
|
||||||
|
for _, b := range a.db.appendable() {
|
||||||
|
if b.meta.Sequence > maxSeq {
|
||||||
|
a.heads = append(a.heads, b.Appender().(*headAppender))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for i := len(a.heads) - 1; i >= 0; i-- {
|
for i := len(a.heads) - 1; i >= 0; i-- {
|
||||||
if h := a.heads[i]; t >= h.meta.MinTime {
|
if h := a.heads[i]; t >= h.meta.MinTime {
|
||||||
|
@ -463,6 +448,7 @@ func (db *DB) ensureHead(t int64) error {
|
||||||
// AppendableBlocks-1 front padding heads.
|
// AppendableBlocks-1 front padding heads.
|
||||||
if len(db.heads) == 0 {
|
if len(db.heads) == 0 {
|
||||||
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
|
for i := int64(db.opts.AppendableBlocks - 1); i >= 0; i-- {
|
||||||
|
fmt.Println("cut init for", t-i*int64(db.opts.MinBlockDuration))
|
||||||
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
|
if _, err := db.cut(t - i*int64(db.opts.MinBlockDuration)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -472,10 +458,11 @@ func (db *DB) ensureHead(t int64) error {
|
||||||
for {
|
for {
|
||||||
h := db.heads[len(db.heads)-1]
|
h := db.heads[len(db.heads)-1]
|
||||||
// If t doesn't exceed the range of heads blocks, there's nothing to do.
|
// If t doesn't exceed the range of heads blocks, there's nothing to do.
|
||||||
if t <= h.meta.MaxTime {
|
if t < h.meta.MaxTime {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if _, err := db.cut(h.meta.MaxTime + 1); err != nil {
|
fmt.Println("cut for", h.meta.MaxTime)
|
||||||
|
if _, err := db.cut(h.meta.MaxTime); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -567,7 +554,7 @@ func (db *DB) blocksForInterval(mint, maxt int64) []Block {
|
||||||
// cut starts a new head block to append to. The completed head block
|
// cut starts a new head block to append to. The completed head block
|
||||||
// will still be appendable for the configured grace period.
|
// will still be appendable for the configured grace period.
|
||||||
func (db *DB) cut(mint int64) (*headBlock, error) {
|
func (db *DB) cut(mint int64) (*headBlock, error) {
|
||||||
maxt := mint + int64(db.opts.MinBlockDuration) - 1
|
maxt := mint + int64(db.opts.MinBlockDuration)
|
||||||
|
|
||||||
dir, seq, err := nextBlockDir(db.dir)
|
dir, seq, err := nextBlockDir(db.dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -664,9 +651,6 @@ func OpenPartitioned(dir string, n int, l log.Logger, opts *Options) (*Partition
|
||||||
l = log.NewLogfmtLogger(os.Stdout)
|
l = log.NewLogfmtLogger(os.Stdout)
|
||||||
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
l = log.NewContext(l).With("ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
||||||
}
|
}
|
||||||
if opts.AppendableBlocks < 1 {
|
|
||||||
return nil, errors.Errorf("AppendableBlocks must be greater than 0")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
7
head.go
7
head.go
|
@ -128,6 +128,9 @@ func (h *headBlock) inBounds(t int64) bool {
|
||||||
|
|
||||||
// Close syncs all data and closes underlying resources of the head block.
|
// Close syncs all data and closes underlying resources of the head block.
|
||||||
func (h *headBlock) Close() error {
|
func (h *headBlock) Close() error {
|
||||||
|
if err := writeMetaFile(h.dir, &h.meta); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return h.wal.Close()
|
return h.wal.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,11 +193,9 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
|
||||||
|
|
||||||
func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
if ms := a.get(hash, lset); ms != nil {
|
if ms := a.get(hash, lset); ms != nil {
|
||||||
// fmt.Println("add ref get", ms.ref)
|
|
||||||
return uint64(ms.ref), nil
|
return uint64(ms.ref), nil
|
||||||
}
|
}
|
||||||
if ref, ok := a.newHashes[hash]; ok {
|
if ref, ok := a.newHashes[hash]; ok {
|
||||||
// fmt.Println("add ref newHashes", ref)
|
|
||||||
return uint64(ref), nil
|
return uint64(ref), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -215,8 +216,6 @@ func (a *headAppender) hashedAdd(hash uint64, lset labels.Labels, t int64, v flo
|
||||||
a.newSeries[ref] = hashedLabels{hash: hash, labels: lset}
|
a.newSeries[ref] = hashedLabels{hash: hash, labels: lset}
|
||||||
a.newHashes[hash] = ref
|
a.newHashes[hash] = ref
|
||||||
|
|
||||||
// fmt.Println("add ref", ref)
|
|
||||||
|
|
||||||
return ref, a.AddFast(ref, t, v)
|
return ref, a.AddFast(ref, t, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
30
writer.go
30
writer.go
|
@ -132,6 +132,12 @@ func (w *seriesWriter) Size() int64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *seriesWriter) Close() error {
|
func (w *seriesWriter) Close() error {
|
||||||
|
// Initialize block in case no data was written to it.
|
||||||
|
if w.n == 0 {
|
||||||
|
if err := w.writeMeta(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
return w.w.Flush()
|
return w.w.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,8 +340,7 @@ func (w *indexWriter) writeSeries() error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
func (w *indexWriter) init() error {
|
||||||
if !w.started {
|
|
||||||
if err := w.writeMeta(); err != nil {
|
if err := w.writeMeta(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -346,6 +351,15 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.started = true
|
w.started = true
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
|
if !w.started {
|
||||||
|
if err := w.init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
valt, err := newStringTuples(values, len(names))
|
valt, err := newStringTuples(values, len(names))
|
||||||
|
@ -382,6 +396,12 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
func (w *indexWriter) WritePostings(name, value string, it Postings) error {
|
||||||
|
if !w.started {
|
||||||
|
if err := w.init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
key := name + string(sep) + value
|
key := name + string(sep) + value
|
||||||
|
|
||||||
w.postings = append(w.postings, hashEntry{
|
w.postings = append(w.postings, hashEntry{
|
||||||
|
@ -473,6 +493,12 @@ func (w *indexWriter) finalize() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *indexWriter) Close() error {
|
func (w *indexWriter) Close() error {
|
||||||
|
// Handle blocks without any data.
|
||||||
|
if !w.started {
|
||||||
|
if err := w.init(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if err := w.finalize(); err != nil {
|
if err := w.finalize(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue