Added TSDB import with OpenMetrics and CSV file support.

Based on https://github.com/prometheus/prometheus/pull/5887 Thanks for your
work so far @dipack95, it helped a lot!

Changes on top of @dipack95:

* Addressed all reviews components
* Use subcommands for different formats
* Simplifed block creation, no need to be such complex for first iteration.
* Simpliefied and separate concerns. No need to have access to DB. Block
* writting is separated as well for ease of benchmarking and test. This will be also
needed by @JessicaGreben
* Added import support for different formats.
* Removed all tests - those had to be pulled over and adjusted ):


Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
Bartlomiej Plotka 2020-07-13 18:33:27 +01:00
parent 575cdf2886
commit 3ac96c7841
12 changed files with 897 additions and 1417 deletions

View file

@ -16,6 +16,7 @@ package textparse
import (
"mime"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -55,8 +56,8 @@ type Parser interface {
// exemplar. It returns if an exemplar exists or not.
Exemplar(l *exemplar.Exemplar) bool
// Next advances the parser to the next sample. It returns false if no
// more samples were read or an error occurred.
// Next advances the parser to the next sample. It returns io.EOF if no
// more samples were read.
Next() (Entry, error)
}
@ -94,3 +95,45 @@ const (
MetricTypeStateset = "stateset"
MetricTypeUnknown = "unknown"
)
func (m *MetricType) ParseForOpenMetrics(mtyp string) error {
switch mtyp {
case "counter":
*m = MetricTypeCounter
case "gauge":
*m = MetricTypeGauge
case "histogram":
*m = MetricTypeHistogram
case "gaugehistogram":
*m = MetricTypeGaugeHistogram
case "summary":
*m = MetricTypeSummary
case "info":
*m = MetricTypeInfo
case "stateset":
*m = MetricTypeStateset
case "unknown":
*m = MetricTypeUnknown
default:
return errors.Errorf("invalid metric type %q", mtyp)
}
return nil
}
func (m *MetricType) ParseForProm(mtyp string) error {
switch mtyp {
case "counter":
*m = MetricTypeCounter
case "gauge":
*m = MetricTypeGauge
case "histogram":
*m = MetricTypeHistogram
case "summary":
*m = MetricTypeSummary
case "unknown":
*m = MetricTypeUnknown
default:
return errors.Errorf("invalid metric type %q", mtyp)
}
return nil
}

View file

@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
}
switch t {
case tType:
switch s := yoloString(p.text); s {
case "counter":
p.mtype = MetricTypeCounter
case "gauge":
p.mtype = MetricTypeGauge
case "histogram":
p.mtype = MetricTypeHistogram
case "gaugehistogram":
p.mtype = MetricTypeGaugeHistogram
case "summary":
p.mtype = MetricTypeSummary
case "info":
p.mtype = MetricTypeInfo
case "stateset":
p.mtype = MetricTypeStateset
case "unknown":
p.mtype = MetricTypeUnknown
default:
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil {
return EntryInvalid, err
}
case tHelp:
if !utf8.Valid(p.text) {

View file

@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) {
}
switch t {
case tType:
switch s := yoloString(p.text); s {
case "counter":
p.mtype = MetricTypeCounter
case "gauge":
p.mtype = MetricTypeGauge
case "histogram":
p.mtype = MetricTypeHistogram
case "summary":
p.mtype = MetricTypeSummary
case "untyped":
p.mtype = MetricTypeUnknown
default:
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil {
return EntryInvalid, err
}
case tHelp:
if !utf8.Valid(p.text) {

View file

@ -34,11 +34,15 @@ import (
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/importer"
"github.com/prometheus/prometheus/tsdb/importer/blocks"
"github.com/prometheus/prometheus/tsdb/importer/csv"
"github.com/prometheus/prometheus/tsdb/importer/openmetrics"
"gopkg.in/alecthomas/kingpin.v2"
)
@ -53,34 +57,43 @@ func execute() (err error) {
var (
defaultDBPath = filepath.Join("benchout", "storage")
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
importCmd = cli.Command("import", "import samples from file containing information formatted in the Open Metrics format. Please refer to the storage docs for more details.")
importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Open Metrics format)").Required().String()
importDbPath = importCmd.Arg("db path", "database path").Required().String()
importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process in a cycle").Default("10000").Int()
importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time").Default("20").Int()
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.")
importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String()
importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String()
importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration()
omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.")
csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.")
csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String()
)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
var merr tsdb_errors.MultiError
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd {
case benchWriteCmd.FullCommand():
wb := &writeBenchmark{
outPath: *benchWriteOutPath,
@ -144,16 +157,39 @@ func execute() (err error) {
err = merr.Err()
}()
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
case importCmd.FullCommand():
f, err := os.Open(*importFilePath)
if err != nil {
return err
case omImportCmd.FullCommand(), csvImportCmd.FullCommand():
input := os.Stdin
if *importFilePath != "" {
input, err = os.Open(*importFilePath)
if err != nil {
return err
}
defer func() {
merr.Add(err)
merr.Add(input.Close())
err = merr.Err()
}()
}
return importer.ImportFromFile(f, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren, logger)
var p textparse.Parser
if cmd == omImportCmd.FullCommand() {
p = openmetrics.NewParser(input)
} else {
if len(*csvImportDelimiter) != 1 {
return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter)
}
p = csv.NewParser(input, []rune(*csvImportDelimiter)[0])
}
return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize)))
}
return nil
}
func durToMillis(t time.Duration) int64 {
return int64(t.Seconds() * 1000)
}
type writeBenchmark struct {
outPath string
samplesFile string

View file

@ -0,0 +1,120 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blocks
import (
"github.com/go-kit/kit/log"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/index"
)
type errAppender struct{ err error }
func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err }
func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err }
func (a errAppender) Commit() error { return a.err }
func (a errAppender) Rollback() error { return a.err }
func rangeForTimestamp(t int64, width int64) (maxt int64) {
return (t/width)*width + width
}
type MultiWriter struct {
blocks map[index.Range]Writer
activeAppenders map[index.Range]storage.Appender
logger log.Logger
dir string
// TODO(bwplotka): Allow more complex compaction levels.
sizeMillis int64
}
func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter {
return &MultiWriter{
logger: logger,
dir: dir,
sizeMillis: sizeMillis,
blocks: map[index.Range]Writer{},
activeAppenders: map[index.Range]storage.Appender{},
}
}
// Appender is not thread-safe. Returned Appender is not thread-save as well.
// TODO(bwplotka): Consider making it thread safe.
func (w *MultiWriter) Appender() storage.Appender { return w }
func (w *MultiWriter) getOrCreate(t int64) storage.Appender {
maxt := rangeForTimestamp(t, w.sizeMillis)
hash := index.Range{Start: maxt - w.sizeMillis, End: maxt}
if a, ok := w.activeAppenders[hash]; ok {
return a
}
nw, err := NewTSDBWriter(w.logger, w.dir)
if err != nil {
return errAppender{err: errors.Wrap(err, "new tsdb writer")}
}
w.blocks[hash] = nw
w.activeAppenders[hash] = nw.Appender()
return w.activeAppenders[hash]
}
func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) {
return w.getOrCreate(t).Add(l, t, v)
}
func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error {
return w.getOrCreate(t).AddFast(ref, t, v)
}
func (w *MultiWriter) Commit() error {
var merr tsdb_errors.MultiError
for _, a := range w.activeAppenders {
merr.Add(a.Commit())
}
return merr.Err()
}
func (w *MultiWriter) Rollback() error {
var merr tsdb_errors.MultiError
for _, a := range w.activeAppenders {
merr.Add(a.Rollback())
}
return merr.Err()
}
func (w *MultiWriter) Flush() ([]ulid.ULID, error) {
ids := make([]ulid.ULID, 0, len(w.blocks))
for _, b := range w.blocks {
id, err := b.Flush()
if err != nil {
return nil, err
}
ids = append(ids, id...)
}
return ids, nil
}
func (w *MultiWriter) Close() error {
var merr tsdb_errors.MultiError
for _, b := range w.blocks {
merr.Add(b.Close())
}
return merr.Err()
}

