mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-13 06:47:28 -08:00
Merge pull request #706 from prometheus/beorn7/persistence2
Improve iterator performance.
This commit is contained in:
commit
c44e7cd105
|
@ -73,7 +73,7 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
|
||||||
switch n := node.(type) {
|
switch n := node.(type) {
|
||||||
case *VectorSelector:
|
case *VectorSelector:
|
||||||
pt := getPreloadTimes(n.Offset)
|
pt := getPreloadTimes(n.Offset)
|
||||||
fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers)
|
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers)
|
||||||
n.fingerprints = fpts
|
n.fingerprints = fpts
|
||||||
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
|
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
|
||||||
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
|
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
|
||||||
|
@ -84,11 +84,11 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
|
||||||
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
|
if _, alreadyInRanges := pt.ranges[fp]; !alreadyInRanges {
|
||||||
pt.instants[fp] = struct{}{}
|
pt.instants[fp] = struct{}{}
|
||||||
}
|
}
|
||||||
n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp)
|
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
|
||||||
}
|
}
|
||||||
case *MatrixSelector:
|
case *MatrixSelector:
|
||||||
pt := getPreloadTimes(n.Offset)
|
pt := getPreloadTimes(n.Offset)
|
||||||
fpts := a.Storage.GetFingerprintsForLabelMatchers(n.LabelMatchers)
|
fpts := a.Storage.FingerprintsForLabelMatchers(n.LabelMatchers)
|
||||||
n.fingerprints = fpts
|
n.fingerprints = fpts
|
||||||
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
|
n.metrics = map[clientmodel.Fingerprint]clientmodel.COWMetric{}
|
||||||
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
|
n.iterators = map[clientmodel.Fingerprint]local.SeriesIterator{}
|
||||||
|
@ -100,7 +100,7 @@ func (a *Analyzer) Analyze(ctx context.Context) error {
|
||||||
// an instant for the same fingerprint, should we have one.
|
// an instant for the same fingerprint, should we have one.
|
||||||
delete(pt.instants, fp)
|
delete(pt.instants, fp)
|
||||||
}
|
}
|
||||||
n.metrics[fp] = a.Storage.GetMetricForFingerprint(fp)
|
n.metrics[fp] = a.Storage.MetricForFingerprint(fp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
|
|
@ -644,7 +644,7 @@ func (ev *evaluator) eval(expr Expr) Value {
|
||||||
func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
|
func (ev *evaluator) vectorSelector(node *VectorSelector) Vector {
|
||||||
vec := Vector{}
|
vec := Vector{}
|
||||||
for fp, it := range node.iterators {
|
for fp, it := range node.iterators {
|
||||||
sampleCandidates := it.GetValueAtTime(ev.Timestamp.Add(-node.Offset))
|
sampleCandidates := it.ValueAtTime(ev.Timestamp.Add(-node.Offset))
|
||||||
samplePair := chooseClosestSample(sampleCandidates, ev.Timestamp.Add(-node.Offset))
|
samplePair := chooseClosestSample(sampleCandidates, ev.Timestamp.Add(-node.Offset))
|
||||||
if samplePair != nil {
|
if samplePair != nil {
|
||||||
vec = append(vec, &Sample{
|
vec = append(vec, &Sample{
|
||||||
|
@ -666,7 +666,7 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix {
|
||||||
|
|
||||||
sampleStreams := make([]*SampleStream, 0, len(node.iterators))
|
sampleStreams := make([]*SampleStream, 0, len(node.iterators))
|
||||||
for fp, it := range node.iterators {
|
for fp, it := range node.iterators {
|
||||||
samplePairs := it.GetRangeValues(interval)
|
samplePairs := it.RangeValues(interval)
|
||||||
if len(samplePairs) == 0 {
|
if len(samplePairs) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -695,7 +695,7 @@ func (ev *evaluator) matrixSelectorBounds(node *MatrixSelector) Matrix {
|
||||||
|
|
||||||
sampleStreams := make([]*SampleStream, 0, len(node.iterators))
|
sampleStreams := make([]*SampleStream, 0, len(node.iterators))
|
||||||
for fp, it := range node.iterators {
|
for fp, it := range node.iterators {
|
||||||
samplePairs := it.GetBoundaryValues(interval)
|
samplePairs := it.BoundaryValues(interval)
|
||||||
if len(samplePairs) == 0 {
|
if len(samplePairs) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,8 +41,8 @@ const (
|
||||||
// goroutine-safe proxies for chunk methods.
|
// goroutine-safe proxies for chunk methods.
|
||||||
type chunkDesc struct {
|
type chunkDesc struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
chunk chunk // nil if chunk is evicted.
|
c chunk // nil if chunk is evicted.
|
||||||
refCount int
|
rCnt int
|
||||||
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
|
chunkFirstTime clientmodel.Timestamp // Used if chunk is evicted.
|
||||||
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
|
chunkLastTime clientmodel.Timestamp // Used if chunk is evicted.
|
||||||
|
|
||||||
|
@ -59,14 +59,14 @@ func newChunkDesc(c chunk) *chunkDesc {
|
||||||
chunkOps.WithLabelValues(createAndPin).Inc()
|
chunkOps.WithLabelValues(createAndPin).Inc()
|
||||||
atomic.AddInt64(&numMemChunks, 1)
|
atomic.AddInt64(&numMemChunks, 1)
|
||||||
numMemChunkDescs.Inc()
|
numMemChunkDescs.Inc()
|
||||||
return &chunkDesc{chunk: c, refCount: 1}
|
return &chunkDesc{c: c, rCnt: 1}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
|
func (cd *chunkDesc) add(s *metric.SamplePair) []chunk {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
return cd.chunk.add(s)
|
return cd.c.add(s)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pin increments the refCount by one. Upon increment from 0 to 1, this
|
// pin increments the refCount by one. Upon increment from 0 to 1, this
|
||||||
|
@ -76,11 +76,11 @@ func (cd *chunkDesc) pin(evictRequests chan<- evictRequest) {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.refCount == 0 {
|
if cd.rCnt == 0 {
|
||||||
// Remove ourselves from the evict list.
|
// Remove ourselves from the evict list.
|
||||||
evictRequests <- evictRequest{cd, false}
|
evictRequests <- evictRequest{cd, false}
|
||||||
}
|
}
|
||||||
cd.refCount++
|
cd.rCnt++
|
||||||
}
|
}
|
||||||
|
|
||||||
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this
|
// unpin decrements the refCount by one. Upon decrement from 1 to 0, this
|
||||||
|
@ -90,69 +90,69 @@ func (cd *chunkDesc) unpin(evictRequests chan<- evictRequest) {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.refCount == 0 {
|
if cd.rCnt == 0 {
|
||||||
panic("cannot unpin already unpinned chunk")
|
panic("cannot unpin already unpinned chunk")
|
||||||
}
|
}
|
||||||
cd.refCount--
|
cd.rCnt--
|
||||||
if cd.refCount == 0 {
|
if cd.rCnt == 0 {
|
||||||
// Add ourselves to the back of the evict list.
|
// Add ourselves to the back of the evict list.
|
||||||
evictRequests <- evictRequest{cd, true}
|
evictRequests <- evictRequest{cd, true}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) getRefCount() int {
|
func (cd *chunkDesc) refCount() int {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
return cd.refCount
|
return cd.rCnt
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) firstTime() clientmodel.Timestamp {
|
func (cd *chunkDesc) firstTime() clientmodel.Timestamp {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.chunk == nil {
|
if cd.c == nil {
|
||||||
return cd.chunkFirstTime
|
return cd.chunkFirstTime
|
||||||
}
|
}
|
||||||
return cd.chunk.firstTime()
|
return cd.c.firstTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
|
func (cd *chunkDesc) lastTime() clientmodel.Timestamp {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.chunk == nil {
|
if cd.c == nil {
|
||||||
return cd.chunkLastTime
|
return cd.chunkLastTime
|
||||||
}
|
}
|
||||||
return cd.chunk.lastTime()
|
return cd.c.newIterator().lastTimestamp()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) isEvicted() bool {
|
func (cd *chunkDesc) isEvicted() bool {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
return cd.chunk == nil
|
return cd.c == nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
|
func (cd *chunkDesc) contains(t clientmodel.Timestamp) bool {
|
||||||
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
|
return !t.Before(cd.firstTime()) && !t.After(cd.lastTime())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) getChunk() chunk {
|
func (cd *chunkDesc) chunk() chunk {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
return cd.chunk
|
return cd.c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cd *chunkDesc) setChunk(c chunk) {
|
func (cd *chunkDesc) setChunk(c chunk) {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.chunk != nil {
|
if cd.c != nil {
|
||||||
panic("chunk already set")
|
panic("chunk already set")
|
||||||
}
|
}
|
||||||
cd.chunk = c
|
cd.c = c
|
||||||
}
|
}
|
||||||
|
|
||||||
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
|
// maybeEvict evicts the chunk if the refCount is 0. It returns whether the chunk
|
||||||
|
@ -162,15 +162,15 @@ func (cd *chunkDesc) maybeEvict() bool {
|
||||||
cd.Lock()
|
cd.Lock()
|
||||||
defer cd.Unlock()
|
defer cd.Unlock()
|
||||||
|
|
||||||
if cd.chunk == nil {
|
if cd.c == nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
if cd.refCount != 0 {
|
if cd.rCnt != 0 {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
cd.chunkFirstTime = cd.chunk.firstTime()
|
cd.chunkFirstTime = cd.c.firstTime()
|
||||||
cd.chunkLastTime = cd.chunk.lastTime()
|
cd.chunkLastTime = cd.c.newIterator().lastTimestamp()
|
||||||
cd.chunk = nil
|
cd.c = nil
|
||||||
chunkOps.WithLabelValues(evict).Inc()
|
chunkOps.WithLabelValues(evict).Inc()
|
||||||
atomic.AddInt64(&numMemChunks, -1)
|
atomic.AddInt64(&numMemChunks, -1)
|
||||||
return true
|
return true
|
||||||
|
@ -188,12 +188,38 @@ type chunk interface {
|
||||||
add(sample *metric.SamplePair) []chunk
|
add(sample *metric.SamplePair) []chunk
|
||||||
clone() chunk
|
clone() chunk
|
||||||
firstTime() clientmodel.Timestamp
|
firstTime() clientmodel.Timestamp
|
||||||
lastTime() clientmodel.Timestamp
|
|
||||||
newIterator() chunkIterator
|
newIterator() chunkIterator
|
||||||
marshal(io.Writer) error
|
marshal(io.Writer) error
|
||||||
unmarshal(io.Reader) error
|
unmarshal(io.Reader) error
|
||||||
unmarshalFromBuf([]byte)
|
unmarshalFromBuf([]byte)
|
||||||
encoding() chunkEncoding
|
encoding() chunkEncoding
|
||||||
|
}
|
||||||
|
|
||||||
|
// A chunkIterator enables efficient access to the content of a chunk. It is
|
||||||
|
// generally not safe to use a chunkIterator concurrently with or after chunk
|
||||||
|
// mutation.
|
||||||
|
type chunkIterator interface {
|
||||||
|
// length returns the number of samples in the chunk.
|
||||||
|
length() int
|
||||||
|
// Gets the timestamp of the n-th sample in the chunk.
|
||||||
|
timestampAtIndex(int) clientmodel.Timestamp
|
||||||
|
// Gets the last timestamp in the chunk.
|
||||||
|
lastTimestamp() clientmodel.Timestamp
|
||||||
|
// Gets the sample value of the n-th sample in the chunk.
|
||||||
|
sampleValueAtIndex(int) clientmodel.SampleValue
|
||||||
|
// Gets the last sample value in the chunk.
|
||||||
|
lastSampleValue() clientmodel.SampleValue
|
||||||
|
// Gets the two values that are immediately adjacent to a given time. In
|
||||||
|
// case a value exist at precisely the given time, only that single
|
||||||
|
// value is returned. Only the first or last value is returned (as a
|
||||||
|
// single value), if the given time is before or after the first or last
|
||||||
|
// value, respectively.
|
||||||
|
valueAtTime(clientmodel.Timestamp) metric.Values
|
||||||
|
// Gets all values contained within a given interval.
|
||||||
|
rangeValues(metric.Interval) metric.Values
|
||||||
|
// Whether a given timestamp is contained between first and last value
|
||||||
|
// in the chunk.
|
||||||
|
contains(clientmodel.Timestamp) bool
|
||||||
// values returns a channel, from which all sample values in the chunk
|
// values returns a channel, from which all sample values in the chunk
|
||||||
// can be received in order. The channel is closed after the last
|
// can be received in order. The channel is closed after the last
|
||||||
// one. It is generally not safe to mutate the chunk while the channel
|
// one. It is generally not safe to mutate the chunk while the channel
|
||||||
|
@ -201,29 +227,12 @@ type chunk interface {
|
||||||
values() <-chan *metric.SamplePair
|
values() <-chan *metric.SamplePair
|
||||||
}
|
}
|
||||||
|
|
||||||
// A chunkIterator enables efficient access to the content of a chunk. It is
|
|
||||||
// generally not safe to use a chunkIterator concurrently with or after chunk
|
|
||||||
// mutation.
|
|
||||||
type chunkIterator interface {
|
|
||||||
// Gets the two values that are immediately adjacent to a given time. In
|
|
||||||
// case a value exist at precisely the given time, only that single
|
|
||||||
// value is returned. Only the first or last value is returned (as a
|
|
||||||
// single value), if the given time is before or after the first or last
|
|
||||||
// value, respectively.
|
|
||||||
getValueAtTime(clientmodel.Timestamp) metric.Values
|
|
||||||
// Gets all values contained within a given interval.
|
|
||||||
getRangeValues(metric.Interval) metric.Values
|
|
||||||
// Whether a given timestamp is contained between first and last value
|
|
||||||
// in the chunk.
|
|
||||||
contains(clientmodel.Timestamp) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
|
func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
|
||||||
chunkOps.WithLabelValues(transcode).Inc()
|
chunkOps.WithLabelValues(transcode).Inc()
|
||||||
|
|
||||||
head := dst
|
head := dst
|
||||||
body := []chunk{}
|
body := []chunk{}
|
||||||
for v := range src.values() {
|
for v := range src.newIterator().values() {
|
||||||
newChunks := head.add(v)
|
newChunks := head.add(v)
|
||||||
body = append(body, newChunks[:len(newChunks)-1]...)
|
body = append(body, newChunks[:len(newChunks)-1]...)
|
||||||
head = newChunks[len(newChunks)-1]
|
head = newChunks[len(newChunks)-1]
|
||||||
|
|
|
@ -318,7 +318,7 @@ func (p *persistence) sanitizeSeries(
|
||||||
return fp, true
|
return fp, true
|
||||||
}
|
}
|
||||||
// This series is supposed to be archived.
|
// This series is supposed to be archived.
|
||||||
metric, err := p.getArchivedMetric(fp)
|
metric, err := p.archivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf(
|
log.Errorf(
|
||||||
"Fingerprint %v assumed archived but couldn't be looked up in archived index: %s",
|
"Fingerprint %v assumed archived but couldn't be looked up in archived index: %s",
|
||||||
|
|
|
@ -188,18 +188,19 @@ func (c deltaEncodedChunk) clone() chunk {
|
||||||
|
|
||||||
// firstTime implements chunk.
|
// firstTime implements chunk.
|
||||||
func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
|
func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp {
|
||||||
return c.valueAtIndex(0).Timestamp
|
return c.baseTime()
|
||||||
}
|
|
||||||
|
|
||||||
// lastTime implements chunk.
|
|
||||||
func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp {
|
|
||||||
return c.valueAtIndex(c.len() - 1).Timestamp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newIterator implements chunk.
|
// newIterator implements chunk.
|
||||||
func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
func (c *deltaEncodedChunk) newIterator() chunkIterator {
|
||||||
return &deltaEncodedChunkIterator{
|
return &deltaEncodedChunkIterator{
|
||||||
chunk: c,
|
c: *c,
|
||||||
|
len: c.len(),
|
||||||
|
baseT: c.baseTime(),
|
||||||
|
baseV: c.baseValue(),
|
||||||
|
tBytes: c.timeBytes(),
|
||||||
|
vBytes: c.valueBytes(),
|
||||||
|
isInt: c.isInt(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,19 +238,6 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) {
|
||||||
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
|
*c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])]
|
||||||
}
|
}
|
||||||
|
|
||||||
// values implements chunk.
|
|
||||||
func (c deltaEncodedChunk) values() <-chan *metric.SamplePair {
|
|
||||||
n := c.len()
|
|
||||||
valuesChan := make(chan *metric.SamplePair)
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
valuesChan <- c.valueAtIndex(i)
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// encoding implements chunk.
|
// encoding implements chunk.
|
||||||
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
|
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
|
||||||
|
|
||||||
|
@ -284,106 +272,157 @@ func (c deltaEncodedChunk) len() int {
|
||||||
return (len(c) - deltaHeaderBytes) / c.sampleSize()
|
return (len(c) - deltaHeaderBytes) / c.sampleSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
|
|
||||||
offset := deltaHeaderBytes + idx*c.sampleSize()
|
|
||||||
|
|
||||||
var ts clientmodel.Timestamp
|
|
||||||
switch c.timeBytes() {
|
|
||||||
case d1:
|
|
||||||
ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset]))
|
|
||||||
case d2:
|
|
||||||
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:]))
|
|
||||||
case d4:
|
|
||||||
ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:]))
|
|
||||||
case d8:
|
|
||||||
// Take absolute value for d8.
|
|
||||||
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for time delta")
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += int(c.timeBytes())
|
|
||||||
|
|
||||||
var v clientmodel.SampleValue
|
|
||||||
if c.isInt() {
|
|
||||||
switch c.valueBytes() {
|
|
||||||
case d0:
|
|
||||||
v = c.baseValue()
|
|
||||||
case d1:
|
|
||||||
v = c.baseValue() + clientmodel.SampleValue(int8(c[offset]))
|
|
||||||
case d2:
|
|
||||||
v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
|
|
||||||
case d4:
|
|
||||||
v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
|
|
||||||
// No d8 for ints.
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for integer delta")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
switch c.valueBytes() {
|
|
||||||
case d4:
|
|
||||||
v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
|
|
||||||
case d8:
|
|
||||||
// Take absolute value for d8.
|
|
||||||
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for floating point delta")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &metric.SamplePair{
|
|
||||||
Timestamp: ts,
|
|
||||||
Value: v,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// deltaEncodedChunkIterator implements chunkIterator.
|
// deltaEncodedChunkIterator implements chunkIterator.
|
||||||
type deltaEncodedChunkIterator struct {
|
type deltaEncodedChunkIterator struct {
|
||||||
chunk *deltaEncodedChunk
|
c deltaEncodedChunk
|
||||||
// TODO: add more fields here to keep track of last position.
|
len int
|
||||||
|
baseT clientmodel.Timestamp
|
||||||
|
baseV clientmodel.SampleValue
|
||||||
|
tBytes, vBytes deltaBytes
|
||||||
|
isInt bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// getValueAtTime implements chunkIterator.
|
// length implements chunkIterator.
|
||||||
func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
|
func (it *deltaEncodedChunkIterator) length() int { return it.len }
|
||||||
i := sort.Search(it.chunk.len(), func(i int) bool {
|
|
||||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
|
// valueAtTime implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) valueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||||
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
|
return !it.timestampAtIndex(i).Before(t)
|
||||||
})
|
})
|
||||||
|
|
||||||
switch i {
|
switch i {
|
||||||
case 0:
|
case 0:
|
||||||
return metric.Values{*it.chunk.valueAtIndex(0)}
|
return metric.Values{metric.SamplePair{
|
||||||
case it.chunk.len():
|
Timestamp: it.timestampAtIndex(0),
|
||||||
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
|
Value: it.sampleValueAtIndex(0),
|
||||||
|
}}
|
||||||
|
case it.len:
|
||||||
|
return metric.Values{metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(it.len - 1),
|
||||||
|
Value: it.sampleValueAtIndex(it.len - 1),
|
||||||
|
}}
|
||||||
default:
|
default:
|
||||||
v := it.chunk.valueAtIndex(i)
|
ts := it.timestampAtIndex(i)
|
||||||
if v.Timestamp.Equal(t) {
|
if ts.Equal(t) {
|
||||||
return metric.Values{*v}
|
return metric.Values{metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
return metric.Values{
|
||||||
|
metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i - 1),
|
||||||
|
Value: it.sampleValueAtIndex(i - 1),
|
||||||
|
},
|
||||||
|
metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRangeValues implements chunkIterator.
|
// rangeValues implements chunkIterator.
|
||||||
func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
|
func (it *deltaEncodedChunkIterator) rangeValues(in metric.Interval) metric.Values {
|
||||||
oldest := sort.Search(it.chunk.len(), func(i int) bool {
|
oldest := sort.Search(it.len, func(i int) bool {
|
||||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
|
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
|
||||||
})
|
})
|
||||||
|
|
||||||
newest := sort.Search(it.chunk.len(), func(i int) bool {
|
newest := sort.Search(it.len, func(i int) bool {
|
||||||
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
|
return it.timestampAtIndex(i).After(in.NewestInclusive)
|
||||||
})
|
})
|
||||||
|
|
||||||
if oldest == it.chunk.len() {
|
if oldest == it.len {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make(metric.Values, 0, newest-oldest)
|
result := make(metric.Values, 0, newest-oldest)
|
||||||
for i := oldest; i < newest; i++ {
|
for i := oldest; i < newest; i++ {
|
||||||
result = append(result, *it.chunk.valueAtIndex(i))
|
result = append(result, metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
// contains implements chunkIterator.
|
||||||
func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
||||||
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
|
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// values implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) values() <-chan *metric.SamplePair {
|
||||||
|
valuesChan := make(chan *metric.SamplePair)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < it.len; i++ {
|
||||||
|
valuesChan <- &metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(valuesChan)
|
||||||
|
}()
|
||||||
|
return valuesChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// timestampAtIndex implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Timestamp {
|
||||||
|
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes)
|
||||||
|
|
||||||
|
switch it.tBytes {
|
||||||
|
case d1:
|
||||||
|
return it.baseT + clientmodel.Timestamp(uint8(it.c[offset]))
|
||||||
|
case d2:
|
||||||
|
return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint16(it.c[offset:]))
|
||||||
|
case d4:
|
||||||
|
return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint32(it.c[offset:]))
|
||||||
|
case d8:
|
||||||
|
// Take absolute value for d8.
|
||||||
|
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for time delta")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastTimestamp implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) lastTimestamp() clientmodel.Timestamp {
|
||||||
|
return it.timestampAtIndex(it.len - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sampleValueAtIndex implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.SampleValue {
|
||||||
|
offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||||
|
|
||||||
|
if it.isInt {
|
||||||
|
switch it.vBytes {
|
||||||
|
case d0:
|
||||||
|
return it.baseV
|
||||||
|
case d1:
|
||||||
|
return it.baseV + clientmodel.SampleValue(int8(it.c[offset]))
|
||||||
|
case d2:
|
||||||
|
return it.baseV + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
|
case d4:
|
||||||
|
return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
|
// No d8 for ints.
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for integer delta")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch it.vBytes {
|
||||||
|
case d4:
|
||||||
|
return it.baseV + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
|
case d8:
|
||||||
|
// Take absolute value for d8.
|
||||||
|
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for floating point delta")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastSampleValue implements chunkIterator.
|
||||||
|
func (it *deltaEncodedChunkIterator) lastSampleValue() clientmodel.SampleValue {
|
||||||
|
return it.sampleValueAtIndex(it.len - 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -199,15 +199,18 @@ func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp {
|
||||||
return c.baseTime()
|
return c.baseTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
// lastTime implements chunk.
|
|
||||||
func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp {
|
|
||||||
return c.valueAtIndex(c.len() - 1).Timestamp
|
|
||||||
}
|
|
||||||
|
|
||||||
// newIterator implements chunk.
|
// newIterator implements chunk.
|
||||||
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
|
||||||
return &doubleDeltaEncodedChunkIterator{
|
return &doubleDeltaEncodedChunkIterator{
|
||||||
chunk: c,
|
c: *c,
|
||||||
|
len: c.len(),
|
||||||
|
baseT: c.baseTime(),
|
||||||
|
baseΔT: c.baseTimeDelta(),
|
||||||
|
baseV: c.baseValue(),
|
||||||
|
baseΔV: c.baseValueDelta(),
|
||||||
|
tBytes: c.timeBytes(),
|
||||||
|
vBytes: c.valueBytes(),
|
||||||
|
isInt: c.isInt(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,19 +248,6 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) {
|
||||||
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
|
*c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])]
|
||||||
}
|
}
|
||||||
|
|
||||||
// values implements chunk.
|
|
||||||
func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair {
|
|
||||||
n := c.len()
|
|
||||||
valuesChan := make(chan *metric.SamplePair)
|
|
||||||
go func() {
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
valuesChan <- c.valueAtIndex(i)
|
|
||||||
}
|
|
||||||
close(valuesChan)
|
|
||||||
}()
|
|
||||||
return valuesChan
|
|
||||||
}
|
|
||||||
|
|
||||||
// encoding implements chunk.
|
// encoding implements chunk.
|
||||||
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
|
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
|
||||||
|
|
||||||
|
@ -280,6 +270,9 @@ func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
|
func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
|
||||||
|
if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
return clientmodel.Timestamp(
|
return clientmodel.Timestamp(
|
||||||
binary.LittleEndian.Uint64(
|
binary.LittleEndian.Uint64(
|
||||||
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
||||||
|
@ -288,6 +281,9 @@ func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue {
|
func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue {
|
||||||
|
if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
return clientmodel.SampleValue(
|
return clientmodel.SampleValue(
|
||||||
math.Float64frombits(
|
math.Float64frombits(
|
||||||
binary.LittleEndian.Uint64(
|
binary.LittleEndian.Uint64(
|
||||||
|
@ -387,147 +383,196 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de
|
||||||
return []chunk{&c}
|
return []chunk{&c}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair {
|
|
||||||
if idx == 0 {
|
|
||||||
return &metric.SamplePair{
|
|
||||||
Timestamp: c.baseTime(),
|
|
||||||
Value: c.baseValue(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if idx == 1 {
|
|
||||||
// If time and/or value bytes are at d8, the time and value is
|
|
||||||
// saved directly rather than as a difference.
|
|
||||||
timestamp := c.baseTimeDelta()
|
|
||||||
if c.timeBytes() < d8 {
|
|
||||||
timestamp += c.baseTime()
|
|
||||||
}
|
|
||||||
value := c.baseValueDelta()
|
|
||||||
if c.valueBytes() < d8 {
|
|
||||||
value += c.baseValue()
|
|
||||||
}
|
|
||||||
return &metric.SamplePair{
|
|
||||||
Timestamp: timestamp,
|
|
||||||
Value: value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize()
|
|
||||||
|
|
||||||
var ts clientmodel.Timestamp
|
|
||||||
switch c.timeBytes() {
|
|
||||||
case d1:
|
|
||||||
ts = c.baseTime() +
|
|
||||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
|
||||||
clientmodel.Timestamp(int8(c[offset]))
|
|
||||||
case d2:
|
|
||||||
ts = c.baseTime() +
|
|
||||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
|
||||||
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:])))
|
|
||||||
case d4:
|
|
||||||
ts = c.baseTime() +
|
|
||||||
clientmodel.Timestamp(idx)*c.baseTimeDelta() +
|
|
||||||
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:])))
|
|
||||||
case d8:
|
|
||||||
// Take absolute value for d8.
|
|
||||||
ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:]))
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for time delta")
|
|
||||||
}
|
|
||||||
|
|
||||||
offset += int(c.timeBytes())
|
|
||||||
|
|
||||||
var v clientmodel.SampleValue
|
|
||||||
if c.isInt() {
|
|
||||||
switch c.valueBytes() {
|
|
||||||
case d0:
|
|
||||||
v = c.baseValue() +
|
|
||||||
clientmodel.SampleValue(idx)*c.baseValueDelta()
|
|
||||||
case d1:
|
|
||||||
v = c.baseValue() +
|
|
||||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
|
||||||
clientmodel.SampleValue(int8(c[offset]))
|
|
||||||
case d2:
|
|
||||||
v = c.baseValue() +
|
|
||||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
|
||||||
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:])))
|
|
||||||
case d4:
|
|
||||||
v = c.baseValue() +
|
|
||||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
|
||||||
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:])))
|
|
||||||
// No d8 for ints.
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for integer delta")
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
switch c.valueBytes() {
|
|
||||||
case d4:
|
|
||||||
v = c.baseValue() +
|
|
||||||
clientmodel.SampleValue(idx)*c.baseValueDelta() +
|
|
||||||
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:])))
|
|
||||||
case d8:
|
|
||||||
// Take absolute value for d8.
|
|
||||||
v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:])))
|
|
||||||
default:
|
|
||||||
panic("Invalid number of bytes for floating point delta")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return &metric.SamplePair{
|
|
||||||
Timestamp: ts,
|
|
||||||
Value: v,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// doubleDeltaEncodedChunkIterator implements chunkIterator.
|
// doubleDeltaEncodedChunkIterator implements chunkIterator.
|
||||||
type doubleDeltaEncodedChunkIterator struct {
|
type doubleDeltaEncodedChunkIterator struct {
|
||||||
chunk *doubleDeltaEncodedChunk
|
c doubleDeltaEncodedChunk
|
||||||
// TODO(beorn7): add more fields here to keep track of last position.
|
len int
|
||||||
|
baseT, baseΔT clientmodel.Timestamp
|
||||||
|
baseV, baseΔV clientmodel.SampleValue
|
||||||
|
tBytes, vBytes deltaBytes
|
||||||
|
isInt bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// getValueAtTime implements chunkIterator.
|
// length implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values {
|
func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len }
|
||||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
|
||||||
// state of the iterator and internals of the doubleDeltaChunk.
|
// valueAtTime implements chunkIterator.
|
||||||
i := sort.Search(it.chunk.len(), func(i int) bool {
|
func (it *doubleDeltaEncodedChunkIterator) valueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(t)
|
i := sort.Search(it.len, func(i int) bool {
|
||||||
|
return !it.timestampAtIndex(i).Before(t)
|
||||||
})
|
})
|
||||||
|
|
||||||
switch i {
|
switch i {
|
||||||
case 0:
|
case 0:
|
||||||
return metric.Values{*it.chunk.valueAtIndex(0)}
|
return metric.Values{metric.SamplePair{
|
||||||
case it.chunk.len():
|
Timestamp: it.timestampAtIndex(0),
|
||||||
return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)}
|
Value: it.sampleValueAtIndex(0),
|
||||||
|
}}
|
||||||
|
case it.len:
|
||||||
|
return metric.Values{metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(it.len - 1),
|
||||||
|
Value: it.sampleValueAtIndex(it.len - 1),
|
||||||
|
}}
|
||||||
default:
|
default:
|
||||||
v := it.chunk.valueAtIndex(i)
|
ts := it.timestampAtIndex(i)
|
||||||
if v.Timestamp.Equal(t) {
|
if ts.Equal(t) {
|
||||||
return metric.Values{*v}
|
return metric.Values{metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
return metric.Values{
|
||||||
|
metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i - 1),
|
||||||
|
Value: it.sampleValueAtIndex(i - 1),
|
||||||
|
},
|
||||||
|
metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
return metric.Values{*it.chunk.valueAtIndex(i - 1), *v}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getRangeValues implements chunkIterator.
|
// rangeValues implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values {
|
func (it *doubleDeltaEncodedChunkIterator) rangeValues(in metric.Interval) metric.Values {
|
||||||
// TODO(beorn7): Implement in a more efficient way making use of the
|
oldest := sort.Search(it.len, func(i int) bool {
|
||||||
// state of the iterator and internals of the doubleDeltaChunk.
|
return !it.timestampAtIndex(i).Before(in.OldestInclusive)
|
||||||
oldest := sort.Search(it.chunk.len(), func(i int) bool {
|
|
||||||
return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
newest := sort.Search(it.chunk.len(), func(i int) bool {
|
newest := sort.Search(it.len, func(i int) bool {
|
||||||
return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive)
|
return it.timestampAtIndex(i).After(in.NewestInclusive)
|
||||||
})
|
})
|
||||||
|
|
||||||
if oldest == it.chunk.len() {
|
if oldest == it.len {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make(metric.Values, 0, newest-oldest)
|
result := make(metric.Values, 0, newest-oldest)
|
||||||
for i := oldest; i < newest; i++ {
|
for i := oldest; i < newest; i++ {
|
||||||
result = append(result, *it.chunk.valueAtIndex(i))
|
result = append(result, metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// contains implements chunkIterator.
|
// contains implements chunkIterator.
|
||||||
func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool {
|
||||||
return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime())
|
return !t.Before(it.baseT) && !t.After(it.timestampAtIndex(it.len-1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// values implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) values() <-chan *metric.SamplePair {
|
||||||
|
valuesChan := make(chan *metric.SamplePair)
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < it.len; i++ {
|
||||||
|
valuesChan <- &metric.SamplePair{
|
||||||
|
Timestamp: it.timestampAtIndex(i),
|
||||||
|
Value: it.sampleValueAtIndex(i),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
close(valuesChan)
|
||||||
|
}()
|
||||||
|
return valuesChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// timestampAtIndex implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) timestampAtIndex(idx int) clientmodel.Timestamp {
|
||||||
|
if idx == 0 {
|
||||||
|
return it.baseT
|
||||||
|
}
|
||||||
|
if idx == 1 {
|
||||||
|
// If time bytes are at d8, the time is saved directly rather
|
||||||
|
// than as a difference.
|
||||||
|
if it.tBytes == d8 {
|
||||||
|
return it.baseΔT
|
||||||
|
}
|
||||||
|
return it.baseT + it.baseΔT
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes)
|
||||||
|
|
||||||
|
switch it.tBytes {
|
||||||
|
case d1:
|
||||||
|
return it.baseT +
|
||||||
|
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||||
|
clientmodel.Timestamp(int8(it.c[offset]))
|
||||||
|
case d2:
|
||||||
|
return it.baseT +
|
||||||
|
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||||
|
clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
|
case d4:
|
||||||
|
return it.baseT +
|
||||||
|
clientmodel.Timestamp(idx)*it.baseΔT +
|
||||||
|
clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
|
case d8:
|
||||||
|
// Take absolute value for d8.
|
||||||
|
return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:]))
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for time delta")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastTimestamp implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) lastTimestamp() clientmodel.Timestamp {
|
||||||
|
return it.timestampAtIndex(it.len - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// sampleValueAtIndex implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) sampleValueAtIndex(idx int) clientmodel.SampleValue {
|
||||||
|
if idx == 0 {
|
||||||
|
return it.baseV
|
||||||
|
}
|
||||||
|
if idx == 1 {
|
||||||
|
// If value bytes are at d8, the value is saved directly rather
|
||||||
|
// than as a difference.
|
||||||
|
if it.vBytes == d8 {
|
||||||
|
return it.baseΔV
|
||||||
|
}
|
||||||
|
return it.baseV + it.baseΔV
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes)
|
||||||
|
|
||||||
|
if it.isInt {
|
||||||
|
switch it.vBytes {
|
||||||
|
case d0:
|
||||||
|
return it.baseV +
|
||||||
|
clientmodel.SampleValue(idx)*it.baseΔV
|
||||||
|
case d1:
|
||||||
|
return it.baseV +
|
||||||
|
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||||
|
clientmodel.SampleValue(int8(it.c[offset]))
|
||||||
|
case d2:
|
||||||
|
return it.baseV +
|
||||||
|
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||||
|
clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:])))
|
||||||
|
case d4:
|
||||||
|
return it.baseV +
|
||||||
|
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||||
|
clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
|
// No d8 for ints.
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for integer delta")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
switch it.vBytes {
|
||||||
|
case d4:
|
||||||
|
return it.baseV +
|
||||||
|
clientmodel.SampleValue(idx)*it.baseΔV +
|
||||||
|
clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:])))
|
||||||
|
case d8:
|
||||||
|
// Take absolute value for d8.
|
||||||
|
return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:])))
|
||||||
|
default:
|
||||||
|
panic("Invalid number of bytes for floating point delta")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// lastSampleValue implements chunkIterator.
|
||||||
|
func (it *doubleDeltaEncodedChunkIterator) lastSampleValue() clientmodel.SampleValue {
|
||||||
|
return it.sampleValueAtIndex(it.len - 1)
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,11 +38,11 @@ type Storage interface {
|
||||||
NewPreloader() Preloader
|
NewPreloader() Preloader
|
||||||
// Get all of the metric fingerprints that are associated with the
|
// Get all of the metric fingerprints that are associated with the
|
||||||
// provided label matchers.
|
// provided label matchers.
|
||||||
GetFingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints
|
FingerprintsForLabelMatchers(metric.LabelMatchers) clientmodel.Fingerprints
|
||||||
// Get all of the label values that are associated with a given label name.
|
// Get all of the label values that are associated with a given label name.
|
||||||
GetLabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues
|
LabelValuesForLabelName(clientmodel.LabelName) clientmodel.LabelValues
|
||||||
// Get the metric associated with the provided fingerprint.
|
// Get the metric associated with the provided fingerprint.
|
||||||
GetMetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric
|
MetricForFingerprint(clientmodel.Fingerprint) clientmodel.COWMetric
|
||||||
// Construct an iterator for a given fingerprint.
|
// Construct an iterator for a given fingerprint.
|
||||||
NewIterator(clientmodel.Fingerprint) SeriesIterator
|
NewIterator(clientmodel.Fingerprint) SeriesIterator
|
||||||
// Run the various maintenance loops in goroutines. Returns when the
|
// Run the various maintenance loops in goroutines. Returns when the
|
||||||
|
@ -53,28 +53,28 @@ type Storage interface {
|
||||||
// operations, stops all maintenance loops,and frees all resources.
|
// operations, stops all maintenance loops,and frees all resources.
|
||||||
Stop() error
|
Stop() error
|
||||||
// WaitForIndexing returns once all samples in the storage are
|
// WaitForIndexing returns once all samples in the storage are
|
||||||
// indexed. Indexing is needed for GetFingerprintsForLabelMatchers and
|
// indexed. Indexing is needed for FingerprintsForLabelMatchers and
|
||||||
// GetLabelValuesForLabelName and may lag behind.
|
// LabelValuesForLabelName and may lag behind.
|
||||||
WaitForIndexing()
|
WaitForIndexing()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesIterator enables efficient access of sample values in a series. All
|
// SeriesIterator enables efficient access of sample values in a series. Its
|
||||||
// methods are goroutine-safe. A SeriesIterator iterates over a snapshot of a
|
// methods are not goroutine-safe. A SeriesIterator iterates over a snapshot of
|
||||||
// series, i.e. it is safe to continue using a SeriesIterator after modifying
|
// a series, i.e. it is safe to continue using a SeriesIterator after or during
|
||||||
// the corresponding series, but the iterator will represent the state of the
|
// modifying the corresponding series, but the iterator will represent the state
|
||||||
// series prior the modification.
|
// of the series prior the modification.
|
||||||
type SeriesIterator interface {
|
type SeriesIterator interface {
|
||||||
// Gets the two values that are immediately adjacent to a given time. In
|
// Gets the two values that are immediately adjacent to a given time. In
|
||||||
// case a value exist at precisely the given time, only that single
|
// case a value exist at precisely the given time, only that single
|
||||||
// value is returned. Only the first or last value is returned (as a
|
// value is returned. Only the first or last value is returned (as a
|
||||||
// single value), if the given time is before or after the first or last
|
// single value), if the given time is before or after the first or last
|
||||||
// value, respectively.
|
// value, respectively.
|
||||||
GetValueAtTime(clientmodel.Timestamp) metric.Values
|
ValueAtTime(clientmodel.Timestamp) metric.Values
|
||||||
// Gets the boundary values of an interval: the first and last value
|
// Gets the boundary values of an interval: the first and last value
|
||||||
// within a given interval.
|
// within a given interval.
|
||||||
GetBoundaryValues(metric.Interval) metric.Values
|
BoundaryValues(metric.Interval) metric.Values
|
||||||
// Gets all values contained within a given interval.
|
// Gets all values contained within a given interval.
|
||||||
GetRangeValues(metric.Interval) metric.Values
|
RangeValues(metric.Interval) metric.Values
|
||||||
}
|
}
|
||||||
|
|
||||||
// A Preloader preloads series data necessary for a query into memory and pins
|
// A Preloader preloads series data necessary for a query into memory and pins
|
||||||
|
|
|
@ -93,7 +93,7 @@ func (r *fpMapper) mapFP(fp clientmodel.Fingerprint, m clientmodel.Metric) (clie
|
||||||
// If we are here, FP does not exist in memory and is either not mapped
|
// If we are here, FP does not exist in memory and is either not mapped
|
||||||
// at all, or existing mappings for FP are not for m. Check if we have
|
// at all, or existing mappings for FP are not for m. Check if we have
|
||||||
// something for FP in the archive.
|
// something for FP in the archive.
|
||||||
archivedMetric, err := r.p.getArchivedMetric(fp)
|
archivedMetric, err := r.p.archivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fp, err
|
return fp, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,11 +319,11 @@ func (p *persistence) setDirty(dirty bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFingerprintsForLabelPair returns the fingerprints for the given label
|
// fingerprintsForLabelPair returns the fingerprints for the given label
|
||||||
// pair. This method is goroutine-safe but take into account that metrics queued
|
// pair. This method is goroutine-safe but take into account that metrics queued
|
||||||
// for indexing with IndexMetric might not have made it into the index
|
// for indexing with IndexMetric might not have made it into the index
|
||||||
// yet. (Same applies correspondingly to UnindexMetric.)
|
// yet. (Same applies correspondingly to UnindexMetric.)
|
||||||
func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
|
func (p *persistence) fingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
|
||||||
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
|
fps, _, err := p.labelPairToFingerprints.Lookup(lp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -331,11 +331,11 @@ func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmo
|
||||||
return fps, nil
|
return fps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLabelValuesForLabelName returns the label values for the given label
|
// labelValuesForLabelName returns the label values for the given label
|
||||||
// name. This method is goroutine-safe but take into account that metrics queued
|
// name. This method is goroutine-safe but take into account that metrics queued
|
||||||
// for indexing with IndexMetric might not have made it into the index
|
// for indexing with IndexMetric might not have made it into the index
|
||||||
// yet. (Same applies correspondingly to UnindexMetric.)
|
// yet. (Same applies correspondingly to UnindexMetric.)
|
||||||
func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
|
func (p *persistence) labelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
|
||||||
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
|
lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -632,10 +632,10 @@ func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// This is the non-persisted head chunk. Fully marshal it.
|
// This is the non-persisted head chunk. Fully marshal it.
|
||||||
if err = w.WriteByte(byte(chunkDesc.chunk.encoding())); err != nil {
|
if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err = chunkDesc.chunk.marshal(w); err != nil {
|
if err = chunkDesc.c.marshal(w); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -880,7 +880,7 @@ func (p *persistence) dropAndPersistChunks(
|
||||||
// too old. If that's the case, the chunks in the series file
|
// too old. If that's the case, the chunks in the series file
|
||||||
// are all too old, too.
|
// are all too old, too.
|
||||||
i := 0
|
i := 0
|
||||||
for ; i < len(chunks) && chunks[i].lastTime().Before(beforeTime); i++ {
|
for ; i < len(chunks) && chunks[i].newIterator().lastTimestamp().Before(beforeTime); i++ {
|
||||||
}
|
}
|
||||||
if i < len(chunks) {
|
if i < len(chunks) {
|
||||||
firstTimeNotDropped = chunks[i].firstTime()
|
firstTimeNotDropped = chunks[i].firstTime()
|
||||||
|
@ -1017,10 +1017,10 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error)
|
||||||
return numChunks, nil
|
return numChunks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getSeriesFileModTime returns the modification time of the series file
|
// seriesFileModTime returns the modification time of the series file belonging
|
||||||
// belonging to the provided fingerprint. In case of an error, the zero value of
|
// to the provided fingerprint. In case of an error, the zero value of time.Time
|
||||||
// time.Time is returned.
|
// is returned.
|
||||||
func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time {
|
func (p *persistence) seriesFileModTime(fp clientmodel.Fingerprint) time.Time {
|
||||||
var modTime time.Time
|
var modTime time.Time
|
||||||
if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
|
if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
|
||||||
return fi.ModTime()
|
return fi.ModTime()
|
||||||
|
@ -1029,17 +1029,17 @@ func (p *persistence) getSeriesFileModTime(fp clientmodel.Fingerprint) time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// indexMetric queues the given metric for addition to the indexes needed by
|
// indexMetric queues the given metric for addition to the indexes needed by
|
||||||
// getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
// fingerprintsForLabelPair, labelValuesForLabelName, and
|
||||||
// getFingerprintsModifiedBefore. If the queue is full, this method blocks
|
// fingerprintsModifiedBefore. If the queue is full, this method blocks until
|
||||||
// until the metric can be queued. This method is goroutine-safe.
|
// the metric can be queued. This method is goroutine-safe.
|
||||||
func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) {
|
func (p *persistence) indexMetric(fp clientmodel.Fingerprint, m clientmodel.Metric) {
|
||||||
p.indexingQueue <- indexingOp{fp, m, add}
|
p.indexingQueue <- indexingOp{fp, m, add}
|
||||||
}
|
}
|
||||||
|
|
||||||
// unindexMetric queues references to the given metric for removal from the
|
// unindexMetric queues references to the given metric for removal from the
|
||||||
// indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and
|
// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and
|
||||||
// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics
|
// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is
|
||||||
// is not affected by this removal. (In fact, never call this method for an
|
// not affected by this removal. (In fact, never call this method for an
|
||||||
// archived metric. To purge an archived metric, call purgeArchivedFingerprint.)
|
// archived metric. To purge an archived metric, call purgeArchivedFingerprint.)
|
||||||
// If the queue is full, this method blocks until the metric can be queued. This
|
// If the queue is full, this method blocks until the metric can be queued. This
|
||||||
// method is goroutine-safe.
|
// method is goroutine-safe.
|
||||||
|
@ -1097,10 +1097,10 @@ func (p *persistence) updateArchivedTimeRange(
|
||||||
return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last})
|
return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last})
|
||||||
}
|
}
|
||||||
|
|
||||||
// getFingerprintsModifiedBefore returns the fingerprints of archived timeseries
|
// fingerprintsModifiedBefore returns the fingerprints of archived timeseries
|
||||||
// that have live samples before the provided timestamp. This method is
|
// that have live samples before the provided timestamp. This method is
|
||||||
// goroutine-safe.
|
// goroutine-safe.
|
||||||
func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
|
func (p *persistence) fingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
|
||||||
var fp codable.Fingerprint
|
var fp codable.Fingerprint
|
||||||
var tr codable.TimeRange
|
var tr codable.TimeRange
|
||||||
fps := []clientmodel.Fingerprint{}
|
fps := []clientmodel.Fingerprint{}
|
||||||
|
@ -1119,9 +1119,9 @@ func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Times
|
||||||
return fps, nil
|
return fps, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getArchivedMetric retrieves the archived metric with the given
|
// archivedMetric retrieves the archived metric with the given fingerprint. This
|
||||||
// fingerprint. This method is goroutine-safe.
|
// method is goroutine-safe.
|
||||||
func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
|
func (p *persistence) archivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
|
||||||
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
|
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
|
||||||
return metric, err
|
return metric, err
|
||||||
}
|
}
|
||||||
|
@ -1137,7 +1137,7 @@ func (p *persistence) purgeArchivedMetric(fp clientmodel.Fingerprint) (err error
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
metric, err := p.getArchivedMetric(fp)
|
metric, err := p.archivedMetric(fp)
|
||||||
if err != nil || metric == nil {
|
if err != nil || metric == nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1567,8 +1567,14 @@ func chunkIndexForOffset(offset int64) (int, error) {
|
||||||
func writeChunkHeader(w io.Writer, c chunk) error {
|
func writeChunkHeader(w io.Writer, c chunk) error {
|
||||||
header := make([]byte, chunkHeaderLen)
|
header := make([]byte, chunkHeaderLen)
|
||||||
header[chunkHeaderTypeOffset] = byte(c.encoding())
|
header[chunkHeaderTypeOffset] = byte(c.encoding())
|
||||||
binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime()))
|
binary.LittleEndian.PutUint64(
|
||||||
binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime()))
|
header[chunkHeaderFirstTimeOffset:],
|
||||||
|
uint64(c.firstTime()),
|
||||||
|
)
|
||||||
|
binary.LittleEndian.PutUint64(
|
||||||
|
header[chunkHeaderLastTimeOffset:],
|
||||||
|
uint64(c.newIterator().lastTimestamp()),
|
||||||
|
)
|
||||||
_, err := w.Write(header)
|
_, err := w.Write(header)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,8 +70,8 @@ func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
func chunksEqual(c1, c2 chunk) bool {
|
func chunksEqual(c1, c2 chunk) bool {
|
||||||
values2 := c2.values()
|
values2 := c2.newIterator().values()
|
||||||
for v1 := range c1.values() {
|
for v1 := range c1.newIterator().values() {
|
||||||
v2 := <-values2
|
v2 := <-values2
|
||||||
if !v1.Equal(v2) {
|
if !v1.Equal(v2) {
|
||||||
return false
|
return false
|
||||||
|
@ -397,7 +397,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
|
||||||
if !reflect.DeepEqual(loadedS1.metric, m1) {
|
if !reflect.DeepEqual(loadedS1.metric, m1) {
|
||||||
t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
|
t.Errorf("want metric %v, got %v", m1, loadedS1.metric)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(loadedS1.head().chunk, s1.head().chunk) {
|
if !reflect.DeepEqual(loadedS1.head().c, s1.head().c) {
|
||||||
t.Error("head chunks differ")
|
t.Error("head chunks differ")
|
||||||
}
|
}
|
||||||
if loadedS1.chunkDescsOffset != 0 {
|
if loadedS1.chunkDescsOffset != 0 {
|
||||||
|
@ -413,7 +413,7 @@ func testCheckpointAndLoadSeriesMapAndHeads(t *testing.T, encoding chunkEncoding
|
||||||
if !reflect.DeepEqual(loadedS3.metric, m3) {
|
if !reflect.DeepEqual(loadedS3.metric, m3) {
|
||||||
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
|
t.Errorf("want metric %v, got %v", m3, loadedS3.metric)
|
||||||
}
|
}
|
||||||
if loadedS3.head().chunk != nil {
|
if loadedS3.head().c != nil {
|
||||||
t.Error("head chunk not evicted")
|
t.Error("head chunk not evicted")
|
||||||
}
|
}
|
||||||
if loadedS3.chunkDescsOffset != -1 {
|
if loadedS3.chunkDescsOffset != -1 {
|
||||||
|
@ -515,7 +515,7 @@ func TestCheckpointAndLoadFPMappings(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
func testFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
||||||
p, closer := newTestPersistence(t, encoding)
|
p, closer := newTestPersistence(t, encoding)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
|
@ -537,7 +537,7 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for ts, want := range expectedFPs {
|
for ts, want := range expectedFPs {
|
||||||
got, err := p.getFingerprintsModifiedBefore(ts)
|
got, err := p.fingerprintsModifiedBefore(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -575,7 +575,7 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for ts, want := range expectedFPs {
|
for ts, want := range expectedFPs {
|
||||||
got, err := p.getFingerprintsModifiedBefore(ts)
|
got, err := p.fingerprintsModifiedBefore(ts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -585,12 +585,12 @@ func testGetFingerprintsModifiedBefore(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetFingerprintsModifiedBeforeChunkType0(t *testing.T) {
|
func TestFingerprintsModifiedBeforeChunkType0(t *testing.T) {
|
||||||
testGetFingerprintsModifiedBefore(t, 0)
|
testFingerprintsModifiedBefore(t, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetFingerprintsModifiedBeforeChunkType1(t *testing.T) {
|
func TestFingerprintsModifiedBeforeChunkType1(t *testing.T) {
|
||||||
testGetFingerprintsModifiedBefore(t, 1)
|
testFingerprintsModifiedBefore(t, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
|
@ -605,7 +605,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
p.indexMetric(2, m2)
|
p.indexMetric(2, m2)
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
|
|
||||||
outFPs, err := p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
|
outFPs, err := p.fingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -613,7 +613,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
|
outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -637,7 +637,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
|
|
||||||
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
|
outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n1", Value: "v1"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -645,7 +645,7 @@ func testDropArchivedMetric(t *testing.T, encoding chunkEncoding) {
|
||||||
if !reflect.DeepEqual(outFPs, want) {
|
if !reflect.DeepEqual(outFPs, want) {
|
||||||
t.Errorf("want %#v, got %#v", want, outFPs)
|
t.Errorf("want %#v, got %#v", want, outFPs)
|
||||||
}
|
}
|
||||||
outFPs, err = p.getFingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
|
outFPs, err = p.fingerprintsForLabelPair(metric.LabelPair{Name: "n2", Value: "v2"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -858,7 +858,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
p.waitForIndexing()
|
p.waitForIndexing()
|
||||||
for fp, m := range indexedFpsToMetrics {
|
for fp, m := range indexedFpsToMetrics {
|
||||||
// Compare archived metrics with input metrics.
|
// Compare archived metrics with input metrics.
|
||||||
mOut, err := p.getArchivedMetric(fp)
|
mOut, err := p.archivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -884,7 +884,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
|
|
||||||
// Compare label name -> label values mappings.
|
// Compare label name -> label values mappings.
|
||||||
for ln, lvs := range b.expectedLnToLvs {
|
for ln, lvs := range b.expectedLnToLvs {
|
||||||
outLvs, err := p.getLabelValuesForLabelName(ln)
|
outLvs, err := p.labelValuesForLabelName(ln)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -901,7 +901,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
|
||||||
|
|
||||||
// Compare label pair -> fingerprints mappings.
|
// Compare label pair -> fingerprints mappings.
|
||||||
for lp, fps := range b.expectedLpToFps {
|
for lp, fps := range b.expectedLpToFps {
|
||||||
outFPs, err := p.getFingerprintsForLabelPair(lp)
|
outFPs, err := p.fingerprintsForLabelPair(lp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,8 +40,8 @@ func (p *memorySeriesPreloader) PreloadRange(
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
// GetMetricAtTime implements Preloader.
|
// MetricAtTime implements Preloader.
|
||||||
func (p *memorySeriesPreloader) GetMetricAtTime(fp clientmodel.Fingerprint, t clientmodel.Timestamp) error {
|
func (p *memorySeriesPreloader) MetricAtTime(fp clientmodel.Fingerprint, t clientmodel.Timestamp) error {
|
||||||
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
||||||
from: t,
|
from: t,
|
||||||
through: t,
|
through: t,
|
||||||
|
@ -53,8 +53,8 @@ func (p *memorySeriesPreloader) GetMetricAtTime(fp clientmodel.Fingerprint, t cl
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetricAtInterval implements Preloader.
|
// MetricAtInterval implements Preloader.
|
||||||
func (p *memorySeriesPreloader) GetMetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error {
|
func (p *memorySeriesPreloader) MetricAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval time.Duration) error {
|
||||||
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
||||||
from: from,
|
from: from,
|
||||||
through: through,
|
through: through,
|
||||||
|
@ -67,8 +67,8 @@ func (p *memorySeriesPreloader) GetMetricAtInterval(fp clientmodel.Fingerprint,
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetricRange implements Preloader.
|
// MetricRange implements Preloader.
|
||||||
func (p *memorySeriesPreloader) GetMetricRange(fp clientmodel.Fingerprint, t clientmodel.Timestamp, rangeDuration time.Duration) error {
|
func (p *memorySeriesPreloader) MetricRange(fp clientmodel.Fingerprint, t clientmodel.Timestamp, rangeDuration time.Duration) error {
|
||||||
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
||||||
from: t,
|
from: t,
|
||||||
through: t,
|
through: t,
|
||||||
|
@ -81,8 +81,8 @@ func (p *memorySeriesPreloader) GetMetricRange(fp clientmodel.Fingerprint, t cli
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetricRangeAtInterval implements Preloader.
|
// MetricRangeAtInterval implements Preloader.
|
||||||
func (p *memorySeriesPreloader) GetMetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error {
|
func (p *memorySeriesPreloader) MetricRangeAtInterval(fp clientmodel.Fingerprint, from, through clientmodel.Timestamp, interval, rangeDuration time.Duration) error {
|
||||||
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
cds, err := p.storage.preloadChunks(fp, &timeSelector{
|
||||||
from: from,
|
from: from,
|
||||||
through: through,
|
through: through,
|
||||||
|
|
|
@ -208,7 +208,7 @@ func (s *memorySeries) add(v *metric.SamplePair) int {
|
||||||
newHead := newChunkDesc(newChunk())
|
newHead := newChunkDesc(newChunk())
|
||||||
s.chunkDescs = append(s.chunkDescs, newHead)
|
s.chunkDescs = append(s.chunkDescs, newHead)
|
||||||
s.headChunkClosed = false
|
s.headChunkClosed = false
|
||||||
} else if s.headChunkUsedByIterator && s.head().getRefCount() > 1 {
|
} else if s.headChunkUsedByIterator && s.head().refCount() > 1 {
|
||||||
// We only need to clone the head chunk if the current head
|
// We only need to clone the head chunk if the current head
|
||||||
// chunk was used in an iterator at all and if the refCount is
|
// chunk was used in an iterator at all and if the refCount is
|
||||||
// still greater than the 1 we always have because the head
|
// still greater than the 1 we always have because the head
|
||||||
|
@ -221,12 +221,12 @@ func (s *memorySeries) add(v *metric.SamplePair) int {
|
||||||
chunkOps.WithLabelValues(clone).Inc()
|
chunkOps.WithLabelValues(clone).Inc()
|
||||||
// No locking needed here because a non-persisted head chunk can
|
// No locking needed here because a non-persisted head chunk can
|
||||||
// not get evicted concurrently.
|
// not get evicted concurrently.
|
||||||
s.head().chunk = s.head().chunk.clone()
|
s.head().c = s.head().c.clone()
|
||||||
s.headChunkUsedByIterator = false
|
s.headChunkUsedByIterator = false
|
||||||
}
|
}
|
||||||
|
|
||||||
chunks := s.head().add(v)
|
chunks := s.head().add(v)
|
||||||
s.head().chunk = chunks[0]
|
s.head().c = chunks[0]
|
||||||
|
|
||||||
for _, c := range chunks[1:] {
|
for _, c := range chunks[1:] {
|
||||||
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
|
s.chunkDescs = append(s.chunkDescs, newChunkDesc(c))
|
||||||
|
@ -415,10 +415,10 @@ func (s *memorySeries) preloadChunksForRange(
|
||||||
|
|
||||||
// newIterator returns a new SeriesIterator. The caller must have locked the
|
// newIterator returns a new SeriesIterator. The caller must have locked the
|
||||||
// fingerprint of the memorySeries.
|
// fingerprint of the memorySeries.
|
||||||
func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
func (s *memorySeries) newIterator() SeriesIterator {
|
||||||
chunks := make([]chunk, 0, len(s.chunkDescs))
|
chunks := make([]chunk, 0, len(s.chunkDescs))
|
||||||
for i, cd := range s.chunkDescs {
|
for i, cd := range s.chunkDescs {
|
||||||
if chunk := cd.getChunk(); chunk != nil {
|
if chunk := cd.chunk(); chunk != nil {
|
||||||
if i == len(s.chunkDescs)-1 && !s.headChunkClosed {
|
if i == len(s.chunkDescs)-1 && !s.headChunkClosed {
|
||||||
s.headChunkUsedByIterator = true
|
s.headChunkUsedByIterator = true
|
||||||
}
|
}
|
||||||
|
@ -427,9 +427,8 @@ func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &memorySeriesIterator{
|
return &memorySeriesIterator{
|
||||||
lock: lockFunc,
|
chunks: chunks,
|
||||||
unlock: unlockFunc,
|
chunkIts: make([]chunkIterator, len(chunks)),
|
||||||
chunks: chunks,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -449,13 +448,13 @@ func (s *memorySeries) firstTime() clientmodel.Timestamp {
|
||||||
return s.savedFirstTime
|
return s.savedFirstTime
|
||||||
}
|
}
|
||||||
|
|
||||||
// getChunksToPersist returns a slice of chunkDescs eligible for
|
// chunksToPersist returns a slice of chunkDescs eligible for persistence. It's
|
||||||
// persistence. It's the caller's responsibility to actually persist the
|
// the caller's responsibility to actually persist the returned chunks
|
||||||
// returned chunks afterwards. The method sets the persistWatermark and the
|
// afterwards. The method sets the persistWatermark and the dirty flag
|
||||||
// dirty flag accordingly.
|
// accordingly.
|
||||||
//
|
//
|
||||||
// The caller must have locked the fingerprint of the series.
|
// The caller must have locked the fingerprint of the series.
|
||||||
func (s *memorySeries) getChunksToPersist() []*chunkDesc {
|
func (s *memorySeries) chunksToPersist() []*chunkDesc {
|
||||||
newWatermark := len(s.chunkDescs)
|
newWatermark := len(s.chunkDescs)
|
||||||
if !s.headChunkClosed {
|
if !s.headChunkClosed {
|
||||||
newWatermark--
|
newWatermark--
|
||||||
|
@ -471,108 +470,124 @@ func (s *memorySeries) getChunksToPersist() []*chunkDesc {
|
||||||
|
|
||||||
// memorySeriesIterator implements SeriesIterator.
|
// memorySeriesIterator implements SeriesIterator.
|
||||||
type memorySeriesIterator struct {
|
type memorySeriesIterator struct {
|
||||||
lock, unlock func()
|
chunkIt chunkIterator // Last chunkIterator used by ValueAtTime.
|
||||||
chunkIt chunkIterator
|
chunkIts []chunkIterator // Caches chunkIterators.
|
||||||
chunks []chunk
|
chunks []chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetValueAtTime implements SeriesIterator.
|
// ValueAtTime implements SeriesIterator.
|
||||||
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
|
func (it *memorySeriesIterator) ValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||||
it.lock()
|
|
||||||
defer it.unlock()
|
|
||||||
|
|
||||||
// The most common case. We are iterating through a chunk.
|
// The most common case. We are iterating through a chunk.
|
||||||
if it.chunkIt != nil && it.chunkIt.contains(t) {
|
if it.chunkIt != nil && it.chunkIt.contains(t) {
|
||||||
return it.chunkIt.getValueAtTime(t)
|
return it.chunkIt.valueAtTime(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
it.chunkIt = nil
|
|
||||||
|
|
||||||
if len(it.chunks) == 0 {
|
if len(it.chunks) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Before or exactly on the first sample of the series.
|
// Before or exactly on the first sample of the series.
|
||||||
if !t.After(it.chunks[0].firstTime()) {
|
it.chunkIt = it.chunkIterator(0)
|
||||||
|
ts := it.chunkIt.timestampAtIndex(0)
|
||||||
|
if !t.After(ts) {
|
||||||
// return first value of first chunk
|
// return first value of first chunk
|
||||||
return it.chunks[0].newIterator().getValueAtTime(t)
|
return metric.Values{metric.SamplePair{
|
||||||
}
|
Timestamp: ts,
|
||||||
// After or exactly on the last sample of the series.
|
Value: it.chunkIt.sampleValueAtIndex(0),
|
||||||
if !t.Before(it.chunks[len(it.chunks)-1].lastTime()) {
|
}}
|
||||||
// return last value of last chunk
|
|
||||||
return it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(t)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find first chunk where lastTime() is after or equal to t.
|
// After or exactly on the last sample of the series.
|
||||||
|
it.chunkIt = it.chunkIterator(len(it.chunks) - 1)
|
||||||
|
ts = it.chunkIt.lastTimestamp()
|
||||||
|
if !t.Before(ts) {
|
||||||
|
// return last value of last chunk
|
||||||
|
return metric.Values{metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1),
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find last chunk where firstTime() is before or equal to t.
|
||||||
|
l := len(it.chunks) - 1
|
||||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||||
return !it.chunks[i].lastTime().Before(t)
|
return !it.chunks[l-i].firstTime().After(t)
|
||||||
})
|
})
|
||||||
if i == len(it.chunks) {
|
if i == len(it.chunks) {
|
||||||
panic("out of bounds")
|
panic("out of bounds")
|
||||||
}
|
}
|
||||||
|
it.chunkIt = it.chunkIterator(l - i)
|
||||||
if t.Before(it.chunks[i].firstTime()) {
|
ts = it.chunkIt.lastTimestamp()
|
||||||
|
if t.After(ts) {
|
||||||
// We ended up between two chunks.
|
// We ended up between two chunks.
|
||||||
|
sp1 := metric.SamplePair{
|
||||||
|
Timestamp: ts,
|
||||||
|
Value: it.chunkIt.sampleValueAtIndex(it.chunkIt.length() - 1),
|
||||||
|
}
|
||||||
|
it.chunkIt = it.chunkIterator(l - i + 1)
|
||||||
return metric.Values{
|
return metric.Values{
|
||||||
it.chunks[i-1].newIterator().getValueAtTime(t)[0],
|
sp1,
|
||||||
it.chunks[i].newIterator().getValueAtTime(t)[0],
|
metric.SamplePair{
|
||||||
|
Timestamp: it.chunkIt.timestampAtIndex(0),
|
||||||
|
Value: it.chunkIt.sampleValueAtIndex(0),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// We ended up in the middle of a chunk. We might stay there for a while,
|
return it.chunkIt.valueAtTime(t)
|
||||||
// so save it as the current chunk iterator.
|
|
||||||
it.chunkIt = it.chunks[i].newIterator()
|
|
||||||
return it.chunkIt.getValueAtTime(t)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBoundaryValues implements SeriesIterator.
|
// BoundaryValues implements SeriesIterator.
|
||||||
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
|
func (it *memorySeriesIterator) BoundaryValues(in metric.Interval) metric.Values {
|
||||||
it.lock()
|
// Find the first chunk for which the first sample is within the interval.
|
||||||
defer it.unlock()
|
|
||||||
|
|
||||||
// Find the first relevant chunk.
|
|
||||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||||
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||||
})
|
})
|
||||||
|
// Only now check the last timestamp of the previous chunk (which is
|
||||||
|
// fairly expensive).
|
||||||
|
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
|
||||||
values := make(metric.Values, 0, 2)
|
values := make(metric.Values, 0, 2)
|
||||||
for i, c := range it.chunks[i:] {
|
for j, c := range it.chunks[i:] {
|
||||||
var chunkIt chunkIterator
|
|
||||||
if c.firstTime().After(in.NewestInclusive) {
|
if c.firstTime().After(in.NewestInclusive) {
|
||||||
if len(values) == 1 {
|
if len(values) == 1 {
|
||||||
// We found the first value before, but are now
|
// We found the first value before but are now
|
||||||
// already past the last value. The value we
|
// already past the last value. The value we
|
||||||
// want must be the last value of the previous
|
// want must be the last value of the previous
|
||||||
// chunk. So backtrack...
|
// chunk. So backtrack...
|
||||||
chunkIt = it.chunks[i-1].newIterator()
|
chunkIt := it.chunkIterator(i + j - 1)
|
||||||
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
values = append(values, metric.SamplePair{
|
||||||
|
Timestamp: chunkIt.lastTimestamp(),
|
||||||
|
Value: chunkIt.lastSampleValue(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
chunkIt := it.chunkIterator(i + j)
|
||||||
if len(values) == 0 {
|
if len(values) == 0 {
|
||||||
chunkIt = c.newIterator()
|
firstValues := chunkIt.valueAtTime(in.OldestInclusive)
|
||||||
firstValues := chunkIt.getValueAtTime(in.OldestInclusive)
|
|
||||||
switch len(firstValues) {
|
switch len(firstValues) {
|
||||||
case 2:
|
case 2:
|
||||||
values = append(values, firstValues[1])
|
values = append(values, firstValues[1])
|
||||||
case 1:
|
case 1:
|
||||||
values = firstValues
|
values = firstValues
|
||||||
default:
|
default:
|
||||||
panic("unexpected return from getValueAtTime")
|
panic("unexpected return from valueAtTime")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.lastTime().After(in.NewestInclusive) {
|
if chunkIt.lastTimestamp().After(in.NewestInclusive) {
|
||||||
if chunkIt == nil {
|
values = append(values, chunkIt.valueAtTime(in.NewestInclusive)[0])
|
||||||
chunkIt = c.newIterator()
|
|
||||||
}
|
|
||||||
values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0])
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(values) == 1 {
|
if len(values) == 1 {
|
||||||
// We found exactly one value. In that case, add the most recent we know.
|
// We found exactly one value. In that case, add the most recent we know.
|
||||||
values = append(
|
chunkIt := it.chunkIterator(len(it.chunks) - 1)
|
||||||
values,
|
values = append(values, metric.SamplePair{
|
||||||
it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(in.NewestInclusive)[0],
|
Timestamp: chunkIt.lastTimestamp(),
|
||||||
)
|
Value: chunkIt.lastSampleValue(),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
if len(values) == 2 && values[0].Equal(&values[1]) {
|
if len(values) == 2 && values[0].Equal(&values[1]) {
|
||||||
return values[:1]
|
return values[:1]
|
||||||
|
@ -580,41 +595,53 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRangeValues implements SeriesIterator.
|
// RangeValues implements SeriesIterator.
|
||||||
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
|
func (it *memorySeriesIterator) RangeValues(in metric.Interval) metric.Values {
|
||||||
it.lock()
|
// Find the first chunk for which the first sample is within the interval.
|
||||||
defer it.unlock()
|
|
||||||
|
|
||||||
// Find the first relevant chunk.
|
|
||||||
i := sort.Search(len(it.chunks), func(i int) bool {
|
i := sort.Search(len(it.chunks), func(i int) bool {
|
||||||
return !it.chunks[i].lastTime().Before(in.OldestInclusive)
|
return !it.chunks[i].firstTime().Before(in.OldestInclusive)
|
||||||
})
|
})
|
||||||
|
// Only now check the last timestamp of the previous chunk (which is
|
||||||
|
// fairly expensive).
|
||||||
|
if i > 0 && !it.chunkIterator(i-1).lastTimestamp().Before(in.OldestInclusive) {
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
|
||||||
values := metric.Values{}
|
values := metric.Values{}
|
||||||
for _, c := range it.chunks[i:] {
|
for j, c := range it.chunks[i:] {
|
||||||
if c.firstTime().After(in.NewestInclusive) {
|
if c.firstTime().After(in.NewestInclusive) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// TODO: actually reuse an iterator between calls if we get multiple ranges
|
values = append(values, it.chunkIterator(i+j).rangeValues(in)...)
|
||||||
// from the same chunk.
|
|
||||||
values = append(values, c.newIterator().getRangeValues(in)...)
|
|
||||||
}
|
}
|
||||||
return values
|
return values
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// chunkIterator returns the chunkIterator for the chunk at position i (and
|
||||||
|
// creates it if needed).
|
||||||
|
func (it *memorySeriesIterator) chunkIterator(i int) chunkIterator {
|
||||||
|
chunkIt := it.chunkIts[i]
|
||||||
|
if chunkIt == nil {
|
||||||
|
chunkIt = it.chunks[i].newIterator()
|
||||||
|
it.chunkIts[i] = chunkIt
|
||||||
|
}
|
||||||
|
return chunkIt
|
||||||
|
}
|
||||||
|
|
||||||
// nopSeriesIterator implements Series Iterator. It never returns any values.
|
// nopSeriesIterator implements Series Iterator. It never returns any values.
|
||||||
type nopSeriesIterator struct{}
|
type nopSeriesIterator struct{}
|
||||||
|
|
||||||
// GetValueAtTime implements SeriesIterator.
|
// ValueAtTime implements SeriesIterator.
|
||||||
func (_ nopSeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
|
func (_ nopSeriesIterator) ValueAtTime(t clientmodel.Timestamp) metric.Values {
|
||||||
return metric.Values{}
|
return metric.Values{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBoundaryValues implements SeriesIterator.
|
// BoundaryValues implements SeriesIterator.
|
||||||
func (_ nopSeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
|
func (_ nopSeriesIterator) BoundaryValues(in metric.Interval) metric.Values {
|
||||||
return metric.Values{}
|
return metric.Values{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRangeValues implements SeriesIterator.
|
// RangeValues implements SeriesIterator.
|
||||||
func (_ nopSeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
|
func (_ nopSeriesIterator) RangeValues(in metric.Interval) metric.Values {
|
||||||
return metric.Values{}
|
return metric.Values{}
|
||||||
}
|
}
|
||||||
|
|
|
@ -278,10 +278,7 @@ func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIter
|
||||||
// return any values.
|
// return any values.
|
||||||
return nopSeriesIterator{}
|
return nopSeriesIterator{}
|
||||||
}
|
}
|
||||||
return series.newIterator(
|
return series.newIterator()
|
||||||
func() { s.fpLocker.Lock(fp) },
|
|
||||||
func() { s.fpLocker.Unlock(fp) },
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPreloader implements Storage.
|
// NewPreloader implements Storage.
|
||||||
|
@ -291,14 +288,14 @@ func (s *memorySeriesStorage) NewPreloader() Preloader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFingerprintsForLabelMatchers implements Storage.
|
// FingerprintsForLabelMatchers implements Storage.
|
||||||
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
|
func (s *memorySeriesStorage) FingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
|
||||||
var result map[clientmodel.Fingerprint]struct{}
|
var result map[clientmodel.Fingerprint]struct{}
|
||||||
for _, matcher := range labelMatchers {
|
for _, matcher := range labelMatchers {
|
||||||
intersection := map[clientmodel.Fingerprint]struct{}{}
|
intersection := map[clientmodel.Fingerprint]struct{}{}
|
||||||
switch matcher.Type {
|
switch matcher.Type {
|
||||||
case metric.Equal:
|
case metric.Equal:
|
||||||
fps, err := s.persistence.getFingerprintsForLabelPair(
|
fps, err := s.persistence.fingerprintsForLabelPair(
|
||||||
metric.LabelPair{
|
metric.LabelPair{
|
||||||
Name: matcher.Name,
|
Name: matcher.Name,
|
||||||
Value: matcher.Value,
|
Value: matcher.Value,
|
||||||
|
@ -316,7 +313,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
|
values, err := s.persistence.labelValuesForLabelName(matcher.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
|
log.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
|
||||||
}
|
}
|
||||||
|
@ -325,7 +322,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for _, v := range matches {
|
for _, v := range matches {
|
||||||
fps, err := s.persistence.getFingerprintsForLabelPair(
|
fps, err := s.persistence.fingerprintsForLabelPair(
|
||||||
metric.LabelPair{
|
metric.LabelPair{
|
||||||
Name: matcher.Name,
|
Name: matcher.Name,
|
||||||
Value: v,
|
Value: v,
|
||||||
|
@ -354,17 +351,17 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
|
||||||
return fps
|
return fps
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLabelValuesForLabelName implements Storage.
|
// LabelValuesForLabelName implements Storage.
|
||||||
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
|
func (s *memorySeriesStorage) LabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
|
||||||
lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
|
lvs, err := s.persistence.labelValuesForLabelName(labelName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error getting label values for label name %q: %v", labelName, err)
|
log.Errorf("Error getting label values for label name %q: %v", labelName, err)
|
||||||
}
|
}
|
||||||
return lvs
|
return lvs
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMetricForFingerprint implements Storage.
|
// MetricForFingerprint implements Storage.
|
||||||
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric {
|
func (s *memorySeriesStorage) MetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.COWMetric {
|
||||||
s.fpLocker.Lock(fp)
|
s.fpLocker.Lock(fp)
|
||||||
defer s.fpLocker.Unlock(fp)
|
defer s.fpLocker.Unlock(fp)
|
||||||
|
|
||||||
|
@ -376,7 +373,7 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
|
||||||
Metric: series.metric,
|
Metric: series.metric,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
metric, err := s.persistence.getArchivedMetric(fp)
|
metric, err := s.persistence.archivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
|
log.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
|
||||||
}
|
}
|
||||||
|
@ -459,7 +456,7 @@ func (s *memorySeriesStorage) preloadChunksForRange(
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
|
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
|
||||||
metric, err := s.persistence.getArchivedMetric(fp)
|
metric, err := s.persistence.archivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -648,7 +645,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
|
||||||
defer close(archivedFingerprints)
|
defer close(archivedFingerprints)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
archivedFPs, err := s.persistence.getFingerprintsModifiedBefore(
|
archivedFPs, err := s.persistence.fingerprintsModifiedBefore(
|
||||||
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
|
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -844,20 +841,21 @@ func (s *memorySeriesStorage) maintainMemorySeries(
|
||||||
func (s *memorySeriesStorage) writeMemorySeries(
|
func (s *memorySeriesStorage) writeMemorySeries(
|
||||||
fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp,
|
fp clientmodel.Fingerprint, series *memorySeries, beforeTime clientmodel.Timestamp,
|
||||||
) bool {
|
) bool {
|
||||||
cds := series.getChunksToPersist()
|
cds := series.chunksToPersist()
|
||||||
defer func() {
|
defer func() {
|
||||||
for _, cd := range cds {
|
for _, cd := range cds {
|
||||||
cd.unpin(s.evictRequests)
|
cd.unpin(s.evictRequests)
|
||||||
}
|
}
|
||||||
s.incNumChunksToPersist(-len(cds))
|
s.incNumChunksToPersist(-len(cds))
|
||||||
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
chunkOps.WithLabelValues(persistAndUnpin).Add(float64(len(cds)))
|
||||||
series.modTime = s.persistence.getSeriesFileModTime(fp)
|
series.modTime = s.persistence.seriesFileModTime(fp)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Get the actual chunks from underneath the chunkDescs.
|
// Get the actual chunks from underneath the chunkDescs.
|
||||||
|
// No lock required as chunks still to persist cannot be evicted.
|
||||||
chunks := make([]chunk, len(cds))
|
chunks := make([]chunk, len(cds))
|
||||||
for i, cd := range cds {
|
for i, cd := range cds {
|
||||||
chunks[i] = cd.chunk
|
chunks[i] = cd.c
|
||||||
}
|
}
|
||||||
|
|
||||||
if !series.firstTime().Before(beforeTime) {
|
if !series.firstTime().Before(beforeTime) {
|
||||||
|
|
|
@ -28,7 +28,7 @@ import (
|
||||||
"github.com/prometheus/prometheus/utility/test"
|
"github.com/prometheus/prometheus/utility/test"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGetFingerprintsForLabelMatchers(t *testing.T) {
|
func TestFingerprintsForLabelMatchers(t *testing.T) {
|
||||||
storage, closer := NewTestStorage(t, 1)
|
storage, closer := NewTestStorage(t, 1)
|
||||||
defer closer.Close()
|
defer closer.Close()
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ func TestGetFingerprintsForLabelMatchers(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, mt := range matcherTests {
|
for _, mt := range matcherTests {
|
||||||
resfps := storage.GetFingerprintsForLabelMatchers(mt.matchers)
|
resfps := storage.FingerprintsForLabelMatchers(mt.matchers)
|
||||||
if len(mt.expected) != len(resfps) {
|
if len(mt.expected) != len(resfps) {
|
||||||
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(resfps))
|
t.Fatalf("expected %d matches for %q, found %d", len(mt.expected), mt.matchers, len(resfps))
|
||||||
}
|
}
|
||||||
|
@ -208,7 +208,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) {
|
||||||
if cd.isEvicted() {
|
if cd.isEvicted() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for sample := range cd.chunk.values() {
|
for sample := range cd.c.newIterator().values() {
|
||||||
values = append(values, *sample)
|
values = append(values, *sample)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -234,8 +234,8 @@ func TestChunkType1(t *testing.T) {
|
||||||
testChunk(t, 1)
|
testChunk(t, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
func testValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
samples := make(clientmodel.Samples, 1000)
|
samples := make(clientmodel.Samples, 10000)
|
||||||
for i := range samples {
|
for i := range samples {
|
||||||
samples[i] = &clientmodel.Sample{
|
samples[i] = &clientmodel.Sample{
|
||||||
Timestamp: clientmodel.Timestamp(2 * i),
|
Timestamp: clientmodel.Timestamp(2 * i),
|
||||||
|
@ -256,7 +256,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// #1 Exactly on a sample.
|
// #1 Exactly on a sample.
|
||||||
for i, expected := range samples {
|
for i, expected := range samples {
|
||||||
actual := it.GetValueAtTime(expected.Timestamp)
|
actual := it.ValueAtTime(expected.Timestamp)
|
||||||
|
|
||||||
if len(actual) != 1 {
|
if len(actual) != 1 {
|
||||||
t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual))
|
t.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual))
|
||||||
|
@ -275,7 +275,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
expected2 := samples[i+1]
|
expected2 := samples[i+1]
|
||||||
actual := it.GetValueAtTime(expected1.Timestamp + 1)
|
actual := it.ValueAtTime(expected1.Timestamp + 1)
|
||||||
|
|
||||||
if len(actual) != 2 {
|
if len(actual) != 2 {
|
||||||
t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual))
|
t.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual))
|
||||||
|
@ -296,7 +296,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// #3 Corner cases: Just before the first sample, just after the last.
|
// #3 Corner cases: Just before the first sample, just after the last.
|
||||||
expected := samples[0]
|
expected := samples[0]
|
||||||
actual := it.GetValueAtTime(expected.Timestamp - 1)
|
actual := it.ValueAtTime(expected.Timestamp - 1)
|
||||||
if len(actual) != 1 {
|
if len(actual) != 1 {
|
||||||
t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual))
|
t.Fatalf("3.1. Expected exactly one result, got %d.", len(actual))
|
||||||
}
|
}
|
||||||
|
@ -307,7 +307,7 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value)
|
t.Errorf("3.1. Got %v; want %v", actual[0].Value, expected.Value)
|
||||||
}
|
}
|
||||||
expected = samples[len(samples)-1]
|
expected = samples[len(samples)-1]
|
||||||
actual = it.GetValueAtTime(expected.Timestamp + 1)
|
actual = it.ValueAtTime(expected.Timestamp + 1)
|
||||||
if len(actual) != 1 {
|
if len(actual) != 1 {
|
||||||
t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual))
|
t.Fatalf("3.2. Expected exactly one result, got %d.", len(actual))
|
||||||
}
|
}
|
||||||
|
@ -319,16 +319,89 @@ func testGetValueAtTime(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetValueAtTimeChunkType0(t *testing.T) {
|
func TestValueAtTimeChunkType0(t *testing.T) {
|
||||||
testGetValueAtTime(t, 0)
|
testValueAtTime(t, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetValueAtTimeChunkType1(t *testing.T) {
|
func TestValueAtTimeChunkType1(t *testing.T) {
|
||||||
testGetValueAtTime(t, 1)
|
testValueAtTime(t, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
func benchmarkValueAtTime(b *testing.B, encoding chunkEncoding) {
|
||||||
samples := make(clientmodel.Samples, 1000)
|
samples := make(clientmodel.Samples, 10000)
|
||||||
|
for i := range samples {
|
||||||
|
samples[i] = &clientmodel.Sample{
|
||||||
|
Timestamp: clientmodel.Timestamp(2 * i),
|
||||||
|
Value: clientmodel.SampleValue(float64(i) * 0.2),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s, closer := NewTestStorage(b, encoding)
|
||||||
|
defer closer.Close()
|
||||||
|
|
||||||
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
|
s.WaitForIndexing()
|
||||||
|
|
||||||
|
fp := clientmodel.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
it := s.NewIterator(fp)
|
||||||
|
|
||||||
|
// #1 Exactly on a sample.
|
||||||
|
for i, expected := range samples {
|
||||||
|
actual := it.ValueAtTime(expected.Timestamp)
|
||||||
|
|
||||||
|
if len(actual) != 1 {
|
||||||
|
b.Fatalf("1.%d. Expected exactly one result, got %d.", i, len(actual))
|
||||||
|
}
|
||||||
|
if expected.Timestamp != actual[0].Timestamp {
|
||||||
|
b.Errorf("1.%d. Got %v; want %v", i, actual[0].Timestamp, expected.Timestamp)
|
||||||
|
}
|
||||||
|
if expected.Value != actual[0].Value {
|
||||||
|
b.Errorf("1.%d. Got %v; want %v", i, actual[0].Value, expected.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// #2 Between samples.
|
||||||
|
for i, expected1 := range samples {
|
||||||
|
if i == len(samples)-1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
expected2 := samples[i+1]
|
||||||
|
actual := it.ValueAtTime(expected1.Timestamp + 1)
|
||||||
|
|
||||||
|
if len(actual) != 2 {
|
||||||
|
b.Fatalf("2.%d. Expected exactly 2 results, got %d.", i, len(actual))
|
||||||
|
}
|
||||||
|
if expected1.Timestamp != actual[0].Timestamp {
|
||||||
|
b.Errorf("2.%d. Got %v; want %v", i, actual[0].Timestamp, expected1.Timestamp)
|
||||||
|
}
|
||||||
|
if expected1.Value != actual[0].Value {
|
||||||
|
b.Errorf("2.%d. Got %v; want %v", i, actual[0].Value, expected1.Value)
|
||||||
|
}
|
||||||
|
if expected2.Timestamp != actual[1].Timestamp {
|
||||||
|
b.Errorf("2.%d. Got %v; want %v", i, actual[1].Timestamp, expected1.Timestamp)
|
||||||
|
}
|
||||||
|
if expected2.Value != actual[1].Value {
|
||||||
|
b.Errorf("2.%d. Got %v; want %v", i, actual[1].Value, expected1.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkValueAtTimeChunkType0(b *testing.B) {
|
||||||
|
benchmarkValueAtTime(b, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkValueAtTimeChunkType1(b *testing.B) {
|
||||||
|
benchmarkValueAtTime(b, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
|
samples := make(clientmodel.Samples, 10000)
|
||||||
for i := range samples {
|
for i := range samples {
|
||||||
samples[i] = &clientmodel.Sample{
|
samples[i] = &clientmodel.Sample{
|
||||||
Timestamp: clientmodel.Timestamp(2 * i),
|
Timestamp: clientmodel.Timestamp(2 * i),
|
||||||
|
@ -349,7 +422,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// #1 Zero length interval at sample.
|
// #1 Zero length interval at sample.
|
||||||
for i, expected := range samples {
|
for i, expected := range samples {
|
||||||
actual := it.GetRangeValues(metric.Interval{
|
actual := it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected.Timestamp,
|
OldestInclusive: expected.Timestamp,
|
||||||
NewestInclusive: expected.Timestamp,
|
NewestInclusive: expected.Timestamp,
|
||||||
})
|
})
|
||||||
|
@ -367,7 +440,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// #2 Zero length interval off sample.
|
// #2 Zero length interval off sample.
|
||||||
for i, expected := range samples {
|
for i, expected := range samples {
|
||||||
actual := it.GetRangeValues(metric.Interval{
|
actual := it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected.Timestamp + 1,
|
OldestInclusive: expected.Timestamp + 1,
|
||||||
NewestInclusive: expected.Timestamp + 1,
|
NewestInclusive: expected.Timestamp + 1,
|
||||||
})
|
})
|
||||||
|
@ -379,7 +452,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// #3 2sec interval around sample.
|
// #3 2sec interval around sample.
|
||||||
for i, expected := range samples {
|
for i, expected := range samples {
|
||||||
actual := it.GetRangeValues(metric.Interval{
|
actual := it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected.Timestamp - 1,
|
OldestInclusive: expected.Timestamp - 1,
|
||||||
NewestInclusive: expected.Timestamp + 1,
|
NewestInclusive: expected.Timestamp + 1,
|
||||||
})
|
})
|
||||||
|
@ -401,7 +474,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
expected2 := samples[i+1]
|
expected2 := samples[i+1]
|
||||||
actual := it.GetRangeValues(metric.Interval{
|
actual := it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected1.Timestamp,
|
OldestInclusive: expected1.Timestamp,
|
||||||
NewestInclusive: expected1.Timestamp + 2,
|
NewestInclusive: expected1.Timestamp + 2,
|
||||||
})
|
})
|
||||||
|
@ -426,7 +499,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
// #5 corner cases: Interval ends at first sample, interval starts
|
// #5 corner cases: Interval ends at first sample, interval starts
|
||||||
// at last sample, interval entirely before/after samples.
|
// at last sample, interval entirely before/after samples.
|
||||||
expected := samples[0]
|
expected := samples[0]
|
||||||
actual := it.GetRangeValues(metric.Interval{
|
actual := it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected.Timestamp - 2,
|
OldestInclusive: expected.Timestamp - 2,
|
||||||
NewestInclusive: expected.Timestamp,
|
NewestInclusive: expected.Timestamp,
|
||||||
})
|
})
|
||||||
|
@ -440,7 +513,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
t.Errorf("5.1. Got %v; want %v.", actual[0].Value, expected.Value)
|
t.Errorf("5.1. Got %v; want %v.", actual[0].Value, expected.Value)
|
||||||
}
|
}
|
||||||
expected = samples[len(samples)-1]
|
expected = samples[len(samples)-1]
|
||||||
actual = it.GetRangeValues(metric.Interval{
|
actual = it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: expected.Timestamp,
|
OldestInclusive: expected.Timestamp,
|
||||||
NewestInclusive: expected.Timestamp + 2,
|
NewestInclusive: expected.Timestamp + 2,
|
||||||
})
|
})
|
||||||
|
@ -454,7 +527,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
t.Errorf("5.2. Got %v; want %v.", actual[0].Value, expected.Value)
|
t.Errorf("5.2. Got %v; want %v.", actual[0].Value, expected.Value)
|
||||||
}
|
}
|
||||||
firstSample := samples[0]
|
firstSample := samples[0]
|
||||||
actual = it.GetRangeValues(metric.Interval{
|
actual = it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: firstSample.Timestamp - 4,
|
OldestInclusive: firstSample.Timestamp - 4,
|
||||||
NewestInclusive: firstSample.Timestamp - 2,
|
NewestInclusive: firstSample.Timestamp - 2,
|
||||||
})
|
})
|
||||||
|
@ -462,7 +535,7 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
t.Fatalf("5.3. Expected no results, got %d.", len(actual))
|
t.Fatalf("5.3. Expected no results, got %d.", len(actual))
|
||||||
}
|
}
|
||||||
lastSample := samples[len(samples)-1]
|
lastSample := samples[len(samples)-1]
|
||||||
actual = it.GetRangeValues(metric.Interval{
|
actual = it.RangeValues(metric.Interval{
|
||||||
OldestInclusive: lastSample.Timestamp + 2,
|
OldestInclusive: lastSample.Timestamp + 2,
|
||||||
NewestInclusive: lastSample.Timestamp + 4,
|
NewestInclusive: lastSample.Timestamp + 4,
|
||||||
})
|
})
|
||||||
|
@ -471,16 +544,61 @@ func testGetRangeValues(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetRangeValuesChunkType0(t *testing.T) {
|
func TestRangeValuesChunkType0(t *testing.T) {
|
||||||
testGetRangeValues(t, 0)
|
testRangeValues(t, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetRangeValuesChunkType1(t *testing.T) {
|
func TestRangeValuesChunkType1(t *testing.T) {
|
||||||
testGetRangeValues(t, 1)
|
testRangeValues(t, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
|
||||||
|
samples := make(clientmodel.Samples, 10000)
|
||||||
|
for i := range samples {
|
||||||
|
samples[i] = &clientmodel.Sample{
|
||||||
|
Timestamp: clientmodel.Timestamp(2 * i),
|
||||||
|
Value: clientmodel.SampleValue(float64(i) * 0.2),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s, closer := NewTestStorage(b, encoding)
|
||||||
|
defer closer.Close()
|
||||||
|
|
||||||
|
for _, sample := range samples {
|
||||||
|
s.Append(sample)
|
||||||
|
}
|
||||||
|
s.WaitForIndexing()
|
||||||
|
|
||||||
|
fp := clientmodel.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
|
||||||
|
it := s.NewIterator(fp)
|
||||||
|
|
||||||
|
for _, sample := range samples {
|
||||||
|
actual := it.RangeValues(metric.Interval{
|
||||||
|
OldestInclusive: sample.Timestamp - 20,
|
||||||
|
NewestInclusive: sample.Timestamp + 20,
|
||||||
|
})
|
||||||
|
|
||||||
|
if len(actual) < 10 {
|
||||||
|
b.Fatalf("not enough samples found")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRangeValuesChunkType0(b *testing.B) {
|
||||||
|
benchmarkRangeValues(b, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkRangeValuesChunkType1(b *testing.B) {
|
||||||
|
benchmarkRangeValues(b, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
samples := make(clientmodel.Samples, 1000)
|
samples := make(clientmodel.Samples, 10000)
|
||||||
for i := range samples {
|
for i := range samples {
|
||||||
samples[i] = &clientmodel.Sample{
|
samples[i] = &clientmodel.Sample{
|
||||||
Timestamp: clientmodel.Timestamp(2 * i),
|
Timestamp: clientmodel.Timestamp(2 * i),
|
||||||
|
@ -498,29 +616,29 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
fp := clientmodel.Metric{}.FastFingerprint()
|
fp := clientmodel.Metric{}.FastFingerprint()
|
||||||
|
|
||||||
// Drop ~half of the chunks.
|
// Drop ~half of the chunks.
|
||||||
s.maintainMemorySeries(fp, 1000)
|
s.maintainMemorySeries(fp, 10000)
|
||||||
it := s.NewIterator(fp)
|
it := s.NewIterator(fp)
|
||||||
actual := it.GetBoundaryValues(metric.Interval{
|
actual := it.BoundaryValues(metric.Interval{
|
||||||
OldestInclusive: 0,
|
OldestInclusive: 0,
|
||||||
NewestInclusive: 10000,
|
NewestInclusive: 100000,
|
||||||
})
|
})
|
||||||
if len(actual) != 2 {
|
if len(actual) != 2 {
|
||||||
t.Fatal("expected two results after purging half of series")
|
t.Fatal("expected two results after purging half of series")
|
||||||
}
|
}
|
||||||
if actual[0].Timestamp < 600 || actual[0].Timestamp > 1000 {
|
if actual[0].Timestamp < 6000 || actual[0].Timestamp > 10000 {
|
||||||
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
|
t.Errorf("1st timestamp out of expected range: %v", actual[0].Timestamp)
|
||||||
}
|
}
|
||||||
want := clientmodel.Timestamp(1998)
|
want := clientmodel.Timestamp(19998)
|
||||||
if actual[1].Timestamp != want {
|
if actual[1].Timestamp != want {
|
||||||
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
|
t.Errorf("2nd timestamp: want %v, got %v", want, actual[1].Timestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop everything.
|
// Drop everything.
|
||||||
s.maintainMemorySeries(fp, 10000)
|
s.maintainMemorySeries(fp, 100000)
|
||||||
it = s.NewIterator(fp)
|
it = s.NewIterator(fp)
|
||||||
actual = it.GetBoundaryValues(metric.Interval{
|
actual = it.BoundaryValues(metric.Interval{
|
||||||
OldestInclusive: 0,
|
OldestInclusive: 0,
|
||||||
NewestInclusive: 10000,
|
NewestInclusive: 100000,
|
||||||
})
|
})
|
||||||
if len(actual) != 0 {
|
if len(actual) != 0 {
|
||||||
t.Fatal("expected zero results after purging the whole series")
|
t.Fatal("expected zero results after purging the whole series")
|
||||||
|
@ -558,7 +676,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop ~half of the chunks of an archived series.
|
// Drop ~half of the chunks of an archived series.
|
||||||
s.maintainArchivedSeries(fp, 1000)
|
s.maintainArchivedSeries(fp, 10000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -568,7 +686,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop everything.
|
// Drop everything.
|
||||||
s.maintainArchivedSeries(fp, 10000)
|
s.maintainArchivedSeries(fp, 100000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -625,7 +743,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
|
||||||
|
|
||||||
// This will archive again, but must not drop it completely, despite the
|
// This will archive again, but must not drop it completely, despite the
|
||||||
// memorySeries being empty.
|
// memorySeries being empty.
|
||||||
s.maintainMemorySeries(fp, 1000)
|
s.maintainMemorySeries(fp, 10000)
|
||||||
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
archived, _, _, err = s.persistence.hasArchivedMetric(fp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -685,7 +803,7 @@ func testFuzz(t *testing.T, encoding chunkEncoding) {
|
||||||
s, c := NewTestStorage(t, encoding)
|
s, c := NewTestStorage(t, encoding)
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
|
||||||
samples := createRandomSamples("test_fuzz", 1000)
|
samples := createRandomSamples("test_fuzz", 10000)
|
||||||
for _, sample := range samples {
|
for _, sample := range samples {
|
||||||
s.Append(sample)
|
s.Append(sample)
|
||||||
}
|
}
|
||||||
|
@ -917,7 +1035,7 @@ func verifyStorage(t testing.TB, s *memorySeriesStorage, samples clientmodel.Sam
|
||||||
}
|
}
|
||||||
p := s.NewPreloader()
|
p := s.NewPreloader()
|
||||||
p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour)
|
p.PreloadRange(fp, sample.Timestamp, sample.Timestamp, time.Hour)
|
||||||
found := s.NewIterator(fp).GetValueAtTime(sample.Timestamp)
|
found := s.NewIterator(fp).ValueAtTime(sample.Timestamp)
|
||||||
if len(found) != 1 {
|
if len(found) != 1 {
|
||||||
t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found))
|
t.Errorf("Sample %#v: Expected exactly one value, found %d.", sample, len(found))
|
||||||
result = false
|
result = false
|
||||||
|
|
|
@ -156,7 +156,7 @@ func (serv MetricsService) Metrics(w http.ResponseWriter, r *http.Request) {
|
||||||
setAccessControlHeaders(w)
|
setAccessControlHeaders(w)
|
||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
metricNames := serv.Storage.GetLabelValuesForLabelName(clientmodel.MetricNameLabel)
|
metricNames := serv.Storage.LabelValuesForLabelName(clientmodel.MetricNameLabel)
|
||||||
sort.Sort(metricNames)
|
sort.Sort(metricNames)
|
||||||
resultBytes, err := json.Marshal(metricNames)
|
resultBytes, err := json.Marshal(metricNames)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in a new issue