Squash a number of TODOs.

- Staleness delta is no a proper function parameter and not replicated
  from package ast.

- Named type 'chunks' replaced by explicit '[]chunk' to avoid confusion.

- For the same reason, replaced 'chunkDescs' by '[]*chunkDescs'.

- Verified that math.Modf is not a speed enhancement over conversion
  (actually 5x slower).

- Renamed firstTimeField, lastTimeField into chunkFirstTime and
  chunkLastTime.

- Verified unpin() is sufficiently goroutine-safe.

- Decided not to update archivedFingerprintToTimeRange upon series
  truncation and added a rationale why.

Change-Id: I863b8d785e5ad9f71eb63e229845eacf1bed8534
This commit is contained in:
Bjoern Rabenstein 2014-10-15 15:53:05 +02:00
parent 427c8d53a5
commit 096fa0f8b2
9 changed files with 68 additions and 65 deletions

View file

@ -119,13 +119,13 @@ func prepareInstantQuery(node Node, timestamp clientmodel.Timestamp, storage loc
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
p := storage.NewPreloader()
for fp, rangeDuration := range analyzer.FullRanges {
if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp); err != nil {
if err := p.PreloadRange(fp, timestamp.Add(-rangeDuration), timestamp, *stalenessDelta); err != nil {
p.Close()
return nil, err
}
}
for fp := range analyzer.IntervalRanges {
if err := p.PreloadRange(fp, timestamp, timestamp); err != nil {
if err := p.PreloadRange(fp, timestamp, timestamp, *stalenessDelta); err != nil {
p.Close()
return nil, err
}
@ -150,7 +150,7 @@ func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.T
preloadTimer := queryStats.GetTimer(stats.PreloadTime).Start()
p := storage.NewPreloader()
for fp, rangeDuration := range analyzer.FullRanges {
if err := p.PreloadRange(fp, start.Add(-rangeDuration), end); err != nil {
if err := p.PreloadRange(fp, start.Add(-rangeDuration), end, *stalenessDelta); err != nil {
p.Close()
return nil, err
}
@ -169,7 +169,7 @@ func prepareRangeQuery(node Node, start clientmodel.Timestamp, end clientmodel.T
*/
}
for fp := range analyzer.IntervalRanges {
if err := p.PreloadRange(fp, start, end); err != nil {
if err := p.PreloadRange(fp, start, end, *stalenessDelta); err != nil {
p.Close()
return nil, err
}

View file

@ -21,12 +21,6 @@ import (
"github.com/prometheus/prometheus/storage/metric"
)
// chunks is just a chunk slice. No methods are defined for this named type.
// TODO: Perhaps we should remove it? It might avoid errors if it's
// syntactically clear that we are dealing with a vanilly slice and not some
// kind of more complex collection.
type chunks []chunk
// chunk is the interface for all chunks. Chunks are generally not
// goroutine-safe.
type chunk interface {
@ -36,7 +30,7 @@ type chunk interface {
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the orginal chunk.
add(*metric.SamplePair) chunks
add(*metric.SamplePair) []chunk
clone() chunk
firstTime() clientmodel.Timestamp
lastTime() clientmodel.Timestamp
@ -67,11 +61,11 @@ type chunkIterator interface {
contains(clientmodel.Timestamp) bool
}
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) chunks {
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
numTranscodes.Inc()
head := dst
body := chunks{}
body := []chunk{}
for v := range src.values() {
newChunks := head.add(v)
body = append(body, newChunks[:len(newChunks)-1]...)

View file

@ -157,7 +157,7 @@ func (c *deltaEncodedChunk) baseValue() clientmodel.SampleValue {
}
// add implements chunk.
func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks {
func (c *deltaEncodedChunk) add(s *metric.SamplePair) []chunk {
if len(c.buf) < deltaHeaderBytes {
c.buf = c.buf[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c.buf[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
@ -172,7 +172,7 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks {
if remainingBytes < sampleSize {
//fmt.Println("overflow")
overflowChunks := c.newFollowupChunk().add(s)
return chunks{c, overflowChunks[0]}
return []chunk{c, overflowChunks[0]}
}
dt := s.Timestamp - c.baseTime()
@ -184,7 +184,7 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks {
// existing chunk data into new chunk(s).
//
// int->float.
// TODO: compare speed with Math.Modf.
// Note: Using math.Modf is slower than the conversion approach below.
if c.isInt() && clientmodel.SampleValue(int64(dv)) != dv {
//fmt.Println("int->float", len(c.buf), cap(c.buf), dv)
return transcodeAndAdd(newDeltaEncodedChunk(tb, d4, false), c, s)
@ -247,7 +247,7 @@ func (c *deltaEncodedChunk) add(s *metric.SamplePair) chunks {
panic("invalid number of bytes for floating point delta")
}
}
return chunks{c}
return []chunk{c}
}
func (c *deltaEncodedChunk) sampleSize() int {

View file

@ -14,6 +14,7 @@
package local
import (
"time"
clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus"
@ -73,7 +74,11 @@ type SeriesIterator interface {
// them until released via Close(). Its methods are generally not
// goroutine-safe.
type Preloader interface {
PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error
PreloadRange(
fp clientmodel.Fingerprint,
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) error
/*
// GetMetricAtTime loads and pins samples around a given time.
GetMetricAtTime(clientmodel.Fingerprint, clientmodel.Timestamp) error

View file

@ -247,7 +247,7 @@ func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error {
// with the earliest time will have index 0, the following ones will have
// incrementally larger indexes. It is the caller's responsibility to not
// persist or drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) {
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) ([]chunk, error) {
// TODO: we need to verify at some point that file length is a multiple of
// the chunk size. When is the best time to do this, and where to remember
// it? Right now, we only do it when loading chunkDescs.
@ -257,7 +257,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chu
}
defer f.Close()
chunks := make(chunks, 0, len(indexes))
chunks := make([]chunk, 0, len(indexes))
defer func() {
if err == nil {
return
@ -270,14 +270,12 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chu
if err != nil {
return nil, err
}
// TODO: check seek offset too?
n, err := f.Read(typeBuf)
if err != nil {
return nil, err
}
if n != 1 {
// Shouldn't happen?
panic("read returned != 1 bytes")
}
@ -295,7 +293,7 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chu
// loadChunkDescs loads chunkDescs for a series up until a given time. It is
// the caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) {
func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) ([]*chunkDesc, error) {
f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) {
return nil, nil
@ -323,7 +321,7 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
}
numChunks := int(fi.Size()) / totalChunkLen
cds := make(chunkDescs, 0, numChunks)
cds := make([]*chunkDesc, 0, numChunks)
for i := 0; i < numChunks; i++ {
_, err := f.Seek(p.offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
if err != nil {
@ -336,8 +334,8 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie
return nil, err
}
cd := &chunkDesc{
firstTimeField: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)),
lastTimeField: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
chunkFirstTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf)),
chunkLastTime: clientmodel.Timestamp(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
}
if !cd.firstTime().Before(beforeTime) {
// From here on, we have chunkDescs in memory already.
@ -480,7 +478,7 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
if err != nil {
return nil, err
}
chunkDescs := make(chunkDescs, numChunkDescs)
chunkDescs := make([]*chunkDesc, numChunkDescs)
for i := int64(0); i < numChunkDescs; i++ {
if headChunkPersisted || i < numChunkDescs-1 {
@ -493,8 +491,8 @@ func (p *persistence) loadSeriesMapAndHeads() (*seriesMap, error) {
return nil, err
}
chunkDescs[i] = &chunkDesc{
firstTimeField: clientmodel.Timestamp(firstTime),
lastTimeField: clientmodel.Timestamp(lastTime),
chunkFirstTime: clientmodel.Timestamp(firstTime),
chunkLastTime: clientmodel.Timestamp(lastTime),
}
} else {
// Non-persisted head chunk.

View file

@ -38,7 +38,7 @@ func newTestPersistence(t *testing.T) (*persistence, test.Closer) {
})
}
func buildTestChunks() map[clientmodel.Fingerprint]chunks {
func buildTestChunks() map[clientmodel.Fingerprint][]chunk {
fps := clientmodel.Fingerprints{
clientmodel.Metric{
"label": "value1",
@ -50,10 +50,10 @@ func buildTestChunks() map[clientmodel.Fingerprint]chunks {
"label": "value3",
}.Fingerprint(),
}
fpToChunks := map[clientmodel.Fingerprint]chunks{}
fpToChunks := map[clientmodel.Fingerprint][]chunk{}
for _, fp := range fps {
fpToChunks[fp] = make(chunks, 0, 10)
fpToChunks[fp] = make([]chunk, 0, 10)
for i := 0; i < 10; i++ {
fpToChunks[fp] = append(fpToChunks[fp], newDeltaEncodedChunk(d1, d1, true).add(&metric.SamplePair{
Timestamp: clientmodel.Timestamp(i),

View file

@ -14,18 +14,24 @@
package local
import (
"time"
clientmodel "github.com/prometheus/client_golang/model"
)
// memorySeriesPreloader is a Preloader for the memorySeriesStorage.
type memorySeriesPreloader struct {
storage *memorySeriesStorage
pinnedChunkDescs chunkDescs
pinnedChunkDescs []*chunkDesc
}
// PreloadRange implements Preloader.
func (p *memorySeriesPreloader) PreloadRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) error {
cds, err := p.storage.preloadChunksForRange(fp, from, through)
func (p *memorySeriesPreloader) PreloadRange(
fp clientmodel.Fingerprint,
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) error {
cds, err := p.storage.preloadChunksForRange(fp, from, through, stalenessDelta)
if err != nil {
return err
}
@ -93,11 +99,11 @@ func (p *memorySeriesPreloader) GetMetricRangeAtInterval(fp clientmodel.Fingerpr
// Close implements Preloader.
func (p *memorySeriesPreloader) Close() {
// TODO: Idea about a primitive but almost free heuristic to not evict
// "recently used" chunks: Do not unpin the chunks right here, but hand
// over the pinnedChunkDescs to a manager that will delay the unpinning
// based on time and memory pressure.
for _, cd := range p.pinnedChunkDescs {
// TODO: unpinning may synchronously cause closing of chunks if they have
// been marked to be evicted. This could interfere with other parts of the
// storage that check whether a chunk is swapped in or not. Is it a good
// idea / sufficient to take the storage lock here?
cd.unpin()
}
}

View file

@ -123,18 +123,16 @@ func (sm *seriesMap) fpIter() <-chan clientmodel.Fingerprint {
return ch
}
type chunkDescs []*chunkDesc
type chunkDesc struct {
sync.Mutex
chunk chunk
refCount int
evict bool
firstTimeField clientmodel.Timestamp // TODO: stupid name, reorganize.
lastTimeField clientmodel.Timestamp
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
}
func (cd *chunkDesc) add(s *metric.SamplePair) chunks {
func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
cd.Lock()
defer cd.Unlock()
@ -168,7 +166,7 @@ func (cd *chunkDesc) firstTime() clientmodel.Timestamp {
defer cd.Unlock()
if cd.chunk == nil {
return cd.firstTimeField
return cd.chunkFirstTime
}
return cd.chunk.firstTime()
}
@ -178,7 +176,7 @@ func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
defer cd.Unlock()
if cd.chunk == nil {
return cd.lastTimeField
return cd.chunkLastTime
}
return cd.chunk.lastTime()
}
@ -228,15 +226,15 @@ func (cd *chunkDesc) evictOnUnpin() bool {
// evictNow is an internal helper method.
func (cd *chunkDesc) evictNow() {
cd.firstTimeField = cd.chunk.firstTime()
cd.lastTimeField = cd.chunk.lastTime()
cd.chunkFirstTime = cd.chunk.firstTime()
cd.chunkLastTime = cd.chunk.lastTime()
cd.chunk = nil
}
type memorySeries struct {
metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs chunkDescs
chunkDescs []*chunkDesc
// Whether chunkDescs for chunks on disk are all loaded. If false, some
// (or all) chunkDescs are only on disk. These chunks are all contiguous
// and at the tail end.
@ -367,9 +365,9 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
}
// preloadChunks is an internal helper method.
func (s *memorySeries) preloadChunks(indexes []int, p *persistence) (chunkDescs, error) {
func (s *memorySeries) preloadChunks(indexes []int, p *persistence) ([]*chunkDesc, error) {
loadIndexes := []int{}
pinnedChunkDescs := make(chunkDescs, 0, len(indexes))
pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes))
for _, idx := range indexes {
pinnedChunkDescs = append(pinnedChunkDescs, s.chunkDescs[idx])
if s.chunkDescs[idx].isEvicted() {
@ -436,7 +434,7 @@ func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p *persisten
func (s *memorySeries) preloadChunksForRange(
from clientmodel.Timestamp, through clientmodel.Timestamp,
fp clientmodel.Fingerprint, p *persistence,
) (chunkDescs, error) {
) ([]*chunkDesc, error) {
firstChunkDescTime := through
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime()
@ -477,7 +475,7 @@ func (s *memorySeries) preloadChunksForRange(
}
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
chunks := make(chunks, 0, len(s.chunkDescs))
chunks := make([]chunk, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs {
if !cd.isEvicted() {
if i == len(s.chunkDescs)-1 {
@ -521,7 +519,7 @@ func (s *memorySeries) lastTime() clientmodel.Timestamp {
type memorySeriesIterator struct {
lock, unlock func()
chunkIt chunkIterator
chunks chunks
chunks []chunk
}
// GetValueAtTime implements SeriesIterator.

View file

@ -151,8 +151,11 @@ func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts
}
*/
func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
stalenessDelta := 300 * time.Second // TODO: Turn into parameter.
func (s *memorySeriesStorage) preloadChunksForRange(
fp clientmodel.Fingerprint,
from clientmodel.Timestamp, through clientmodel.Timestamp,
stalenessDelta time.Duration,
) ([]*chunkDesc, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
@ -312,9 +315,6 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
glog.Info("Interrupted running series purge.")
return
default:
// TODO: Decide whether we also want to adjust the timerange index
// entries here. Not updating them shouldn't break anything, but will
// make some scenarios less efficient.
s.purgeSeries(fp, ts)
}
}
@ -348,12 +348,14 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
// If we arrive here, nothing was in memory, so the metric must have
// been archived. Drop the archived metric if there are no persisted
// chunks left.
if !allDropped {
return
}
if err := s.persistence.dropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
// chunks left. If we do drop the archived metric, we should update the
// archivedFingerprintToTimeRange index according to the remaining
// chunks, but it's probably not worth the effort. Queries going beyond
// the purge cut-off can be truncated in a more direct fashion.
if allDropped {
if err := s.persistence.dropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
}
}
}