TSDB: Isolation: avoid creating appenderId's without appender (#7135)

Prior to this commit we could have situations where we are creating an
appenderId but never creating an appender to go with it, therefore
blocking the low watermak.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-04-17 20:51:03 +02:00 committed by GitHub
parent ae041f97cf
commit ed1852ab95
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 36 additions and 19 deletions

View file

@ -803,8 +803,6 @@ func (h *RangeHead) Meta() BlockMeta {
type initAppender struct { type initAppender struct {
app storage.Appender app storage.Appender
head *Head head *Head
appendID, cleanupAppendIDsBelow uint64
} }
func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
@ -812,7 +810,7 @@ func (a *initAppender) Add(lset labels.Labels, t int64, v float64) (uint64, erro
return a.app.Add(lset, t, v) return a.app.Add(lset, t, v)
} }
a.head.initTime(t) a.head.initTime(t)
a.app = a.head.appender(a.appendID, a.cleanupAppendIDsBelow) a.app = a.head.appender()
return a.app.Add(lset, t, v) return a.app.Add(lset, t, v)
} }
@ -842,22 +840,20 @@ func (a *initAppender) Rollback() error {
func (h *Head) Appender() storage.Appender { func (h *Head) Appender() storage.Appender {
h.metrics.activeAppenders.Inc() h.metrics.activeAppenders.Inc()
appendID := h.iso.newAppendID()
cleanupAppendIDsBelow := h.iso.lowWatermark()
// The head cache might not have a starting point yet. The init appender // The head cache might not have a starting point yet. The init appender
// picks up the first appended timestamp as the base. // picks up the first appended timestamp as the base.
if h.MinTime() == math.MaxInt64 { if h.MinTime() == math.MaxInt64 {
return &initAppender{ return &initAppender{
head: h, head: h,
appendID: appendID,
cleanupAppendIDsBelow: cleanupAppendIDsBelow,
} }
} }
return h.appender(appendID, cleanupAppendIDsBelow) return h.appender()
} }
func (h *Head) appender(appendID, cleanupAppendIDsBelow uint64) *headAppender { func (h *Head) appender() *headAppender {
appendID := h.iso.newAppendID()
cleanupAppendIDsBelow := h.iso.lowWatermark()
return &headAppender{ return &headAppender{
head: h, head: h,
// Set the minimum valid time to whichever is greater the head min valid time or the compaction window. // Set the minimum valid time to whichever is greater the head min valid time or the compaction window.

View file

@ -1078,7 +1078,7 @@ func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) {
h.initTime(0) h.initTime(0)
app := h.appender(0, 0) app := h.appender()
lset := labels.FromStrings("a", "1") lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1) _, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1106,7 +1106,7 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) {
h.initTime(0) h.initTime(0)
app := h.appender(0, 0) app := h.appender()
lset := labels.FromStrings("a", "1") lset := labels.FromStrings("a", "1")
_, err = app.Add(lset, 2100, 1) _, err = app.Add(lset, 2100, 1)
testutil.Ok(t, err) testutil.Ok(t, err)
@ -1368,14 +1368,16 @@ func TestMemSeriesIsolation(t *testing.T) {
return -1 return -1
} }
i := 0 i := 1
for ; i <= 1000; i++ { for ; i <= 1000; i++ {
var app storage.Appender var app storage.Appender
// To initialize bounds. // To initialize bounds.
if hb.MinTime() == math.MaxInt64 { if hb.MinTime() == math.MaxInt64 {
app = &initAppender{head: hb, appendID: uint64(i), cleanupAppendIDsBelow: 0} app = &initAppender{head: hb}
} else { } else {
app = hb.appender(uint64(i), 0) a := hb.appender()
a.cleanupAppendIDsBelow = 0
app = a
} }
_, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err := app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
@ -1394,7 +1396,8 @@ func TestMemSeriesIsolation(t *testing.T) {
testutil.Equals(t, 999, lastValue(999)) testutil.Equals(t, 999, lastValue(999))
// Cleanup appendIDs below 500. // Cleanup appendIDs below 500.
app := hb.appender(uint64(i), 500) app := hb.appender()
app.cleanupAppendIDsBelow = 500
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1412,7 +1415,8 @@ func TestMemSeriesIsolation(t *testing.T) {
// Cleanup appendIDs below 1000, which means the sample buffer is // Cleanup appendIDs below 1000, which means the sample buffer is
// the only thing with appendIDs. // the only thing with appendIDs.
app = hb.appender(uint64(i), 1000) app = hb.appender()
app.cleanupAppendIDsBelow = 1000
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Commit()) testutil.Ok(t, app.Commit())
@ -1425,7 +1429,8 @@ func TestMemSeriesIsolation(t *testing.T) {
i++ i++
// Cleanup appendIDs below 1001, but with a rollback. // Cleanup appendIDs below 1001, but with a rollback.
app = hb.appender(uint64(i), 1001) app = hb.appender()
app.cleanupAppendIDsBelow = 1001
_, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i)) _, err = app.Add(labels.FromStrings("foo", "bar"), int64(i), float64(i))
testutil.Ok(t, err) testutil.Ok(t, err)
testutil.Ok(t, app.Rollback()) testutil.Ok(t, app.Rollback())
@ -1515,6 +1520,22 @@ func TestHeadSeriesChunkRace(t *testing.T) {
} }
} }
func TestIsolationWithoutAdd(t *testing.T) {
hb, err := NewHead(nil, nil, nil, 1000, DefaultStripeSize)
testutil.Ok(t, err)
defer hb.Close()
app := hb.Appender()
testutil.Ok(t, app.Commit())
app = hb.Appender()
_, err = app.Add(labels.FromStrings("foo", "baz"), 1, 1)
testutil.Ok(t, err)
testutil.Ok(t, app.Commit())
testutil.Equals(t, hb.iso.lastAppendID, hb.iso.lowWatermark(), "High watermark should be equal to the low watermark")
}
func testHeadSeriesChunkRace(t *testing.T) { func testHeadSeriesChunkRace(t *testing.T) {
h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize) h, err := NewHead(nil, nil, nil, 30, DefaultStripeSize)
testutil.Ok(t, err) testutil.Ok(t, err)