2015-01-21 11:07:45 -08:00
|
|
|
// Copyright 2014 The Prometheus Authors
|
2014-09-19 09:18:44 -07:00
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2014-09-16 06:47:24 -07:00
|
|
|
package local
|
2014-06-06 02:55:53 -07:00
|
|
|
|
|
|
|
import (
|
|
|
|
"sort"
|
|
|
|
"sync"
|
2014-10-23 06:18:32 -07:00
|
|
|
"sync/atomic"
|
2014-06-06 02:55:53 -07:00
|
|
|
|
|
|
|
clientmodel "github.com/prometheus/client_golang/model"
|
|
|
|
|
|
|
|
"github.com/prometheus/prometheus/storage/metric"
|
|
|
|
)
|
|
|
|
|
2014-10-21 08:53:53 -07:00
|
|
|
// chunkDescEvictionFactor is a factor used for chunkDesc eviction (as opposed
|
|
|
|
// to evictions of chunks, see method evictOlderThan. A chunk takes about 20x
|
|
|
|
// more memory than a chunkDesc. With a chunkDescEvictionFactor of 10, not more
|
|
|
|
// than a third of the total memory taken by a series will be used for
|
|
|
|
// chunkDescs.
|
|
|
|
const chunkDescEvictionFactor = 10
|
|
|
|
|
2014-10-07 10:11:24 -07:00
|
|
|
// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
|
|
|
|
type fingerprintSeriesPair struct {
|
|
|
|
fp clientmodel.Fingerprint
|
|
|
|
series *memorySeries
|
|
|
|
}
|
|
|
|
|
|
|
|
// seriesMap maps fingerprints to memory series. All its methods are
|
|
|
|
// goroutine-safe. A SeriesMap is effectively is a goroutine-safe version of
|
|
|
|
// map[clientmodel.Fingerprint]*memorySeries.
|
|
|
|
type seriesMap struct {
|
|
|
|
mtx sync.RWMutex
|
|
|
|
m map[clientmodel.Fingerprint]*memorySeries
|
|
|
|
}
|
|
|
|
|
|
|
|
// newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap
|
|
|
|
// based on a prefilled map, use an explicit initializer.
|
2014-10-08 04:49:42 -07:00
|
|
|
func newSeriesMap() *seriesMap {
|
|
|
|
return &seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
|
2014-10-07 10:11:24 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// length returns the number of mappings in the seriesMap.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) length() int {
|
2014-10-07 10:11:24 -07:00
|
|
|
sm.mtx.RLock()
|
|
|
|
defer sm.mtx.RUnlock()
|
|
|
|
|
|
|
|
return len(sm.m)
|
|
|
|
}
|
|
|
|
|
|
|
|
// get returns a memorySeries for a fingerprint. Return values have the same
|
|
|
|
// semantics as the native Go map.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
|
2014-10-07 10:11:24 -07:00
|
|
|
sm.mtx.RLock()
|
|
|
|
defer sm.mtx.RUnlock()
|
|
|
|
|
|
|
|
s, ok = sm.m[fp]
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-10-10 10:16:07 -07:00
|
|
|
// put adds a mapping to the seriesMap. It panics if s == nil.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
|
2014-10-07 10:11:24 -07:00
|
|
|
sm.mtx.Lock()
|
|
|
|
defer sm.mtx.Unlock()
|
|
|
|
|
2014-10-10 10:16:07 -07:00
|
|
|
if s == nil {
|
|
|
|
panic("tried to add nil pointer to seriesMap")
|
|
|
|
}
|
2014-10-07 10:11:24 -07:00
|
|
|
sm.m[fp] = s
|
|
|
|
}
|
|
|
|
|
|
|
|
// del removes a mapping from the series Map.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) del(fp clientmodel.Fingerprint) {
|
2014-10-07 10:11:24 -07:00
|
|
|
sm.mtx.Lock()
|
|
|
|
defer sm.mtx.Unlock()
|
|
|
|
|
|
|
|
delete(sm.m, fp)
|
|
|
|
}
|
|
|
|
|
|
|
|
// iter returns a channel that produces all mappings in the seriesMap. The
|
|
|
|
// channel will be closed once all fingerprints have been received. Not
|
|
|
|
// consuming all fingerprints from the channel will leak a goroutine. The
|
2014-10-07 11:09:56 -07:00
|
|
|
// semantics of concurrent modification of seriesMap is the similar as the one
|
|
|
|
// for iterating over a map with a 'range' clause. However, if the next element
|
|
|
|
// in iteration order is removed after the current element has been received
|
|
|
|
// from the channel, it will still be produced by the channel.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) iter() <-chan fingerprintSeriesPair {
|
2014-10-07 10:11:24 -07:00
|
|
|
ch := make(chan fingerprintSeriesPair)
|
|
|
|
go func() {
|
|
|
|
sm.mtx.RLock()
|
|
|
|
for fp, s := range sm.m {
|
|
|
|
sm.mtx.RUnlock()
|
|
|
|
ch <- fingerprintSeriesPair{fp, s}
|
|
|
|
sm.mtx.RLock()
|
|
|
|
}
|
|
|
|
sm.mtx.RUnlock()
|
2014-10-07 11:09:56 -07:00
|
|
|
close(ch)
|
2014-10-07 10:11:24 -07:00
|
|
|
}()
|
|
|
|
return ch
|
|
|
|
}
|
|
|
|
|
|
|
|
// fpIter returns a channel that produces all fingerprints in the seriesMap. The
|
|
|
|
// channel will be closed once all fingerprints have been received. Not
|
|
|
|
// consuming all fingerprints from the channel will leak a goroutine. The
|
2014-10-07 11:09:56 -07:00
|
|
|
// semantics of concurrent modification of seriesMap is the similar as the one
|
|
|
|
// for iterating over a map with a 'range' clause. However, if the next element
|
|
|
|
// in iteration order is removed after the current element has been received
|
|
|
|
// from the channel, it will still be produced by the channel.
|
2014-10-08 04:49:42 -07:00
|
|
|
func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint {
|
2014-10-07 10:11:24 -07:00
|
|
|
ch := make(chan clientmodel.Fingerprint)
|
|
|
|
go func() {
|
|
|
|
sm.mtx.RLock()
|
|
|
|
for fp := range sm.m {
|
|
|
|
sm.mtx.RUnlock()
|
|
|
|
ch <- fp
|
|
|
|
sm.mtx.RLock()
|
|
|
|
}
|
|
|
|
sm.mtx.RUnlock()
|
2014-10-07 11:09:56 -07:00
|
|
|
close(ch)
|
2014-10-07 10:11:24 -07:00
|
|
|
}()
|
|
|
|
return ch
|
|
|
|
}
|
|
|
|
|
2014-06-06 02:55:53 -07:00
|
|
|
type memorySeries struct {
|
|
|
|
metric clientmodel.Metric
|
2014-09-10 09:41:52 -07:00
|
|
|
// Sorted by start time, overlapping chunk ranges are forbidden.
|
2014-10-15 06:53:05 -07:00
|
|
|
chunkDescs []*chunkDesc
|
2014-10-27 12:40:48 -07:00
|
|
|
// The chunkDescs in memory might not have all the chunkDescs for the
|
|
|
|
// chunks that are persisted to disk. The missing chunkDescs are all
|
|
|
|
// contiguous and at the tail end. chunkDescsOffset is the index of the
|
|
|
|
// chunk on disk that corresponds to the first chunkDesc in memory. If
|
|
|
|
// it is 0, the chunkDescs are all loaded. A value of -1 denotes a
|
|
|
|
// special case: There are chunks on disk, but the offset to the
|
|
|
|
// chunkDescs in memory is unknown. Also, there is no overlap between
|
|
|
|
// chunks on disk and chunks in memory (implying that upon first
|
2014-11-05 11:02:45 -08:00
|
|
|
// persisting of a chunk in memory, the offset has to be set).
|
2014-10-27 12:40:48 -07:00
|
|
|
chunkDescsOffset int
|
2014-11-05 11:02:45 -08:00
|
|
|
// The savedFirstTime field is used as a fallback when the
|
|
|
|
// chunkDescsOffset is not 0. It can be used to save the firstTime of the
|
|
|
|
// first chunk before its chunk desc is evicted. In doubt, this field is
|
|
|
|
// just set to the oldest possible timestamp.
|
|
|
|
savedFirstTime clientmodel.Timestamp
|
2014-10-14 08:44:01 -07:00
|
|
|
// Whether the current head chunk has already been scheduled to be
|
|
|
|
// persisted. If true, the current head chunk must not be modified
|
|
|
|
// anymore.
|
2014-09-16 06:47:24 -07:00
|
|
|
headChunkPersisted bool
|
2014-10-27 07:55:44 -07:00
|
|
|
// Whether the current head chunk is used by an iterator. In that case,
|
|
|
|
// a non-persisted head chunk has to be cloned before more samples are
|
|
|
|
// appended.
|
|
|
|
headChunkUsedByIterator bool
|
2015-03-04 04:40:18 -08:00
|
|
|
// Which type of chunk to create if a new chunk is needed.
|
|
|
|
chunkType byte
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-10-07 10:11:24 -07:00
|
|
|
// newMemorySeries returns a pointer to a newly allocated memorySeries for the
|
|
|
|
// given metric. reallyNew defines if the memorySeries is a genuinely new series
|
|
|
|
// or (if false) a series for a metric being unarchived, i.e. a series that
|
2014-11-05 11:02:45 -08:00
|
|
|
// existed before but has been evicted from memory. If reallyNew is false,
|
|
|
|
// firstTime is ignored (and set to the lowest possible timestamp instead - it
|
2015-03-04 04:40:18 -08:00
|
|
|
// will be set properly upon the first eviction of chunkDescs). chunkType is the
|
|
|
|
// type of chunks newly created by this memorySeries.
|
|
|
|
func newMemorySeries(
|
|
|
|
m clientmodel.Metric,
|
|
|
|
reallyNew bool,
|
|
|
|
firstTime clientmodel.Timestamp,
|
|
|
|
chunkType byte,
|
|
|
|
) *memorySeries {
|
2014-11-05 11:02:45 -08:00
|
|
|
if reallyNew {
|
2015-02-12 08:23:42 -08:00
|
|
|
firstTime = clientmodel.Earliest
|
2014-11-05 11:02:45 -08:00
|
|
|
}
|
2014-10-27 12:40:48 -07:00
|
|
|
s := memorySeries{
|
2014-10-07 10:11:24 -07:00
|
|
|
metric: m,
|
|
|
|
headChunkPersisted: !reallyNew,
|
2014-11-05 11:02:45 -08:00
|
|
|
savedFirstTime: firstTime,
|
2015-03-04 04:40:18 -08:00
|
|
|
chunkType: chunkType,
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
2014-10-27 12:40:48 -07:00
|
|
|
if !reallyNew {
|
|
|
|
s.chunkDescsOffset = -1
|
|
|
|
}
|
|
|
|
return &s
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-10-07 10:11:24 -07:00
|
|
|
// add adds a sample pair to the series.
|
2014-10-27 12:40:48 -07:00
|
|
|
// It returns chunkDescs that must be queued to be persisted.
|
2014-10-07 10:11:24 -07:00
|
|
|
// The caller must have locked the fingerprint of the series.
|
2014-10-27 12:40:48 -07:00
|
|
|
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair) []*chunkDesc {
|
2014-09-16 06:47:24 -07:00
|
|
|
if len(s.chunkDescs) == 0 || s.headChunkPersisted {
|
2015-03-04 04:40:18 -08:00
|
|
|
newHead := newChunkDesc(chunkForType(s.chunkType))
|
2014-09-16 06:47:24 -07:00
|
|
|
s.chunkDescs = append(s.chunkDescs, newHead)
|
|
|
|
s.headChunkPersisted = false
|
2014-10-27 07:55:44 -07:00
|
|
|
} else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 {
|
|
|
|
// We only need to clone the head chunk if the current head
|
|
|
|
// chunk was used in an iterator at all and if the refCount is
|
|
|
|
// still greater than the 1 we always have because the head
|
|
|
|
// chunk is not yet persisted. The latter is just an
|
|
|
|
// approximation. We will still clone unnecessarily if an older
|
|
|
|
// iterator using a previous version of the head chunk is still
|
|
|
|
// around and keep the head chunk pinned. We needed to track
|
|
|
|
// pins by version of the head chunk, which is probably not
|
|
|
|
// worth the effort.
|
|
|
|
chunkOps.WithLabelValues(clone).Inc()
|
|
|
|
// No locking needed here because a non-persisted head chunk can
|
|
|
|
// not get evicted concurrently.
|
|
|
|
s.head().chunk = s.head().chunk.clone()
|
|
|
|
s.headChunkUsedByIterator = false
|
2014-09-16 06:47:24 -07:00
|
|
|
}
|
|
|
|
|
2014-06-06 02:55:53 -07:00
|
|
|
chunks := s.head().add(v)
|
|
|
|
s.head().chunk = chunks[0]
|
|
|
|
|
2014-10-27 12:40:48 -07:00
|
|
|
var chunkDescsToPersist []*chunkDesc
|
|
|
|
if len(chunks) > 1 {
|
|
|
|
chunkDescsToPersist = append(chunkDescsToPersist, s.head())
|
2014-06-06 02:55:53 -07:00
|
|
|
for i, c := range chunks[1:] {
|
2014-10-22 10:21:23 -07:00
|
|
|
cd := newChunkDesc(c)
|
2014-06-06 02:55:53 -07:00
|
|
|
s.chunkDescs = append(s.chunkDescs, cd)
|
|
|
|
// The last chunk is still growing.
|
|
|
|
if i < len(chunks[1:])-1 {
|
2014-10-27 12:40:48 -07:00
|
|
|
chunkDescsToPersist = append(chunkDescsToPersist, cd)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-10-27 12:40:48 -07:00
|
|
|
return chunkDescsToPersist
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-11-13 11:50:25 -08:00
|
|
|
// evictChunkDescs evicts chunkDescs if there are chunkDescEvictionFactor times
|
|
|
|
// more than non-evicted chunks. iOldestNotEvicted is the index within the
|
|
|
|
// current chunkDescs of the oldest chunk that is not evicted.
|
|
|
|
func (s *memorySeries) evictChunkDescs(iOldestNotEvicted int) {
|
|
|
|
lenToKeep := chunkDescEvictionFactor * (len(s.chunkDescs) - iOldestNotEvicted)
|
|
|
|
if lenToKeep < len(s.chunkDescs) {
|
|
|
|
s.savedFirstTime = s.firstTime()
|
|
|
|
lenEvicted := len(s.chunkDescs) - lenToKeep
|
|
|
|
s.chunkDescsOffset += lenEvicted
|
|
|
|
chunkDescOps.WithLabelValues(evict).Add(float64(lenEvicted))
|
2014-11-27 09:25:03 -08:00
|
|
|
numMemChunkDescs.Sub(float64(lenEvicted))
|
2014-11-13 11:50:25 -08:00
|
|
|
s.chunkDescs = append(
|
|
|
|
make([]*chunkDesc, 0, lenToKeep),
|
|
|
|
s.chunkDescs[lenEvicted:]...,
|
|
|
|
)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-02-26 06:19:44 -08:00
|
|
|
// dropChunks removes chunkDescs older than t. It returns the number of dropped
|
|
|
|
// chunkDescs and true if all chunkDescs have been dropped.
|
2014-10-14 08:44:01 -07:00
|
|
|
//
|
2014-10-07 10:11:24 -07:00
|
|
|
// The caller must have locked the fingerprint of the series.
|
2015-02-26 06:19:44 -08:00
|
|
|
func (s *memorySeries) dropChunks(t clientmodel.Timestamp) (int, bool) {
|
2014-06-06 02:55:53 -07:00
|
|
|
keepIdx := len(s.chunkDescs)
|
|
|
|
for i, cd := range s.chunkDescs {
|
|
|
|
if !cd.lastTime().Before(t) {
|
|
|
|
keepIdx = i
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2014-10-27 12:40:48 -07:00
|
|
|
if keepIdx > 0 {
|
|
|
|
s.chunkDescs = append(make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx), s.chunkDescs[keepIdx:]...)
|
2014-11-27 09:25:03 -08:00
|
|
|
numMemChunkDescs.Sub(float64(keepIdx))
|
2014-10-27 12:40:48 -07:00
|
|
|
}
|
|
|
|
return keepIdx, len(s.chunkDescs) == 0
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-10-07 10:11:24 -07:00
|
|
|
// preloadChunks is an internal helper method.
|
2014-11-13 11:50:25 -08:00
|
|
|
func (s *memorySeries) preloadChunks(indexes []int, mss *memorySeriesStorage) ([]*chunkDesc, error) {
|
2014-06-06 02:55:53 -07:00
|
|
|
loadIndexes := []int{}
|
2014-10-15 06:53:05 -07:00
|
|
|
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
|
2014-06-06 02:55:53 -07:00
|
|
|
for _, idx := range indexes {
|
2014-10-22 10:21:23 -07:00
|
|
|
cd := s.chunkDescs[idx]
|
|
|
|
pinnedChunkDescs = append(pinnedChunkDescs, cd)
|
2014-11-13 11:50:25 -08:00
|
|
|
cd.pin(mss.evictRequests) // Have to pin everything first to prevent immediate eviction on chunk loading.
|
2014-10-22 10:21:23 -07:00
|
|
|
if cd.isEvicted() {
|
2014-06-06 02:55:53 -07:00
|
|
|
loadIndexes = append(loadIndexes, idx)
|
|
|
|
}
|
|
|
|
}
|
2014-10-22 10:21:23 -07:00
|
|
|
chunkOps.WithLabelValues(pin).Add(float64(len(pinnedChunkDescs)))
|
2014-06-06 02:55:53 -07:00
|
|
|
|
|
|
|
if len(loadIndexes) > 0 {
|
2014-10-27 12:40:48 -07:00
|
|
|
if s.chunkDescsOffset == -1 {
|
|
|
|
panic("requested loading chunks from persistence in a situation where we must not have persisted data for chunk descriptors in memory")
|
|
|
|
}
|
2014-06-06 02:55:53 -07:00
|
|
|
fp := s.metric.Fingerprint()
|
2014-11-27 11:46:45 -08:00
|
|
|
chunks, err := mss.loadChunks(fp, loadIndexes, s.chunkDescsOffset)
|
2014-06-06 02:55:53 -07:00
|
|
|
if err != nil {
|
2014-10-22 10:21:23 -07:00
|
|
|
// Unpin the chunks since we won't return them as pinned chunks now.
|
2014-08-20 06:05:58 -07:00
|
|
|
for _, cd := range pinnedChunkDescs {
|
2014-11-13 11:50:25 -08:00
|
|
|
cd.unpin(mss.evictRequests)
|
2014-08-20 06:05:58 -07:00
|
|
|
}
|
2014-10-22 10:21:23 -07:00
|
|
|
chunkOps.WithLabelValues(unpin).Add(float64(len(pinnedChunkDescs)))
|
2014-06-06 02:55:53 -07:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
for i, c := range chunks {
|
2014-10-22 10:21:23 -07:00
|
|
|
s.chunkDescs[loadIndexes[i]].setChunk(c)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
2014-10-22 10:21:23 -07:00
|
|
|
chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
|
2014-10-23 06:18:32 -07:00
|
|
|
atomic.AddInt64(&numMemChunks, int64(len(chunks)))
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
return pinnedChunkDescs, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
/*
|
2014-10-07 10:11:24 -07:00
|
|
|
func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p *persistence) (chunkDescs, error) {
|
2014-06-06 02:55:53 -07:00
|
|
|
s.mtx.Lock()
|
|
|
|
defer s.mtx.Unlock()
|
|
|
|
|
|
|
|
if len(s.chunkDescs) == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var pinIndexes []int
|
|
|
|
// Find first chunk where lastTime() is after or equal to t.
|
|
|
|
i := sort.Search(len(s.chunkDescs), func(i int) bool {
|
|
|
|
return !s.chunkDescs[i].lastTime().Before(t)
|
|
|
|
})
|
|
|
|
switch i {
|
|
|
|
case 0:
|
|
|
|
pinIndexes = []int{0}
|
|
|
|
case len(s.chunkDescs):
|
|
|
|
pinIndexes = []int{i - 1}
|
|
|
|
default:
|
|
|
|
if s.chunkDescs[i].contains(t) {
|
|
|
|
pinIndexes = []int{i}
|
|
|
|
} else {
|
|
|
|
pinIndexes = []int{i - 1, i}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return s.preloadChunks(pinIndexes, p)
|
|
|
|
}
|
|
|
|
*/
|
|
|
|
|
2014-10-07 10:11:24 -07:00
|
|
|
// preloadChunksForRange loads chunks for the given range from the persistence.
|
|
|
|
// The caller must have locked the fingerprint of the series.
|
2014-10-14 09:23:32 -07:00
|
|
|
func (s *memorySeries) preloadChunksForRange(
|
|
|
|
from clientmodel.Timestamp, through clientmodel.Timestamp,
|
2014-11-13 11:50:25 -08:00
|
|
|
fp clientmodel.Fingerprint, mss *memorySeriesStorage,
|
2014-10-15 06:53:05 -07:00
|
|
|
) ([]*chunkDesc, error) {
|
2015-02-12 08:23:42 -08:00
|
|
|
firstChunkDescTime := clientmodel.Latest
|
2014-10-14 09:23:32 -07:00
|
|
|
if len(s.chunkDescs) > 0 {
|
|
|
|
firstChunkDescTime = s.chunkDescs[0].firstTime()
|
|
|
|
}
|
2014-10-27 12:40:48 -07:00
|
|
|
if s.chunkDescsOffset != 0 && from.Before(firstChunkDescTime) {
|
2014-11-27 11:46:45 -08:00
|
|
|
cds, err := mss.loadChunkDescs(fp, firstChunkDescTime)
|
2014-10-14 09:23:32 -07:00
|
|
|
if err != nil {
|
2014-06-06 02:55:53 -07:00
|
|
|
return nil, err
|
|
|
|
}
|
2014-10-14 09:23:32 -07:00
|
|
|
s.chunkDescs = append(cds, s.chunkDescs...)
|
2014-10-27 12:40:48 -07:00
|
|
|
s.chunkDescsOffset = 0
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
if len(s.chunkDescs) == 0 {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Find first chunk with start time after "from".
|
|
|
|
fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
|
|
|
|
return s.chunkDescs[i].firstTime().After(from)
|
|
|
|
})
|
|
|
|
// Find first chunk with start time after "through".
|
|
|
|
throughIdx := sort.Search(len(s.chunkDescs), func(i int) bool {
|
|
|
|
return s.chunkDescs[i].firstTime().After(through)
|
|
|
|
})
|
|
|
|
if fromIdx > 0 {
|
|
|
|
fromIdx--
|
|
|
|
}
|
|
|
|
if throughIdx == len(s.chunkDescs) {
|
|
|
|
throughIdx--
|
|
|
|
}
|
|
|
|
|
|
|
|
pinIndexes := make([]int, 0, throughIdx-fromIdx+1)
|
|
|
|
for i := fromIdx; i <= throughIdx; i++ {
|
|
|
|
pinIndexes = append(pinIndexes, i)
|
|
|
|
}
|
2014-11-13 11:50:25 -08:00
|
|
|
return s.preloadChunks(pinIndexes, mss)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-10-27 07:55:44 -07:00
|
|
|
// newIterator returns a new SeriesIterator. The caller must have locked the
|
|
|
|
// fingerprint of the memorySeries.
|
2014-10-07 10:11:24 -07:00
|
|
|
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
2014-10-15 06:53:05 -07:00
|
|
|
chunks := make([]chunk, 0, len(s.chunkDescs))
|
2014-06-06 02:55:53 -07:00
|
|
|
for i, cd := range s.chunkDescs {
|
2014-12-03 09:07:23 -08:00
|
|
|
if chunk := cd.getChunk(); chunk != nil {
|
2014-10-27 07:55:44 -07:00
|
|
|
if i == len(s.chunkDescs)-1 && !s.headChunkPersisted {
|
|
|
|
s.headChunkUsedByIterator = true
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
2014-12-03 09:07:23 -08:00
|
|
|
chunks = append(chunks, chunk)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &memorySeriesIterator{
|
2014-10-07 10:11:24 -07:00
|
|
|
lock: lockFunc,
|
|
|
|
unlock: unlockFunc,
|
2014-06-06 02:55:53 -07:00
|
|
|
chunks: chunks,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-10-28 11:01:41 -07:00
|
|
|
// head returns a pointer to the head chunk descriptor. The caller must have
|
Fix a bug handling freshly unarchived series.
Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
2015-01-08 07:10:31 -08:00
|
|
|
// locked the fingerprint of the memorySeries. This method will panic if this
|
|
|
|
// series has no chunk descriptors.
|
2014-06-06 02:55:53 -07:00
|
|
|
func (s *memorySeries) head() *chunkDesc {
|
|
|
|
return s.chunkDescs[len(s.chunkDescs)-1]
|
|
|
|
}
|
|
|
|
|
2014-10-28 11:01:41 -07:00
|
|
|
// firstTime returns the timestamp of the first sample in the series. The caller
|
|
|
|
// must have locked the fingerprint of the memorySeries.
|
2014-09-15 10:24:26 -07:00
|
|
|
func (s *memorySeries) firstTime() clientmodel.Timestamp {
|
2014-11-05 11:02:45 -08:00
|
|
|
if s.chunkDescsOffset == 0 && len(s.chunkDescs) > 0 {
|
|
|
|
return s.chunkDescs[0].firstTime()
|
|
|
|
}
|
|
|
|
return s.savedFirstTime
|
2014-09-15 10:24:26 -07:00
|
|
|
}
|
|
|
|
|
2014-10-14 04:52:39 -07:00
|
|
|
// memorySeriesIterator implements SeriesIterator.
|
|
|
|
type memorySeriesIterator struct {
|
|
|
|
lock, unlock func()
|
|
|
|
chunkIt chunkIterator
|
2014-10-15 06:53:05 -07:00
|
|
|
chunks []chunk
|
2014-10-14 04:52:39 -07:00
|
|
|
}
|
|
|
|
|
2014-09-16 06:47:24 -07:00
|
|
|
// GetValueAtTime implements SeriesIterator.
|
2014-06-06 02:55:53 -07:00
|
|
|
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
|
2014-10-07 10:11:24 -07:00
|
|
|
it.lock()
|
|
|
|
defer it.unlock()
|
2014-06-06 02:55:53 -07:00
|
|
|
|
|
|
|
// The most common case. We are iterating through a chunk.
|
|
|
|
if it.chunkIt != nil && it.chunkIt.contains(t) {
|
|
|
|
return it.chunkIt.getValueAtTime(t)
|
|
|
|
}
|
|
|
|
|
|
|
|
it.chunkIt = nil
|
|
|
|
|
|
|
|
if len(it.chunks) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Before or exactly on the first sample of the series.
|
|
|
|
if !t.After(it.chunks[0].firstTime()) {
|
|
|
|
// return first value of first chunk
|
|
|
|
return it.chunks[0].newIterator().getValueAtTime(t)
|
|
|
|
}
|
|
|
|
// After or exactly on the last sample of the series.
|
|
|
|
if !t.Before(it.chunks[len(it.chunks)-1].lastTime()) {
|
|
|
|
// return last value of last chunk
|
|
|
|
return it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(t)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Find first chunk where lastTime() is after or equal to t.
|
|
|
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
|
|
|
return !it.chunks[i].lastTime().Before(t)
|
|
|
|
})
|
|
|
|
if i == len(it.chunks) {
|
|
|
|
panic("out of bounds")
|
|
|
|
}
|
|
|
|
|
|
|
|
if t.Before(it.chunks[i].firstTime()) {
|
|
|
|
// We ended up between two chunks.
|
|
|
|
return metric.Values{
|
|
|
|
it.chunks[i-1].newIterator().getValueAtTime(t)[0],
|
|
|
|
it.chunks[i].newIterator().getValueAtTime(t)[0],
|
|
|
|
}
|
|
|
|
}
|
2014-08-12 08:46:46 -07:00
|
|
|
// We ended up in the middle of a chunk. We might stay there for a while,
|
|
|
|
// so save it as the current chunk iterator.
|
|
|
|
it.chunkIt = it.chunks[i].newIterator()
|
|
|
|
return it.chunkIt.getValueAtTime(t)
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-09-24 07:32:07 -07:00
|
|
|
// GetBoundaryValues implements SeriesIterator.
|
2014-06-06 02:55:53 -07:00
|
|
|
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
|
2014-10-07 10:11:24 -07:00
|
|
|
it.lock()
|
|
|
|
defer it.unlock()
|
2014-09-16 06:47:24 -07:00
|
|
|
|
|
|
|
// Find the first relevant chunk.
|
|
|
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
|
|
|
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
|
|
|
})
|
2014-10-10 06:34:09 -07:00
|
|
|
values := make(metric.Values, 0, 2)
|
|
|
|
for i, c := range it.chunks[i:] {
|
2014-09-16 06:47:24 -07:00
|
|
|
var chunkIt chunkIterator
|
|
|
|
if c.firstTime().After(in.NewestInclusive) {
|
|
|
|
if len(values) == 1 {
|
2014-10-28 11:01:41 -07:00
|
|
|
// We found the first value before, but are now
|
2014-09-16 06:47:24 -07:00
|
|
|
// already past the last value. The value we
|
|
|
|
// want must be the last value of the previous
|
|
|
|
// chunk. So backtrack...
|
|
|
|
chunkIt = it.chunks[i-1].newIterator()
|
|
|
|
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if len(values) == 0 {
|
|
|
|
chunkIt = c.newIterator()
|
|
|
|
firstValues := chunkIt.getValueAtTime(in.OldestInclusive)
|
|
|
|
switch len(firstValues) {
|
|
|
|
case 2:
|
|
|
|
values = append(values, firstValues[1])
|
|
|
|
case 1:
|
|
|
|
values = firstValues
|
|
|
|
default:
|
|
|
|
panic("unexpected return from getValueAtTime")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if c.lastTime().After(in.NewestInclusive) {
|
|
|
|
if chunkIt == nil {
|
|
|
|
chunkIt = c.newIterator()
|
|
|
|
}
|
|
|
|
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
2014-10-10 06:34:09 -07:00
|
|
|
if len(values) == 1 {
|
|
|
|
// We found exactly one value. In that case, add the most recent we know.
|
|
|
|
values = append(
|
|
|
|
values,
|
|
|
|
it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(in.NewestInclusive)[0],
|
|
|
|
)
|
|
|
|
}
|
2014-09-16 06:47:24 -07:00
|
|
|
if len(values) == 2 && values[0].Equal(&values[1]) {
|
|
|
|
return values[:1]
|
|
|
|
}
|
|
|
|
return values
|
2014-06-06 02:55:53 -07:00
|
|
|
}
|
|
|
|
|
2014-09-24 07:32:07 -07:00
|
|
|
// GetRangeValues implements SeriesIterator.
|
2014-06-06 02:55:53 -07:00
|
|
|
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
|
2014-10-07 10:11:24 -07:00
|
|
|
it.lock()
|
|
|
|
defer it.unlock()
|
2014-06-06 02:55:53 -07:00
|
|
|
|
|
|
|
// Find the first relevant chunk.
|
|
|
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
|
|
|
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
|
|
|
})
|
|
|
|
values := metric.Values{}
|
|
|
|
for _, c := range it.chunks[i:] {
|
|
|
|
if c.firstTime().After(in.NewestInclusive) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
// TODO: actually reuse an iterator between calls if we get multiple ranges
|
|
|
|
// from the same chunk.
|
|
|
|
values = append(values, c.newIterator().getRangeValues(in)...)
|
|
|
|
}
|
|
|
|
return values
|
|
|
|
}
|
2014-10-14 04:52:39 -07:00
|
|
|
|
|
|
|
// nopSeriesIterator implements Series Iterator. It never returns any values.
|
|
|
|
type nopSeriesIterator struct{}
|
|
|
|
|
|
|
|
// GetValueAtTime implements SeriesIterator.
|
|
|
|
func (_ nopSeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
|
|
|
|
return metric.Values{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetBoundaryValues implements SeriesIterator.
|
|
|
|
func (_ nopSeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
|
|
|
|
return metric.Values{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GetRangeValues implements SeriesIterator.
|
|
|
|
func (_ nopSeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
|
|
|
|
return metric.Values{}
|
|
|
|
}
|