View file

@ -0,0 +1,139 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blocks
import (
"context"
"io/ioutil"
"math"
"os"
"time"
"github.com/oklog/ulid"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/timestamp"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
// Writer is interface to write time series into Prometheus blocks.
type Writer interface {
storage.Appendable
// Flush writes current data to disk.
// The block or blocks will contain values accumulated by `Write`.
Flush() ([]ulid.ULID, error)
// Close releases all resources. No append is allowed anymore to such writer.
Close() error
}
var _ Writer = &TSDBWriter{}
// Writer is a block writer that allows appending and flushing to disk.
type TSDBWriter struct {
logger log.Logger
dir string
head *tsdb.Head
tmpDir string
}
func durToMillis(t time.Duration) int64 {
return int64(t.Seconds() * 1000)
}
// NewTSDBWriter create new block writer.
//
// The returned writer accumulates all series in memory until `Flush` is called.
//
// Note that the writer will not check if the target directory exists or
// contains anything at all. It is the caller's responsibility to
// ensure that the resulting blocks do not overlap etc.
// Writer ensures the block flush is atomic (via rename).
func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) {
res := &TSDBWriter{
logger: logger,
dir: dir,
}
return res, res.initHead()
}
// initHead creates and initialises new head.
func (w *TSDBWriter) initHead() error {
logger := w.logger
// Keep Registerer and WAL nil as we don't use them.
// Put huge chunkRange; It has to be equal then expected block size.
// Since we don't have info about block size here, set it to large number.
tmpDir, err := ioutil.TempDir(os.TempDir(), "head")
if err != nil {
return errors.Wrap(err, "create temp dir")
}
w.tmpDir = tmpDir
h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), w.tmpDir, nil, tsdb.DefaultStripeSize, nil)
if err != nil {
return errors.Wrap(err, "tsdb.NewHead")
}
w.head = h
return w.head.Init(math.MinInt64)
}
// Appender is not thread-safe. Returned Appender is thread-save however.
func (w *TSDBWriter) Appender() storage.Appender {
return w.head.Appender()
}
// Flush implements Writer interface. This is where actual block writing
// happens. After flush completes, no write can be done.
func (w *TSDBWriter) Flush() ([]ulid.ULID, error) {
seriesCount := w.head.NumSeries()
if w.head.NumSeries() == 0 {
return nil, errors.New("no series appended; aborting.")
}
mint := w.head.MinTime()
maxt := w.head.MaxTime() + 1
level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
// Flush head to disk as a block.
compactor, err := tsdb.NewLeveledCompactor(
context.Background(),
nil,
w.logger,
[]int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning.
chunkenc.NewPool())
if err != nil {
return nil, errors.Wrap(err, "create leveled compactor")
}
id, err := compactor.Write(w.dir, w.head, mint, maxt, nil)
if err != nil {
return nil, errors.Wrap(err, "compactor write")
}
// TODO(bwplotka): Potential truncate head, and allow writer reuse. Currently truncating fails with
// truncate chunks.HeadReadWriter: maxt of the files are not set.
return []ulid.ULID{id}, nil
}
func (w *TSDBWriter) Close() error {
_ = os.RemoveAll(w.tmpDir)
return w.head.Close()
}

271
tsdb/importer/csv/csv.go Normal file
View file

