diff --git a/head.go b/head.go index 9753a15064..0a5f3c151e 100644 --- a/head.go +++ b/head.go @@ -620,21 +620,22 @@ func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro } func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { - s := a.head.series.getByID(ref) + if t < a.minValidTime { + return ErrOutOfBounds + } + s := a.head.series.getByID(ref) if s == nil { return errors.Wrap(ErrNotFound, "unknown series") } s.Lock() - err := s.appendable(t, v) - s.Unlock() - - if err != nil { + if err := s.appendable(t, v); err != nil { + s.Unlock() return err } - if t < a.minValidTime { - return ErrOutOfBounds - } + s.pendingCommit = true + s.Unlock() + if t < a.mint { a.mint = t } @@ -694,6 +695,7 @@ func (a *headAppender) Commit() error { for _, s := range a.samples { s.series.Lock() ok, chunkCreated := s.series.append(s.T, s.V) + s.series.pendingCommit = false s.series.Unlock() if !ok { @@ -713,6 +715,11 @@ func (a *headAppender) Commit() error { func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() + for _, s := range a.samples { + s.series.Lock() + s.series.pendingCommit = false + s.series.Unlock() + } a.head.putAppendBuffer(a.samples) // Series are created in the head memory regardless of rollback. Thus we have @@ -1165,7 +1172,7 @@ func (s *stripeSeries) gc(mint int64) (map[uint64]struct{}, int) { series.Lock() rmChunks += series.truncateChunksBefore(mint) - if len(series.chunks) > 0 { + if len(series.chunks) > 0 || series.pendingCommit { series.Unlock() continue } @@ -1256,9 +1263,10 @@ type memSeries struct { chunkRange int64 firstChunkID int - nextAt int64 // timestamp at which to cut the next chunk. - lastValue float64 - sampleBuf [4]sample + nextAt int64 // Timestamp at which to cut the next chunk. + lastValue float64 + sampleBuf [4]sample + pendingCommit bool // Whether there are samples waiting to be committed to this series. app chunkenc.Appender // Current appender for the chunk. } diff --git a/head_test.go b/head_test.go index 5383b2ba4f..d9de7bea0f 100644 --- a/head_test.go +++ b/head_test.go @@ -781,6 +781,64 @@ func TestGCSeriesAccess(t *testing.T) { testutil.Equals(t, ErrNotFound, err) } +func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Commit()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, true, ss.Next()) +} + +func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { + h, err := NewHead(nil, nil, nil, 1000) + testutil.Ok(t, err) + defer h.Close() + + h.initTime(0) + + app := h.appender() + lset := labels.FromStrings("a", "1") + _, err = app.Add(lset, 2100, 1) + testutil.Ok(t, err) + + testutil.Ok(t, h.Truncate(2000)) + testutil.Assert(t, nil != h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") + + testutil.Ok(t, app.Rollback()) + + q, err := NewBlockQuerier(h, 1500, 2500) + testutil.Ok(t, err) + defer q.Close() + + ss, err := q.Select(labels.NewEqualMatcher("a", "1")) + testutil.Ok(t, err) + + testutil.Equals(t, false, ss.Next()) + + // Truncate again, this time the series should be deleted + testutil.Ok(t, h.Truncate(2050)) + testutil.Equals(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset)) +} + func TestHead_LogRollback(t *testing.T) { dir, err := ioutil.TempDir("", "wal_rollback") testutil.Ok(t, err)