mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-15 01:54:06 -08:00
Merge pull request #221: Update prometheus to af0f6da5
Update prometheus to `af0f6da5`
This commit is contained in:
commit
19bfb1e6e1
|
@ -127,7 +127,7 @@ func (Chunk_Encoding) EnumDescriptor() ([]byte, []int) {
|
|||
|
||||
type MetricMetadata struct {
|
||||
// Represents the metric type, these match the set from Prometheus.
|
||||
// Refer to pkg/textparse/interface.go for details.
|
||||
// Refer to model/textparse/interface.go for details.
|
||||
Type MetricMetadata_MetricType `protobuf:"varint,1,opt,name=type,proto3,enum=prometheus.MetricMetadata_MetricType" json:"type,omitempty"`
|
||||
MetricFamilyName string `protobuf:"bytes,2,opt,name=metric_family_name,json=metricFamilyName,proto3" json:"metric_family_name,omitempty"`
|
||||
Help string `protobuf:"bytes,4,opt,name=help,proto3" json:"help,omitempty"`
|
||||
|
@ -200,7 +200,7 @@ func (m *MetricMetadata) GetUnit() string {
|
|||
|
||||
type Sample struct {
|
||||
Value float64 `protobuf:"fixed64,1,opt,name=value,proto3" json:"value,omitempty"`
|
||||
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
|
||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
||||
// conversion from time.Time to Prometheus timestamp.
|
||||
Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
|
@ -259,7 +259,7 @@ type Exemplar struct {
|
|||
// Optional, can be empty.
|
||||
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
|
||||
Value float64 `protobuf:"fixed64,2,opt,name=value,proto3" json:"value,omitempty"`
|
||||
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
|
||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
||||
// conversion from time.Time to Prometheus timestamp.
|
||||
Timestamp int64 `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
XXX_NoUnkeyedLiteral struct{} `json:"-"`
|
||||
|
|
|
@ -31,7 +31,7 @@ message MetricMetadata {
|
|||
}
|
||||
|
||||
// Represents the metric type, these match the set from Prometheus.
|
||||
// Refer to pkg/textparse/interface.go for details.
|
||||
// Refer to model/textparse/interface.go for details.
|
||||
MetricType type = 1;
|
||||
string metric_family_name = 2;
|
||||
string help = 4;
|
||||
|
@ -40,7 +40,7 @@ message MetricMetadata {
|
|||
|
||||
message Sample {
|
||||
double value = 1;
|
||||
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
|
||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
||||
// conversion from time.Time to Prometheus timestamp.
|
||||
int64 timestamp = 2;
|
||||
}
|
||||
|
@ -49,7 +49,7 @@ message Exemplar {
|
|||
// Optional, can be empty.
|
||||
repeated Label labels = 1 [(gogoproto.nullable) = false];
|
||||
double value = 2;
|
||||
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
|
||||
// timestamp is in ms format, see model/timestamp/timestamp.go for
|
||||
// conversion from time.Time to Prometheus timestamp.
|
||||
int64 timestamp = 3;
|
||||
}
|
||||
|
|
|
@ -440,6 +440,8 @@ func (db *DB) loadWAL(r *wal.Reader, multiRef map[chunks.HeadSeriesRef]chunks.He
|
|||
decoded <- samples
|
||||
case record.Tombstones, record.Exemplars:
|
||||
// We don't care about tombstones or exemplars during replay.
|
||||
// TODO: If decide to decode exemplars, we should make sure to prepopulate
|
||||
// stripeSeries.exemplars in the next block by using setLatestExemplar.
|
||||
continue
|
||||
default:
|
||||
errCh <- &wal.CorruptionErr{
|
||||
|
@ -789,6 +791,16 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
|
|||
}
|
||||
}
|
||||
|
||||
// Check for duplicate vs last stored exemplar for this series, and discard those.
|
||||
// Otherwise, record the current exemplar as the latest.
|
||||
// Prometheus' TSDB returns 0 when encountering duplicates, so we do the same here.
|
||||
prevExemplar := a.series.GetLatestExemplar(s.ref)
|
||||
if prevExemplar != nil && prevExemplar.Equals(e) {
|
||||
// Duplicate, don't return an error but don't accept the exemplar.
|
||||
return 0, nil
|
||||
}
|
||||
a.series.SetLatestExemplar(s.ref, &e)
|
||||
|
||||
a.pendingExamplars = append(a.pendingExamplars, record.RefExemplar{
|
||||
Ref: s.ref,
|
||||
T: e.Ts,
|
||||
|
@ -796,6 +808,7 @@ func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exem
|
|||
Labels: e.Labels,
|
||||
})
|
||||
|
||||
a.metrics.totalAppendedExemplars.Inc()
|
||||
return storage.SeriesRef(s.ref), nil
|
||||
}
|
||||
|
||||
|
|
|
@ -129,7 +129,7 @@ func TestCommit(t *testing.T) {
|
|||
|
||||
e := exemplar.Exemplar{
|
||||
Labels: lset,
|
||||
Ts: sample[0].T(),
|
||||
Ts: sample[0].T() + int64(i),
|
||||
Value: sample[0].V(),
|
||||
HasTs: true,
|
||||
}
|
||||
|
@ -482,3 +482,56 @@ func gatherFamily(t *testing.T, reg prometheus.Gatherer, familyName string) *dto
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
|
||||
s := createTestAgentDB(t, nil, DefaultOptions())
|
||||
app := s.Appender(context.Background())
|
||||
defer s.Close()
|
||||
|
||||
sRef, err := app.Append(0, labels.Labels{{Name: "a", Value: "1"}}, 0, 0)
|
||||
require.NoError(t, err, "should not reject valid series")
|
||||
|
||||
// Write a few exemplars to our appender and call Commit().
|
||||
// If the Labels, Value or Timestamp are different than the last exemplar,
|
||||
// then a new one should be appended; Otherwise, it should be skipped.
|
||||
e := exemplar.Exemplar{Labels: labels.Labels{{Name: "a", Value: "1"}}, Value: 20, Ts: 10, HasTs: true}
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
|
||||
e.Labels = labels.Labels{{Name: "b", Value: "2"}}
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
|
||||
e.Value = 42
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
|
||||
e.Ts = 25
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
_, _ = app.AppendExemplar(sRef, nil, e)
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
|
||||
// Read back what was written to the WAL.
|
||||
var walExemplarsCount int
|
||||
sr, err := wal.NewSegmentsReader(s.wal.Dir())
|
||||
require.NoError(t, err)
|
||||
defer sr.Close()
|
||||
r := wal.NewReader(sr)
|
||||
|
||||
var dec record.Decoder
|
||||
for r.Next() {
|
||||
rec := r.Record()
|
||||
switch dec.Type(rec) {
|
||||
case record.Exemplars:
|
||||
var exemplars []record.RefExemplar
|
||||
exemplars, err = dec.Exemplars(rec, exemplars)
|
||||
require.NoError(t, err)
|
||||
walExemplarsCount += len(exemplars)
|
||||
}
|
||||
}
|
||||
|
||||
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
|
||||
require.Equal(t, 4, walExemplarsCount)
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package agent
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/prometheus/model/exemplar"
|
||||
"github.com/prometheus/prometheus/model/labels"
|
||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||
)
|
||||
|
@ -89,10 +90,11 @@ func (m seriesHashmap) Delete(hash uint64, ref chunks.HeadSeriesRef) {
|
|||
// Filling the padded space with the maps was profiled to be slower -
|
||||
// likely due to the additional pointer dereferences.
|
||||
type stripeSeries struct {
|
||||
size int
|
||||
series []map[chunks.HeadSeriesRef]*memSeries
|
||||
hashes []seriesHashmap
|
||||
locks []stripeLock
|
||||
size int
|
||||
series []map[chunks.HeadSeriesRef]*memSeries
|
||||
hashes []seriesHashmap
|
||||
exemplars []map[chunks.HeadSeriesRef]*exemplar.Exemplar
|
||||
locks []stripeLock
|
||||
|
||||
gcMut sync.Mutex
|
||||
}
|
||||
|
@ -105,10 +107,11 @@ type stripeLock struct {
|
|||
|
||||
func newStripeSeries(stripeSize int) *stripeSeries {
|
||||
s := &stripeSeries{
|
||||
size: stripeSize,
|
||||
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
|
||||
hashes: make([]seriesHashmap, stripeSize),
|
||||
locks: make([]stripeLock, stripeSize),
|
||||
size: stripeSize,
|
||||
series: make([]map[chunks.HeadSeriesRef]*memSeries, stripeSize),
|
||||
hashes: make([]seriesHashmap, stripeSize),
|
||||
exemplars: make([]map[chunks.HeadSeriesRef]*exemplar.Exemplar, stripeSize),
|
||||
locks: make([]stripeLock, stripeSize),
|
||||
}
|
||||
for i := range s.series {
|
||||
s.series[i] = map[chunks.HeadSeriesRef]*memSeries{}
|
||||
|
@ -116,6 +119,9 @@ func newStripeSeries(stripeSize int) *stripeSeries {
|
|||
for i := range s.hashes {
|
||||
s.hashes[i] = seriesHashmap{}
|
||||
}
|
||||
for i := range s.exemplars {
|
||||
s.exemplars[i] = map[chunks.HeadSeriesRef]*exemplar.Exemplar{}
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
|
@ -154,6 +160,10 @@ func (s *stripeSeries) GC(mint int64) map[chunks.HeadSeriesRef]struct{} {
|
|||
delete(s.series[refLock], series.ref)
|
||||
s.hashes[hashLock].Delete(hash, series.ref)
|
||||
|
||||
// Since the series is gone, we'll also delete
|
||||
// the latest stored exemplar.
|
||||
delete(s.exemplars[refLock], series.ref)
|
||||
|
||||
if hashLock != refLock {
|
||||
s.locks[refLock].Unlock()
|
||||
}
|
||||
|
@ -201,3 +211,24 @@ func (s *stripeSeries) Set(hash uint64, series *memSeries) {
|
|||
s.hashes[hashLock].Set(hash, series)
|
||||
s.locks[hashLock].Unlock()
|
||||
}
|
||||
|
||||
func (s *stripeSeries) GetLatestExemplar(ref chunks.HeadSeriesRef) *exemplar.Exemplar {
|
||||
i := uint64(ref) & uint64(s.size-1)
|
||||
|
||||
s.locks[i].RLock()
|
||||
exemplar := s.exemplars[i][ref]
|
||||
s.locks[i].RUnlock()
|
||||
|
||||
return exemplar
|
||||
}
|
||||
|
||||
func (s *stripeSeries) SetLatestExemplar(ref chunks.HeadSeriesRef, exemplar *exemplar.Exemplar) {
|
||||
i := uint64(ref) & uint64(s.size-1)
|
||||
|
||||
// Make sure that's a valid series id and record its latest exemplar
|
||||
s.locks[i].Lock()
|
||||
if s.series[i][ref] != nil {
|
||||
s.exemplars[i][ref] = exemplar
|
||||
}
|
||||
s.locks[i].Unlock()
|
||||
}
|
||||
|
|
|
@ -530,7 +530,12 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
|||
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, maxNextAt)
|
||||
s.nextAt = addJitterToChunkEndTime(s.hash, c.minTime, s.nextAt, maxNextAt, s.chunkEndTimeVariance)
|
||||
}
|
||||
if t >= s.nextAt {
|
||||
// If numSamples > samplesPerChunk*2 then our previous prediction was invalid,
|
||||
// most likely because samples rate has changed and now they are arriving more frequently.
|
||||
// Since we assume that the rate is higher, we're being conservative and cutting at 2*samplesPerChunk
|
||||
// as we expect more chunks to come.
|
||||
// Note that next chunk will have its nextAt recalculated for the new rate.
|
||||
if t >= s.nextAt || numSamples >= samplesPerChunk*2 {
|
||||
c = s.cutNewHeadChunk(t, chunkDiskMapper)
|
||||
chunkCreated = true
|
||||
}
|
||||
|
|
|
@ -1313,6 +1313,51 @@ func TestMemSeries_append(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
const samplesPerChunk = 120
|
||||
dir := t.TempDir()
|
||||
// This is usually taken from the Head, but passing manually here.
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
})
|
||||
|
||||
lbls := labels.Labels{}
|
||||
s := newMemSeries(lbls, 1, lbls.Hash(), DefaultBlockDuration, 0, nil, defaultIsolationDisabled)
|
||||
|
||||
// At this slow rate, we will fill the chunk in two block durations.
|
||||
slowRate := (DefaultBlockDuration * 2) / samplesPerChunk
|
||||
|
||||
var nextTs int64
|
||||
var totalAppendedSamples int
|
||||
for i := 0; i < samplesPerChunk/4; i++ {
|
||||
_, ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
|
||||
require.Truef(t, ok, "slow sample %d was not appended", i)
|
||||
nextTs += slowRate
|
||||
totalAppendedSamples++
|
||||
}
|
||||
require.Equal(t, DefaultBlockDuration, s.nextAt, "after appending a samplesPerChunk/4 samples at a slow rate, we should aim to cut a new block at the default block duration %d, but it's set to %d", DefaultBlockDuration, s.nextAt)
|
||||
|
||||
// Suddenly, the rate increases and we receive a sample every millisecond.
|
||||
for i := 0; i < math.MaxUint16; i++ {
|
||||
_, ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper)
|
||||
require.Truef(t, ok, "quick sample %d was not appended", i)
|
||||
nextTs++
|
||||
totalAppendedSamples++
|
||||
}
|
||||
_, ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper)
|
||||
require.True(t, ok, "new chunk sample was not appended")
|
||||
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
|
||||
|
||||
var totalSamplesInChunks int
|
||||
for i, c := range s.mmappedChunks {
|
||||
totalSamplesInChunks += int(c.numSamples)
|
||||
require.LessOrEqualf(t, c.numSamples, uint16(2*samplesPerChunk), "mmapped chunk %d has more than %d samples", i, 2*samplesPerChunk)
|
||||
}
|
||||
require.Equal(t, totalAppendedSamples, totalSamplesInChunks, "wrong number of samples in %d mmapped chunks", len(s.mmappedChunks))
|
||||
}
|
||||
|
||||
func TestGCChunkAccess(t *testing.T) {
|
||||
// Put a chunk, select it. GC it and then access it.
|
||||
h, _ := newTestHead(t, 1000, false)
|
||||
|
|
Loading…
Reference in a new issue