@ -0,0 +1,271 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv
import (
"encoding/csv"
"fmt"
"io"
"sort"
"strconv"
"strings"
"unsafe"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/exemplar"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
)
type HeaderType string
const (
// TODO(bwplotka): Consider accepting different ones like `values` or `timestamps` and pairs of value - timestamp, similar to label name - value.
Value HeaderType = "value"
TimestampMs HeaderType = "timestamp_ms" // Milliseconds from seconds since 1970-01-01 00:00:00 UTC
MetricName HeaderType = "metric_name" // Value of "__name__" label.
LabelName HeaderType = "label_name" // If both labels and metric_name are specified labels are merged. This header can repeat, but label_value is expected as next column.
LabelValue HeaderType = "label_value" // If both labels and metric_name are specified labels are merged. This header can repeat but label_name is expected to be in previous column.
Help HeaderType = "help"
Type HeaderType = "type"
Unit HeaderType = "unit"
ExemplarValue HeaderType = "exemplar_value"
ExemplarTimestampMs HeaderType = "exemplar_timestamp_ms"
)
type recordEntry struct {
labels labels.Labels
help, unit []byte
typ textparse.MetricType
value float64
timestampMs int64
exemplar *exemplar.Exemplar
}
type Parser struct {
cr *csv.Reader
mapping []HeaderType
record recordEntry
}
// NewParser returns new CSV parsers that is able to parse input in the format described in RFC 4180 and expose
// as textparse.Parser. Read https://golang.org/pkg/encoding/csv/ for more details about parsing CSV.
func NewParser(r io.Reader, delimiter rune) textparse.Parser {
cr := csv.NewReader(r)
cr.ReuseRecord = true
cr.Comma = delimiter
return &Parser{cr: cr}
}
// Next advances the parser to the next sample. It returns io.EOF if no
// more samples were read.
func (p *Parser) Next() (textparse.Entry, error) {
for {
record, err := p.cr.Read()
if err != nil {
// io.EOF is returned on the end, and io.EOF is also controlling textparse.Parser.Next
return textparse.EntryInvalid, err
}
if p.mapping == nil {
p.mapping, err = parseHeaders(record)
if err != nil {
return textparse.EntryInvalid, err
}
continue
}
if len(record) != len(p.mapping) {
return textparse.EntryInvalid, errors.Errorf("got different number of fields than defined by headers. Got %v, expected %v", len(record), len(p.mapping))
}
r := recordEntry{}
var l labels.Label
for i, field := range record {
switch p.mapping[i] {
case Value:
r.value, err = parseFloat(field)
if err != nil {
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", Value)
}
case TimestampMs:
r.timestampMs, err = strconv.ParseInt(field, 10, 64)
if err != nil {
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", TimestampMs)
}
case MetricName:
if r.labels.Has(labels.MetricName) {
return textparse.EntryInvalid, errors.Errorf("%q (or %q label name) already was specified", MetricName, labels.MetricName)
}
r.labels = append(r.labels, labels.Label{Name: labels.MetricName, Value: field})
case Help:
r.help = yoloBytes(field)
case Type:
if err := r.typ.ParseForOpenMetrics(field); err != nil {
return textparse.EntryInvalid, err
}
case Unit:
r.unit = yoloBytes(field)
case ExemplarValue, ExemplarTimestampMs:
if field == "" {
continue
}
if r.exemplar == nil {
r.exemplar = &exemplar.Exemplar{}
}
if p.mapping[i] == ExemplarValue {
r.exemplar.Value, err = parseFloat(field)
if err != nil {
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", ExemplarValue)
}
} else {
r.exemplar.Ts, err = strconv.ParseInt(field, 10, 64)
if err != nil {
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", ExemplarTimestampMs)
}
r.exemplar.HasTs = true
}
// Name is required to be before value. On top of that such pair can repeat.
case LabelName:
if field != "" && r.labels.Has(field) {
return textparse.EntryInvalid, errors.Errorf("label name %q already was specified", field)
}
l.Name = field
case LabelValue:
if l.Name == "" {
if field == "" {
continue
}
return textparse.EntryInvalid, errors.Errorf("label value is specified (%q) when previous name was empty", field)
}
if field == "" {
return textparse.EntryInvalid, errors.New("label value cannot be empty if previous name is not empty")
}
l.Value = field
r.labels = append(r.labels, l)
default:
panic(fmt.Sprintf("unknown mapping %q", p.mapping[i]))
}
}
sort.Sort(r.labels)
if r.exemplar != nil {
r.exemplar.Labels = r.labels
}
// TODO(bwplotka): Implement iterating over all type if we have any (help, unit, etc).
p.record = r
return textparse.EntrySeries, nil
}
}
func yoloBytes(s string) []byte {
return *((*[]byte)(unsafe.Pointer(&s)))
}
func parseFloat(s string) (float64, error) {
// Keep to pre-Go 1.13 float formats.
if strings.ContainsAny(s, "pP_") {
return 0, errors.Errorf("unsupported characters in float %v", s)
}
return strconv.ParseFloat(s, 64)
}
func parseHeaders(record []string) (mapping []HeaderType, _ error) {
dedup := map[HeaderType]struct{}{}
expectLabelValue := false
for _, field := range record {
switch f := HeaderType(strings.ToLower(field)); f {
case Value, TimestampMs, MetricName, Help, Type, Unit, ExemplarValue, ExemplarTimestampMs:
if expectLabelValue {
return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName)
}
if _, ok := dedup[f]; ok {
return nil, errors.Errorf("only one header type of %s is allowed", f)
}
dedup[f] = struct{}{}
mapping = append(mapping, f)
// Name is required to be before value. On top of that such pair can repeat.
case LabelName:
if expectLabelValue {
return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName)
}
mapping = append(mapping, f)
expectLabelValue = true
case LabelValue:
if !expectLabelValue {
return nil, errors.Errorf("matching %s header expected before %s", LabelName, LabelValue)
}
mapping = append(mapping, f)
expectLabelValue = false
default:
return nil, errors.Errorf("%q header is not supported", field)
}
}
return mapping, nil
}
// Series returns the bytes of the series, the timestamp if set, and the value
// of the current sample. Timestamps is in milliseconds.
func (p *Parser) Series() ([]byte, *int64, float64) {
return []byte("not-implemented"), &p.record.timestampMs, p.record.value
}
// Help returns the metric name and help text in the current entry.
// Must only be called after Next returned a help entry.
// The returned byte slices become invalid after the next call to Next.
func (p *Parser) Help() ([]byte, []byte) {
return []byte(p.record.labels.Get("__name__")), p.record.help
}
// Type returns the metric name and type in the current entry.
// Must only be called after Next returned a type entry.
// The returned byte slices become invalid after the next call to Next.
func (p *Parser) Type() ([]byte, textparse.MetricType) {
return []byte(p.record.labels.Get("__name__")), p.record.typ
}
// Unit returns the metric name and unit in the current entry.
// Must only be called after Next returned a unit entry.
// The returned byte slices become invalid after the next call to Next.
func (p *Parser) Unit() ([]byte, []byte) {
return []byte(p.record.labels.Get("__name__")), p.record.unit
}
// Comment returns the text of the current comment.
// Must only be called after Next returned a comment entry.
// The returned byte slice becomes invalid after the next call to Next.
func (p *Parser) Comment() []byte { return nil }
// Metric writes the labels of the current sample into the passed labels.
// It returns the string from which the metric was parsed.
func (p *Parser) Metric(l *labels.Labels) string {
*l = append(*l, p.record.labels...)
// Sort labels to maintain the sorted labels invariant.
sort.Sort(*l)
return "not-implemented"
}
// Exemplar writes the exemplar of the current sample into the passed
// exemplar. It returns if an exemplar exists or not.
func (p *Parser) Exemplar(l *exemplar.Exemplar) bool {
if p.record.exemplar == nil {
return false
}
*l = *p.record.exemplar
return true
}

View file

@ -0,0 +1,98 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package csv
import (
"io"
"strings"
"testing"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/util/testutil"
)
type series struct {
// TODO(bwplotka) Assert more stuff once supported.
lbls labels.Labels
samples []sample
}
type sample struct {
t int64
v float64
}
func TestParser(t *testing.T) {
for _, tcase := range []struct {
name string
input string
expectedSeries []series
}{
// TODO(bwplotka): Test all edge cases and all entries.
{
name: "empty",
},
{
name: "just header",
input: `metric_name,label_name,label_value,timestamp_ms,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp_ms`,
},
{
name: "mixed header with data",
input: `metric_name,label_name,label_value,timestamp_ms,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp_ms
metric1,pod,abc-1,1594885435,instance,1,some help,1245214.23423,counter,-0.12,bytes because why not,1
metric1,pod,abc-1,1594885436,instance,1,some help,1.23423,counter,-0.12,bytes because why not,1
metric1,pod,abc-2,1594885432,,,some help2,1245214.23421,gauge,,bytes,
`,
expectedSeries: []series{
{
lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1", "instance", "1"),
samples: []sample{{v: 1245214.23423, t: 1594885435}, {v: 1.23423, t: 1594885436}},
},
{
lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1"),
samples: []sample{{v: 1245214.23421, t: 1594885432}},
},
},
},
} {
t.Run(tcase.name, func(t *testing.T) {
p := NewParser(strings.NewReader(tcase.input), ',')
got := map[uint64]series{}
for {
e, err := p.Next()
if err == io.EOF {
break
}
testutil.Ok(t, err)
// For now expects only series.
testutil.Equals(t, textparse.EntrySeries, e)
l := labels.Labels{}
p.Metric(&l)
s, ok := got[l.Hash()]
if !ok {
got[l.Hash()] = series{lbls: l}
}
_, ts, v := p.Series()
if ts == nil {
t.Fatal("got no timestamps")
}
s.samples = append(s.samples, sample{t: *ts, v: v})
}
})
}
}

