From 1a7923dde37c5699c523cbd400b4da6d1c616bf5 Mon Sep 17 00:00:00 2001 From: Callum Styan Date: Mon, 11 Mar 2019 16:44:23 -0700 Subject: [PATCH] Add ref counting to string interning so we can remove a string when there are no longer any refs. Add tests for interning. Co-authored-by: Tom Wilkie Signed-off-by: Callum Styan --- storage/remote/intern.go | 54 +++++++++++++++++---- storage/remote/intern_test.go | 85 +++++++++++++++++++++++++++++++++ storage/remote/queue_manager.go | 16 ++++++- 3 files changed, 143 insertions(+), 12 deletions(-) create mode 100644 storage/remote/intern_test.go diff --git a/storage/remote/intern.go b/storage/remote/intern.go index 8dfc37fc4..785741f85 100644 --- a/storage/remote/intern.go +++ b/storage/remote/intern.go @@ -18,18 +18,26 @@ package remote -import "sync" +import ( + "sync" + "sync/atomic" +) var interner = newPool() type pool struct { mtx sync.RWMutex - pool map[string]string + pool map[string]*entry +} + +type entry struct { + s string + refs int64 } func newPool() *pool { return &pool{ - pool: map[string]string{}, + pool: map[string]*entry{}, } } @@ -42,18 +50,44 @@ func (p *pool) intern(s string) string { interned, ok := p.pool[s] p.mtx.RUnlock() if ok { - return interned + atomic.AddInt64(&interned.refs, 1) + return interned.s + } + p.mtx.Lock() + defer p.mtx.Unlock() + if interned, ok := p.pool[s]; ok { + atomic.AddInt64(&interned.refs, 1) + return interned.s + } + + s = pack(s) + p.pool[s] = &entry{ + s: s, + refs: 1, + } + return s +} + +func (p *pool) release(s string) { + p.mtx.RLock() + interned, ok := p.pool[s] + p.mtx.RUnlock() + + if !ok { + panic("released unknown string") + } + + refs := atomic.AddInt64(&interned.refs, -1) + if refs > 0 { + return } p.mtx.Lock() defer p.mtx.Unlock() - if interned, ok := p.pool[s]; ok { - return interned + if atomic.LoadInt64(&interned.refs) != 0 { + return } - - s = pack(s) - p.pool[s] = s - return s + delete(p.pool, s) } // StrPack returns a new instance of s which is tightly packed in memory. diff --git a/storage/remote/intern_test.go b/storage/remote/intern_test.go new file mode 100644 index 000000000..c6a915356 --- /dev/null +++ b/storage/remote/intern_test.go @@ -0,0 +1,85 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Inspired / copied / modified from https://gitlab.com/cznic/strutil/blob/master/strutil.go, +// which is MIT licensed, so: +// +// Copyright (c) 2014 The strutil Authors. All rights reserved. + +package remote + +import ( + "fmt" + "testing" + "time" + + "github.com/prometheus/prometheus/util/testutil" +) + +func TestIntern(t *testing.T) { + testString := "TestIntern" + interner.intern(testString) + interned, ok := interner.pool[testString] + + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) +} + +func TestIntern_MultiRef(t *testing.T) { + testString := "TestIntern_MultiRef" + + interner.intern(testString) + interned, ok := interner.pool[testString] + + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + + interner.intern(testString) + interned, ok = interner.pool[testString] + + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 2, fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs)) +} + +func TestIntern_DeleteRef(t *testing.T) { + testString := "TestIntern_DeleteRef" + + interner.intern(testString) + interned, ok := interner.pool[testString] + + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + + interner.release(testString) + _, ok = interner.pool[testString] + testutil.Equals(t, ok, false) +} + +func TestIntern_MultiRef_Concurrent(t *testing.T) { + testString := "TestIntern_MultiRef_Concurrent" + + interner.intern(testString) + interned, ok := interner.pool[testString] + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) + + go interner.release(testString) + + interner.intern(testString) + + time.Sleep(time.Millisecond) + + interned, ok = interner.pool[testString] + testutil.Equals(t, ok, true) + testutil.Assert(t, interned.refs == 1, fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs)) +} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index beeafbff4..dd5610237 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -343,8 +343,12 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { t.seriesMtx.Lock() defer t.seriesMtx.Unlock() for ref, labels := range temp { - t.seriesLabels[ref] = labels t.seriesSegmentIndexes[ref] = index + + if orig, ok := t.seriesLabels[ref]; ok { + release(orig) + } + t.seriesLabels[ref] = labels } } @@ -359,12 +363,20 @@ func (t *QueueManager) SeriesReset(index int) { // that were not also present in the checkpoint. for k, v := range t.seriesSegmentIndexes { if v < index { - delete(t.seriesLabels, k) delete(t.seriesSegmentIndexes, k) + release(t.seriesLabels[k]) + delete(t.seriesLabels, k) } } } +func release(ls []prompb.Label) { + for _, l := range ls { + interner.release(l.Name) + interner.release(l.Value) + } +} + // processExternalLabels merges externalLabels into ls. If ls contains // a label in externalLabels, the value in ls wins. func processExternalLabels(ls tsdbLabels.Labels, externalLabels labels.Labels) labels.Labels {