Fix wrong byte size in WAL base ref

This commit is contained in:
Fabian Reinartz 2017-01-17 08:40:31 +01:00
parent 5fb01d41aa
commit 343dd9d94c
3 changed files with 28 additions and 45 deletions

9
db.go
View file

@ -58,15 +58,6 @@ type Appender interface {
Rollback() error Rollback() error
} }
type hashedSample struct {
hash uint64
labels labels.Labels
ref uint32
t int64
v float64
}
const sep = '\xff' const sep = '\xff'
// DB handles reads and writes of time series falling into // DB handles reads and writes of time series falling into

56
head.go
View file

@ -65,10 +65,7 @@ func openHeadBlock(dir string, l log.Logger) (*headBlock, error) {
b.stats.SeriesCount++ b.stats.SeriesCount++
}, },
sample: func(s refdSample) { sample: func(s refdSample) {
si := s.ref b.series[s.ref].append(s.t, s.v)
cd := b.series[si]
cd.append(s.t, s.v)
if s.t > b.stats.MaxTime { if s.t > b.stats.MaxTime {
b.stats.MaxTime = s.t b.stats.MaxTime = s.t
@ -177,11 +174,11 @@ func (a *headAppender) setSeries(hash uint64, lset labels.Labels) (uint64, error
func (a *headAppender) Add(ref uint64, t int64, v float64) error { func (a *headAppender) Add(ref uint64, t int64, v float64) error {
// We only own the first 5 bytes of the reference. Anything before is // We only own the first 5 bytes of the reference. Anything before is
// used by higher-order appenders. We erase it to avoid issues. // used by higher-order appenders. We erase it to avoid issues.
ref = (ref << 31) >> 31 ref = (ref << 24) >> 24
// Distinguish between existing series and series created in // Distinguish between existing series and series created in
// this transaction. // this transaction.
if ref&(1<<32) > 0 { if ref&(1<<32) != 0 {
if _, ok := a.newSeries[ref]; !ok { if _, ok := a.newSeries[ref]; !ok {
return ErrNotFound return ErrNotFound
} }
@ -189,27 +186,21 @@ func (a *headAppender) Add(ref uint64, t int64, v float64) error {
// sample sequence is valid. // sample sequence is valid.
// We also have to revalidate it as we switch locks an create // We also have to revalidate it as we switch locks an create
// the new series. // the new series.
a.samples = append(a.samples, refdSample{ } else {
ref: ref, ms := a.series[int(ref)]
t: t, if ms == nil {
v: v, return ErrNotFound
}) }
return nil c := ms.head()
}
ms := a.series[int(ref)] // TODO(fabxc): memory series should be locked here already.
if ms == nil { // Only problem is release of locks in case of a rollback.
return ErrNotFound if t < c.maxTime {
} return ErrOutOfOrderSample
c := ms.head() }
if c.maxTime == t && ms.lastValue != v {
// TODO(fabxc): memory series should be locked here already. return ErrAmendSample
// Only problem is release of locks in case of a rollback. }
if t < c.maxTime {
return ErrOutOfOrderSample
}
if c.maxTime == t && ms.lastValue != v {
return ErrAmendSample
} }
a.samples = append(a.samples, refdSample{ a.samples = append(a.samples, refdSample{
@ -254,12 +245,6 @@ func (a *headAppender) createSeries() {
func (a *headAppender) Commit() error { func (a *headAppender) Commit() error {
defer putHeadAppendBuffer(a.samples) defer putHeadAppendBuffer(a.samples)
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
a.mtx.RUnlock()
return err
}
a.createSeries() a.createSeries()
@ -279,8 +264,15 @@ func (a *headAppender) Commit() error {
total-- total--
} }
} }
a.mtx.RUnlock() a.mtx.RUnlock()
// Write all new series and samples to the WAL and add it to the
// in-mem database on success.
if err := a.wal.Log(a.newLabels, a.samples); err != nil {
return err
}
a.stats.mtx.Lock() a.stats.mtx.Lock()
defer a.stats.mtx.Unlock() defer a.stats.mtx.Unlock()

8
wal.go
View file

@ -285,7 +285,7 @@ func (e *walEncoder) encodeSamples(samples []refdSample) error {
first := samples[0] first := samples[0]
binary.BigEndian.PutUint64(b, first.ref) binary.BigEndian.PutUint64(b, first.ref)
buf = append(buf, b[:4]...) buf = append(buf, b[:8]...)
binary.BigEndian.PutUint64(b, uint64(first.t)) binary.BigEndian.PutUint64(b, uint64(first.t))
buf = append(buf, b[:8]...) buf = append(buf, b[:8]...)
@ -349,14 +349,14 @@ func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
} }
func (d *walDecoder) decodeSamples(flag byte, b []byte) error { func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
if len(b) < 12 { if len(b) < 16 {
return errors.Wrap(errInvalidSize, "header length") return errors.Wrap(errInvalidSize, "header length")
} }
var ( var (
baseRef = binary.BigEndian.Uint64(b) baseRef = binary.BigEndian.Uint64(b)
baseTime = int64(binary.BigEndian.Uint64(b[4:])) baseTime = int64(binary.BigEndian.Uint64(b[8:]))
) )
b = b[12:] b = b[16:]
for len(b) > 0 { for len(b) > 0 {
var smpl refdSample var smpl refdSample