86
tsdb/importer/import.go Normal file
View file

@ -0,0 +1,86 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"fmt"
"io"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/importer/blocks"
)
// Import imports data from a textparse Parser into block Writer.
// TODO(bwplotka): textparse interface potentially limits the format to never give multiple samples. Fix this as some formats
// (e.g JSON) might allow that.
// Import takes ownership of given block writer.
func Import(logger log.Logger, p textparse.Parser, w blocks.Writer) (err error) {
if logger == nil {
logger = log.NewNopLogger()
}
level.Info(logger).Log("msg", "started importing input data.")
app := w.Appender()
defer func() {
var merr tsdb_errors.MultiError
merr.Add(err)
merr.Add(w.Close())
err = merr.Err()
}()
var e textparse.Entry
for {
e, err = p.Next()
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "parse")
}
// For now care about series only.
if e != textparse.EntrySeries {
continue
}
// TODO(bwplotka): Avoid allocations using AddFast method and maintaining refs.
l := labels.Labels{}
p.Metric(&l)
_, ts, v := p.Series()
if ts == nil {
return errors.Errorf("expected timestamp for series %v, got none", l.String())
}
if _, err := app.Add(l, *ts, v); err != nil {
return errors.Wrap(err, "add sample")
}
}
level.Info(logger).Log("msg", "no more input data, committing appenders and flushing block(s)")
if err := app.Commit(); err != nil {
return errors.Wrap(err, "commit")
}
ids, err := w.Flush()
if err != nil {
return errors.Wrap(err, "flush")
}
level.Info(logger).Log("msg", "blocks flushed", "ids", fmt.Sprintf("%v", ids))
return nil
}

View file

