mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
De-flag preloadChunksForRange
Now there is preloadChunksForRange and preloadChunksForInstant in both, the series and the storage.
This commit is contained in:
parent
dcb7c0d3ee
commit
beb36df4bb
|
@ -30,9 +30,9 @@ func (p *memorySeriesPreloader) PreloadRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from model.Time, through model.Time,
|
||||||
) (SeriesIterator, error) {
|
) (SeriesIterator, error) {
|
||||||
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false)
|
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return iter, err
|
return nil, err
|
||||||
}
|
}
|
||||||
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
|
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
|
||||||
return iter, nil
|
return iter, nil
|
||||||
|
@ -43,7 +43,7 @@ func (p *memorySeriesPreloader) PreloadInstant(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
timestamp model.Time, stalenessDelta time.Duration,
|
timestamp model.Time, stalenessDelta time.Duration,
|
||||||
) (SeriesIterator, error) {
|
) (SeriesIterator, error) {
|
||||||
cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
|
cds, iter, err := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,19 +383,18 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// preloadChunksForRange loads chunks for the given range from the persistence.
|
// preloadChunksForInstant preloads chunks for the latest value in the given
|
||||||
// The caller must have locked the fingerprint of the series.
|
// range. If the last sample saved in the memorySeries itself is the latest
|
||||||
func (s *memorySeries) preloadChunksForRange(
|
// value in the given range, it will in fact preload zero chunks and just take
|
||||||
|
// that value.
|
||||||
|
func (s *memorySeries) preloadChunksForInstant(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from model.Time, through model.Time,
|
||||||
lastSampleOnly bool,
|
|
||||||
mss *memorySeriesStorage,
|
mss *memorySeriesStorage,
|
||||||
) ([]*chunkDesc, SeriesIterator, error) {
|
) ([]*chunkDesc, SeriesIterator, error) {
|
||||||
// If we have to preload for only one sample, and we have a
|
// If we have a lastSamplePair in the series, and thas last samplePair
|
||||||
// lastSamplePair in the series, and thas last samplePair is in the
|
// is in the interval, just take it in a singleSampleSeriesIterator. No
|
||||||
// interval, just take it in a singleSampleSeriesIterator. No need to
|
// need to pin or load anything.
|
||||||
// pin or load anything.
|
|
||||||
if lastSampleOnly {
|
|
||||||
lastSample := s.lastSamplePair()
|
lastSample := s.lastSamplePair()
|
||||||
if !through.Before(lastSample.Timestamp) &&
|
if !through.Before(lastSample.Timestamp) &&
|
||||||
!from.After(lastSample.Timestamp) &&
|
!from.After(lastSample.Timestamp) &&
|
||||||
|
@ -406,7 +405,18 @@ func (s *memorySeries) preloadChunksForRange(
|
||||||
}
|
}
|
||||||
return nil, iter, nil
|
return nil, iter, nil
|
||||||
}
|
}
|
||||||
|
// If we are here, we are out of luck and have to delegate to the more
|
||||||
|
// expensive method.
|
||||||
|
return s.preloadChunksForRange(fp, from, through, mss)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// preloadChunksForRange loads chunks for the given range from the persistence.
|
||||||
|
// The caller must have locked the fingerprint of the series.
|
||||||
|
func (s *memorySeries) preloadChunksForRange(
|
||||||
|
fp model.Fingerprint,
|
||||||
|
from model.Time, through model.Time,
|
||||||
|
mss *memorySeriesStorage,
|
||||||
|
) ([]*chunkDesc, SeriesIterator, error) {
|
||||||
firstChunkDescTime := model.Latest
|
firstChunkDescTime := model.Latest
|
||||||
if len(s.chunkDescs) > 0 {
|
if len(s.chunkDescs) > 0 {
|
||||||
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
||||||
|
|
|
@ -688,35 +688,58 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
|
||||||
return series
|
return series
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *memorySeriesStorage) getSeriesForRange(
|
||||||
|
fp model.Fingerprint,
|
||||||
|
from model.Time, through model.Time,
|
||||||
|
) (*memorySeries, error) {
|
||||||
|
series, ok := s.fpToSeries.get(fp)
|
||||||
|
if ok {
|
||||||
|
return series, nil
|
||||||
|
}
|
||||||
|
has, first, last, err := s.persistence.hasArchivedMetric(fp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !has {
|
||||||
|
s.invalidPreloadRequestsCount.Inc()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if last.Before(from) || first.After(through) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
metric, err := s.persistence.archivedMetric(fp)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return s.getOrCreateSeries(fp, metric), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) preloadChunksForRange(
|
func (s *memorySeriesStorage) preloadChunksForRange(
|
||||||
fp model.Fingerprint,
|
fp model.Fingerprint,
|
||||||
from model.Time, through model.Time,
|
from model.Time, through model.Time,
|
||||||
lastSampleOnly bool,
|
|
||||||
) ([]*chunkDesc, SeriesIterator, error) {
|
) ([]*chunkDesc, SeriesIterator, error) {
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
series, ok := s.fpToSeries.get(fp)
|
series, err := s.getSeriesForRange(fp, from, through)
|
||||||
if !ok {
|
if err != nil || series == nil {
|
||||||
has, first, last, err := s.persistence.hasArchivedMetric(fp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nopIter, err
|
return nil, nopIter, err
|
||||||
}
|
}
|
||||||
if !has {
|
return series.preloadChunksForRange(fp, from, through, s)
|
||||||
s.invalidPreloadRequestsCount.Inc()
|
|
||||||
return nil, nopIter, nil
|
|
||||||
}
|
}
|
||||||
if from.Before(last) && through.After(first) {
|
|
||||||
metric, err := s.persistence.archivedMetric(fp)
|
func (s *memorySeriesStorage) preloadChunksForInstant(
|
||||||
if err != nil {
|
fp model.Fingerprint,
|
||||||
|
from model.Time, through model.Time,
|
||||||
|
) ([]*chunkDesc, SeriesIterator, error) {
|
||||||
|
s.fpLocker.Lock(fp)
|
||||||
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
|
series, err := s.getSeriesForRange(fp, from, through)
|
||||||
|
if err != nil || series == nil {
|
||||||
return nil, nopIter, err
|
return nil, nopIter, err
|
||||||
}
|
}
|
||||||
series = s.getOrCreateSeries(fp, metric)
|
return series.preloadChunksForInstant(fp, from, through, s)
|
||||||
} else {
|
|
||||||
return nil, nopIter, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memorySeriesStorage) handleEvictList() {
|
func (s *memorySeriesStorage) handleEvictList() {
|
||||||
|
|
|
@ -500,7 +500,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
|
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -508,7 +508,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
|
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -533,7 +533,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
|
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
|
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -541,7 +541,7 @@ func TestDropMetrics(t *testing.T) {
|
||||||
t.Errorf("unexpected number of samples: %d", len(vals))
|
t.Errorf("unexpected number of samples: %d", len(vals))
|
||||||
}
|
}
|
||||||
|
|
||||||
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
|
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -670,7 +670,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
fp := model.Metric{}.FastFingerprint()
|
fp := model.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -747,7 +747,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
|
||||||
|
|
||||||
fp := model.Metric{}.FastFingerprint()
|
fp := model.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("Error preloading everything: %s", err)
|
b.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -828,7 +828,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
fp := model.Metric{}.FastFingerprint()
|
fp := model.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -983,7 +983,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
|
||||||
|
|
||||||
fp := model.Metric{}.FastFingerprint()
|
fp := model.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
b.Fatalf("Error preloading everything: %s", err)
|
b.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1032,7 +1032,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// Drop ~half of the chunks.
|
// Drop ~half of the chunks.
|
||||||
s.maintainMemorySeries(fp, 10000)
|
s.maintainMemorySeries(fp, 10000)
|
||||||
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1053,7 +1053,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// Drop everything.
|
// Drop everything.
|
||||||
s.maintainMemorySeries(fp, 100000)
|
s.maintainMemorySeries(fp, 100000)
|
||||||
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
|
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error preloading everything: %s", err)
|
t.Fatalf("Error preloading everything: %s", err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue