alternative implementation

Signed-off-by: Callum Styan <callumstyan@gmail.com>
This commit is contained in:
Callum Styan 2024-08-27 16:30:47 -07:00
parent d3d5e2af0e
commit 07f5c209e6
3 changed files with 59 additions and 48 deletions

View file

@ -36,51 +36,47 @@ var noReferenceReleases = promauto.NewCounter(prometheus.CounterOpts{
type pool struct { type pool struct {
mtx sync.RWMutex mtx sync.RWMutex
pool map[unique.Handle[string]]*entry pool map[string]*entry
} }
type entry struct { type entry struct {
refs atomic.Int64 refs atomic.Int64
handle unique.Handle[string]
} }
func newEntry() *entry { func newEntry(s string) *entry {
return &entry{} return &entry{handle: unique.Make(s)}
} }
func newPool() *pool { func newPool() *pool {
return &pool{ return &pool{
pool: map[unique.Handle[string]]*entry{}, pool: map[string]*entry{},
} }
} }
func (p *pool) intern(s string) string { func (p *pool) intern(s string) unique.Handle[string] {
if s == "" {
return ""
}
p.mtx.RLock() p.mtx.RLock()
h := unique.Make(s) interned, ok := p.pool[s]
interned, ok := p.pool[h]
p.mtx.RUnlock() p.mtx.RUnlock()
if ok { if ok {
interned.refs.Inc() interned.refs.Inc()
return s return interned.handle
} }
p.mtx.Lock() p.mtx.Lock()
defer p.mtx.Unlock() defer p.mtx.Unlock()
if interned, ok := p.pool[h]; ok { if interned, ok := p.pool[s]; ok {
interned.refs.Inc() interned.refs.Inc()
return s return interned.handle
} }
p.pool[h] = newEntry()
p.pool[h].refs.Store(1) p.pool[s] = newEntry(s)
return s p.pool[s].refs.Store(1)
return p.pool[s].handle
} }
func (p *pool) release(s string) { func (p *pool) release(s string) {
p.mtx.RLock() p.mtx.RLock()
h := unique.Make(s) interned, ok := p.pool[s]
interned, ok := p.pool[h]
p.mtx.RUnlock() p.mtx.RUnlock()
if !ok { if !ok {
@ -98,5 +94,5 @@ func (p *pool) release(s string) {
if interned.refs.Load() != 0 { if interned.refs.Load() != 0 {
return return
} }
delete(p.pool, h) delete(p.pool, s)
} }

View file

@ -20,18 +20,16 @@ package remote
import ( import (
"fmt" "fmt"
"github.com/stretchr/testify/require"
"testing" "testing"
"time" "time"
"unique"
"github.com/stretchr/testify/require"
) )
func TestIntern(t *testing.T) { func TestIntern(t *testing.T) {
interner := newPool() interner := newPool()
testString := "TestIntern" testString := "TestIntern"
interner.intern(testString) interner.intern(testString)
interned, ok := interner.pool[unique.Make(testString)] interned, ok := interner.pool[testString]
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
@ -42,13 +40,13 @@ func TestIntern_MultiRef(t *testing.T) {
testString := "TestIntern_MultiRef" testString := "TestIntern_MultiRef"
interner.intern(testString) interner.intern(testString)
interned, ok := interner.pool[unique.Make(testString)] interned, ok := interner.pool[testString]
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
interner.intern(testString) interner.intern(testString)
interned, ok = interner.pool[unique.Make(testString)] interned, ok = interner.pool[testString]
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load())) require.Equal(t, int64(2), interned.refs.Load(), fmt.Sprintf("expected refs to be 2 but it was %d", interned.refs.Load()))
@ -59,13 +57,13 @@ func TestIntern_DeleteRef(t *testing.T) {
testString := "TestIntern_DeleteRef" testString := "TestIntern_DeleteRef"
interner.intern(testString) interner.intern(testString)
interned, ok := interner.pool[unique.Make(testString)] interned, ok := interner.pool[testString]
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
interner.release(testString) interner.release(testString)
_, ok = interner.pool[unique.Make(testString)] _, ok = interner.pool[testString]
require.False(t, ok) require.False(t, ok)
} }
@ -74,7 +72,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
testString := "TestIntern_MultiRef_Concurrent" testString := "TestIntern_MultiRef_Concurrent"
interner.intern(testString) interner.intern(testString)
interned, ok := interner.pool[unique.Make(testString)] interned, ok := interner.pool[testString]
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))
@ -85,7 +83,7 @@ func TestIntern_MultiRef_Concurrent(t *testing.T) {
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
interner.mtx.RLock() interner.mtx.RLock()
interned, ok = interner.pool[unique.Make(testString)] interned, ok = interner.pool[testString]
interner.mtx.RUnlock() interner.mtx.RUnlock()
require.True(t, ok) require.True(t, ok)
require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load())) require.Equal(t, int64(1), interned.refs.Load(), fmt.Sprintf("expected refs to be 1 but it was %d", interned.refs.Load()))

View file

@ -21,6 +21,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"time" "time"
"unique"
"github.com/go-kit/log" "github.com/go-kit/log"
"github.com/go-kit/log/level" "github.com/go-kit/log/level"
@ -61,6 +62,13 @@ const (
reasonUnintentionalDroppedSeries = "unintentionally_dropped_series" reasonUnintentionalDroppedSeries = "unintentionally_dropped_series"
) )
var handlesPool = sync.Pool{
New: func() interface{} {
//t.Log("Created")
return make([]unique.Handle[string], 0)
},
}
type queueManagerMetrics struct { type queueManagerMetrics struct {
reg prometheus.Registerer reg prometheus.Registerer
@ -398,6 +406,11 @@ type WriteClient interface {
Endpoint() string Endpoint() string
} }
type internRef struct {
//handles []unique.Handle[string]
lset labels.Labels
}
// QueueManager manages a queue of samples to be sent to the Storage // QueueManager manages a queue of samples to be sent to the Storage
// indicated by the provided WriteClient. Implements writeTo interface // indicated by the provided WriteClient. Implements writeTo interface
// used by WAL Watcher. // used by WAL Watcher.
@ -424,10 +437,11 @@ type QueueManager struct {
enc Compression enc Compression
seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata, droppedSeries and builder. seriesMtx sync.Mutex // Covers seriesLabels, seriesMetadata, droppedSeries and builder.
seriesLabels map[chunks.HeadSeriesRef]labels.Labels seriesLabels map[chunks.HeadSeriesRef]internRef
seriesMetadata map[chunks.HeadSeriesRef]*metadata.Metadata seriesMetadata map[chunks.HeadSeriesRef]*metadata.Metadata
droppedSeries map[chunks.HeadSeriesRef]struct{} droppedSeries map[chunks.HeadSeriesRef]struct{}
builder *labels.Builder builder *labels.Builder
handles []unique.Handle[string]
seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first. seriesSegmentMtx sync.Mutex // Covers seriesSegmentIndexes - if you also lock seriesMtx, take seriesMtx first.
seriesSegmentIndexes map[chunks.HeadSeriesRef]int seriesSegmentIndexes map[chunks.HeadSeriesRef]int
@ -492,7 +506,7 @@ func NewQueueManager(
sendExemplars: enableExemplarRemoteWrite, sendExemplars: enableExemplarRemoteWrite,
sendNativeHistograms: enableNativeHistogramRemoteWrite, sendNativeHistograms: enableNativeHistogramRemoteWrite,
seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), seriesLabels: make(map[chunks.HeadSeriesRef]internRef),
seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata), seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
@ -730,7 +744,7 @@ outer:
default: default:
} }
if t.shards.enqueue(s.Ref, timeSeries{ if t.shards.enqueue(s.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls.lset,
metadata: meta, metadata: meta,
timestamp: s.T, timestamp: s.T,
value: s.V, value: s.V,
@ -788,7 +802,7 @@ outer:
default: default:
} }
if t.shards.enqueue(e.Ref, timeSeries{ if t.shards.enqueue(e.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls.lset,
metadata: meta, metadata: meta,
timestamp: e.T, timestamp: e.T,
value: e.V, value: e.V,
@ -834,6 +848,7 @@ outer:
continue continue
} }
meta := t.seriesMetadata[h.Ref] meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond) backoff := model.Duration(5 * time.Millisecond)
@ -844,7 +859,7 @@ outer:
default: default:
} }
if t.shards.enqueue(h.Ref, timeSeries{ if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls.lset,
metadata: meta, metadata: meta,
timestamp: h.T, timestamp: h.T,
histogram: h.H, histogram: h.H,
@ -889,6 +904,7 @@ outer:
continue continue
} }
meta := t.seriesMetadata[h.Ref] meta := t.seriesMetadata[h.Ref]
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
backoff := model.Duration(5 * time.Millisecond) backoff := model.Duration(5 * time.Millisecond)
@ -899,7 +915,7 @@ outer:
default: default:
} }
if t.shards.enqueue(h.Ref, timeSeries{ if t.shards.enqueue(h.Ref, timeSeries{
seriesLabels: lbls, seriesLabels: lbls.lset,
metadata: meta, metadata: meta,
timestamp: h.T, timestamp: h.T,
floatHistogram: h.FH, floatHistogram: h.FH,
@ -960,8 +976,8 @@ func (t *QueueManager) Stop() {
// On shutdown, release the strings in the labels from the intern pool. // On shutdown, release the strings in the labels from the intern pool.
t.seriesMtx.Lock() t.seriesMtx.Lock()
for _, labels := range t.seriesLabels { for k := range t.seriesLabels {
t.releaseLabels(labels) delete(t.seriesLabels, k)
} }
t.seriesMtx.Unlock() t.seriesMtx.Unlock()
t.metrics.unregister() t.metrics.unregister()
@ -990,10 +1006,11 @@ func (t *QueueManager) StoreSeries(series []record.RefSeries, index int) {
// We should not ever be replacing a series labels in the map, but just // We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned // in case we do we need to ensure we do not leak the replaced interned
// strings. // strings.
if orig, ok := t.seriesLabels[s.Ref]; ok { if _, ok := t.seriesLabels[s.Ref]; ok {
t.releaseLabels(orig) delete(t.seriesLabels, s.Ref)
} }
t.seriesLabels[s.Ref] = lbls
t.seriesLabels[s.Ref] = internRef{lset: lbls}
} }
} }
@ -1037,7 +1054,8 @@ func (t *QueueManager) SeriesReset(index int) {
for k, v := range t.seriesSegmentIndexes { for k, v := range t.seriesSegmentIndexes {
if v < index { if v < index {
delete(t.seriesSegmentIndexes, k) delete(t.seriesSegmentIndexes, k)
t.releaseLabels(t.seriesLabels[k]) //t.releaseLabels(t.seriesLabels[k])
//delete(t.seriesLabels, k)
delete(t.seriesLabels, k) delete(t.seriesLabels, k)
delete(t.seriesMetadata, k) delete(t.seriesMetadata, k)
delete(t.droppedSeries, k) delete(t.droppedSeries, k)
@ -1060,11 +1078,10 @@ func (t *QueueManager) client() WriteClient {
} }
func (t *QueueManager) internLabels(lbls labels.Labels) { func (t *QueueManager) internLabels(lbls labels.Labels) {
lbls.InternStrings(t.interner.intern) for i := range lbls {
} t.interner.intern(lbls[i].Name)
t.interner.intern(lbls[i].Value)
func (t *QueueManager) releaseLabels(ls labels.Labels) { }
ls.ReleaseStrings(t.interner.release)
} }
// processExternalLabels merges externalLabels into b. If b contains // processExternalLabels merges externalLabels into b. If b contains