mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 22:07:27 -08:00
Merge remote-tracking branch 'upstream/main' into merge-upstream
This commit is contained in:
commit
f93d38aca7
|
@ -36,6 +36,7 @@ jobs:
|
||||||
GOOPTS: "-p 2"
|
GOOPTS: "-p 2"
|
||||||
GOMAXPROCS: "2"
|
GOMAXPROCS: "2"
|
||||||
GO111MODULE: "on"
|
GO111MODULE: "on"
|
||||||
|
- run: go test ./tsdb/ -test.tsdb-isolation=false
|
||||||
- prometheus/check_proto:
|
- prometheus/check_proto:
|
||||||
version: "3.15.8"
|
version: "3.15.8"
|
||||||
- prometheus/store_artifact:
|
- prometheus/store_artifact:
|
||||||
|
@ -93,6 +94,7 @@ jobs:
|
||||||
steps:
|
steps:
|
||||||
- checkout
|
- checkout
|
||||||
- run: go test ./tsdb/...
|
- run: go test ./tsdb/...
|
||||||
|
- run: go test ./tsdb/ -test.tsdb-isolation=false
|
||||||
|
|
||||||
test_mixins:
|
test_mixins:
|
||||||
executor: golang
|
executor: golang
|
||||||
|
|
|
@ -312,9 +312,9 @@ local template = grafana.template;
|
||||||
)
|
)
|
||||||
.addTemplate(
|
.addTemplate(
|
||||||
template.new(
|
template.new(
|
||||||
'instance',
|
'cluster',
|
||||||
'$datasource',
|
'$datasource',
|
||||||
'label_values(prometheus_build_info, instance)' % $._config,
|
'label_values(kube_pod_container_info{image=~".*prometheus.*"}, cluster)' % $._config,
|
||||||
refresh='time',
|
refresh='time',
|
||||||
current={
|
current={
|
||||||
selected: true,
|
selected: true,
|
||||||
|
@ -326,9 +326,9 @@ local template = grafana.template;
|
||||||
)
|
)
|
||||||
.addTemplate(
|
.addTemplate(
|
||||||
template.new(
|
template.new(
|
||||||
'cluster',
|
'instance',
|
||||||
'$datasource',
|
'$datasource',
|
||||||
'label_values(kube_pod_container_info{image=~".*prometheus.*"}, cluster)' % $._config,
|
'label_values(prometheus_build_info{cluster=~"$cluster"}, instance)' % $._config,
|
||||||
refresh='time',
|
refresh='time',
|
||||||
current={
|
current={
|
||||||
selected: true,
|
selected: true,
|
||||||
|
|
10
tsdb/db.go
10
tsdb/db.go
|
@ -81,6 +81,7 @@ func DefaultOptions() *Options {
|
||||||
WALCompression: false,
|
WALCompression: false,
|
||||||
StripeSize: DefaultStripeSize,
|
StripeSize: DefaultStripeSize,
|
||||||
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
HeadChunksWriteBufferSize: chunks.DefaultWriteBufferSize,
|
||||||
|
IsolationDisabled: defaultIsolationDisabled,
|
||||||
HeadChunksEndTimeVariance: 0,
|
HeadChunksEndTimeVariance: 0,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -150,7 +151,7 @@ type Options struct {
|
||||||
// mainly meant for external users who import TSDB.
|
// mainly meant for external users who import TSDB.
|
||||||
BlocksToDelete BlocksToDeleteFunc
|
BlocksToDelete BlocksToDeleteFunc
|
||||||
|
|
||||||
// Enables the in memory exemplar storage,.
|
// Enables the in memory exemplar storage.
|
||||||
EnableExemplarStorage bool
|
EnableExemplarStorage bool
|
||||||
|
|
||||||
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
|
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
|
||||||
|
@ -160,6 +161,9 @@ type Options struct {
|
||||||
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
|
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
|
||||||
MaxExemplars int64
|
MaxExemplars int64
|
||||||
|
|
||||||
|
// Disables isolation between reads and in-flight appends.
|
||||||
|
IsolationDisabled bool
|
||||||
|
|
||||||
// SeriesHashCache specifies the series hash cache used when querying shards via Querier.Select().
|
// SeriesHashCache specifies the series hash cache used when querying shards via Querier.Select().
|
||||||
// If nil, the cache won't be used.
|
// If nil, the cache won't be used.
|
||||||
SeriesHashCache *hashcache.SeriesHashCache
|
SeriesHashCache *hashcache.SeriesHashCache
|
||||||
|
@ -720,6 +724,10 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
|
||||||
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
|
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
|
||||||
headOpts.MaxExemplars.Store(opts.MaxExemplars)
|
headOpts.MaxExemplars.Store(opts.MaxExemplars)
|
||||||
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
|
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
|
||||||
|
if opts.IsolationDisabled {
|
||||||
|
// We only override this flag if isolation is disabled at DB level. We use the default otherwise.
|
||||||
|
headOpts.IsolationDisabled = opts.IsolationDisabled
|
||||||
|
}
|
||||||
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
|
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
@ -54,6 +55,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMain(m *testing.M) {
|
func TestMain(m *testing.M) {
|
||||||
|
var isolationEnabled bool
|
||||||
|
flag.BoolVar(&isolationEnabled, "test.tsdb-isolation", true, "enable isolation")
|
||||||
|
flag.Parse()
|
||||||
|
defaultIsolationDisabled = !isolationEnabled
|
||||||
|
|
||||||
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2"))
|
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func1"), goleak.IgnoreTopFunction("github.com/prometheus/prometheus/tsdb.(*SegmentWAL).cut.func2"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2407,6 +2413,10 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDBCannotSeePartialCommits(t *testing.T) {
|
func TestDBCannotSeePartialCommits(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
tmpdir, _ := ioutil.TempDir("", "test")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, os.RemoveAll(tmpdir))
|
require.NoError(t, os.RemoveAll(tmpdir))
|
||||||
|
@ -2477,6 +2487,10 @@ func TestDBCannotSeePartialCommits(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
func TestDBQueryDoesntSeeAppendsAfterCreation(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
tmpdir, _ := ioutil.TempDir("", "test")
|
tmpdir, _ := ioutil.TempDir("", "test")
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, os.RemoveAll(tmpdir))
|
require.NoError(t, os.RemoveAll(tmpdir))
|
||||||
|
|
20
tsdb/head.go
20
tsdb/head.go
|
@ -52,6 +52,9 @@ var (
|
||||||
// ErrAppenderClosed is returned if an appender has already be successfully
|
// ErrAppenderClosed is returned if an appender has already be successfully
|
||||||
// rolled back or committed.
|
// rolled back or committed.
|
||||||
ErrAppenderClosed = errors.New("appender closed")
|
ErrAppenderClosed = errors.New("appender closed")
|
||||||
|
|
||||||
|
// defaultIsolationDisabled is true if isolation is disabled by default.
|
||||||
|
defaultIsolationDisabled = false
|
||||||
)
|
)
|
||||||
|
|
||||||
// Head handles reads and writes of time series data within a time window.
|
// Head handles reads and writes of time series data within a time window.
|
||||||
|
@ -134,6 +137,8 @@ type HeadOptions struct {
|
||||||
SeriesCallback SeriesLifecycleCallback
|
SeriesCallback SeriesLifecycleCallback
|
||||||
EnableExemplarStorage bool
|
EnableExemplarStorage bool
|
||||||
EnableMemorySnapshotOnShutdown bool
|
EnableMemorySnapshotOnShutdown bool
|
||||||
|
|
||||||
|
IsolationDisabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultHeadOptions() *HeadOptions {
|
func DefaultHeadOptions() *HeadOptions {
|
||||||
|
@ -145,6 +150,7 @@ func DefaultHeadOptions() *HeadOptions {
|
||||||
ChunkEndTimeVariance: 0,
|
ChunkEndTimeVariance: 0,
|
||||||
StripeSize: DefaultStripeSize,
|
StripeSize: DefaultStripeSize,
|
||||||
SeriesCallback: &noopSeriesLifecycleCallback{},
|
SeriesCallback: &noopSeriesLifecycleCallback{},
|
||||||
|
IsolationDisabled: defaultIsolationDisabled,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -236,12 +242,13 @@ func (h *Head) resetInMemoryState() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.iso = newIsolation(h.opts.IsolationDisabled)
|
||||||
|
|
||||||
h.exemplarMetrics = em
|
h.exemplarMetrics = em
|
||||||
h.exemplars = es
|
h.exemplars = es
|
||||||
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
|
h.series = newStripeSeries(h.opts.StripeSize, h.opts.SeriesCallback)
|
||||||
h.postings = index.NewUnorderedMemPostings()
|
h.postings = index.NewUnorderedMemPostings()
|
||||||
h.tombstones = tombstones.NewMemTombstones()
|
h.tombstones = tombstones.NewMemTombstones()
|
||||||
h.iso = newIsolation()
|
|
||||||
h.deleted = map[chunks.HeadSeriesRef]int{}
|
h.deleted = map[chunks.HeadSeriesRef]int{}
|
||||||
h.chunkRange.Store(h.opts.ChunkRange)
|
h.chunkRange.Store(h.opts.ChunkRange)
|
||||||
h.minTime.Store(math.MaxInt64)
|
h.minTime.Store(math.MaxInt64)
|
||||||
|
@ -1232,7 +1239,7 @@ func (h *Head) getOrCreate(hash uint64, lset labels.Labels) (*memSeries, bool, e
|
||||||
|
|
||||||
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
func (h *Head) getOrCreateWithID(id chunks.HeadSeriesRef, hash uint64, lset labels.Labels) (*memSeries, bool, error) {
|
||||||
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
s, created, err := h.series.getOrSet(hash, lset, func() *memSeries {
|
||||||
return newMemSeries(lset, id, hash, h.chunkRange.Load(), h.opts.ChunkEndTimeVariance, &h.memChunkPool)
|
return newMemSeries(lset, id, hash, h.chunkRange.Load(), h.opts.ChunkEndTimeVariance, &h.memChunkPool, h.opts.IsolationDisabled)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
|
@ -1513,10 +1520,11 @@ type memSeries struct {
|
||||||
|
|
||||||
memChunkPool *sync.Pool
|
memChunkPool *sync.Pool
|
||||||
|
|
||||||
|
// txs is nil if isolation is disabled.
|
||||||
txs *txRing
|
txs *txRing
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, hash uint64, chunkRange int64, chunkEndTimeVariance float64, memChunkPool *sync.Pool) *memSeries {
|
func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, hash uint64, chunkRange int64, chunkEndTimeVariance float64, memChunkPool *sync.Pool, isolationDisabled bool) *memSeries {
|
||||||
s := &memSeries{
|
s := &memSeries{
|
||||||
lset: lset,
|
lset: lset,
|
||||||
hash: hash,
|
hash: hash,
|
||||||
|
@ -1524,9 +1532,11 @@ func newMemSeries(lset labels.Labels, id chunks.HeadSeriesRef, hash uint64, chun
|
||||||
chunkRange: chunkRange,
|
chunkRange: chunkRange,
|
||||||
chunkEndTimeVariance: chunkEndTimeVariance,
|
chunkEndTimeVariance: chunkEndTimeVariance,
|
||||||
nextAt: math.MinInt64,
|
nextAt: math.MinInt64,
|
||||||
txs: newTxRing(4),
|
|
||||||
memChunkPool: memChunkPool,
|
memChunkPool: memChunkPool,
|
||||||
}
|
}
|
||||||
|
if !isolationDisabled {
|
||||||
|
s.txs = newTxRing(4)
|
||||||
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1579,7 +1589,9 @@ func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
|
||||||
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
// cleanupAppendIDsBelow cleans up older appendIDs. Has to be called after
|
||||||
// acquiring lock.
|
// acquiring lock.
|
||||||
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
func (s *memSeries) cleanupAppendIDsBelow(bound uint64) {
|
||||||
|
if s.txs != nil {
|
||||||
s.txs.cleanupAppendIDsBelow(bound)
|
s.txs.cleanupAppendIDsBelow(bound)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memSeries) head() *memChunk {
|
func (s *memSeries) head() *memChunk {
|
||||||
|
|
|
@ -534,7 +534,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
||||||
s.sampleBuf[2] = s.sampleBuf[3]
|
s.sampleBuf[2] = s.sampleBuf[3]
|
||||||
s.sampleBuf[3] = sample{t: t, v: v}
|
s.sampleBuf[3] = sample{t: t, v: v}
|
||||||
|
|
||||||
if appendID > 0 {
|
if appendID > 0 && s.txs != nil {
|
||||||
s.txs.add(appendID)
|
s.txs.add(appendID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -391,7 +391,7 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
|
||||||
numSamples := c.chunk.NumSamples()
|
numSamples := c.chunk.NumSamples()
|
||||||
stopAfter := numSamples
|
stopAfter := numSamples
|
||||||
|
|
||||||
if isoState != nil {
|
if isoState != nil && !isoState.IsolationDisabled() {
|
||||||
totalSamples := 0 // Total samples in this series.
|
totalSamples := 0 // Total samples in this series.
|
||||||
previousSamples := 0 // Samples before this chunk.
|
previousSamples := 0 // Samples before this chunk.
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,7 @@ func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.
|
||||||
opts.ChunkDirRoot = dir
|
opts.ChunkDirRoot = dir
|
||||||
opts.EnableExemplarStorage = true
|
opts.EnableExemplarStorage = true
|
||||||
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
opts.MaxExemplars.Store(config.DefaultExemplarsConfig.MaxExemplars)
|
||||||
|
|
||||||
h, err := NewHead(nil, nil, wlog, opts, nil)
|
h, err := NewHead(nil, nil, wlog, opts, nil)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -228,7 +229,7 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||||
// Create one mmapped chunk per series, with one sample at the given time.
|
// Create one mmapped chunk per series, with one sample at the given time.
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, lbls.Hash(), c.mmappedChunkT, 0, nil)
|
s := newMemSeries(lbls, chunks.HeadSeriesRef(k)*101, lbls.Hash(), c.mmappedChunkT, 0, nil, defaultIsolationDisabled)
|
||||||
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
|
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
|
||||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||||
}
|
}
|
||||||
|
@ -553,7 +554,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
lbls := labels.FromStrings("a", "b")
|
lbls := labels.FromStrings("a", "b")
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool)
|
s := newMemSeries(lbls, 1, lbls.Hash(), 2000, 0, &memChunkPool, defaultIsolationDisabled)
|
||||||
|
|
||||||
for i := 0; i < 4000; i += 5 {
|
for i := 0; i < 4000; i += 5 {
|
||||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
||||||
|
@ -1092,7 +1093,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil)
|
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil, defaultIsolationDisabled)
|
||||||
|
|
||||||
// Add first two samples at the very end of a chunk range and the next two
|
// Add first two samples at the very end of a chunk range and the next two
|
||||||
// on and after it.
|
// on and after it.
|
||||||
|
@ -1550,6 +1551,10 @@ func TestAddDuplicateLabelName(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMemSeriesIsolation(t *testing.T) {
|
func TestMemSeriesIsolation(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
// Put a series, select it. GC it and then access it.
|
// Put a series, select it. GC it and then access it.
|
||||||
lastValue := func(h *Head, maxAppendID uint64) int {
|
lastValue := func(h *Head, maxAppendID uint64) int {
|
||||||
idx, err := h.Index()
|
idx, err := h.Index()
|
||||||
|
@ -1721,6 +1726,10 @@ func TestMemSeriesIsolation(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIsolationRollback(t *testing.T) {
|
func TestIsolationRollback(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
// Rollback after a failed append and test if the low watermark has progressed anyway.
|
// Rollback after a failed append and test if the low watermark has progressed anyway.
|
||||||
hb, _ := newTestHead(t, 1000, false)
|
hb, _ := newTestHead(t, 1000, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -1749,6 +1758,10 @@ func TestIsolationRollback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
hb, _ := newTestHead(t, 1000, false)
|
hb, _ := newTestHead(t, 1000, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
|
@ -1782,6 +1795,10 @@ func TestIsolationLowWatermarkMonotonous(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
h, _ := newTestHead(t, 1000, false)
|
h, _ := newTestHead(t, 1000, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
|
@ -1803,6 +1820,10 @@ func TestHeadSeriesChunkRace(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestIsolationWithoutAdd(t *testing.T) {
|
func TestIsolationWithoutAdd(t *testing.T) {
|
||||||
|
if defaultIsolationDisabled {
|
||||||
|
t.Skip("skipping test since tsdb isolation is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
hb, _ := newTestHead(t, 1000, false)
|
hb, _ := newTestHead(t, 1000, false)
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, hb.Close())
|
require.NoError(t, hb.Close())
|
||||||
|
@ -2323,7 +2344,7 @@ func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
lbls := labels.Labels{}
|
lbls := labels.Labels{}
|
||||||
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil)
|
s := newMemSeries(lbls, 1, lbls.Hash(), 500, 0, nil, defaultIsolationDisabled)
|
||||||
|
|
||||||
for i := 0; i < 7; i++ {
|
for i := 0; i < 7; i++ {
|
||||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper)
|
||||||
|
|
|
@ -39,6 +39,10 @@ func (i *isolationState) Close() {
|
||||||
i.prev.next = i.next
|
i.prev.next = i.next
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (i *isolationState) IsolationDisabled() bool {
|
||||||
|
return i.isolation.disabled
|
||||||
|
}
|
||||||
|
|
||||||
type isolationAppender struct {
|
type isolationAppender struct {
|
||||||
appendID uint64
|
appendID uint64
|
||||||
prev *isolationAppender
|
prev *isolationAppender
|
||||||
|
@ -63,9 +67,11 @@ type isolation struct {
|
||||||
readMtx sync.RWMutex
|
readMtx sync.RWMutex
|
||||||
// All current in use isolationStates. This is a doubly-linked list.
|
// All current in use isolationStates. This is a doubly-linked list.
|
||||||
readsOpen *isolationState
|
readsOpen *isolationState
|
||||||
|
// If true, writes are not tracked while reads are still tracked.
|
||||||
|
disabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIsolation() *isolation {
|
func newIsolation(disabled bool) *isolation {
|
||||||
isoState := &isolationState{}
|
isoState := &isolationState{}
|
||||||
isoState.next = isoState
|
isoState.next = isoState
|
||||||
isoState.prev = isoState
|
isoState.prev = isoState
|
||||||
|
@ -78,6 +84,7 @@ func newIsolation() *isolation {
|
||||||
appendsOpen: map[uint64]*isolationAppender{},
|
appendsOpen: map[uint64]*isolationAppender{},
|
||||||
appendsOpenList: appender,
|
appendsOpenList: appender,
|
||||||
readsOpen: isoState,
|
readsOpen: isoState,
|
||||||
|
disabled: disabled,
|
||||||
appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }},
|
appendersPool: sync.Pool{New: func() interface{} { return &isolationAppender{} }},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,12 +92,20 @@ func newIsolation() *isolation {
|
||||||
// lowWatermark returns the appendID below which we no longer need to track
|
// lowWatermark returns the appendID below which we no longer need to track
|
||||||
// which appends were from which appendID.
|
// which appends were from which appendID.
|
||||||
func (i *isolation) lowWatermark() uint64 {
|
func (i *isolation) lowWatermark() uint64 {
|
||||||
|
if i.disabled {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
i.appendMtx.RLock() // Take appendMtx first.
|
i.appendMtx.RLock() // Take appendMtx first.
|
||||||
defer i.appendMtx.RUnlock()
|
defer i.appendMtx.RUnlock()
|
||||||
return i.lowWatermarkLocked()
|
return i.lowWatermarkLocked()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *isolation) lowWatermarkLocked() uint64 {
|
func (i *isolation) lowWatermarkLocked() uint64 {
|
||||||
|
if i.disabled {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
i.readMtx.RLock()
|
i.readMtx.RLock()
|
||||||
defer i.readMtx.RUnlock()
|
defer i.readMtx.RUnlock()
|
||||||
if i.readsOpen.prev != i.readsOpen {
|
if i.readsOpen.prev != i.readsOpen {
|
||||||
|
@ -106,6 +121,8 @@ func (i *isolation) lowWatermarkLocked() uint64 {
|
||||||
func (i *isolation) State(mint, maxt int64) *isolationState {
|
func (i *isolation) State(mint, maxt int64) *isolationState {
|
||||||
i.appendMtx.RLock() // Take append mutex before read mutex.
|
i.appendMtx.RLock() // Take append mutex before read mutex.
|
||||||
defer i.appendMtx.RUnlock()
|
defer i.appendMtx.RUnlock()
|
||||||
|
|
||||||
|
// We need to track the reads even when isolation is disabled.
|
||||||
isoState := &isolationState{
|
isoState := &isolationState{
|
||||||
maxAppendID: i.appendsOpenList.appendID,
|
maxAppendID: i.appendsOpenList.appendID,
|
||||||
lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
|
lowWatermark: i.appendsOpenList.next.appendID, // Lowest appendID from appenders, or lastAppendId.
|
||||||
|
@ -124,6 +141,7 @@ func (i *isolation) State(mint, maxt int64) *isolationState {
|
||||||
isoState.next = i.readsOpen.next
|
isoState.next = i.readsOpen.next
|
||||||
i.readsOpen.next.prev = isoState
|
i.readsOpen.next.prev = isoState
|
||||||
i.readsOpen.next = isoState
|
i.readsOpen.next = isoState
|
||||||
|
|
||||||
return isoState
|
return isoState
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,6 +164,10 @@ func (i *isolation) TraverseOpenReads(f func(s *isolationState) bool) {
|
||||||
// ID. The first ID returned is 1.
|
// ID. The first ID returned is 1.
|
||||||
// Also returns the low watermark, to keep lock/unlock operations down.
|
// Also returns the low watermark, to keep lock/unlock operations down.
|
||||||
func (i *isolation) newAppendID() (uint64, uint64) {
|
func (i *isolation) newAppendID() (uint64, uint64) {
|
||||||
|
if i.disabled {
|
||||||
|
return 0, 0
|
||||||
|
}
|
||||||
|
|
||||||
i.appendMtx.Lock()
|
i.appendMtx.Lock()
|
||||||
defer i.appendMtx.Unlock()
|
defer i.appendMtx.Unlock()
|
||||||
|
|
||||||
|
@ -165,6 +187,10 @@ func (i *isolation) newAppendID() (uint64, uint64) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *isolation) lastAppendID() uint64 {
|
func (i *isolation) lastAppendID() uint64 {
|
||||||
|
if i.disabled {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
i.appendMtx.RLock()
|
i.appendMtx.RLock()
|
||||||
defer i.appendMtx.RUnlock()
|
defer i.appendMtx.RUnlock()
|
||||||
|
|
||||||
|
@ -172,6 +198,10 @@ func (i *isolation) lastAppendID() uint64 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *isolation) closeAppend(appendID uint64) {
|
func (i *isolation) closeAppend(appendID uint64) {
|
||||||
|
if i.disabled {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
i.appendMtx.Lock()
|
i.appendMtx.Lock()
|
||||||
defer i.appendMtx.Unlock()
|
defer i.appendMtx.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import (
|
||||||
func BenchmarkIsolation(b *testing.B) {
|
func BenchmarkIsolation(b *testing.B) {
|
||||||
for _, goroutines := range []int{10, 100, 1000, 10000} {
|
for _, goroutines := range []int{10, 100, 1000, 10000} {
|
||||||
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
|
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
|
||||||
iso := newIsolation()
|
iso := newIsolation(false)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
start := make(chan struct{})
|
start := make(chan struct{})
|
||||||
|
@ -53,7 +53,7 @@ func BenchmarkIsolation(b *testing.B) {
|
||||||
func BenchmarkIsolationWithState(b *testing.B) {
|
func BenchmarkIsolationWithState(b *testing.B) {
|
||||||
for _, goroutines := range []int{10, 100, 1000, 10000} {
|
for _, goroutines := range []int{10, 100, 1000, 10000} {
|
||||||
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
|
b.Run(strconv.Itoa(goroutines), func(b *testing.B) {
|
||||||
iso := newIsolation()
|
iso := newIsolation(false)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
start := make(chan struct{})
|
start := make(chan struct{})
|
||||||
|
|
|
@ -63,7 +63,7 @@
|
||||||
"prettier": "^2.4.1",
|
"prettier": "^2.4.1",
|
||||||
"ts-loader": "^7.0.4",
|
"ts-loader": "^7.0.4",
|
||||||
"ts-mocha": "^8.0.0",
|
"ts-mocha": "^8.0.0",
|
||||||
"ts-node": "^9.0.0",
|
"ts-node": "^10.4.0",
|
||||||
"typescript": "^4.5.2"
|
"typescript": "^4.5.2"
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
"peerDependencies": {
|
||||||
|
|
1061
web/ui/package-lock.json
generated
1061
web/ui/package-lock.json
generated
File diff suppressed because it is too large
Load diff
|
@ -11,7 +11,7 @@
|
||||||
"@codemirror/history": "^0.19.0",
|
"@codemirror/history": "^0.19.0",
|
||||||
"@codemirror/language": "^0.19.5",
|
"@codemirror/language": "^0.19.5",
|
||||||
"@codemirror/lint": "^0.19.3",
|
"@codemirror/lint": "^0.19.3",
|
||||||
"@codemirror/matchbrackets": "^0.19.1",
|
"@codemirror/matchbrackets": "^0.19.3",
|
||||||
"@codemirror/search": "^0.19.2",
|
"@codemirror/search": "^0.19.2",
|
||||||
"@codemirror/state": "^0.19.5",
|
"@codemirror/state": "^0.19.5",
|
||||||
"@codemirror/view": "^0.19.19",
|
"@codemirror/view": "^0.19.19",
|
||||||
|
@ -37,7 +37,7 @@
|
||||||
"react-router-dom": "^5.2.1",
|
"react-router-dom": "^5.2.1",
|
||||||
"react-test-renderer": "^17.0.2",
|
"react-test-renderer": "^17.0.2",
|
||||||
"reactstrap": "^8.9.0",
|
"reactstrap": "^8.9.0",
|
||||||
"sanitize-html": "^2.3.3",
|
"sanitize-html": "^2.5.3",
|
||||||
"sass": "1.43.4",
|
"sass": "1.43.4",
|
||||||
"tempusdominus-bootstrap-4": "^5.1.2",
|
"tempusdominus-bootstrap-4": "^5.1.2",
|
||||||
"tempusdominus-core": "^5.0.3"
|
"tempusdominus-core": "^5.0.3"
|
||||||
|
@ -67,7 +67,7 @@
|
||||||
"@testing-library/react-hooks": "^7.0.1",
|
"@testing-library/react-hooks": "^7.0.1",
|
||||||
"@types/enzyme": "^3.10.10",
|
"@types/enzyme": "^3.10.10",
|
||||||
"@types/flot": "0.0.32",
|
"@types/flot": "0.0.32",
|
||||||
"@types/jest": "^27.0.1",
|
"@types/jest": "^27.0.2",
|
||||||
"@types/jquery": "^3.5.8",
|
"@types/jquery": "^3.5.8",
|
||||||
"@types/node": "^16.11.7",
|
"@types/node": "^16.11.7",
|
||||||
"@types/react": "^17.0.35",
|
"@types/react": "^17.0.35",
|
||||||
|
|
Loading…
Reference in a new issue