@ -1,753 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"testing"
labels2 "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/testutil"
)
const (
// We use a lower value for this than the default to test block compaction implicitly.
maxSamplesInMemory = 2000
maxBlockChildren = 10
)
func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) {
// Assert we have expected number of blocks.
testutil.Equals(t, expectedNumBlocks, len(blocks))
allSymbols := make(map[string]struct{})
allSamples := make([]tsdb.MetricSample, 0)
maxt, mint := int64(math.MinInt64), int64(math.MaxInt64)
for _, block := range blocks {
maxt, mint = value.MaxInt64(maxt, block.Meta().MaxTime), value.MinInt64(mint, block.Meta().MinTime)
indexr, err := block.Index()
testutil.Ok(t, err)
symbols := indexr.Symbols()
for symbols.Next() {
key := symbols.At()
if _, ok := allSymbols[key]; !ok {
allSymbols[key] = struct{}{}
}
}
blockSamples, err := readSeries(block, labels2.FromStrings(metricLabels...))
testutil.Ok(t, err)
allSamples = append(allSamples, blockSamples...)
_ = indexr.Close()
}
allSymbolsSlice := make([]string, 0)
for key := range allSymbols {
allSymbolsSlice = append(allSymbolsSlice, key)
}
sort.Strings(allSymbolsSlice)
sort.Strings(expectedSymbols)
// Assert that all symbols that we expect to find are present.
testutil.Equals(t, expectedSymbols, allSymbolsSlice)
sortSamples(allSamples)
sortSamples(expectedSamples)
// Assert that all samples that we imported + what existed already are present.
testutil.Assert(t, len(allSamples) == len(expectedSamples), "number of expected samples is different from actual samples")
testutil.Equals(t, expectedSamples, allSamples, "actual samples are different from expected samples")
// Assert that DB's time ranges are as specified.
testutil.Equals(t, expectedMaxt, maxt)
testutil.Equals(t, expectedMint, mint)
}
func sortSamples(samples []tsdb.MetricSample) {
sort.Slice(samples, func(x, y int) bool {
sx, sy := samples[x], samples[y]
// If timestamps are equal, sort based on values.
if sx.Timestamp != sy.Timestamp {
return sx.Timestamp < sy.Timestamp
}
return sx.Value < sy.Value
})
}
// readSeries returns all series present in the block, that contain the labels we want.
// The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them,
// we just need to pass the common labels, and we will get all the metrics that have them,
// even if the other labels b/w the metrics are different.
func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSample, error) {
series := make([]tsdb.MetricSample, 0)
ir, err := block.Index()
if err != nil {
return series, err
}
defer ir.Close()
tsr, err := block.Tombstones()
if err != nil {
return series, err
}
defer tsr.Close()
chunkr, err := block.Chunks()
if err != nil {
return series, err
}
defer chunkr.Close()
for _, lbl := range lbls {
css, err := tsdb.LookupChunkSeries(ir, tsr, labels2.MustNewMatcher(labels2.MatchEqual, lbl.Name, lbl.Value))
if err != nil {
return series, err
}
for css.Next() {
actLabels, chkMetas, _ := css.At()
var chunkIter chunkenc.Iterator
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
if err != nil {
return series, err
}
chunkIter = chk.Iterator(chunkIter)
for chunkIter.Next() {
t, v := chunkIter.At()
sample := tsdb.MetricSample{
Timestamp: t,
Value: v,
Labels: actLabels,
}
series = append(series, sample)
}
}
}
}
return series, nil
}
// genSeries generates series from mint to maxt, with a step value.
func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample {
series := make([]tsdb.MetricSample, 0)
for idx := mint; idx < maxt; idx += int64(step) {
// Round to 3 places.
val := math.Floor(rand.Float64()*1000) / 1000
sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels2.FromStrings(labels...)}
series = append(series, sample)
}
return series
}
// labelsToStr converts the given labels to a string representation that is compliant with
// the OpenMetrics parser.
func labelsToStr(labels labels2.Labels) string {
str := "{"
for idx, l := range labels {
str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value))
if idx < len(labels)-1 {
str += ","
}
}
str += "}"
return str
}
// genOpenMetricsText formats the given series data into OpenMetrics compliant format.
func genOpenMetricsText(metricName, metricType string, series []tsdb.MetricSample) string {
str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType)
for _, s := range series {
str += fmt.Sprintf("\n%s%s %f %d", metricName, labelsToStr(s.Labels), s.Value, s.Timestamp)
}
str += fmt.Sprintf("\n# EOF")
return str
}
func TestImport(t *testing.T) {
tests := []struct {
ToParse string
IsOk bool
MetricLabels []string
Expected struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}
}{
{
ToParse: `# EOF`,
IsOk: true,
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1021 1565133713989
http_requests_total{code="400"} 1 1565133713990
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1565133713989,
MaxTime: 1565133713991,
NumBlocks: 1,
Symbols: []string{"__name__", "http_requests_total", "code", "200", "400"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1565133713989,
Value: 1021,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1565133713990,
Value: 1,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1022 1565133713989
http_requests_total{code="400"} 2 1575133713990
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1565133713989,
MaxTime: 1575133713991,
NumBlocks: 2,
Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1565133713989,
Value: 1022,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1575133713990,
Value: 2,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1023 1395066363000
http_requests_total{code="400"} 3 1395066363000
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1395066363000,
MaxTime: 1395066363001,
NumBlocks: 1,
Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1395066363000,
Value: 1023,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1395066363000,
Value: 3,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP something_weird Something weird
# TYPE something_weird gauge
something_weird{problem="infinite timestamp"} +Inf -3982045
# EOF
`,
IsOk: false,
},
{
// Metric has no timestamp, hence invalid
ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
# TYPE rpc_duration_seconds summary
rpc_duration_seconds{quantile="0.01"} 3102
rpc_duration_seconds{quantile="0.05"} 3272
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP no_type_metric This is a metric with no TYPE string
no_type_metric{type="bad_news_bears"} 0.0 111
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "no_type_metric"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 111,
MaxTime: 112,
NumBlocks: 1,
Symbols: []string{"no_type_metric", "type", "bad_news_bears", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 111,
Value: 0.0,
Labels: labels2.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"),
},
},
},
},
{
ToParse: `# HELP bad_ts This is a metric with an extreme timestamp
# TYPE bad_ts gauge
bad_ts{type="bad_timestamp"} 420 -1e99
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP bad_ts This is a metric with an extreme timestamp
# TYPE bad_ts gauge
bad_ts{type="bad_timestamp"} 420 1e99
# EOF
`,
IsOk: false,
},
{
ToParse: `no_help_no_type{foo="bar"} 42 6900
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "no_help_no_type"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 6900,
MaxTime: 6901,
NumBlocks: 1,
Symbols: []string{"no_help_no_type", "foo", "bar", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 6900,
Value: 42,
Labels: labels2.FromStrings("__name__", "no_help_no_type", "foo", "bar"),
},
},
},
},
{
ToParse: `bare_metric 42.24 1001
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "bare_metric"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1001,
MaxTime: 1002,
NumBlocks: 1,
Symbols: []string{"bare_metric", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1001,
Value: 42.24,
Labels: labels2.FromStrings("__name__", "bare_metric"),
},
},
},
},
{
ToParse: `# HELP bad_metric This a bad metric
# TYPE bad_metric bad_type
bad_metric{type="has no type information"} 0.0 111
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP no_nl This test has no newline so will fail
# TYPE no_nl gauge
no_nl{type="no newline"}
# EOF`,
IsOk: false,
},
}
for _, test := range tests {
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
if test.IsOk {
testutil.Ok(t, err)
if len(test.Expected.Symbols) > 0 {
db, err := tsdb.OpenDBReadOnly(tmpDbDir, nil)
testutil.Ok(t, err)
blocks, err := db.Blocks()
testutil.Ok(t, err)
testBlocks(t, blocks, test.MetricLabels, test.Expected.MinTime, test.Expected.MaxTime, test.Expected.Samples, test.Expected.Symbols, test.Expected.NumBlocks)
}
} else {
testutil.NotOk(t, err)
}
_ = os.RemoveAll(tmpDbDir)
}
}
func TestImportBadFile(t *testing.T) {
// No file found case.
err := ImportFromFile((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil)
testutil.NotOk(t, err)
}
func TestImportIntoExistingDB(t *testing.T) {
type importTest struct {
MetricName string
MetricType string
MetricLabels []string
GeneratorStep int
DBMint, DBMaxt int64
ImportShuffle bool
ImportMint, ImportMaxt int64
ExpectedMint, ExpectedMaxt int64
ExpectedSymbols []string
ExpectedNumBlocks int
}
tests := []importTest{
{
// Import overlapping data only.
MetricName: "test_metric",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 2000,
ImportShuffle: false,
ImportMint: 1000,
ImportMaxt: 1500,
ExpectedMint: 1000,
ExpectedMaxt: 2000,
ExpectedSymbols: []string{"__name__", "test_metric", "foo", "bar"},
ExpectedNumBlocks: 2,
},
{
// Test overlapping, and non-overlapping data, with a lot of samples.
MetricName: "test_metric_2",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 4000,
ImportShuffle: false,
ImportMint: 0,
ImportMaxt: 5000,
ExpectedSymbols: []string{"__name__", "test_metric_2", "foo", "bar"},
ExpectedMint: 0,
ExpectedMaxt: 5000,
ExpectedNumBlocks: 4,
},
{
// Test overlapping, and non-overlapping data, aligning the blocks.
// This test also creates a large number of samples, across a much wider time range, to test
// if the importer will divvy samples beyond DB time limits.
MetricName: "test_metric_3",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 20000,
DBMint: 1000,
DBMaxt: tsdb.DefaultBlockDuration,
ImportShuffle: false,
ImportMint: 0,
ImportMaxt: tsdb.DefaultBlockDuration * 2,
ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"},
ExpectedMint: 0,
ExpectedMaxt: (tsdb.DefaultBlockDuration * 2) - 20000 + 1,
ExpectedNumBlocks: 4,
},
{
// Import partially overlapping data only.
MetricName: "test_metric_4",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 2000,
ImportShuffle: false,
ImportMint: 500,
ImportMaxt: 1500,
ExpectedMint: 500,
ExpectedMaxt: 2000,
ExpectedSymbols: []string{"__name__", "test_metric_4", "foo", "bar"},
ExpectedNumBlocks: 3,
},
{
// Import that never overlaps, after the DB max time.
MetricName: "test_metric_5",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 0,
DBMaxt: 1000,
ImportShuffle: false,
ImportMint: tsdb.DefaultBlockDuration + 1000,
ImportMaxt: tsdb.DefaultBlockDuration + 2000,
ExpectedMint: 0,
ExpectedMaxt: tsdb.DefaultBlockDuration + 2000,
ExpectedSymbols: []string{"__name__", "test_metric_5", "foo", "bar"},
ExpectedNumBlocks: 2,
},
}
for _, test := range tests {
initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep)
initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
expectedSamples := make([]tsdb.MetricSample, 0)
for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} {
for _, sample := range exp {
lbls := labels2.FromStrings("__name__", test.MetricName)
s := tsdb.MetricSample{
Timestamp: sample.Timestamp,
Value: sample.Value,
Labels: append(lbls, sample.Labels...),
}
expectedSamples = append(expectedSamples, s)
}
}
db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{
RetentionDuration: tsdb.DefaultOptions().RetentionDuration,
AllowOverlappingBlocks: true,
})
testutil.Ok(t, err)
blocks := make([]tsdb.BlockReader, 0)
for _, b := range db.Blocks() {
blocks = append(blocks, b)
}
testBlocks(t, blocks, test.MetricLabels, test.ExpectedMint, test.ExpectedMaxt, expectedSamples, test.ExpectedSymbols, test.ExpectedNumBlocks)
// Close and remove all temp files and folders.
_ = db.Close()
_ = os.RemoveAll(tmpDbDir)
}
}
// Test to see if multiple series are imported correctly.
func TestMixedSeries(t *testing.T) {
partialOMText := func(metricName, metricType string, series []tsdb.MetricSample) string {
str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType)
for _, s := range series {
str += fmt.Sprintf("\n%s%s %f %d", metricName, s.Labels.String(), s.Value, s.Timestamp)
}
return str
}
metricA := "metricA"
metricLabels := []string{"foo", "bar"}
mint, maxt := int64(0), int64(1000)
step := 5
seriesA := genSeries(metricLabels, mint, maxt, step)
textA := partialOMText(metricA, "gauge", seriesA)
metricB := "metricB"
seriesB := genSeries(metricLabels, mint, maxt, step)
textB := partialOMText(metricB, "gauge", seriesB)
text := fmt.Sprintf("%s\n%s\n# EOF", textA, textB)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample {
augSamples := make([]tsdb.MetricSample, 0)
for _, sample := range series {
lbls := labels2.FromStrings("__name__", metricName)
s := tsdb.MetricSample{
Timestamp: sample.Timestamp,
Value: sample.Value,
Labels: append(lbls, sample.Labels...),
}
augSamples = append(augSamples, s)
}
return augSamples
}
expectedSamples := append(addMetricLabel(seriesA, metricA), addMetricLabel(seriesB, metricB)...)
expectedSymbols := append([]string{"__name__", metricA, metricB}, metricLabels...)
db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{
RetentionDuration: tsdb.DefaultOptions().RetentionDuration,
AllowOverlappingBlocks: true,
})
testutil.Ok(t, err)
blocks := make([]tsdb.BlockReader, 0)
for _, b := range db.Blocks() {
blocks = append(blocks, b)
}
testBlocks(t, blocks, metricLabels, mint, maxt-int64(step-1), expectedSamples, expectedSymbols, 1)
_ = os.RemoveAll(tmpDbDir)
}
// Test to check if we can detect an error with the text,
// without having to read till EOF.
func TestInvalidSyntaxQuickAbort(t *testing.T) {
lbls := []string{"foo", "bar"}
firstHalf := genSeries(lbls, 0, 10, 1)
invalidMetrics := []tsdb.MetricSample{
{
Timestamp: -1,
Value: 0,
Labels: labels2.FromStrings(" INVALID", "INVALID"),
},
}
secondHalf := genSeries(lbls, 10, 200, 1)
combined := append(firstHalf, append(invalidMetrics, secondHalf...)...)
importText := genOpenMetricsText("invalid_test", "gauge", combined)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
r := strings.NewReader(importText)
err = ImportFromFile(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.NotOk(t, err)
// We should abort and return the error with the text, without reading till EOF.
testutil.Assert(t, r.Len() > 0, "import text has been completely read")
}
// func benchVirtual(vThresh int, b *testing.B) {
// for n := 0; n < b.N; n++ {
// type importTest struct {
// MetricName string
// MetricType string
// MetricLabels []string
// GeneratorStep int
// DBMint, DBMaxt int64
// ImportShuffle bool
// ImportMint, ImportMaxt int64
// ExpectedMint, ExpectedMaxt int64
// ExpectedSymbols []string
// ExpectedNumBlocks int
// }
//
// tests := []importTest{
// {
// // Test overlapping, and non-overlapping data, aligning the blocks.
// // This test also creates a large number of samples, across a much wider time range, to test
// // if the importer will divvy samples beyond DB time limits into progressively smaller blocks
// // using the ranges in tsdb.DefaultOptions.BlockRanges.
// MetricName: "test_metric_3",
// MetricType: "gauge",
// MetricLabels: []string{"foo", "bar"},
// GeneratorStep: 5000,
// DBMint: 1000,
// DBMaxt: tsdb.DefaultOptions.BlockRanges[2],
// ImportShuffle: false,
// ImportMint: 0,
// ImportMaxt: tsdb.DefaultOptions.BlockRanges[2]*2 + 10000,
// ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"},
// ExpectedMint: 0,
// ExpectedMaxt: (tsdb.DefaultOptions.BlockRanges[2] * 2) + 1,
// ExpectedNumBlocks: 5,
// },
// }
//
// for _, test := range tests {
// initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep)
// initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries)
//
// tmpDbDir, err := ioutil.TempDir("", "importer")
// testutil.Ok(b, err)
// err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
// testutil.Ok(b, err)
//
// importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
// importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
//
// err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
// testutil.Ok(b, err)
// }
// }
// }
//
// func Benchmark_Vertical0(b *testing.B) {
// benchVirtual(0, b)
// }
// func Benchmark_Vertical5(b *testing.B) {
// benchVirtual(5, b)
// }
// func Benchmark_Vertical10(b *testing.B) {
// benchVirtual(10, b)
// }
// func Benchmark_Vertical20(b *testing.B) {
// benchVirtual(20, b)
// }

View file

@ -1,602 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"bufio"
"bytes"
"context"
"encoding/gob"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// Implementing the error interface to create a
// constant, which cannot be overridden.
// https://dave.cheney.net/2016/04/07/constant-errors
type Error string
func (e Error) Error() string {
return string(e)
}
const (
// This error is thrown when the current entry does not have a timestamp associated.
NoTimestampError = Error("expected timestamp with metric")
// This error is thrown when the sample being parsed currently has a corresponding block
// in the database, but has no metadata collected for it, curiously enough.
NoBlockMetaFoundError = Error("no metadata found for current samples' block")
)
type blockTimestampPair struct {
start, end int64
}
type newBlockMeta struct {
index int
count int
mint, maxt int64
isAligned bool
dir string
children []*newBlockMeta
}
// Content Type for the Open Metrics Parser.
// Needed to init the text parser.
const contentType = "application/openmetrics-text;"
// ImportFromFile imports data from a file formatted according to the Open Metrics format,
// converts it into block(s), and places the newly created block(s) in the
// TSDB DB directory, where it is treated like any other block.
func ImportFromFile(
r io.Reader,
dbPath string,
maxSamplesInMemory int,
maxBlockChildren int,
logger log.Logger,
) error {
if logger == nil {
logger = log.NewNopLogger()
}
level.Debug(logger).Log("msg", "creating temp directory")
tmpDbDir, err := ioutil.TempDir("", "importer")
if err != nil {
return err
}
defer os.RemoveAll(tmpDbDir)
level.Debug(logger).Log("msg", "reading existing block metadata to infer time ranges")
blockMetas, err := constructBlockMetadata(dbPath)
if err != nil {
return err
}
level.Info(logger).Log("msg", "importing input data and aligning with existing DB")
if err = writeSamples(r, tmpDbDir, blockMetas, maxSamplesInMemory, maxBlockChildren, logger); err != nil {
return err
}
level.Info(logger).Log("msg", "merging fully overlapping blocks")
if err = mergeBlocks(tmpDbDir, blockMetas, logger); err != nil {
return err
}
level.Debug(logger).Log("msg", "copying newly created blocks from temp location to actual DB location")
if err = fileutil.CopyDirs(tmpDbDir, dbPath); err != nil {
return err
}
return nil
}
// writeSamples parses each metric sample, and writes the samples to the correspondingly aligned block
// to the given directory.
func writeSamples(
r io.Reader,
dir string,
metas []*newBlockMeta,
maxSamplesInMemory int,
maxBlockChildren int,
logger log.Logger,
) error {
// First block represents everything before DB start.
// Last block represents everything after DB end.
dbMint, dbMaxt := metas[0].maxt, metas[len(metas)-1].mint
blocks := make([][]*tsdb.MetricSample, len(metas))
currentPassCount := 0
// TODO: Maybe use bufio.Reader.Readline instead?
// https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca
// Use a streaming approach to avoid loading too much data at once.
scanner := bufio.NewScanner(r)
buf := new(bytes.Buffer)
scanner.Split(sampleStreamer(buf))
for scanner.Scan() {
if currentPassCount == 0 {
blocks = make([][]*tsdb.MetricSample, len(metas))
}
encSample := scanner.Bytes()
decBuf := bytes.NewBuffer(encSample)
sample := tsdb.MetricSample{}
if err := gob.NewDecoder(decBuf).Decode(&sample); err != nil {
level.Error(logger).Log("msg", "failed to decode current entry returned by file scanner", "err", err)
return err
}
// Find what block this sample belongs to.
var blockIndex int
if len(metas) == 1 || sample.Timestamp < dbMint {
blockIndex = 0
} else if sample.Timestamp >= dbMaxt {
blockIndex = len(metas) - 1
} else {
blockIndex = getBlockIndex(sample.Timestamp, metas)
if blockIndex == -1 {
return NoBlockMetaFoundError
}
}
meta := metas[blockIndex]
meta.mint = value.MinInt64(meta.mint, sample.Timestamp)
meta.maxt = value.MaxInt64(meta.maxt, sample.Timestamp)
meta.count++
blocks[blockIndex] = append(blocks[blockIndex], &sample)
currentPassCount++
// Have enough samples to write to disk.
if currentPassCount == maxSamplesInMemory {
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
return err
}
// Reset current pass count.
currentPassCount = 0
}
}
if err := scanner.Err(); err != nil {
return err
}
// Flush any remaining samples.
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
return err
}
return nil
}
// flushBlocks writes the given blocks of samples to disk, and updates block metadata information.
func flushBlocks(
dir string,
toFlush [][]*tsdb.MetricSample,
metas []*newBlockMeta,
maxBlockChildren int,
logger log.Logger,
) error {
for blockIdx, block := range toFlush {
// If current block is empty, nothing to write.
if len(block) == 0 {
continue
}
meta := metas[blockIdx]
// Put each sample into the appropriate block.
bins, binTimes := binSamples(block, meta, tsdb.DefaultBlockDuration)
for binIdx, bin := range bins {
if len(bin) == 0 {
continue
}
start, end := binTimes[binIdx].start, binTimes[binIdx].end
path, err := tsdb.CreateBlock(bin, dir, start, end, logger)
if err != nil {
return err
}
child := &newBlockMeta{
index: meta.index,
count: 0,
mint: start,
maxt: end,
isAligned: meta.isAligned,
dir: path,
}
meta.children = append(meta.children, child)
}
if maxBlockChildren > 0 && len(meta.children) >= maxBlockChildren {
if err := compactMeta(dir, meta, logger); err != nil {
return err
}
}
}
return nil
}
// sampleStreamer returns a function that can be used to parse an OpenMetrics compliant
// byte array, and return each token in the user-provided byte buffer.
func sampleStreamer(buf *bytes.Buffer) func([]byte, bool) (int, []byte, error) {
currentEntryInvalid := false
return func(data []byte, atEOF bool) (int, []byte, error) {
var err error
advance := 0
lineIndex := 0
lines := strings.Split(string(data), "\n")
parser := textparse.New(data, contentType)
for {
var et textparse.Entry
if et, err = parser.Next(); err != nil {
if !atEOF && et == textparse.EntryInvalid && !currentEntryInvalid {
currentEntryInvalid = true
// Fetch more data for the scanner to see if the entry is really invalid.
return 0, nil, nil
}
if err == io.EOF {
err = nil
}
return 0, nil, err
}
// Reset invalid entry flag as it meant that we just had to get more data
// into the scanner.
if currentEntryInvalid {
currentEntryInvalid = false
}
// Add 1 to account for newline character.
lineLength := len(lines[lineIndex]) + 1
lineIndex++
if et != textparse.EntrySeries {
advance = advance + lineLength
continue
}
_, ctime, cvalue := parser.Series()
if ctime == nil {
return 0, nil, NoTimestampError
}
// OpenMetrics parser multiples times by 1000 - undoing that.
ctimeCorrected := *ctime / 1000
var clabels labels.Labels
_ = parser.Metric(&clabels)
sample := tsdb.MetricSample{
Timestamp: ctimeCorrected,
Value: cvalue,
Labels: labels.FromMap(clabels.Map()),
}
buf.Reset()
enc := gob.NewEncoder(buf)
if err = enc.Encode(sample); err != nil {
return 0, nil, err
}
advance += lineLength
return advance, buf.Bytes(), nil
}
}
}
// makeRange returns a series of block times between start and stop,
// with step divisions.
func makeRange(start, stop, step int64) []blockTimestampPair {
if step <= 0 || stop < start {
return []blockTimestampPair{}
}
r := make([]blockTimestampPair, 0)
// In case we only have samples with the same timestamp.
if start == stop {
r = append(r, blockTimestampPair{
start: start,
end: stop + 1,
})
return r
}
for s := start; s < stop; s += step {
pair := blockTimestampPair{
start: s,
}
if (s + step) >= stop {
pair.end = stop + 1
} else {
pair.end = s + step
}
r = append(r, pair)
}
return r
}
// mergeBlocks looks for blocks that have overlapping time intervals, and compacts them.
func mergeBlocks(dest string, metas []*newBlockMeta, logger log.Logger) error {
for _, m := range metas {
// If no children, there's nothing to merge.
if len(m.children) == 0 {
continue
}
if err := compactMeta(dest, m, logger); err != nil {
return err
}
}
return nil
}
// binBlocks groups blocks that have fully intersecting intervals of the given durations.
// It iterates through the given durations, creating progressive blocks, and once it reaches
// the last given duration, it divvies all the remaining samples into blocks of that duration.
// Input blocks are assumed sorted by time.
func binBlocks(blocks []*newBlockMeta, durations []int64) [][]*newBlockMeta {
bins := make([][]*newBlockMeta, 0)
if len(blocks) == 0 || len(durations) == 0 {
return bins
}
bin := make([]*newBlockMeta, 0)
start := blocks[0].mint
blockIdx := 0
Outer:
for dIdx, duration := range durations {
for bIdx, block := range blocks[blockIdx:] {
blockIdx = bIdx
if block.maxt-block.mint >= duration {
if block.mint <= start {
bin = append(bin, block)
bins = append(bins, bin)
} else {
bins = append(bins, bin)
bins = append(bins, []*newBlockMeta{block})
}
bin = []*newBlockMeta{}
start = block.maxt
if dIdx == len(durations)-1 {
continue
} else {
blockIdx++
continue Outer
}
}
if block.mint-start < duration && block.maxt-start < duration {
// If current block is within the duration window, place it in the current bin.
bin = append(bin, block)
blockIdx++
} else {
// Else create a new block, starting with this block.
bins = append(bins, bin)
bin = []*newBlockMeta{block}
start = block.mint
if dIdx < len(durations)-1 {
blockIdx++
continue Outer
}
}
}
}
bins = append(bins, bin)
return bins
}
// binSamples divvies the samples into bins corresponding to the given block.
// If an aligned block is given to it, then a single bin is created,
// else, it divides the samples into blocks of duration.
// It returns the binned samples, and the time limits for each bin.
func binSamples(samples []*tsdb.MetricSample, meta *newBlockMeta, duration int64) ([][]*tsdb.MetricSample, []blockTimestampPair) {
findBlock := func(ts int64, blocks []blockTimestampPair) int {
for idx, block := range blocks {
if block.start <= ts && ts < block.end {
return idx
}
}
return -1
}
bins := make([][]*tsdb.MetricSample, 0)
times := make([]blockTimestampPair, 0)
if len(samples) == 0 {
return bins, times
}
if meta.isAligned {
bins = append(bins, samples)
times = []blockTimestampPair{{start: meta.mint, end: meta.maxt}}
} else {
timeTranches := makeRange(meta.mint, meta.maxt, duration)
bins = make([][]*tsdb.MetricSample, len(timeTranches))
binIdx := -1
for _, sample := range samples {
ts := sample.Timestamp
if binIdx == -1 {
binIdx = findBlock(ts, timeTranches)
}
block := timeTranches[binIdx]
if block.start <= ts && ts < block.end {
bins[binIdx] = append(bins[binIdx], sample)
} else {
binIdx = findBlock(ts, timeTranches)
bins[binIdx] = append(bins[binIdx], sample)
}
}
times = timeTranches
}
return bins, times
}
// constructBlockMetadata gives us the new blocks' metadatas.
func constructBlockMetadata(path string) ([]*newBlockMeta, error) {
dbMint, dbMaxt := int64(math.MinInt64), int64(math.MaxInt64)
// If we try to open a regular RW handle on an active TSDB instance,
// it will fail. Hence, we open a RO handle.
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return nil, err
}
defer db.Close()
blocks, err := db.Blocks()
if err != nil {
return nil, err
}
metas := []*newBlockMeta{
&newBlockMeta{
index: 0,
count: 0,
mint: math.MaxInt64,
maxt: dbMint,
isAligned: false,
},
}
for idx, block := range blocks {
start, end := block.Meta().MinTime, block.Meta().MaxTime
metas = append(metas, &newBlockMeta{
index: idx + 1,
count: 0,
mint: start,
maxt: end,
isAligned: true,
})
if idx == 0 {
dbMint, dbMaxt = start, end
} else {
dbMint = value.MinInt64(dbMint, start)
dbMaxt = value.MaxInt64(dbMaxt, end)
}
}
// If there are existing blocks in the import location,
// we need to add a block for samples that fall over DB maxt.
if len(metas) > 1 {
// Update first block with the right dbMint as it's maxt.
metas[0].maxt = dbMint
metas = append(metas, &newBlockMeta{
index: len(metas),
count: 0,
mint: dbMaxt,
maxt: math.MinInt64,
isAligned: false,
})
}
return metas, nil
}
// getBlockIndex returns the index of the block the current timestamp corresponds to.
// User-provided times are assumed sorted in ascending order.
func getBlockIndex(current int64, metas []*newBlockMeta) int {
for idx, m := range metas {
if current < m.maxt {
return idx
}
}
return -1
}
// compactBlocks compacts all the given blocks and places them in the destination directory.
func compactBlocks(dest string, blocks []*newBlockMeta, logger log.Logger) (string, error) {
dirs := make([]string, 0)
for _, meta := range blocks {
dirs = append(dirs, meta.dir)
}
path, err := compactDirs(dest, dirs, logger)
if err != nil {
return "", err
}
for _, meta := range blocks {
// Remove existing blocks directory as it is no longer needed.
if err = os.RemoveAll(meta.dir); err != nil {
return "", err
}
// Update child with new blocks path.
meta.dir = path
}
return path, err
}
// compactMeta compacts the children of a given block, dividing them into the Prometheus recommended
// block sizes, if it has to.
func compactMeta(dest string, m *newBlockMeta, logger log.Logger) error {
children := make([]*newBlockMeta, 0)
if m.isAligned {
path, err := compactBlocks(dest, m.children, logger)
if err != nil {
return err
}
m.dir = path
} else {
// Sort blocks to ensure we bin properly.
sortBlocks(m.children)
binned := binBlocks(m.children, []int64{tsdb.DefaultBlockDuration})
for _, bin := range binned {
if len(bin) <= 1 {
children = append(children, bin...)
continue
}
path, err := compactBlocks(dest, bin, logger)
if err != nil {
return err
}
children = append(children, &newBlockMeta{
index: m.index,
count: 0,
mint: bin[0].mint,
maxt: bin[len(bin)-1].maxt,
isAligned: m.isAligned,
dir: path,
})
}
}
m.children = children
return nil
}
// compactDirs compacts the block dirs and places them in destination directory.
func compactDirs(dest string, dirs []string, logger log.Logger) (string, error) {
path := ""
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, []int64{tsdb.DefaultBlockDuration}, nil)
if err != nil {
return path, err
}
ulid, err := compactor.Compact(dest, dirs, nil)
if err != nil {
return path, err
}
path = filepath.Join(dest, ulid.String())
return path, nil
}
// sortBlocks sorts block metadata according to the time limits.
func sortBlocks(blocks []*newBlockMeta) {
sort.Slice(blocks, func(x, y int) bool {
bx, by := blocks[x], blocks[y]
if bx.mint != by.mint {
return bx.mint < by.mint
}
return bx.maxt < by.maxt
})
}

