mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge pull request #14438 from prometheus/cedwards/ooo-chunk-encoding
tsdb: Add support for handling multiple chunks in OOO head
This commit is contained in:
commit
a28d1974b4
|
@ -848,10 +848,11 @@ func (a *headAppender) Commit() (err error) {
|
||||||
|
|
||||||
inOrderMint int64 = math.MaxInt64
|
inOrderMint int64 = math.MaxInt64
|
||||||
inOrderMaxt int64 = math.MinInt64
|
inOrderMaxt int64 = math.MinInt64
|
||||||
ooomint int64 = math.MaxInt64
|
oooMinT int64 = math.MaxInt64
|
||||||
ooomaxt int64 = math.MinInt64
|
oooMaxT int64 = math.MinInt64
|
||||||
wblSamples []record.RefSample
|
wblSamples []record.RefSample
|
||||||
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
|
oooMmapMarkers map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef
|
||||||
|
oooMmapMarkersCount int
|
||||||
oooRecords [][]byte
|
oooRecords [][]byte
|
||||||
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
|
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
|
||||||
series *memSeries
|
series *memSeries
|
||||||
|
@ -872,6 +873,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
// WBL is not enabled. So no need to collect.
|
// WBL is not enabled. So no need to collect.
|
||||||
wblSamples = nil
|
wblSamples = nil
|
||||||
oooMmapMarkers = nil
|
oooMmapMarkers = nil
|
||||||
|
oooMmapMarkersCount = 0
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// The m-map happens before adding a new sample. So we collect
|
// The m-map happens before adding a new sample. So we collect
|
||||||
|
@ -880,13 +882,15 @@ func (a *headAppender) Commit() (err error) {
|
||||||
// WBL Before this Commit(): [old samples before this commit for chunk 1]
|
// WBL Before this Commit(): [old samples before this commit for chunk 1]
|
||||||
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
|
// WBL After this Commit(): [old samples before this commit for chunk 1][new samples in this commit for chunk 1]mmapmarker1[samples for chunk 2]mmapmarker2[samples for chunk 3]
|
||||||
if oooMmapMarkers != nil {
|
if oooMmapMarkers != nil {
|
||||||
markers := make([]record.RefMmapMarker, 0, len(oooMmapMarkers))
|
markers := make([]record.RefMmapMarker, 0, oooMmapMarkersCount)
|
||||||
for ref, mmapRef := range oooMmapMarkers {
|
for ref, mmapRefs := range oooMmapMarkers {
|
||||||
|
for _, mmapRef := range mmapRefs {
|
||||||
markers = append(markers, record.RefMmapMarker{
|
markers = append(markers, record.RefMmapMarker{
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
MmapRef: mmapRef,
|
MmapRef: mmapRef,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
|
r := enc.MmapMarkers(markers, a.head.getBytesBuffer())
|
||||||
oooRecords = append(oooRecords, r)
|
oooRecords = append(oooRecords, r)
|
||||||
}
|
}
|
||||||
|
@ -928,32 +932,39 @@ func (a *headAppender) Commit() (err error) {
|
||||||
case oooSample:
|
case oooSample:
|
||||||
// Sample is OOO and OOO handling is enabled
|
// Sample is OOO and OOO handling is enabled
|
||||||
// and the delta is within the OOO tolerance.
|
// and the delta is within the OOO tolerance.
|
||||||
var mmapRef chunks.ChunkDiskMapperRef
|
var mmapRefs []chunks.ChunkDiskMapperRef
|
||||||
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
|
ok, chunkCreated, mmapRefs = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
|
||||||
if chunkCreated {
|
if chunkCreated {
|
||||||
r, ok := oooMmapMarkers[series.ref]
|
r, ok := oooMmapMarkers[series.ref]
|
||||||
if !ok || r != 0 {
|
if !ok || r != nil {
|
||||||
// !ok means there are no markers collected for these samples yet. So we first flush the samples
|
// !ok means there are no markers collected for these samples yet. So we first flush the samples
|
||||||
// before setting this m-map marker.
|
// before setting this m-map marker.
|
||||||
|
|
||||||
// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
|
// r != nil means we have already m-mapped a chunk for this series in the same Commit().
|
||||||
// Hence, before we m-map again, we should add the samples and m-map markers
|
// Hence, before we m-map again, we should add the samples and m-map markers
|
||||||
// seen till now to the WBL records.
|
// seen till now to the WBL records.
|
||||||
collectOOORecords()
|
collectOOORecords()
|
||||||
}
|
}
|
||||||
|
|
||||||
if oooMmapMarkers == nil {
|
if oooMmapMarkers == nil {
|
||||||
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
|
oooMmapMarkers = make(map[chunks.HeadSeriesRef][]chunks.ChunkDiskMapperRef)
|
||||||
|
}
|
||||||
|
if len(mmapRefs) > 0 {
|
||||||
|
oooMmapMarkers[series.ref] = mmapRefs
|
||||||
|
oooMmapMarkersCount += len(mmapRefs)
|
||||||
|
} else {
|
||||||
|
// No chunk was written to disk, so we need to set an initial marker for this series.
|
||||||
|
oooMmapMarkers[series.ref] = []chunks.ChunkDiskMapperRef{0}
|
||||||
|
oooMmapMarkersCount++
|
||||||
}
|
}
|
||||||
oooMmapMarkers[series.ref] = mmapRef
|
|
||||||
}
|
}
|
||||||
if ok {
|
if ok {
|
||||||
wblSamples = append(wblSamples, s)
|
wblSamples = append(wblSamples, s)
|
||||||
if s.T < ooomint {
|
if s.T < oooMinT {
|
||||||
ooomint = s.T
|
oooMinT = s.T
|
||||||
}
|
}
|
||||||
if s.T > ooomaxt {
|
if s.T > oooMaxT {
|
||||||
ooomaxt = s.T
|
oooMaxT = s.T
|
||||||
}
|
}
|
||||||
floatOOOAccepted++
|
floatOOOAccepted++
|
||||||
} else {
|
} else {
|
||||||
|
@ -1053,7 +1064,7 @@ func (a *headAppender) Commit() (err error) {
|
||||||
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
|
a.head.metrics.samplesAppended.WithLabelValues(sampleMetricTypeHistogram).Add(float64(histogramsAppended))
|
||||||
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted))
|
a.head.metrics.outOfOrderSamplesAppended.WithLabelValues(sampleMetricTypeFloat).Add(float64(floatOOOAccepted))
|
||||||
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
|
a.head.updateMinMaxTime(inOrderMint, inOrderMaxt)
|
||||||
a.head.updateMinOOOMaxOOOTime(ooomint, ooomaxt)
|
a.head.updateMinOOOMaxOOOTime(oooMinT, oooMaxT)
|
||||||
|
|
||||||
collectOOORecords()
|
collectOOORecords()
|
||||||
if a.head.wbl != nil {
|
if a.head.wbl != nil {
|
||||||
|
@ -1069,14 +1080,14 @@ func (a *headAppender) Commit() (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert is like append, except it inserts. Used for OOO samples.
|
// insert is like append, except it inserts. Used for OOO samples.
|
||||||
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
|
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRefs []chunks.ChunkDiskMapperRef) {
|
||||||
if s.ooo == nil {
|
if s.ooo == nil {
|
||||||
s.ooo = &memSeriesOOOFields{}
|
s.ooo = &memSeriesOOOFields{}
|
||||||
}
|
}
|
||||||
c := s.ooo.oooHeadChunk
|
c := s.ooo.oooHeadChunk
|
||||||
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
|
if c == nil || c.chunk.NumSamples() == int(oooCapMax) {
|
||||||
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
|
// Note: If no new samples come in then we rely on compaction to clean up stale in-memory OOO chunks.
|
||||||
c, mmapRef = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
|
c, mmapRefs = s.cutNewOOOHeadChunk(t, chunkDiskMapper)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1089,7 +1100,7 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
|
||||||
c.maxTime = t
|
c.maxTime = t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ok, chunkCreated, mmapRef
|
return ok, chunkCreated, mmapRefs
|
||||||
}
|
}
|
||||||
|
|
||||||
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
|
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
|
||||||
|
@ -1431,7 +1442,7 @@ func (s *memSeries) cutNewHeadChunk(mint int64, e chunkenc.Encoding, chunkRange
|
||||||
|
|
||||||
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
// cutNewOOOHeadChunk cuts a new OOO chunk and m-maps the old chunk.
|
||||||
// The caller must ensure that s.ooo is not nil.
|
// The caller must ensure that s.ooo is not nil.
|
||||||
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, chunks.ChunkDiskMapperRef) {
|
func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) (*oooHeadChunk, []chunks.ChunkDiskMapperRef) {
|
||||||
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
|
ref := s.mmapCurrentOOOHeadChunk(chunkDiskMapper)
|
||||||
|
|
||||||
s.ooo.oooHeadChunk = &oooHeadChunk{
|
s.ooo.oooHeadChunk = &oooHeadChunk{
|
||||||
|
@ -1443,21 +1454,29 @@ func (s *memSeries) cutNewOOOHeadChunk(mint int64, chunkDiskMapper *chunks.Chunk
|
||||||
return s.ooo.oooHeadChunk, ref
|
return s.ooo.oooHeadChunk, ref
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) chunks.ChunkDiskMapperRef {
|
func (s *memSeries) mmapCurrentOOOHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) []chunks.ChunkDiskMapperRef {
|
||||||
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
|
if s.ooo == nil || s.ooo.oooHeadChunk == nil {
|
||||||
// There is no head chunk, so nothing to m-map here.
|
// OOO is not enabled or there is no head chunk, so nothing to m-map here.
|
||||||
return 0
|
return nil
|
||||||
}
|
}
|
||||||
xor, _ := s.ooo.oooHeadChunk.chunk.ToXOR() // Encode to XorChunk which is more compact and implements all of the needed functionality.
|
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||||
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, xor, true, handleChunkWriteError)
|
if err != nil {
|
||||||
|
handleChunkWriteError(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
chunkRefs := make([]chunks.ChunkDiskMapperRef, 0, 1)
|
||||||
|
for _, memchunk := range chks {
|
||||||
|
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.ooo.oooHeadChunk.minTime, s.ooo.oooHeadChunk.maxTime, memchunk.chunk, true, handleChunkWriteError)
|
||||||
|
chunkRefs = append(chunkRefs, chunkRef)
|
||||||
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
|
s.ooo.oooMmappedChunks = append(s.ooo.oooMmappedChunks, &mmappedChunk{
|
||||||
ref: chunkRef,
|
ref: chunkRef,
|
||||||
numSamples: uint16(xor.NumSamples()),
|
numSamples: uint16(memchunk.chunk.NumSamples()),
|
||||||
minTime: s.ooo.oooHeadChunk.minTime,
|
minTime: memchunk.minTime,
|
||||||
maxTime: s.ooo.oooHeadChunk.maxTime,
|
maxTime: memchunk.maxTime,
|
||||||
})
|
})
|
||||||
|
}
|
||||||
s.ooo.oooHeadChunk = nil
|
s.ooo.oooHeadChunk = nil
|
||||||
return chunkRef
|
return chunkRefs
|
||||||
}
|
}
|
||||||
|
|
||||||
// mmapChunks will m-map all but first chunk on s.headChunks list.
|
// mmapChunks will m-map all but first chunk on s.headChunks list.
|
||||||
|
|
|
@ -4730,6 +4730,14 @@ func TestChunkSnapshotTakenAfterIncompleteSnapshot(t *testing.T) {
|
||||||
|
|
||||||
// TestWBLReplay checks the replay at a low level.
|
// TestWBLReplay checks the replay at a low level.
|
||||||
func TestWBLReplay(t *testing.T) {
|
func TestWBLReplay(t *testing.T) {
|
||||||
|
for name, scenario := range sampleTypeScenarios {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
testWBLReplay(t, scenario)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testWBLReplay(t *testing.T, scenario sampleTypeScenario) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, wlog.CompressionSnappy)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -4745,11 +4753,11 @@ func TestWBLReplay(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, h.Init(0))
|
require.NoError(t, h.Init(0))
|
||||||
|
|
||||||
var expOOOSamples []sample
|
var expOOOSamples []chunks.Sample
|
||||||
l := labels.FromStrings("foo", "bar")
|
l := labels.FromStrings("foo", "bar")
|
||||||
appendSample := func(mins int64, isOOO bool) {
|
appendSample := func(mins int64, val float64, isOOO bool) {
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
ts, v := mins*time.Minute.Milliseconds(), float64(mins)
|
ts, v := mins*time.Minute.Milliseconds(), val
|
||||||
_, err := app.Append(0, l, ts, v)
|
_, err := app.Append(0, l, ts, v)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, app.Commit())
|
require.NoError(t, app.Commit())
|
||||||
|
@ -4760,15 +4768,15 @@ func TestWBLReplay(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// In-order sample.
|
// In-order sample.
|
||||||
appendSample(60, false)
|
appendSample(60, 60, false)
|
||||||
|
|
||||||
// Out of order samples.
|
// Out of order samples.
|
||||||
appendSample(40, true)
|
appendSample(40, 40, true)
|
||||||
appendSample(35, true)
|
appendSample(35, 35, true)
|
||||||
appendSample(50, true)
|
appendSample(50, 50, true)
|
||||||
appendSample(55, true)
|
appendSample(55, 55, true)
|
||||||
appendSample(59, true)
|
appendSample(59, 59, true)
|
||||||
appendSample(31, true)
|
appendSample(31, 31, true)
|
||||||
|
|
||||||
// Check that Head's time ranges are set properly.
|
// Check that Head's time ranges are set properly.
|
||||||
require.Equal(t, 60*time.Minute.Milliseconds(), h.MinTime())
|
require.Equal(t, 60*time.Minute.Milliseconds(), h.MinTime())
|
||||||
|
@ -4792,22 +4800,23 @@ func TestWBLReplay(t *testing.T) {
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
require.NotNil(t, ms)
|
require.NotNil(t, ms)
|
||||||
|
|
||||||
xor, err := ms.ooo.oooHeadChunk.chunk.ToXOR()
|
chks, err := ms.ooo.oooHeadChunk.chunk.ToEncodedChunks(math.MinInt64, math.MaxInt64)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
require.Len(t, chks, 1)
|
||||||
|
|
||||||
it := xor.Iterator(nil)
|
it := chks[0].chunk.Iterator(nil)
|
||||||
actOOOSamples := make([]sample, 0, len(expOOOSamples))
|
actOOOSamples, err := storage.ExpandSamples(it, nil)
|
||||||
for it.Next() == chunkenc.ValFloat {
|
require.NoError(t, err)
|
||||||
ts, v := it.At()
|
|
||||||
actOOOSamples = append(actOOOSamples, sample{t: ts, f: v})
|
|
||||||
}
|
|
||||||
|
|
||||||
// OOO chunk will be sorted. Hence sort the expected samples.
|
// OOO chunk will be sorted. Hence sort the expected samples.
|
||||||
sort.Slice(expOOOSamples, func(i, j int) bool {
|
sort.Slice(expOOOSamples, func(i, j int) bool {
|
||||||
return expOOOSamples[i].t < expOOOSamples[j].t
|
return expOOOSamples[i].T() < expOOOSamples[j].T()
|
||||||
})
|
})
|
||||||
|
|
||||||
require.Equal(t, expOOOSamples, actOOOSamples)
|
// Passing in true for the 'ignoreCounterResets' parameter prevents differences in counter reset headers
|
||||||
|
// from being factored in to the sample comparison
|
||||||
|
// TODO(fionaliao): understand counter reset behaviour, might want to modify this later
|
||||||
|
requireEqualSamples(t, l.String(), expOOOSamples, actOOOSamples, true)
|
||||||
|
|
||||||
require.NoError(t, h.Close())
|
require.NoError(t, h.Close())
|
||||||
}
|
}
|
||||||
|
|
107
tsdb/ooo_head.go
107
tsdb/ooo_head.go
|
@ -17,9 +17,10 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
|
||||||
"github.com/oklog/ulid"
|
"github.com/oklog/ulid"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
"github.com/prometheus/prometheus/tsdb/tombstones"
|
"github.com/prometheus/prometheus/tsdb/tombstones"
|
||||||
)
|
)
|
||||||
|
@ -74,24 +75,22 @@ func (o *OOOChunk) NumSamples() int {
|
||||||
return len(o.samples)
|
return len(o.samples)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *OOOChunk) ToXOR() (*chunkenc.XORChunk, error) {
|
// ToEncodedChunks returns chunks with the samples in the OOOChunk.
|
||||||
x := chunkenc.NewXORChunk()
|
//
|
||||||
app, err := x.Appender()
|
//nolint:revive // unexported-return.
|
||||||
if err != nil {
|
func (o *OOOChunk) ToEncodedChunks(mint, maxt int64) (chks []memChunk, err error) {
|
||||||
return nil, err
|
if len(o.samples) == 0 {
|
||||||
}
|
return nil, nil
|
||||||
for _, s := range o.samples {
|
|
||||||
app.Append(s.t, s.f)
|
|
||||||
}
|
|
||||||
return x, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk, error) {
|
|
||||||
x := chunkenc.NewXORChunk()
|
|
||||||
app, err := x.Appender()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
|
// The most common case is that there will be a single chunk, with the same type of samples in it - this is always true for float samples.
|
||||||
|
chks = make([]memChunk, 0, 1)
|
||||||
|
var (
|
||||||
|
cmint int64
|
||||||
|
cmaxt int64
|
||||||
|
chunk chunkenc.Chunk
|
||||||
|
app chunkenc.Appender
|
||||||
|
)
|
||||||
|
prevEncoding := chunkenc.EncNone // Yes we could call the chunk for this, but this is more efficient.
|
||||||
for _, s := range o.samples {
|
for _, s := range o.samples {
|
||||||
if s.t < mint {
|
if s.t < mint {
|
||||||
continue
|
continue
|
||||||
|
@ -99,9 +98,77 @@ func (o *OOOChunk) ToXORBetweenTimestamps(mint, maxt int64) (*chunkenc.XORChunk,
|
||||||
if s.t > maxt {
|
if s.t > maxt {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
app.Append(s.t, s.f)
|
encoding := chunkenc.EncXOR
|
||||||
|
if s.h != nil {
|
||||||
|
encoding = chunkenc.EncHistogram
|
||||||
|
} else if s.fh != nil {
|
||||||
|
encoding = chunkenc.EncFloatHistogram
|
||||||
}
|
}
|
||||||
return x, nil
|
|
||||||
|
// prevApp is the appender for the previous sample.
|
||||||
|
prevApp := app
|
||||||
|
|
||||||
|
if encoding != prevEncoding { // For the first sample, this will always be true as EncNone != EncXOR | EncHistogram | EncFloatHistogram
|
||||||
|
if prevEncoding != chunkenc.EncNone {
|
||||||
|
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||||
|
}
|
||||||
|
cmint = s.t
|
||||||
|
switch encoding {
|
||||||
|
case chunkenc.EncXOR:
|
||||||
|
chunk = chunkenc.NewXORChunk()
|
||||||
|
case chunkenc.EncHistogram:
|
||||||
|
chunk = chunkenc.NewHistogramChunk()
|
||||||
|
case chunkenc.EncFloatHistogram:
|
||||||
|
chunk = chunkenc.NewFloatHistogramChunk()
|
||||||
|
default:
|
||||||
|
chunk = chunkenc.NewXORChunk()
|
||||||
|
}
|
||||||
|
app, err = chunk.Appender()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
switch encoding {
|
||||||
|
case chunkenc.EncXOR:
|
||||||
|
app.Append(s.t, s.f)
|
||||||
|
case chunkenc.EncHistogram:
|
||||||
|
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||||
|
prevHApp, _ := prevApp.(*chunkenc.HistogramAppender)
|
||||||
|
var (
|
||||||
|
newChunk chunkenc.Chunk
|
||||||
|
recoded bool
|
||||||
|
)
|
||||||
|
newChunk, recoded, app, _ = app.AppendHistogram(prevHApp, s.t, s.h, false)
|
||||||
|
if newChunk != nil { // A new chunk was allocated.
|
||||||
|
if !recoded {
|
||||||
|
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||||
|
}
|
||||||
|
chunk = newChunk
|
||||||
|
cmint = s.t
|
||||||
|
}
|
||||||
|
case chunkenc.EncFloatHistogram:
|
||||||
|
// Ignoring ok is ok, since we don't want to compare to the wrong previous appender anyway.
|
||||||
|
prevHApp, _ := prevApp.(*chunkenc.FloatHistogramAppender)
|
||||||
|
var (
|
||||||
|
newChunk chunkenc.Chunk
|
||||||
|
recoded bool
|
||||||
|
)
|
||||||
|
newChunk, recoded, app, _ = app.AppendFloatHistogram(prevHApp, s.t, s.fh, false)
|
||||||
|
if newChunk != nil { // A new chunk was allocated.
|
||||||
|
if !recoded {
|
||||||
|
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||||
|
}
|
||||||
|
chunk = newChunk
|
||||||
|
cmint = s.t
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cmaxt = s.t
|
||||||
|
prevEncoding = encoding
|
||||||
|
}
|
||||||
|
if prevEncoding != chunkenc.EncNone {
|
||||||
|
chks = append(chks, memChunk{chunk, cmint, cmaxt, nil})
|
||||||
|
}
|
||||||
|
return chks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ BlockReader = &OOORangeHead{}
|
var _ BlockReader = &OOORangeHead{}
|
||||||
|
|
|
@ -108,11 +108,19 @@ func (oh *OOOHeadIndexReader) series(ref storage.SeriesRef, builder *labels.Scra
|
||||||
c := s.ooo.oooHeadChunk
|
c := s.ooo.oooHeadChunk
|
||||||
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
|
if c.OverlapsClosedInterval(oh.mint, oh.maxt) && maxMmapRef == 0 {
|
||||||
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
ref := chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.oooHeadChunkID(len(s.ooo.oooMmappedChunks))))
|
||||||
var xor chunkenc.Chunk
|
|
||||||
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
if len(c.chunk.samples) > 0 { // Empty samples happens in tests, at least.
|
||||||
xor, _ = c.chunk.ToXOR() // Ignoring error because it can't fail.
|
chks, err := s.ooo.oooHeadChunk.chunk.ToEncodedChunks(c.minTime, c.maxTime)
|
||||||
|
if err != nil {
|
||||||
|
handleChunkWriteError(err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
for _, chk := range chks {
|
||||||
|
addChunk(c.minTime, c.maxTime, ref, chk.chunk)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var emptyChunk chunkenc.Chunk
|
||||||
|
addChunk(c.minTime, c.maxTime, ref, emptyChunk)
|
||||||
}
|
}
|
||||||
addChunk(c.minTime, c.maxTime, ref, xor)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
|
for i := len(s.ooo.oooMmappedChunks) - 1; i >= 0; i-- {
|
||||||
|
@ -341,14 +349,20 @@ func NewOOOCompactionHead(ctx context.Context, head *Head) (*OOOCompactionHead,
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
mmapRef := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
|
var lastMmapRef chunks.ChunkDiskMapperRef
|
||||||
if mmapRef == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
mmapRefs := ms.mmapCurrentOOOHeadChunk(head.chunkDiskMapper)
|
||||||
|
if len(mmapRefs) == 0 && len(ms.ooo.oooMmappedChunks) > 0 {
|
||||||
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
// Nothing was m-mapped. So take the mmapRef from the existing slice if it exists.
|
||||||
mmapRef = ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref
|
mmapRefs = []chunks.ChunkDiskMapperRef{ms.ooo.oooMmappedChunks[len(ms.ooo.oooMmappedChunks)-1].ref}
|
||||||
}
|
}
|
||||||
seq, off := mmapRef.Unpack()
|
if len(mmapRefs) == 0 {
|
||||||
|
lastMmapRef = 0
|
||||||
|
} else {
|
||||||
|
lastMmapRef = mmapRefs[len(mmapRefs)-1]
|
||||||
|
}
|
||||||
|
seq, off := lastMmapRef.Unpack()
|
||||||
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
|
if seq > lastSeq || (seq == lastSeq && off > lastOff) {
|
||||||
ch.lastMmapRef, lastSeq, lastOff = mmapRef, seq, off
|
ch.lastMmapRef, lastSeq, lastOff = lastMmapRef, seq, off
|
||||||
}
|
}
|
||||||
if len(ms.ooo.oooMmappedChunks) > 0 {
|
if len(ms.ooo.oooMmappedChunks) > 0 {
|
||||||
ch.postings = append(ch.postings, seriesRef)
|
ch.postings = append(ch.postings, seriesRef)
|
||||||
|
|
Loading…
Reference in a new issue