mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 22:37:27 -08:00
Support multiple chunk files in read path
This commit is contained in:
parent
afa084920c
commit
a3d042b54e
4
block.go
4
block.go
|
@ -130,11 +130,11 @@ func newPersistedBlock(dir string) (*persistedBlock, error) {
|
|||
return nil, errors.Wrap(err, "open index file")
|
||||
}
|
||||
|
||||
sr, err := newSeriesReader(chunksf.b)
|
||||
sr, err := newSeriesReader([][]byte{chunksf.b})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create series reader")
|
||||
}
|
||||
ir, err := newIndexReader(sr, indexf.b)
|
||||
ir, err := newIndexReader(indexf.b)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "create index reader")
|
||||
}
|
||||
|
|
13
head.go
13
head.go
|
@ -364,14 +364,17 @@ type headSeriesReader struct {
|
|||
}
|
||||
|
||||
// Chunk returns the chunk for the reference number.
|
||||
func (h *headSeriesReader) Chunk(ref uint32) (chunks.Chunk, error) {
|
||||
func (h *headSeriesReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||
h.mtx.RLock()
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
si := ref >> 32
|
||||
ci := (ref << 32) >> 32
|
||||
|
||||
c := &safeChunk{
|
||||
Chunk: h.series[ref>>8].chunks[int((ref<<24)>>24)].chunk,
|
||||
s: h.series[ref>>8],
|
||||
i: int((ref << 24) >> 24),
|
||||
Chunk: h.series[si].chunks[ci].chunk,
|
||||
s: h.series[si],
|
||||
i: int(ci),
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
@ -440,7 +443,7 @@ func (h *headIndexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error)
|
|||
metas = append(metas, ChunkMeta{
|
||||
MinTime: c.minTime,
|
||||
MaxTime: c.maxTime,
|
||||
Ref: (ref << 8) | uint32(i),
|
||||
Ref: (uint64(ref) << 32) | uint64(i),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -491,7 +491,7 @@ type chunkSeries struct {
|
|||
|
||||
// chunk is a function that retrieves chunks based on a reference
|
||||
// number contained in the chunk meta information.
|
||||
chunk func(ref uint32) (chunks.Chunk, error)
|
||||
chunk func(ref uint64) (chunks.Chunk, error)
|
||||
}
|
||||
|
||||
func (s *chunkSeries) Labels() labels.Labels {
|
||||
|
|
54
reader.go
54
reader.go
|
@ -13,32 +13,45 @@ import (
|
|||
// SeriesReader provides reading access of serialized time series data.
|
||||
type SeriesReader interface {
|
||||
// Chunk returns the series data chunk with the given reference.
|
||||
Chunk(ref uint32) (chunks.Chunk, error)
|
||||
Chunk(ref uint64) (chunks.Chunk, error)
|
||||
}
|
||||
|
||||
// seriesReader implements a SeriesReader for a serialized byte stream
|
||||
// of series data.
|
||||
type seriesReader struct {
|
||||
// The underlying byte slice holding the encoded series data.
|
||||
b []byte
|
||||
// The underlying bytes holding the encoded series data.
|
||||
bs [][]byte
|
||||
}
|
||||
|
||||
func newSeriesReader(b []byte) (*seriesReader, error) {
|
||||
if len(b) < 4 {
|
||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||
func newSeriesReader(bs [][]byte) (*seriesReader, error) {
|
||||
s := &seriesReader{bs: bs}
|
||||
|
||||
for i, b := range bs {
|
||||
if len(b) < 4 {
|
||||
return nil, errors.Wrapf(errInvalidSize, "validate magic in segment %d", i)
|
||||
}
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
|
||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
}
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicSeries {
|
||||
return nil, fmt.Errorf("invalid magic number %x", m)
|
||||
}
|
||||
return &seriesReader{b: b}, nil
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *seriesReader) Chunk(offset uint32) (chunks.Chunk, error) {
|
||||
if int(offset) > len(s.b) {
|
||||
return nil, errors.Errorf("offset %d beyond data size %d", offset, len(s.b))
|
||||
func (s *seriesReader) Chunk(ref uint64) (chunks.Chunk, error) {
|
||||
var (
|
||||
seq = int(ref >> 32)
|
||||
off = int((ref << 32) >> 32)
|
||||
)
|
||||
if seq >= len(s.bs) {
|
||||
return nil, errors.Errorf("reference sequence %d out of range", seq)
|
||||
}
|
||||
b := s.b[offset:]
|
||||
b := s.bs[seq]
|
||||
|
||||
if int(off) >= len(b) {
|
||||
return nil, errors.Errorf("offset %d beyond data size %d", off, len(b))
|
||||
}
|
||||
b = b[off:]
|
||||
|
||||
l, n := binary.Uvarint(b)
|
||||
if n < 0 {
|
||||
|
@ -78,8 +91,6 @@ type StringTuples interface {
|
|||
}
|
||||
|
||||
type indexReader struct {
|
||||
series SeriesReader
|
||||
|
||||
// The underlying byte slice holding the encoded series data.
|
||||
b []byte
|
||||
|
||||
|
@ -93,14 +104,11 @@ var (
|
|||
errInvalidFlag = fmt.Errorf("invalid flag")
|
||||
)
|
||||
|
||||
func newIndexReader(s SeriesReader, b []byte) (*indexReader, error) {
|
||||
func newIndexReader(b []byte) (*indexReader, error) {
|
||||
if len(b) < 4 {
|
||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||
}
|
||||
r := &indexReader{
|
||||
series: s,
|
||||
b: b,
|
||||
}
|
||||
r := &indexReader{b: b}
|
||||
|
||||
// Verify magic number.
|
||||
if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex {
|
||||
|
@ -299,7 +307,7 @@ func (r *indexReader) Series(ref uint32) (labels.Labels, []ChunkMeta, error) {
|
|||
b = b[n:]
|
||||
|
||||
chunks = append(chunks, ChunkMeta{
|
||||
Ref: uint32(o),
|
||||
Ref: o,
|
||||
MinTime: firstTime,
|
||||
MaxTime: lastTime,
|
||||
})
|
||||
|
|
|
@ -103,7 +103,7 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset labels.Labels, chks []ChunkM
|
|||
for i := range chks {
|
||||
chk := &chks[i]
|
||||
|
||||
chk.Ref = uint32(w.n)
|
||||
chk.Ref = uint64(w.n)
|
||||
|
||||
n = binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))
|
||||
|
||||
|
@ -148,7 +148,7 @@ type ChunkMeta struct {
|
|||
// Ref and Chunk hold either a reference that can be used to retrieve
|
||||
// chunk data or the data itself.
|
||||
// Generally, only one of them is set.
|
||||
Ref uint32
|
||||
Ref uint64
|
||||
Chunk chunks.Chunk
|
||||
|
||||
MinTime, MaxTime int64 // time range the data covers
|
||||
|
|
Loading…
Reference in a new issue