View file

@ -0,0 +1,70 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package openmetrics
import (
"bufio"
"io"
"github.com/prometheus/prometheus/pkg/textparse"
)
// Content Type for the Open Metrics Parser.
// Needed to init the text parser.
const contentType = "application/openmetrics-text;"
type Parser struct {
textparse.Parser
s *bufio.Scanner
}
// NewParser is a tiny layer between textparse.Parser and importer.Parser which works on io.Reader.
// TODO(bwplotka): This is hacky, Parser should allow passing reader from scratch.
func NewParser(r io.Reader) textparse.Parser {
// TODO(dipack95): Maybe use bufio.Reader.Readline instead?
// https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca
return &Parser{s: bufio.NewScanner(r)}
}
// Next advances the parser to the next sample. It returns io.EOF if no
// more samples were read.
// TODO(bwplotka): Rought implementation, not tested, please help dipack95! (:
func (p *Parser) Next() (textparse.Entry, error) {
for p.s.Scan() {
// TODO(bwplotka): Assuming all line by line. If not do refetch like in previous version with more lines.
line := p.s.Bytes()
p.Parser = textparse.New(line, contentType)
if et, err := p.Parser.Next(); err != io.EOF {
return et, err
}
// EOF from parser, continue scanning.
}
if err := p.s.Err(); err != nil {
return 0, err
}
return 0, io.EOF
}
// Series returns the bytes of the series, the timestamp if set, and the value
// of the current sample. Timestamps is milliseconds.
func (p *Parser) Series() ([]byte, *int64, float64) {
b, ts, v := p.Parser.Series()
if ts != nil {
// OpenMetrics parser has time in nanoseconds. Convert to milliseconds.
*ts = *ts / 1000
}
return b, ts, v
}