mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Add WAL decoder+loading and benchmarks
This commit is contained in:
parent
0b8c77361e
commit
1dde3b6d31
2
db.go
2
db.go
|
@ -41,7 +41,7 @@ type DB struct {
|
||||||
|
|
||||||
// TODO(fabxc): make configurable
|
// TODO(fabxc): make configurable
|
||||||
const (
|
const (
|
||||||
shardShift = 3
|
shardShift = 4
|
||||||
numShards = 1 << shardShift
|
numShards = 1 << shardShift
|
||||||
maxChunkSize = 1024
|
maxChunkSize = 1024
|
||||||
)
|
)
|
||||||
|
|
17
head.go
17
head.go
|
@ -31,7 +31,7 @@ type HeadBlock struct {
|
||||||
|
|
||||||
// NewHeadBlock creates a new empty head block.
|
// NewHeadBlock creates a new empty head block.
|
||||||
func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
|
func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
|
||||||
wal, err := CreateWAL(dir)
|
wal, err := OpenWAL(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -45,6 +45,21 @@ func NewHeadBlock(dir string, baseTime int64) (*HeadBlock, error) {
|
||||||
}
|
}
|
||||||
b.stats.MinTime = baseTime
|
b.stats.MinTime = baseTime
|
||||||
|
|
||||||
|
err = wal.ReadAll(&walHandler{
|
||||||
|
series: func(lset labels.Labels) {
|
||||||
|
b.create(lset.Hash(), lset)
|
||||||
|
},
|
||||||
|
sample: func(s hashedSample) {
|
||||||
|
if err := b.descs[s.ref].append(s.t, s.v); err != nil {
|
||||||
|
panic(err) // TODO(fabxc): cannot actually error
|
||||||
|
}
|
||||||
|
b.stats.SampleCount++
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
205
wal.go
205
wal.go
|
@ -10,6 +10,7 @@ import (
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/fileutil"
|
"github.com/coreos/etcd/pkg/fileutil"
|
||||||
"github.com/fabxc/tsdb/labels"
|
"github.com/fabxc/tsdb/labels"
|
||||||
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// WALEntryType indicates what data a WAL entry contains.
|
// WALEntryType indicates what data a WAL entry contains.
|
||||||
|
@ -31,20 +32,28 @@ type WAL struct {
|
||||||
symbols map[string]uint32
|
symbols map[string]uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateWAL creates a new write ahead log in the given directory.
|
// OpenWAL opens or creates a write ahead log in the given directory.
|
||||||
func CreateWAL(dir string) (*WAL, error) {
|
// The WAL must be read completely before new data is written.
|
||||||
|
func OpenWAL(dir string) (*WAL, error) {
|
||||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
p := filepath.Join(dir, "wal")
|
p := filepath.Join(dir, "wal")
|
||||||
|
|
||||||
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
f, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
if !os.IsNotExist(err) {
|
||||||
}
|
return nil, err
|
||||||
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
}
|
||||||
return nil, err
|
|
||||||
|
f, err = fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
|
@ -55,6 +64,27 @@ func CreateWAL(dir string) (*WAL, error) {
|
||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type walHandler struct {
|
||||||
|
sample func(hashedSample)
|
||||||
|
series func(labels.Labels)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WAL) ReadAll(h *walHandler) error {
|
||||||
|
dec := &walDecoder{
|
||||||
|
r: w.f,
|
||||||
|
handler: h,
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
if err := dec.entry(); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Log writes a batch of new series labels and samples to the log.
|
// Log writes a batch of new series labels and samples to the log.
|
||||||
func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
|
func (w *WAL) Log(series []labels.Labels, samples []hashedSample) error {
|
||||||
if err := w.enc.encodeSeries(series); err != nil {
|
if err := w.enc.encodeSeries(series); err != nil {
|
||||||
|
@ -78,11 +108,6 @@ func (w *WAL) Close() error {
|
||||||
return w.f.Close()
|
return w.f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenWAL does things.
|
|
||||||
func OpenWAL(dir string) (*WAL, error) {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type walEncoder struct {
|
type walEncoder struct {
|
||||||
w io.Writer
|
w io.Writer
|
||||||
|
|
||||||
|
@ -92,7 +117,7 @@ type walEncoder struct {
|
||||||
func newWALEncoder(w io.Writer) *walEncoder {
|
func newWALEncoder(w io.Writer) *walEncoder {
|
||||||
return &walEncoder{
|
return &walEncoder{
|
||||||
w: w,
|
w: w,
|
||||||
buf: make([]byte, 1024*1024),
|
buf: make([]byte, 0, 1024*1024),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,6 +141,8 @@ func (e *walEncoder) entry(et WALEntryType, flag byte, n int) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.buf = e.buf[:0]
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,35 +155,33 @@ func (e *walEncoder) encodeSeries(series []labels.Labels) error {
|
||||||
if len(series) == 0 {
|
if len(series) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var (
|
|
||||||
b = make([]byte, binary.MaxVarintLen32)
|
b := make([]byte, binary.MaxVarintLen32)
|
||||||
buf = e.buf[:0]
|
|
||||||
)
|
|
||||||
|
|
||||||
for _, lset := range series {
|
for _, lset := range series {
|
||||||
n := binary.PutUvarint(b, uint64(len(lset)))
|
n := binary.PutUvarint(b, uint64(len(lset)))
|
||||||
buf = append(buf, b[:n]...)
|
e.buf = append(e.buf, b[:n]...)
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
n = binary.PutUvarint(b, uint64(len(l.Name)))
|
n = binary.PutUvarint(b, uint64(len(l.Name)))
|
||||||
buf = append(buf, b[:n]...)
|
e.buf = append(e.buf, b[:n]...)
|
||||||
|
e.buf = append(e.buf, l.Name...)
|
||||||
|
|
||||||
n = binary.PutUvarint(b, uint64(len(l.Value)))
|
n = binary.PutUvarint(b, uint64(len(l.Value)))
|
||||||
buf = append(buf, b[:n]...)
|
e.buf = append(e.buf, b[:n]...)
|
||||||
|
e.buf = append(e.buf, l.Value...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.entry(WALEntrySeries, walSeriesSimple, len(buf))
|
return e.entry(WALEntrySeries, walSeriesSimple, len(e.buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *walEncoder) encodeSamples(samples []hashedSample) error {
|
func (e *walEncoder) encodeSamples(samples []hashedSample) error {
|
||||||
if len(samples) == 0 {
|
if len(samples) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var (
|
|
||||||
b = make([]byte, binary.MaxVarintLen64)
|
b := make([]byte, binary.MaxVarintLen64)
|
||||||
buf = e.buf[:0]
|
|
||||||
)
|
|
||||||
|
|
||||||
// Store base timestamp and base reference number of first sample.
|
// Store base timestamp and base reference number of first sample.
|
||||||
// All samples encode their timestamp and ref as delta to those.
|
// All samples encode their timestamp and ref as delta to those.
|
||||||
|
@ -165,27 +190,139 @@ func (e *walEncoder) encodeSamples(samples []hashedSample) error {
|
||||||
first := samples[0]
|
first := samples[0]
|
||||||
|
|
||||||
binary.BigEndian.PutUint32(b, first.ref)
|
binary.BigEndian.PutUint32(b, first.ref)
|
||||||
buf = append(buf, b[:4]...)
|
e.buf = append(e.buf, b[:4]...)
|
||||||
binary.BigEndian.PutUint64(b, uint64(first.t))
|
binary.BigEndian.PutUint64(b, uint64(first.t))
|
||||||
buf = append(buf, b[:8]...)
|
e.buf = append(e.buf, b[:8]...)
|
||||||
|
|
||||||
for _, s := range samples {
|
for _, s := range samples {
|
||||||
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
|
n := binary.PutVarint(b, int64(s.ref)-int64(first.ref))
|
||||||
buf = append(buf, b[:n]...)
|
e.buf = append(e.buf, b[:n]...)
|
||||||
|
|
||||||
n = binary.PutVarint(b, s.t-first.t)
|
n = binary.PutVarint(b, s.t-first.t)
|
||||||
buf = append(buf, b[:n]...)
|
e.buf = append(e.buf, b[:n]...)
|
||||||
|
|
||||||
binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
|
binary.BigEndian.PutUint64(b, math.Float64bits(s.v))
|
||||||
buf = append(buf, b[:8]...)
|
e.buf = append(e.buf, b[:8]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.entry(WALEntrySamples, walSamplesSimple, len(buf))
|
return e.entry(WALEntrySamples, walSamplesSimple, len(e.buf))
|
||||||
}
|
}
|
||||||
|
|
||||||
type walDecoder struct {
|
type walDecoder struct {
|
||||||
r io.Reader
|
r io.Reader
|
||||||
|
handler *walHandler
|
||||||
|
|
||||||
handleSeries func(labels.Labels)
|
buf []byte
|
||||||
handleSample func(hashedSample)
|
}
|
||||||
|
|
||||||
|
func newWALDecoer(r io.Reader, h *walHandler) *walDecoder {
|
||||||
|
return &walDecoder{
|
||||||
|
r: r,
|
||||||
|
handler: h,
|
||||||
|
buf: make([]byte, 0, 1024*1024),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *walDecoder) decodeSeries(flag byte, b []byte) error {
|
||||||
|
for len(b) > 0 {
|
||||||
|
l, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return errors.Wrap(errInvalidSize, "number of labels")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
lset := make(labels.Labels, l)
|
||||||
|
|
||||||
|
for i := 0; i < int(l); i++ {
|
||||||
|
nl, n := binary.Uvarint(b)
|
||||||
|
if n < 1 || len(b) < n+int(nl) {
|
||||||
|
return errors.Wrap(errInvalidSize, "label name")
|
||||||
|
}
|
||||||
|
lset[i].Name = string(b[n : n+int(nl)])
|
||||||
|
b = b[n+int(nl):]
|
||||||
|
|
||||||
|
vl, n := binary.Uvarint(b)
|
||||||
|
if n < 1 || len(b) < n+int(vl) {
|
||||||
|
return errors.Wrap(errInvalidSize, "label value")
|
||||||
|
}
|
||||||
|
lset[i].Value = string(b[n : n+int(vl)])
|
||||||
|
b = b[n+int(vl):]
|
||||||
|
}
|
||||||
|
|
||||||
|
d.handler.series(lset)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *walDecoder) decodeSamples(flag byte, b []byte) error {
|
||||||
|
if len(b) < 12 {
|
||||||
|
return errors.Wrap(errInvalidSize, "header length")
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
baseRef = binary.BigEndian.Uint32(b)
|
||||||
|
baseTime = int64(binary.BigEndian.Uint64(b[4:]))
|
||||||
|
)
|
||||||
|
b = b[12:]
|
||||||
|
|
||||||
|
for len(b) > 0 {
|
||||||
|
var smpl hashedSample
|
||||||
|
|
||||||
|
dref, n := binary.Varint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return errors.Wrap(errInvalidSize, "sample ref delta")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
smpl.ref = uint32(int64(baseRef) + dref)
|
||||||
|
|
||||||
|
dtime, n := binary.Varint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return errors.Wrap(errInvalidSize, "sample timestamp delta")
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
smpl.t = baseTime + dtime
|
||||||
|
|
||||||
|
if len(b) < 8 {
|
||||||
|
return errors.Wrapf(errInvalidSize, "sample value bits %d", len(b))
|
||||||
|
}
|
||||||
|
smpl.v = float64(math.Float64frombits(binary.BigEndian.Uint64(b)))
|
||||||
|
b = b[8:]
|
||||||
|
|
||||||
|
d.handler.sample(smpl)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *walDecoder) entry() error {
|
||||||
|
b := make([]byte, 6)
|
||||||
|
if _, err := d.r.Read(b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
etype = WALEntryType(b[0])
|
||||||
|
flag = b[1]
|
||||||
|
length = int(binary.BigEndian.Uint32(b[2:]))
|
||||||
|
)
|
||||||
|
|
||||||
|
if length > len(d.buf) {
|
||||||
|
d.buf = make([]byte, length)
|
||||||
|
}
|
||||||
|
buf := d.buf[:length]
|
||||||
|
|
||||||
|
if _, err := d.r.Read(buf); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Read away checksum.
|
||||||
|
// TODO(fabxc): verify it
|
||||||
|
if _, err := d.r.Read(b[:4]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch etype {
|
||||||
|
case WALEntrySeries:
|
||||||
|
return d.decodeSeries(flag, buf)
|
||||||
|
case WALEntrySamples:
|
||||||
|
return d.decodeSamples(flag, buf)
|
||||||
|
}
|
||||||
|
|
||||||
|
return errors.Errorf("unknown WAL entry type %q", etype)
|
||||||
}
|
}
|
||||||
|
|
234
wal_test.go
Normal file
234
wal_test.go
Normal file
|
@ -0,0 +1,234 @@
|
||||||
|
package tsdb
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/fabxc/tsdb/labels"
|
||||||
|
dto "github.com/prometheus/client_model/go"
|
||||||
|
"github.com/prometheus/common/expfmt"
|
||||||
|
"github.com/prometheus/common/model"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkWALWrite(b *testing.B) {
|
||||||
|
d, err := ioutil.TempDir("", "wal_read_test")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(b, os.RemoveAll(d))
|
||||||
|
}()
|
||||||
|
|
||||||
|
wal, err := OpenWAL(d)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
f, err := os.Open("cmd/tsdb/testdata.1m")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
series, err := readPrometheusLabels(f, b.N)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
var (
|
||||||
|
samples [][]hashedSample
|
||||||
|
ts int64
|
||||||
|
)
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
ts += int64(30000)
|
||||||
|
scrape := make([]hashedSample, 0, len(series))
|
||||||
|
|
||||||
|
for ref := range series {
|
||||||
|
scrape = append(scrape, hashedSample{
|
||||||
|
ref: uint32(ref),
|
||||||
|
t: ts,
|
||||||
|
v: 12345788,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
samples = append(samples, scrape)
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
err = wal.Log(series, samples[0])
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
for _, s := range samples[1:] {
|
||||||
|
err = wal.Log(nil, s)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(b, wal.Close())
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkWALRead(b *testing.B) {
|
||||||
|
f, err := os.Open("cmd/tsdb/testdata.1m")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
series, err := readPrometheusLabels(f, 1000000)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.Run("test", func(b *testing.B) {
|
||||||
|
bseries := series[:b.N]
|
||||||
|
|
||||||
|
d, err := ioutil.TempDir("", "wal_read_test")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(b, os.RemoveAll(d))
|
||||||
|
}()
|
||||||
|
|
||||||
|
wal, err := OpenWAL(d)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
var (
|
||||||
|
samples [][]hashedSample
|
||||||
|
ts int64
|
||||||
|
)
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
ts += int64(30000)
|
||||||
|
scrape := make([]hashedSample, 0, len(bseries))
|
||||||
|
|
||||||
|
for ref := range bseries {
|
||||||
|
scrape = append(scrape, hashedSample{
|
||||||
|
ref: uint32(ref),
|
||||||
|
t: ts,
|
||||||
|
v: 12345788,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
samples = append(samples, scrape)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = wal.Log(bseries, samples[0])
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
for _, s := range samples[1:] {
|
||||||
|
err = wal.Log(nil, s)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(b, wal.Close())
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
wal, err = OpenWAL(d)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
var numSeries, numSamples int
|
||||||
|
|
||||||
|
err = wal.ReadAll(&walHandler{
|
||||||
|
series: func(lset labels.Labels) { numSeries++ },
|
||||||
|
sample: func(smpl hashedSample) { numSamples++ },
|
||||||
|
})
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
stat, _ := wal.f.Stat()
|
||||||
|
fmt.Println("read series", numSeries, "read samples", numSamples, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkWALReadIntoHead(b *testing.B) {
|
||||||
|
f, err := os.Open("cmd/tsdb/testdata.1m")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
series, err := readPrometheusLabels(f, 1000000)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
b.Run("test", func(b *testing.B) {
|
||||||
|
bseries := series[:b.N]
|
||||||
|
|
||||||
|
d, err := ioutil.TempDir("", "wal_read_test")
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(b, os.RemoveAll(d))
|
||||||
|
}()
|
||||||
|
|
||||||
|
wal, err := OpenWAL(d)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
var (
|
||||||
|
samples [][]hashedSample
|
||||||
|
ts int64
|
||||||
|
)
|
||||||
|
for i := 0; i < 300; i++ {
|
||||||
|
ts += int64(30000)
|
||||||
|
scrape := make([]hashedSample, 0, len(bseries))
|
||||||
|
|
||||||
|
for ref := range bseries {
|
||||||
|
scrape = append(scrape, hashedSample{
|
||||||
|
ref: uint32(ref),
|
||||||
|
t: ts,
|
||||||
|
v: 12345788,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
samples = append(samples, scrape)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = wal.Log(bseries, samples[0])
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
for _, s := range samples[1:] {
|
||||||
|
err = wal.Log(nil, s)
|
||||||
|
require.NoError(b, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(b, wal.Close())
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
head, err := NewHeadBlock(d, 0)
|
||||||
|
require.NoError(b, err)
|
||||||
|
|
||||||
|
stat, _ := head.wal.f.Stat()
|
||||||
|
fmt.Println("head block initialized from WAL")
|
||||||
|
fmt.Println("read series", head.stats.SeriesCount, "read samples", head.stats.SampleCount, "wal size", fmt.Sprintf("%.2fMiB", float64(stat.Size())/1024/1024))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
|
||||||
|
dec := expfmt.NewDecoder(r, expfmt.FmtProtoText)
|
||||||
|
|
||||||
|
var mets []model.Metric
|
||||||
|
fps := map[model.Fingerprint]struct{}{}
|
||||||
|
var mf dto.MetricFamily
|
||||||
|
var dups int
|
||||||
|
|
||||||
|
for i := 0; i < n; {
|
||||||
|
if err := dec.Decode(&mf); err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range mf.GetMetric() {
|
||||||
|
met := make(model.Metric, len(m.GetLabel())+1)
|
||||||
|
met["__name__"] = model.LabelValue(mf.GetName())
|
||||||
|
|
||||||
|
for _, l := range m.GetLabel() {
|
||||||
|
met[model.LabelName(l.GetName())] = model.LabelValue(l.GetValue())
|
||||||
|
}
|
||||||
|
if _, ok := fps[met.Fingerprint()]; ok {
|
||||||
|
dups++
|
||||||
|
} else {
|
||||||
|
mets = append(mets, met)
|
||||||
|
fps[met.Fingerprint()] = struct{}{}
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
lbls := make([]labels.Labels, 0, n)
|
||||||
|
|
||||||
|
for _, m := range mets[:n] {
|
||||||
|
lset := make(labels.Labels, 0, len(m))
|
||||||
|
for k, v := range m {
|
||||||
|
lset = append(lset, labels.Label{Name: string(k), Value: string(v)})
|
||||||
|
}
|
||||||
|
lbls = append(lbls, lset)
|
||||||
|
}
|
||||||
|
|
||||||
|
return lbls, nil
|
||||||
|
}
|
Loading…
Reference in a new issue