mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Refactored TSDB import and added CSV file support.
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
5159da31c3
commit
2911e78fce
|
@ -16,6 +16,7 @@ package textparse
|
||||||
import (
|
import (
|
||||||
"mime"
|
"mime"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/prometheus/pkg/exemplar"
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
)
|
)
|
||||||
|
@ -55,8 +56,8 @@ type Parser interface {
|
||||||
// exemplar. It returns if an exemplar exists or not.
|
// exemplar. It returns if an exemplar exists or not.
|
||||||
Exemplar(l *exemplar.Exemplar) bool
|
Exemplar(l *exemplar.Exemplar) bool
|
||||||
|
|
||||||
// Next advances the parser to the next sample. It returns false if no
|
// Next advances the parser to the next sample. It returns io.EOF if no
|
||||||
// more samples were read or an error occurred.
|
// more samples were read.
|
||||||
Next() (Entry, error)
|
Next() (Entry, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,3 +95,45 @@ const (
|
||||||
MetricTypeStateset = "stateset"
|
MetricTypeStateset = "stateset"
|
||||||
MetricTypeUnknown = "unknown"
|
MetricTypeUnknown = "unknown"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (m *MetricType) ParseForOpenMetrics(mtyp string) error {
|
||||||
|
switch mtyp {
|
||||||
|
case "counter":
|
||||||
|
*m = MetricTypeCounter
|
||||||
|
case "gauge":
|
||||||
|
*m = MetricTypeGauge
|
||||||
|
case "histogram":
|
||||||
|
*m = MetricTypeHistogram
|
||||||
|
case "gaugehistogram":
|
||||||
|
*m = MetricTypeGaugeHistogram
|
||||||
|
case "summary":
|
||||||
|
*m = MetricTypeSummary
|
||||||
|
case "info":
|
||||||
|
*m = MetricTypeInfo
|
||||||
|
case "stateset":
|
||||||
|
*m = MetricTypeStateset
|
||||||
|
case "unknown":
|
||||||
|
*m = MetricTypeUnknown
|
||||||
|
default:
|
||||||
|
return errors.Errorf("invalid metric type %q", mtyp)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MetricType) ParseForProm(mtyp string) error {
|
||||||
|
switch mtyp {
|
||||||
|
case "counter":
|
||||||
|
*m = MetricTypeCounter
|
||||||
|
case "gauge":
|
||||||
|
*m = MetricTypeGauge
|
||||||
|
case "histogram":
|
||||||
|
*m = MetricTypeHistogram
|
||||||
|
case "summary":
|
||||||
|
*m = MetricTypeSummary
|
||||||
|
case "unknown":
|
||||||
|
*m = MetricTypeUnknown
|
||||||
|
default:
|
||||||
|
return errors.Errorf("invalid metric type %q", mtyp)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) {
|
||||||
}
|
}
|
||||||
switch t {
|
switch t {
|
||||||
case tType:
|
case tType:
|
||||||
switch s := yoloString(p.text); s {
|
if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil {
|
||||||
case "counter":
|
return EntryInvalid, err
|
||||||
p.mtype = MetricTypeCounter
|
|
||||||
case "gauge":
|
|
||||||
p.mtype = MetricTypeGauge
|
|
||||||
case "histogram":
|
|
||||||
p.mtype = MetricTypeHistogram
|
|
||||||
case "gaugehistogram":
|
|
||||||
p.mtype = MetricTypeGaugeHistogram
|
|
||||||
case "summary":
|
|
||||||
p.mtype = MetricTypeSummary
|
|
||||||
case "info":
|
|
||||||
p.mtype = MetricTypeInfo
|
|
||||||
case "stateset":
|
|
||||||
p.mtype = MetricTypeStateset
|
|
||||||
case "unknown":
|
|
||||||
p.mtype = MetricTypeUnknown
|
|
||||||
default:
|
|
||||||
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
|
|
||||||
}
|
}
|
||||||
case tHelp:
|
case tHelp:
|
||||||
if !utf8.Valid(p.text) {
|
if !utf8.Valid(p.text) {
|
||||||
|
|
|
@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) {
|
||||||
}
|
}
|
||||||
switch t {
|
switch t {
|
||||||
case tType:
|
case tType:
|
||||||
switch s := yoloString(p.text); s {
|
if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil {
|
||||||
case "counter":
|
return EntryInvalid, err
|
||||||
p.mtype = MetricTypeCounter
|
|
||||||
case "gauge":
|
|
||||||
p.mtype = MetricTypeGauge
|
|
||||||
case "histogram":
|
|
||||||
p.mtype = MetricTypeHistogram
|
|
||||||
case "summary":
|
|
||||||
p.mtype = MetricTypeSummary
|
|
||||||
case "untyped":
|
|
||||||
p.mtype = MetricTypeUnknown
|
|
||||||
default:
|
|
||||||
return EntryInvalid, errors.Errorf("invalid metric type %q", s)
|
|
||||||
}
|
}
|
||||||
case tHelp:
|
case tHelp:
|
||||||
if !utf8.Valid(p.text) {
|
if !utf8.Valid(p.text) {
|
||||||
|
|
|
@ -34,11 +34,15 @@ import (
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
"github.com/prometheus/prometheus/storage"
|
"github.com/prometheus/prometheus/storage"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunks"
|
"github.com/prometheus/prometheus/tsdb/chunks"
|
||||||
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
"github.com/prometheus/prometheus/tsdb/importer"
|
"github.com/prometheus/prometheus/tsdb/importer"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/importer/blocks"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/importer/csv"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/importer/openmetrics"
|
||||||
"gopkg.in/alecthomas/kingpin.v2"
|
"gopkg.in/alecthomas/kingpin.v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -53,40 +57,43 @@ func execute() (err error) {
|
||||||
var (
|
var (
|
||||||
defaultDBPath = filepath.Join("benchout", "storage")
|
defaultDBPath = filepath.Join("benchout", "storage")
|
||||||
|
|
||||||
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
|
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
|
||||||
|
|
||||||
benchCmd = cli.Command("bench", "run benchmarks")
|
benchCmd = cli.Command("bench", "run benchmarks")
|
||||||
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
|
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
|
||||||
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
|
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
|
||||||
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
|
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
|
||||||
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
|
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
|
||||||
|
|
||||||
listCmd = cli.Command("ls", "list db blocks")
|
listCmd = cli.Command("ls", "list db blocks")
|
||||||
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
|
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
|
||||||
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
||||||
|
|
||||||
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
|
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
|
||||||
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
||||||
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
|
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
|
||||||
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
|
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
|
||||||
|
|
||||||
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
|
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
|
||||||
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
|
||||||
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
||||||
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||||
|
|
||||||
importCmd = cli.Command("import", fmt.Sprintf("import samples from input and produce TSDB block. Currently supported input formats: %v. Please refer to the storage docs for more details.", importer.)
|
importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.")
|
||||||
importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String()
|
importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String()
|
||||||
importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String()
|
importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String()
|
||||||
importCSVDelimit = importCmd.Flag("csv-delimiter", "force input parsing to CSV format with given delimiter. Default is coma delimiter.").String()
|
importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration()
|
||||||
importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process at once.").Default("10000").Int()
|
|
||||||
importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time.").Default("20").Int()
|
omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.")
|
||||||
|
|
||||||
|
csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.")
|
||||||
|
csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String()
|
||||||
)
|
)
|
||||||
|
|
||||||
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
||||||
var merr tsdb_errors.MultiError
|
var merr tsdb_errors.MultiError
|
||||||
|
|
||||||
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
|
switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd {
|
||||||
case benchWriteCmd.FullCommand():
|
case benchWriteCmd.FullCommand():
|
||||||
wb := &writeBenchmark{
|
wb := &writeBenchmark{
|
||||||
outPath: *benchWriteOutPath,
|
outPath: *benchWriteOutPath,
|
||||||
|
@ -150,7 +157,7 @@ func execute() (err error) {
|
||||||
err = merr.Err()
|
err = merr.Err()
|
||||||
}()
|
}()
|
||||||
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
|
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
|
||||||
case importCmd.FullCommand():
|
case omImportCmd.FullCommand(), csvImportCmd.FullCommand():
|
||||||
input := os.Stdin
|
input := os.Stdin
|
||||||
if importFilePath != nil {
|
if importFilePath != nil {
|
||||||
input, err = os.Open(*importFilePath)
|
input, err = os.Open(*importFilePath)
|
||||||
|
@ -163,11 +170,26 @@ func execute() (err error) {
|
||||||
err = merr.Err()
|
err = merr.Err()
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
return importer.ImportFromFile(logger, input, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren)
|
|
||||||
|
var p textparse.Parser
|
||||||
|
if cmd == omImportCmd.FullCommand() {
|
||||||
|
p = openmetrics.NewParser(input)
|
||||||
|
} else {
|
||||||
|
if len(*csvImportDelimiter) != 1 {
|
||||||
|
return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter)
|
||||||
|
}
|
||||||
|
|
||||||
|
p = csv.NewParser(input, []rune(*csvImportDelimiter)[0])
|
||||||
|
}
|
||||||
|
return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize)))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func durToMillis(t time.Duration) int64 {
|
||||||
|
return int64(t.Seconds() * 1000)
|
||||||
|
}
|
||||||
|
|
||||||
type writeBenchmark struct {
|
type writeBenchmark struct {
|
||||||
outPath string
|
outPath string
|
||||||
samplesFile string
|
samplesFile string
|
||||||
|
|
26
tsdb/head.go
26
tsdb/head.go
|
@ -1644,6 +1644,19 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries {
|
||||||
return series
|
return series
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sample struct {
|
||||||
|
t int64
|
||||||
|
v float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s sample) T() int64 {
|
||||||
|
return s.t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s sample) V() float64 {
|
||||||
|
return s.v
|
||||||
|
}
|
||||||
|
|
||||||
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
|
func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) {
|
||||||
i := hash & uint64(s.size-1)
|
i := hash & uint64(s.size-1)
|
||||||
|
|
||||||
|
@ -1665,19 +1678,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo
|
||||||
return series, true
|
return series, true
|
||||||
}
|
}
|
||||||
|
|
||||||
type sample struct {
|
|
||||||
t int64
|
|
||||||
v float64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s sample) T() int64 {
|
|
||||||
return s.t
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s sample) V() float64 {
|
|
||||||
return s.v
|
|
||||||
}
|
|
||||||
|
|
||||||
// memSeries is the in-memory representation of a series. None of its methods
|
// memSeries is the in-memory representation of a series. None of its methods
|
||||||
// are goroutine safe and it is the caller's responsibility to lock it.
|
// are goroutine safe and it is the caller's responsibility to lock it.
|
||||||
type memSeries struct {
|
type memSeries struct {
|
||||||
|
|
107
tsdb/importer/blocks/multi.go
Normal file
107
tsdb/importer/blocks/multi.go
Normal file
|
@ -0,0 +1,107 @@
|
||||||
|
package blocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/oklog/ulid"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/index"
|
||||||
|
)
|
||||||
|
|
||||||
|
type errAppender struct{ err error }
|
||||||
|
|
||||||
|
func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err }
|
||||||
|
func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err }
|
||||||
|
func (a errAppender) Commit() error { return a.err }
|
||||||
|
func (a errAppender) Rollback() error { return a.err }
|
||||||
|
|
||||||
|
func rangeForTimestamp(t int64, width int64) (maxt int64) {
|
||||||
|
return (t/width)*width + width
|
||||||
|
}
|
||||||
|
|
||||||
|
type MultiWriter struct {
|
||||||
|
blocks map[index.Range]Writer
|
||||||
|
activeAppenders map[index.Range]storage.Appender
|
||||||
|
|
||||||
|
logger log.Logger
|
||||||
|
dir string
|
||||||
|
// TODO(bwplotka): Allow more complex compaction levels.
|
||||||
|
sizeMillis int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter {
|
||||||
|
return &MultiWriter{
|
||||||
|
logger: logger,
|
||||||
|
dir: dir,
|
||||||
|
sizeMillis: sizeMillis,
|
||||||
|
blocks: map[index.Range]Writer{},
|
||||||
|
activeAppenders: map[index.Range]storage.Appender{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appender is not thread-safe. Returned Appender is not thread-save as well.
|
||||||
|
// TODO(bwplotka): Consider making it thread safe.
|
||||||
|
func (w *MultiWriter) Appender() storage.Appender { return w }
|
||||||
|
|
||||||
|
func (w *MultiWriter) getOrCreate(t int64) storage.Appender {
|
||||||
|
maxt := rangeForTimestamp(t, w.sizeMillis)
|
||||||
|
hash := index.Range{Start: maxt - w.sizeMillis, End: maxt}
|
||||||
|
if a, ok := w.activeAppenders[hash]; ok {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
nw, err := NewTSDBWriter(w.logger, w.dir)
|
||||||
|
if err != nil {
|
||||||
|
return errAppender{err: errors.Wrap(err, "new tsdb writer")}
|
||||||
|
}
|
||||||
|
|
||||||
|
w.blocks[hash] = nw
|
||||||
|
w.activeAppenders[hash] = nw.Appender()
|
||||||
|
return w.activeAppenders[hash]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) {
|
||||||
|
return w.getOrCreate(t).Add(l, t, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error {
|
||||||
|
return w.getOrCreate(t).AddFast(ref, t, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) Commit() error {
|
||||||
|
var merr tsdb_errors.MultiError
|
||||||
|
for _, a := range w.activeAppenders {
|
||||||
|
merr.Add(a.Commit())
|
||||||
|
}
|
||||||
|
return merr.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) Rollback() error {
|
||||||
|
var merr tsdb_errors.MultiError
|
||||||
|
for _, a := range w.activeAppenders {
|
||||||
|
merr.Add(a.Rollback())
|
||||||
|
}
|
||||||
|
return merr.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) Flush() ([]ulid.ULID, error) {
|
||||||
|
ids := make([]ulid.ULID, 0, len(w.blocks))
|
||||||
|
for _, b := range w.blocks {
|
||||||
|
id, err := b.Flush()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ids = append(ids, id...)
|
||||||
|
}
|
||||||
|
return ids, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *MultiWriter) Close() error {
|
||||||
|
var merr tsdb_errors.MultiError
|
||||||
|
for _, b := range w.blocks {
|
||||||
|
merr.Add(b.Close())
|
||||||
|
}
|
||||||
|
return merr.Err()
|
||||||
|
}
|
116
tsdb/importer/blocks/writer.go
Normal file
116
tsdb/importer/blocks/writer.go
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
package blocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/oklog/ulid"
|
||||||
|
"github.com/prometheus/prometheus/storage"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Writer is interface to write time series into Prometheus blocks.
|
||||||
|
type Writer interface {
|
||||||
|
storage.Appendable
|
||||||
|
|
||||||
|
// Flush writes current data to disk.
|
||||||
|
// The block or blocks will contain values accumulated by `Write`.
|
||||||
|
Flush() ([]ulid.ULID, error)
|
||||||
|
|
||||||
|
// Close releases all resources. No append is allowed anymore to such writer.
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Writer = &TSDBWriter{}
|
||||||
|
|
||||||
|
// Writer is a block writer that allows appending and flushing to disk.
|
||||||
|
type TSDBWriter struct {
|
||||||
|
logger log.Logger
|
||||||
|
dir string
|
||||||
|
|
||||||
|
head *tsdb.Head
|
||||||
|
}
|
||||||
|
|
||||||
|
func durToMillis(t time.Duration) int64 {
|
||||||
|
return int64(t.Seconds() * 1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewTSDBWriter create new block writer.
|
||||||
|
//
|
||||||
|
// The returned writer accumulates all series in memory until `Flush` is called.
|
||||||
|
//
|
||||||
|
// Note that the writer will not check if the target directory exists or
|
||||||
|
// contains anything at all. It is the caller's responsibility to
|
||||||
|
// ensure that the resulting blocks do not overlap etc.
|
||||||
|
// Writer ensures the block flush is atomic (via rename).
|
||||||
|
func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) {
|
||||||
|
res := &TSDBWriter{
|
||||||
|
logger: logger,
|
||||||
|
dir: dir,
|
||||||
|
}
|
||||||
|
return res, res.initHead()
|
||||||
|
}
|
||||||
|
|
||||||
|
// initHead creates and initialises new head.
|
||||||
|
func (w *TSDBWriter) initHead() error {
|
||||||
|
logger := w.logger
|
||||||
|
|
||||||
|
// Keep Registerer and WAL nil as we don't use them.
|
||||||
|
// Put huge chunkRange; It has to be equal then expected block size.
|
||||||
|
// Since we don't have info about block size here, set it to large number.
|
||||||
|
h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), tsdb.DefaultStripeSize)
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "tsdb.NewHead")
|
||||||
|
}
|
||||||
|
|
||||||
|
w.head = h
|
||||||
|
return w.head.Init(math.MinInt64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Appender is not thread-safe. Returned Appender is thread-save however.
|
||||||
|
func (w *TSDBWriter) Appender() storage.Appender {
|
||||||
|
return w.head.Appender()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush implements Writer interface. This is where actual block writing
|
||||||
|
// happens. After flush completes, no write can be done.
|
||||||
|
func (w *TSDBWriter) Flush() ([]ulid.ULID, error) {
|
||||||
|
seriesCount := w.head.NumSeries()
|
||||||
|
if w.head.NumSeries() == 0 {
|
||||||
|
return nil, errors.New("no series appended; aborting.")
|
||||||
|
}
|
||||||
|
|
||||||
|
mint := w.head.MinTime()
|
||||||
|
maxt := w.head.MaxTime()
|
||||||
|
level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt))
|
||||||
|
|
||||||
|
// Flush head to disk as a block.
|
||||||
|
compactor, err := tsdb.NewLeveledCompactor(
|
||||||
|
context.Background(),
|
||||||
|
nil,
|
||||||
|
w.logger,
|
||||||
|
[]int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning.
|
||||||
|
chunkenc.NewPool())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "create leveled compactor")
|
||||||
|
}
|
||||||
|
id, err := compactor.Write(w.dir, w.head, mint, maxt+1, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "compactor write")
|
||||||
|
}
|
||||||
|
if err := w.head.Truncate(maxt + 1); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "truncate head")
|
||||||
|
}
|
||||||
|
return []ulid.ULID{id}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *TSDBWriter) Close() error {
|
||||||
|
return w.head.Close()
|
||||||
|
}
|
|
@ -1 +0,0 @@
|
||||||
package importer
|
|
258
tsdb/importer/csv/csv.go
Normal file
258
tsdb/importer/csv/csv.go
Normal file
|
@ -0,0 +1,258 @@
|
||||||
|
package csv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/csv"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/prometheus/pkg/exemplar"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
|
)
|
||||||
|
|
||||||
|
type HeaderType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
// TODO(bwplotka): Consider accepting different ones like `values` or `timestamps` and pairs of value - timestamp, similar to label name - value.
|
||||||
|
Value HeaderType = "value"
|
||||||
|
TimestampMs HeaderType = "timestamp_ms" // Milliseconds from seconds since 1970-01-01 00:00:00 UTC
|
||||||
|
MetricName HeaderType = "metric_name" // Value of "__name__" label.
|
||||||
|
LabelName HeaderType = "label_name" // If both labels and metric_name are specified labels are merged. This header can repeat, but label_value is expected as next column.
|
||||||
|
LabelValue HeaderType = "label_value" // If both labels and metric_name are specified labels are merged. This header can repeat but label_name is expected to be in previous column.
|
||||||
|
Help HeaderType = "help"
|
||||||
|
Type HeaderType = "type"
|
||||||
|
Unit HeaderType = "unit"
|
||||||
|
ExemplarValue HeaderType = "exemplar_value"
|
||||||
|
ExemplarTimestamp HeaderType = "exemplar_timestamp"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recordEntry struct {
|
||||||
|
labels labels.Labels
|
||||||
|
help, unit []byte
|
||||||
|
typ textparse.MetricType
|
||||||
|
value float64
|
||||||
|
timestamp int64
|
||||||
|
exemplar *exemplar.Exemplar
|
||||||
|
}
|
||||||
|
|
||||||
|
type Parser struct {
|
||||||
|
cr *csv.Reader
|
||||||
|
|
||||||
|
mapping []HeaderType
|
||||||
|
record recordEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewParser returns new CSV parsers that is able to parse input in the format described in RFC 4180 and expose
|
||||||
|
// as textparse.Parser. Read https://golang.org/pkg/encoding/csv/ for more details about parsing CSV.
|
||||||
|
func NewParser(r io.Reader, delimiter rune) textparse.Parser {
|
||||||
|
cr := csv.NewReader(r)
|
||||||
|
cr.ReuseRecord = true
|
||||||
|
cr.Comma = delimiter
|
||||||
|
return &Parser{cr: cr}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances the parser to the next sample. It returns io.EOF if no
|
||||||
|
// more samples were read.
|
||||||
|
func (p *Parser) Next() (textparse.Entry, error) {
|
||||||
|
for {
|
||||||
|
record, err := p.cr.Read()
|
||||||
|
if err != nil {
|
||||||
|
// io.EOF is returned on the end, and io.EOF is also controlling textparse.Parser.Next
|
||||||
|
return textparse.EntryInvalid, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.mapping == nil {
|
||||||
|
p.mapping, err = parseHeaders(record)
|
||||||
|
if err != nil {
|
||||||
|
return textparse.EntryInvalid, err
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(record) != len(p.mapping) {
|
||||||
|
return textparse.EntryInvalid, errors.Errorf("got different number of fields than defined by headers. Got %v, expected %v", len(record), len(p.mapping))
|
||||||
|
}
|
||||||
|
|
||||||
|
r := recordEntry{}
|
||||||
|
var l labels.Label
|
||||||
|
for i, field := range record {
|
||||||
|
switch p.mapping[i] {
|
||||||
|
case Value:
|
||||||
|
r.value, err = parseFloat(field)
|
||||||
|
if err != nil {
|
||||||
|
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", Value)
|
||||||
|
}
|
||||||
|
case TimestampMs:
|
||||||
|
r.timestamp, err = strconv.ParseInt(field, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", TimestampMs)
|
||||||
|
}
|
||||||
|
case MetricName:
|
||||||
|
if r.labels.Has(labels.MetricName) {
|
||||||
|
return textparse.EntryInvalid, errors.Errorf("%q (or %q label name) already was specified", MetricName, labels.MetricName)
|
||||||
|
}
|
||||||
|
r.labels = append(r.labels, labels.Label{Name: labels.MetricName, Value: field})
|
||||||
|
case Help:
|
||||||
|
r.help = yoloBytes(field)
|
||||||
|
case Type:
|
||||||
|
if err := r.typ.ParseForOpenMetrics(field); err != nil {
|
||||||
|
return textparse.EntryInvalid, err
|
||||||
|
}
|
||||||
|
case Unit:
|
||||||
|
r.unit = yoloBytes(field)
|
||||||
|
case ExemplarValue, ExemplarTimestamp:
|
||||||
|
if field == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if r.exemplar == nil {
|
||||||
|
r.exemplar = &exemplar.Exemplar{}
|
||||||
|
}
|
||||||
|
if p.mapping[i] == ExemplarValue {
|
||||||
|
r.exemplar.Value, err = parseFloat(field)
|
||||||
|
if err != nil {
|
||||||
|
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", ExemplarValue)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
r.exemplar.Ts, err = strconv.ParseInt(field, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", ExemplarTimestamp)
|
||||||
|
}
|
||||||
|
r.exemplar.HasTs = true
|
||||||
|
}
|
||||||
|
// Name is required to be before value. On top of that such pair can repeat.
|
||||||
|
case LabelName:
|
||||||
|
if field != "" && r.labels.Has(field) {
|
||||||
|
return textparse.EntryInvalid, errors.Errorf("label name %q already was specified", field)
|
||||||
|
}
|
||||||
|
l.Name = field
|
||||||
|
case LabelValue:
|
||||||
|
if l.Name == "" {
|
||||||
|
if field == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return textparse.EntryInvalid, errors.Errorf("label value is specified (%q) when previous name was empty", field)
|
||||||
|
}
|
||||||
|
if field == "" {
|
||||||
|
return textparse.EntryInvalid, errors.New("label value cannot be empty if previous name is not empty")
|
||||||
|
}
|
||||||
|
l.Value = field
|
||||||
|
r.labels = append(r.labels, l)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("unknown mapping %q", p.mapping[i]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Sort(r.labels)
|
||||||
|
|
||||||
|
if r.exemplar != nil {
|
||||||
|
r.exemplar.Labels = r.labels
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(bwplotka): Implement iterating over all type if we have any (help, unit, etc).
|
||||||
|
p.record = r
|
||||||
|
return textparse.EntrySeries, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func yoloBytes(s string) []byte {
|
||||||
|
return *((*[]byte)(unsafe.Pointer(&s)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFloat(s string) (float64, error) {
|
||||||
|
// Keep to pre-Go 1.13 float formats.
|
||||||
|
if strings.ContainsAny(s, "pP_") {
|
||||||
|
return 0, errors.Errorf("unsupported characters in float %v", s)
|
||||||
|
}
|
||||||
|
return strconv.ParseFloat(s, 64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseHeaders(record []string) (mapping []HeaderType, _ error) {
|
||||||
|
dedup := map[HeaderType]struct{}{}
|
||||||
|
expectLabelValue := false
|
||||||
|
for _, field := range record {
|
||||||
|
switch f := HeaderType(strings.ToLower(field)); f {
|
||||||
|
case Value, TimestampMs, MetricName, Help, Type, Unit, ExemplarValue, ExemplarTimestamp:
|
||||||
|
if expectLabelValue {
|
||||||
|
return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName)
|
||||||
|
}
|
||||||
|
if _, ok := dedup[f]; ok {
|
||||||
|
return nil, errors.Errorf("only one header type of %s is allowed", f)
|
||||||
|
}
|
||||||
|
dedup[f] = struct{}{}
|
||||||
|
mapping = append(mapping, f)
|
||||||
|
|
||||||
|
// Name is required to be before value. On top of that such pair can repeat.
|
||||||
|
case LabelName:
|
||||||
|
if expectLabelValue {
|
||||||
|
return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName)
|
||||||
|
}
|
||||||
|
mapping = append(mapping, f)
|
||||||
|
expectLabelValue = true
|
||||||
|
case LabelValue:
|
||||||
|
if !expectLabelValue {
|
||||||
|
return nil, errors.Errorf("matching %s header expected before %s", LabelName, LabelValue)
|
||||||
|
}
|
||||||
|
mapping = append(mapping, f)
|
||||||
|
expectLabelValue = false
|
||||||
|
default:
|
||||||
|
return nil, errors.Errorf("%q header is not supported", field)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return mapping, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Series returns the bytes of the series, the timestamp if set, and the value
|
||||||
|
// of the current sample. Timestamps is in milliseconds.
|
||||||
|
func (p *Parser) Series() ([]byte, *int64, float64) {
|
||||||
|
return []byte("not-implemented"), &p.record.timestamp, p.record.value
|
||||||
|
}
|
||||||
|
|
||||||
|
// Help returns the metric name and help text in the current entry.
|
||||||
|
// Must only be called after Next returned a help entry.
|
||||||
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
func (p *Parser) Help() ([]byte, []byte) {
|
||||||
|
return []byte(p.record.labels.Get("__name__")), p.record.help
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the metric name and type in the current entry.
|
||||||
|
// Must only be called after Next returned a type entry.
|
||||||
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
func (p *Parser) Type() ([]byte, textparse.MetricType) {
|
||||||
|
return []byte(p.record.labels.Get("__name__")), p.record.typ
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unit returns the metric name and unit in the current entry.
|
||||||
|
// Must only be called after Next returned a unit entry.
|
||||||
|
// The returned byte slices become invalid after the next call to Next.
|
||||||
|
func (p *Parser) Unit() ([]byte, []byte) {
|
||||||
|
return []byte(p.record.labels.Get("__name__")), p.record.unit
|
||||||
|
}
|
||||||
|
|
||||||
|
// Comment returns the text of the current comment.
|
||||||
|
// Must only be called after Next returned a comment entry.
|
||||||
|
// The returned byte slice becomes invalid after the next call to Next.
|
||||||
|
func (p *Parser) Comment() []byte { return nil }
|
||||||
|
|
||||||
|
// Metric writes the labels of the current sample into the passed labels.
|
||||||
|
// It returns the string from which the metric was parsed.
|
||||||
|
func (p *Parser) Metric(l *labels.Labels) string {
|
||||||
|
*l = append(*l, p.record.labels...)
|
||||||
|
// Sort labels to maintain the sorted labels invariant.
|
||||||
|
sort.Sort(*l)
|
||||||
|
|
||||||
|
return "not-implemented"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exemplar writes the exemplar of the current sample into the passed
|
||||||
|
// exemplar. It returns if an exemplar exists or not.
|
||||||
|
func (p *Parser) Exemplar(l *exemplar.Exemplar) bool {
|
||||||
|
if p.record.exemplar == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
*l = *p.record.exemplar
|
||||||
|
return true
|
||||||
|
}
|
85
tsdb/importer/csv/csv_test.go
Normal file
85
tsdb/importer/csv/csv_test.go
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
package csv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
type series struct {
|
||||||
|
// TODO(bwplotka) Assert more stuff once supported.
|
||||||
|
lbls labels.Labels
|
||||||
|
samples []sample
|
||||||
|
}
|
||||||
|
|
||||||
|
type sample struct {
|
||||||
|
t int64
|
||||||
|
v float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParser(t *testing.T) {
|
||||||
|
for _, tcase := range []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
|
||||||
|
expectedSeries []series
|
||||||
|
}{
|
||||||
|
// TODO(bwplotka): Test all edge cases and all entries.
|
||||||
|
{
|
||||||
|
name: "empty",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "just header",
|
||||||
|
input: `metric_name,label_name,label_value,timestamp,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mixed header with data",
|
||||||
|
input: `metric_name,label_name,label_value,timestamp,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp
|
||||||
|
metric1,pod,abc-1,1594885435,instance,1,some help,1245214.23423,counter,-0.12,bytes because why not,1
|
||||||
|
metric1,pod,abc-1,1594885436,instance,1,some help,1.23423,counter,-0.12,bytes because why not,1
|
||||||
|
metric1,pod,abc-2,1594885432,,,some help2,1245214.23421,gauge,,bytes,
|
||||||
|
`,
|
||||||
|
expectedSeries: []series{
|
||||||
|
{
|
||||||
|
lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1", "instance", "1"),
|
||||||
|
samples: []sample{{v: 1245214.23423, t: 1594885435}, {v: 1.23423, t: 1594885436}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1"),
|
||||||
|
samples: []sample{{v: 1245214.23421, t: 1594885432}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tcase.name, func(t *testing.T) {
|
||||||
|
p := NewParser(strings.NewReader(tcase.input), ',')
|
||||||
|
got := map[uint64]series{}
|
||||||
|
for {
|
||||||
|
e, err := p.Next()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
// For now expects only series.
|
||||||
|
testutil.Equals(t, textparse.EntrySeries, e)
|
||||||
|
l := labels.Labels{}
|
||||||
|
p.Metric(&l)
|
||||||
|
|
||||||
|
s, ok := got[l.Hash()]
|
||||||
|
if !ok {
|
||||||
|
got[l.Hash()] = series{lbls: l}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ts, v := p.Series()
|
||||||
|
if ts == nil {
|
||||||
|
t.Fatal("got no timestamps")
|
||||||
|
}
|
||||||
|
s.samples = append(s.samples, sample{t: *ts, v: v})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,24 +0,0 @@
|
||||||
package importer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestCSVParser(t *testing.T) {
|
|
||||||
for _, tcase := range []struct {
|
|
||||||
name string
|
|
||||||
input string
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "empty",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "just header",
|
|
||||||
input: `metric,label,value,timestamp`,
|
|
||||||
},
|
|
||||||
} {
|
|
||||||
t.Run("", func(t *testing.T) {
|
|
||||||
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
86
tsdb/importer/import.go
Normal file
86
tsdb/importer/import.go
Normal file
|
@ -0,0 +1,86 @@
|
||||||
|
// Copyright 2020 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package importer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
|
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
|
||||||
|
"github.com/prometheus/prometheus/tsdb/importer/blocks"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Import imports data from a textparse Parser into block Writer.
|
||||||
|
// TODO(bwplotka): textparse interface potentially limits the format to never give multiple samples. Fix this as some formats
|
||||||
|
// (e.g JSON) might allow that.
|
||||||
|
// Import takes ownership of given block writer.
|
||||||
|
func Import(logger log.Logger, p textparse.Parser, w blocks.Writer) (err error) {
|
||||||
|
if logger == nil {
|
||||||
|
logger = log.NewNopLogger()
|
||||||
|
}
|
||||||
|
|
||||||
|
level.Info(logger).Log("msg", "started importing input data.")
|
||||||
|
app := w.Appender()
|
||||||
|
|
||||||
|
l := labels.Labels{}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
var merr tsdb_errors.MultiError
|
||||||
|
merr.Add(err)
|
||||||
|
merr.Add(w.Close())
|
||||||
|
err = merr.Err()
|
||||||
|
}()
|
||||||
|
|
||||||
|
var e textparse.Entry
|
||||||
|
for {
|
||||||
|
e, err = p.Next()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "parse")
|
||||||
|
}
|
||||||
|
|
||||||
|
// For now care about series only.
|
||||||
|
if e != textparse.EntrySeries {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Metric(&l)
|
||||||
|
_, ts, v := p.Series()
|
||||||
|
if ts == nil {
|
||||||
|
return errors.Errorf("expected timestamp for series %v, got none", l.String())
|
||||||
|
}
|
||||||
|
if _, err := app.Add(l, *ts, v); err != nil {
|
||||||
|
return errors.Wrap(err, "add sample")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
level.Info(logger).Log("msg", "no more input data, committing appenders and flushing block(s)")
|
||||||
|
if err := app.Commit(); err != nil {
|
||||||
|
return errors.Wrap(err, "commit")
|
||||||
|
}
|
||||||
|
|
||||||
|
ids, err := w.Flush()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "flush")
|
||||||
|
}
|
||||||
|
level.Info(logger).Log("msg", "blocks flushed", "ids", fmt.Sprintf("%v", ids))
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -24,18 +24,14 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
labels2 "github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
||||||
"github.com/prometheus/prometheus/util/testutil"
|
"github.com/prometheus/prometheus/util/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// TODO(bwplotka): Did not touch this file, has to updated. Some of this can be simplified and moved to importer/openmetrics package (:
|
||||||
// We use a lower value for this than the default to test block compaction implicitly.
|
|
||||||
maxSamplesInMemory = 2000
|
|
||||||
maxBlockChildren = 10
|
|
||||||
)
|
|
||||||
|
|
||||||
func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) {
|
func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) {
|
||||||
// Assert we have expected number of blocks.
|
// Assert we have expected number of blocks.
|
||||||
|
@ -55,7 +51,7 @@ func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string,
|
||||||
allSymbols[key] = struct{}{}
|
allSymbols[key] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
blockSamples, err := readSeries(block, labels2.FromStrings(metricLabels...))
|
blockSamples, err := readSeries(block, labels.FromStrings(metricLabels...))
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
allSamples = append(allSamples, blockSamples...)
|
allSamples = append(allSamples, blockSamples...)
|
||||||
_ = indexr.Close()
|
_ = indexr.Close()
|
||||||
|
@ -97,7 +93,7 @@ func sortSamples(samples []tsdb.MetricSample) {
|
||||||
// The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them,
|
// The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them,
|
||||||
// we just need to pass the common labels, and we will get all the metrics that have them,
|
// we just need to pass the common labels, and we will get all the metrics that have them,
|
||||||
// even if the other labels b/w the metrics are different.
|
// even if the other labels b/w the metrics are different.
|
||||||
func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSample, error) {
|
func readSeries(block tsdb.BlockReader, lbls labels.Labels) ([]tsdb.MetricSample, error) {
|
||||||
series := make([]tsdb.MetricSample, 0)
|
series := make([]tsdb.MetricSample, 0)
|
||||||
ir, err := block.Index()
|
ir, err := block.Index()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -115,7 +111,7 @@ func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSampl
|
||||||
}
|
}
|
||||||
defer chunkr.Close()
|
defer chunkr.Close()
|
||||||
for _, lbl := range lbls {
|
for _, lbl := range lbls {
|
||||||
css, err := tsdb.LookupChunkSeries(ir, tsr, labels2.MustNewMatcher(labels2.MatchEqual, lbl.Name, lbl.Value))
|
css, err := tsdb.LookupChunkSeries(ir, tsr, labels.MustNewMatcher(labels.MatchEqual, lbl.Name, lbl.Value))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return series, err
|
return series, err
|
||||||
}
|
}
|
||||||
|
@ -149,7 +145,7 @@ func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample
|
||||||
for idx := mint; idx < maxt; idx += int64(step) {
|
for idx := mint; idx < maxt; idx += int64(step) {
|
||||||
// Round to 3 places.
|
// Round to 3 places.
|
||||||
val := math.Floor(rand.Float64()*1000) / 1000
|
val := math.Floor(rand.Float64()*1000) / 1000
|
||||||
sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels2.FromStrings(labels...)}
|
sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels.FromStrings(labels...)}
|
||||||
series = append(series, sample)
|
series = append(series, sample)
|
||||||
}
|
}
|
||||||
return series
|
return series
|
||||||
|
@ -157,7 +153,7 @@ func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample
|
||||||
|
|
||||||
// labelsToStr converts the given labels to a string representation that is compliant with
|
// labelsToStr converts the given labels to a string representation that is compliant with
|
||||||
// the OpenMetrics parser.
|
// the OpenMetrics parser.
|
||||||
func labelsToStr(labels labels2.Labels) string {
|
func labelsToStr(labels labels.Labels) string {
|
||||||
str := "{"
|
str := "{"
|
||||||
for idx, l := range labels {
|
for idx, l := range labels {
|
||||||
str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value))
|
str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value))
|
||||||
|
@ -220,12 +216,12 @@ http_requests_total{code="400"} 1 1565133713990
|
||||||
{
|
{
|
||||||
Timestamp: 1565133713989,
|
Timestamp: 1565133713989,
|
||||||
Value: 1021,
|
Value: 1021,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Timestamp: 1565133713990,
|
Timestamp: 1565133713990,
|
||||||
Value: 1,
|
Value: 1,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -254,12 +250,12 @@ http_requests_total{code="400"} 2 1575133713990
|
||||||
{
|
{
|
||||||
Timestamp: 1565133713989,
|
Timestamp: 1565133713989,
|
||||||
Value: 1022,
|
Value: 1022,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Timestamp: 1575133713990,
|
Timestamp: 1575133713990,
|
||||||
Value: 2,
|
Value: 2,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -288,12 +284,12 @@ http_requests_total{code="400"} 3 1395066363000
|
||||||
{
|
{
|
||||||
Timestamp: 1395066363000,
|
Timestamp: 1395066363000,
|
||||||
Value: 1023,
|
Value: 1023,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"),
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Timestamp: 1395066363000,
|
Timestamp: 1395066363000,
|
||||||
Value: 3,
|
Value: 3,
|
||||||
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
|
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -338,7 +334,7 @@ no_type_metric{type="bad_news_bears"} 0.0 111
|
||||||
{
|
{
|
||||||
Timestamp: 111,
|
Timestamp: 111,
|
||||||
Value: 0.0,
|
Value: 0.0,
|
||||||
Labels: labels2.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"),
|
Labels: labels.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -380,7 +376,7 @@ bad_ts{type="bad_timestamp"} 420 1e99
|
||||||
{
|
{
|
||||||
Timestamp: 6900,
|
Timestamp: 6900,
|
||||||
Value: 42,
|
Value: 42,
|
||||||
Labels: labels2.FromStrings("__name__", "no_help_no_type", "foo", "bar"),
|
Labels: labels.FromStrings("__name__", "no_help_no_type", "foo", "bar"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -406,7 +402,7 @@ bad_ts{type="bad_timestamp"} 420 1e99
|
||||||
{
|
{
|
||||||
Timestamp: 1001,
|
Timestamp: 1001,
|
||||||
Value: 42.24,
|
Value: 42.24,
|
||||||
Labels: labels2.FromStrings("__name__", "bare_metric"),
|
Labels: labels.FromStrings("__name__", "bare_metric"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -430,7 +426,7 @@ no_nl{type="no newline"}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
tmpDbDir, err := ioutil.TempDir("", "importer")
|
tmpDbDir, err := ioutil.TempDir("", "importer")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
err = ImportFromFile(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
err = Import(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
if test.IsOk {
|
if test.IsOk {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
if len(test.Expected.Symbols) > 0 {
|
if len(test.Expected.Symbols) > 0 {
|
||||||
|
@ -449,7 +445,7 @@ no_nl{type="no newline"}
|
||||||
|
|
||||||
func TestImportBadFile(t *testing.T) {
|
func TestImportBadFile(t *testing.T) {
|
||||||
// No file found case.
|
// No file found case.
|
||||||
err := ImportFromFile((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil)
|
err := Import((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
testutil.NotOk(t, err)
|
testutil.NotOk(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -558,19 +554,19 @@ func TestImportIntoExistingDB(t *testing.T) {
|
||||||
|
|
||||||
tmpDbDir, err := ioutil.TempDir("", "importer")
|
tmpDbDir, err := ioutil.TempDir("", "importer")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
err = Import(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
|
importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
|
||||||
importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
|
importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
|
||||||
|
|
||||||
err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
err = Import(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
expectedSamples := make([]tsdb.MetricSample, 0)
|
expectedSamples := make([]tsdb.MetricSample, 0)
|
||||||
for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} {
|
for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} {
|
||||||
for _, sample := range exp {
|
for _, sample := range exp {
|
||||||
lbls := labels2.FromStrings("__name__", test.MetricName)
|
lbls := labels.FromStrings("__name__", test.MetricName)
|
||||||
s := tsdb.MetricSample{
|
s := tsdb.MetricSample{
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
Value: sample.Value,
|
Value: sample.Value,
|
||||||
|
@ -621,13 +617,13 @@ func TestMixedSeries(t *testing.T) {
|
||||||
|
|
||||||
tmpDbDir, err := ioutil.TempDir("", "importer")
|
tmpDbDir, err := ioutil.TempDir("", "importer")
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
err = ImportFromFile(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
err = Import(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample {
|
addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample {
|
||||||
augSamples := make([]tsdb.MetricSample, 0)
|
augSamples := make([]tsdb.MetricSample, 0)
|
||||||
for _, sample := range series {
|
for _, sample := range series {
|
||||||
lbls := labels2.FromStrings("__name__", metricName)
|
lbls := labels.FromStrings("__name__", metricName)
|
||||||
s := tsdb.MetricSample{
|
s := tsdb.MetricSample{
|
||||||
Timestamp: sample.Timestamp,
|
Timestamp: sample.Timestamp,
|
||||||
Value: sample.Value,
|
Value: sample.Value,
|
||||||
|
@ -666,7 +662,7 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) {
|
||||||
{
|
{
|
||||||
Timestamp: -1,
|
Timestamp: -1,
|
||||||
Value: 0,
|
Value: 0,
|
||||||
Labels: labels2.FromStrings(" INVALID", "INVALID"),
|
Labels: labels.FromStrings(" INVALID", "INVALID"),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
secondHalf := genSeries(lbls, 10, 200, 1)
|
secondHalf := genSeries(lbls, 10, 200, 1)
|
||||||
|
@ -677,7 +673,7 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) {
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
r := strings.NewReader(importText)
|
r := strings.NewReader(importText)
|
||||||
err = ImportFromFile(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
err = Import(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
|
||||||
|
|
||||||
testutil.NotOk(t, err)
|
testutil.NotOk(t, err)
|
||||||
// We should abort and return the error with the text, without reading till EOF.
|
// We should abort and return the error with the text, without reading till EOF.
|
||||||
|
@ -727,13 +723,13 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) {
|
||||||
//
|
//
|
||||||
// tmpDbDir, err := ioutil.TempDir("", "importer")
|
// tmpDbDir, err := ioutil.TempDir("", "importer")
|
||||||
// testutil.Ok(b, err)
|
// testutil.Ok(b, err)
|
||||||
// err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
|
// err = Import(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
|
||||||
// testutil.Ok(b, err)
|
// testutil.Ok(b, err)
|
||||||
//
|
//
|
||||||
// importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
|
// importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
|
||||||
// importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
|
// importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
|
||||||
//
|
//
|
||||||
// err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
|
// err = Import(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
|
||||||
// testutil.Ok(b, err)
|
// testutil.Ok(b, err)
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
|
@ -1,601 +0,0 @@
|
||||||
// Copyright 2020 The Prometheus Authors
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package importer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"encoding/gob"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"math"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
|
||||||
"github.com/go-kit/kit/log/level"
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
|
||||||
"github.com/prometheus/prometheus/tsdb"
|
|
||||||
"github.com/prometheus/prometheus/tsdb/fileutil"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Parser interface {
|
|
||||||
// IsParsable returns true if given parser can be used for given input.
|
|
||||||
// It's up to parser how much bytes it will fetch from reader to detect format.
|
|
||||||
IsParsable(io.Reader) error
|
|
||||||
// Parse parses the input.
|
|
||||||
Parse(io.Reader) error
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
// This error is thrown when the current entry does not have a timestamp associated.
|
|
||||||
NoTimestampError = errors.New("expected timestamp with metric")
|
|
||||||
// This error is thrown when the sample being parsed currently has a corresponding block
|
|
||||||
// in the database, but has no metadata collected for it, curiously enough.
|
|
||||||
NoBlockMetaFoundError = errors.New("no metadata found for current samples' block")
|
|
||||||
)
|
|
||||||
|
|
||||||
type blockTimestampPair struct {
|
|
||||||
start, end int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type newBlockMeta struct {
|
|
||||||
index int
|
|
||||||
count int
|
|
||||||
mint, maxt int64
|
|
||||||
isAligned bool
|
|
||||||
dir string
|
|
||||||
children []*newBlockMeta
|
|
||||||
}
|
|
||||||
|
|
||||||
// Content Type for the Open Metrics Parser.
|
|
||||||
// Needed to init the text parser.
|
|
||||||
const contentType = "application/openmetrics-text;"
|
|
||||||
|
|
||||||
// ImportFromFile imports data from a file formatted according to the Open Metrics format,
|
|
||||||
// converts it into block(s), and places the newly created block(s) in the
|
|
||||||
// TSDB DB directory, where it is treated like any other block.
|
|
||||||
func ImportFromFile(
|
|
||||||
r io.Reader,
|
|
||||||
outputPath string,
|
|
||||||
maxSamplesInMemory int,
|
|
||||||
maxBlockChildren int,
|
|
||||||
) error {
|
|
||||||
if logger == nil {
|
|
||||||
logger = log.NewNopLogger()
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Debug(logger).Log("msg", "creating temp directory")
|
|
||||||
tmpDbDir, err := ioutil.TempDir("", "importer")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(tmpDbDir)
|
|
||||||
|
|
||||||
level.Debug(logger).Log("msg", "reading existing block metadata to infer time ranges")
|
|
||||||
blockMetas, err := constructBlockMetadata(dbPath)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Info(logger).Log("msg", "importing input data and aligning with existing DB")
|
|
||||||
if err = writeSamples(r, tmpDbDir, blockMetas, maxSamplesInMemory, maxBlockChildren, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Info(logger).Log("msg", "merging fully overlapping blocks")
|
|
||||||
if err = mergeBlocks(tmpDbDir, blockMetas, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
level.Debug(logger).Log("msg", "copying newly created blocks from temp location to actual DB location")
|
|
||||||
if err = fileutil.CopyDirs(tmpDbDir, dbPath); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// writeSamples parses each metric sample, and writes the samples to the correspondingly aligned block
|
|
||||||
// to the given directory.
|
|
||||||
func writeSamples(
|
|
||||||
r io.Reader,
|
|
||||||
dir string,
|
|
||||||
metas []*newBlockMeta,
|
|
||||||
maxSamplesInMemory int,
|
|
||||||
maxBlockChildren int,
|
|
||||||
logger log.Logger,
|
|
||||||
) error {
|
|
||||||
// First block represents everything before DB start.
|
|
||||||
// Last block represents everything after DB end.
|
|
||||||
dbMint, dbMaxt := metas[0].maxt, metas[len(metas)-1].mint
|
|
||||||
|
|
||||||
blocks := make([][]*tsdb.MetricSample, len(metas))
|
|
||||||
currentPassCount := 0
|
|
||||||
|
|
||||||
// TODO: Maybe use bufio.Reader.Readline instead?
|
|
||||||
// https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca
|
|
||||||
// Use a streaming approach to avoid loading too much data at once.
|
|
||||||
scanner := bufio.NewScanner(r)
|
|
||||||
buf := new(bytes.Buffer)
|
|
||||||
scanner.Split(sampleStreamer(buf))
|
|
||||||
|
|
||||||
for scanner.Scan() {
|
|
||||||
if currentPassCount == 0 {
|
|
||||||
blocks = make([][]*tsdb.MetricSample, len(metas))
|
|
||||||
}
|
|
||||||
|
|
||||||
encSample := scanner.Bytes()
|
|
||||||
decBuf := bytes.NewBuffer(encSample)
|
|
||||||
sample := tsdb.MetricSample{}
|
|
||||||
if err := gob.NewDecoder(decBuf).Decode(&sample); err != nil {
|
|
||||||
level.Error(logger).Log("msg", "failed to decode current entry returned by file scanner", "err", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find what block this sample belongs to.
|
|
||||||
var blockIndex int
|
|
||||||
if len(metas) == 1 || sample.Timestamp < dbMint {
|
|
||||||
blockIndex = 0
|
|
||||||
} else if sample.Timestamp >= dbMaxt {
|
|
||||||
blockIndex = len(metas) - 1
|
|
||||||
} else {
|
|
||||||
blockIndex = getBlockIndex(sample.Timestamp, metas)
|
|
||||||
if blockIndex == -1 {
|
|
||||||
return NoBlockMetaFoundError
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
meta := metas[blockIndex]
|
|
||||||
meta.mint = value.MinInt64(meta.mint, sample.Timestamp)
|
|
||||||
meta.maxt = value.MaxInt64(meta.maxt, sample.Timestamp)
|
|
||||||
meta.count++
|
|
||||||
|
|
||||||
blocks[blockIndex] = append(blocks[blockIndex], &sample)
|
|
||||||
|
|
||||||
currentPassCount++
|
|
||||||
// Have enough samples to write to disk.
|
|
||||||
if currentPassCount == maxSamplesInMemory {
|
|
||||||
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Reset current pass count.
|
|
||||||
currentPassCount = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := scanner.Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Flush any remaining samples.
|
|
||||||
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// flushBlocks writes the given blocks of samples to disk, and updates block metadata information.
|
|
||||||
func flushBlocks(
|
|
||||||
dir string,
|
|
||||||
toFlush [][]*tsdb.MetricSample,
|
|
||||||
metas []*newBlockMeta,
|
|
||||||
maxBlockChildren int,
|
|
||||||
logger log.Logger,
|
|
||||||
) error {
|
|
||||||
for blockIdx, block := range toFlush {
|
|
||||||
// If current block is empty, nothing to write.
|
|
||||||
if len(block) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
meta := metas[blockIdx]
|
|
||||||
|
|
||||||
// Put each sample into the appropriate block.
|
|
||||||
bins, binTimes := binSamples(block, meta, tsdb.DefaultBlockDuration)
|
|
||||||
for binIdx, bin := range bins {
|
|
||||||
if len(bin) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
start, end := binTimes[binIdx].start, binTimes[binIdx].end
|
|
||||||
path, err := tsdb.CreateBlock(bin, dir, start, end, logger)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
child := &newBlockMeta{
|
|
||||||
index: meta.index,
|
|
||||||
count: 0,
|
|
||||||
mint: start,
|
|
||||||
maxt: end,
|
|
||||||
isAligned: meta.isAligned,
|
|
||||||
dir: path,
|
|
||||||
}
|
|
||||||
meta.children = append(meta.children, child)
|
|
||||||
}
|
|
||||||
if maxBlockChildren > 0 && len(meta.children) >= maxBlockChildren {
|
|
||||||
if err := compactMeta(dir, meta, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sampleStreamer returns a function that can be used to parse an OpenMetrics compliant
|
|
||||||
// byte array, and return each token in the user-provided byte buffer.
|
|
||||||
func sampleStreamer(buf *bytes.Buffer) func([]byte, bool) (int, []byte, error) {
|
|
||||||
currentEntryInvalid := false
|
|
||||||
return func(data []byte, atEOF bool) (int, []byte, error) {
|
|
||||||
var err error
|
|
||||||
advance := 0
|
|
||||||
lineIndex := 0
|
|
||||||
lines := strings.Split(string(data), "\n")
|
|
||||||
parser := textparse.New(data, contentType)
|
|
||||||
for {
|
|
||||||
var et textparse.Entry
|
|
||||||
if et, err = parser.Next(); err != nil {
|
|
||||||
if !atEOF && et == textparse.EntryInvalid && !currentEntryInvalid {
|
|
||||||
currentEntryInvalid = true
|
|
||||||
// Fetch more data for the scanner to see if the entry is really invalid.
|
|
||||||
return 0, nil, nil
|
|
||||||
}
|
|
||||||
if err == io.EOF {
|
|
||||||
err = nil
|
|
||||||
}
|
|
||||||
return 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset invalid entry flag as it meant that we just had to get more data
|
|
||||||
// into the scanner.
|
|
||||||
if currentEntryInvalid {
|
|
||||||
currentEntryInvalid = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add 1 to account for newline character.
|
|
||||||
lineLength := len(lines[lineIndex]) + 1
|
|
||||||
lineIndex++
|
|
||||||
|
|
||||||
if et != textparse.EntrySeries {
|
|
||||||
advance = advance + lineLength
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ctime, cvalue := parser.Series()
|
|
||||||
if ctime == nil {
|
|
||||||
return 0, nil, NoTimestampError
|
|
||||||
}
|
|
||||||
// OpenMetrics parser multiples times by 1000 - undoing that.
|
|
||||||
ctimeCorrected := *ctime / 1000
|
|
||||||
|
|
||||||
var clabels labels.Labels
|
|
||||||
_ = parser.Metric(&clabels)
|
|
||||||
|
|
||||||
sample := tsdb.MetricSample{
|
|
||||||
Timestamp: ctimeCorrected,
|
|
||||||
Value: cvalue,
|
|
||||||
Labels: labels.FromMap(clabels.Map()),
|
|
||||||
}
|
|
||||||
|
|
||||||
buf.Reset()
|
|
||||||
enc := gob.NewEncoder(buf)
|
|
||||||
if err = enc.Encode(sample); err != nil {
|
|
||||||
return 0, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
advance += lineLength
|
|
||||||
return advance, buf.Bytes(), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// makeRange returns a series of block times between start and stop,
|
|
||||||
// with step divisions.
|
|
||||||
func makeRange(start, stop, step int64) []blockTimestampPair {
|
|
||||||
if step <= 0 || stop < start {
|
|
||||||
return []blockTimestampPair{}
|
|
||||||
}
|
|
||||||
r := make([]blockTimestampPair, 0)
|
|
||||||
// In case we only have samples with the same timestamp.
|
|
||||||
if start == stop {
|
|
||||||
r = append(r, blockTimestampPair{
|
|
||||||
start: start,
|
|
||||||
end: stop + 1,
|
|
||||||
})
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
for s := start; s < stop; s += step {
|
|
||||||
pair := blockTimestampPair{
|
|
||||||
start: s,
|
|
||||||
}
|
|
||||||
if (s + step) >= stop {
|
|
||||||
pair.end = stop + 1
|
|
||||||
} else {
|
|
||||||
pair.end = s + step
|
|
||||||
}
|
|
||||||
r = append(r, pair)
|
|
||||||
}
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// mergeBlocks looks for blocks that have overlapping time intervals, and compacts them.
|
|
||||||
func mergeBlocks(dest string, metas []*newBlockMeta, logger log.Logger) error {
|
|
||||||
for _, m := range metas {
|
|
||||||
// If no children, there's nothing to merge.
|
|
||||||
if len(m.children) == 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := compactMeta(dest, m, logger); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// binBlocks groups blocks that have fully intersecting intervals of the given durations.
|
|
||||||
// It iterates through the given durations, creating progressive blocks, and once it reaches
|
|
||||||
// the last given duration, it divvies all the remaining samples into blocks of that duration.
|
|
||||||
// Input blocks are assumed sorted by time.
|
|
||||||
func binBlocks(blocks []*newBlockMeta, durations []int64) [][]*newBlockMeta {
|
|
||||||
bins := make([][]*newBlockMeta, 0)
|
|
||||||
if len(blocks) == 0 || len(durations) == 0 {
|
|
||||||
return bins
|
|
||||||
}
|
|
||||||
|
|
||||||
bin := make([]*newBlockMeta, 0)
|
|
||||||
|
|
||||||
start := blocks[0].mint
|
|
||||||
blockIdx := 0
|
|
||||||
Outer:
|
|
||||||
for dIdx, duration := range durations {
|
|
||||||
for bIdx, block := range blocks[blockIdx:] {
|
|
||||||
blockIdx = bIdx
|
|
||||||
if block.maxt-block.mint >= duration {
|
|
||||||
if block.mint <= start {
|
|
||||||
bin = append(bin, block)
|
|
||||||
bins = append(bins, bin)
|
|
||||||
} else {
|
|
||||||
bins = append(bins, bin)
|
|
||||||
bins = append(bins, []*newBlockMeta{block})
|
|
||||||
}
|
|
||||||
bin = []*newBlockMeta{}
|
|
||||||
start = block.maxt
|
|
||||||
if dIdx == len(durations)-1 {
|
|
||||||
continue
|
|
||||||
} else {
|
|
||||||
blockIdx++
|
|
||||||
continue Outer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if block.mint-start < duration && block.maxt-start < duration {
|
|
||||||
// If current block is within the duration window, place it in the current bin.
|
|
||||||
bin = append(bin, block)
|
|
||||||
blockIdx++
|
|
||||||
} else {
|
|
||||||
// Else create a new block, starting with this block.
|
|
||||||
bins = append(bins, bin)
|
|
||||||
bin = []*newBlockMeta{block}
|
|
||||||
start = block.mint
|
|
||||||
if dIdx < len(durations)-1 {
|
|
||||||
blockIdx++
|
|
||||||
continue Outer
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bins = append(bins, bin)
|
|
||||||
return bins
|
|
||||||
}
|
|
||||||
|
|
||||||
// binSamples divvies the samples into bins corresponding to the given block.
|
|
||||||
// If an aligned block is given to it, then a single bin is created,
|
|
||||||
// else, it divides the samples into blocks of duration.
|
|
||||||
// It returns the binned samples, and the time limits for each bin.
|
|
||||||
func binSamples(samples []*tsdb.MetricSample, meta *newBlockMeta, duration int64) ([][]*tsdb.MetricSample, []blockTimestampPair) {
|
|
||||||
findBlock := func(ts int64, blocks []blockTimestampPair) int {
|
|
||||||
for idx, block := range blocks {
|
|
||||||
if block.start <= ts && ts < block.end {
|
|
||||||
return idx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
bins := make([][]*tsdb.MetricSample, 0)
|
|
||||||
times := make([]blockTimestampPair, 0)
|
|
||||||
if len(samples) == 0 {
|
|
||||||
return bins, times
|
|
||||||
}
|
|
||||||
if meta.isAligned {
|
|
||||||
bins = append(bins, samples)
|
|
||||||
times = []blockTimestampPair{{start: meta.mint, end: meta.maxt}}
|
|
||||||
} else {
|
|
||||||
timeTranches := makeRange(meta.mint, meta.maxt, duration)
|
|
||||||
bins = make([][]*tsdb.MetricSample, len(timeTranches))
|
|
||||||
binIdx := -1
|
|
||||||
for _, sample := range samples {
|
|
||||||
ts := sample.Timestamp
|
|
||||||
if binIdx == -1 {
|
|
||||||
binIdx = findBlock(ts, timeTranches)
|
|
||||||
}
|
|
||||||
block := timeTranches[binIdx]
|
|
||||||
if block.start <= ts && ts < block.end {
|
|
||||||
bins[binIdx] = append(bins[binIdx], sample)
|
|
||||||
} else {
|
|
||||||
binIdx = findBlock(ts, timeTranches)
|
|
||||||
bins[binIdx] = append(bins[binIdx], sample)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
times = timeTranches
|
|
||||||
}
|
|
||||||
return bins, times
|
|
||||||
}
|
|
||||||
|
|
||||||
// constructBlockMetadata gives us the new blocks' metadatas.
|
|
||||||
func constructBlockMetadata(path string) ([]*newBlockMeta, error) {
|
|
||||||
dbMint, dbMaxt := int64(math.MinInt64), int64(math.MaxInt64)
|
|
||||||
|
|
||||||
// If we try to open a regular RW handle on an active TSDB instance,
|
|
||||||
// it will fail. Hence, we open a RO handle.
|
|
||||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
defer db.Close()
|
|
||||||
|
|
||||||
blocks, err := db.Blocks()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
metas := []*newBlockMeta{
|
|
||||||
&newBlockMeta{
|
|
||||||
index: 0,
|
|
||||||
count: 0,
|
|
||||||
mint: math.MaxInt64,
|
|
||||||
maxt: dbMint,
|
|
||||||
isAligned: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for idx, block := range blocks {
|
|
||||||
start, end := block.Meta().MinTime, block.Meta().MaxTime
|
|
||||||
metas = append(metas, &newBlockMeta{
|
|
||||||
index: idx + 1,
|
|
||||||
count: 0,
|
|
||||||
mint: start,
|
|
||||||
maxt: end,
|
|
||||||
isAligned: true,
|
|
||||||
})
|
|
||||||
if idx == 0 {
|
|
||||||
dbMint, dbMaxt = start, end
|
|
||||||
} else {
|
|
||||||
dbMint = value.MinInt64(dbMint, start)
|
|
||||||
dbMaxt = value.MaxInt64(dbMaxt, end)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If there are existing blocks in the import location,
|
|
||||||
// we need to add a block for samples that fall over DB maxt.
|
|
||||||
if len(metas) > 1 {
|
|
||||||
// Update first block with the right dbMint as it's maxt.
|
|
||||||
metas[0].maxt = dbMint
|
|
||||||
metas = append(metas, &newBlockMeta{
|
|
||||||
index: len(metas),
|
|
||||||
count: 0,
|
|
||||||
mint: dbMaxt,
|
|
||||||
maxt: math.MinInt64,
|
|
||||||
isAligned: false,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return metas, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// getBlockIndex returns the index of the block the current timestamp corresponds to.
|
|
||||||
// User-provided times are assumed sorted in ascending order.
|
|
||||||
func getBlockIndex(current int64, metas []*newBlockMeta) int {
|
|
||||||
for idx, m := range metas {
|
|
||||||
if current < m.maxt {
|
|
||||||
return idx
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
// compactBlocks compacts all the given blocks and places them in the destination directory.
|
|
||||||
func compactBlocks(dest string, blocks []*newBlockMeta, logger log.Logger) (string, error) {
|
|
||||||
dirs := make([]string, 0)
|
|
||||||
for _, meta := range blocks {
|
|
||||||
dirs = append(dirs, meta.dir)
|
|
||||||
}
|
|
||||||
path, err := compactDirs(dest, dirs, logger)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
for _, meta := range blocks {
|
|
||||||
// Remove existing blocks directory as it is no longer needed.
|
|
||||||
if err = os.RemoveAll(meta.dir); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
// Update child with new blocks path.
|
|
||||||
meta.dir = path
|
|
||||||
}
|
|
||||||
return path, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// compactMeta compacts the children of a given block, dividing them into the Prometheus recommended
|
|
||||||
// block sizes, if it has to.
|
|
||||||
func compactMeta(dest string, m *newBlockMeta, logger log.Logger) error {
|
|
||||||
children := make([]*newBlockMeta, 0)
|
|
||||||
if m.isAligned {
|
|
||||||
path, err := compactBlocks(dest, m.children, logger)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
m.dir = path
|
|
||||||
} else {
|
|
||||||
// Sort blocks to ensure we bin properly.
|
|
||||||
sortBlocks(m.children)
|
|
||||||
binned := binBlocks(m.children, []int64{tsdb.DefaultBlockDuration})
|
|
||||||
for _, bin := range binned {
|
|
||||||
if len(bin) <= 1 {
|
|
||||||
children = append(children, bin...)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
path, err := compactBlocks(dest, bin, logger)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
children = append(children, &newBlockMeta{
|
|
||||||
index: m.index,
|
|
||||||
count: 0,
|
|
||||||
mint: bin[0].mint,
|
|
||||||
maxt: bin[len(bin)-1].maxt,
|
|
||||||
isAligned: m.isAligned,
|
|
||||||
dir: path,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
m.children = children
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// compactDirs compacts the block dirs and places them in destination directory.
|
|
||||||
func compactDirs(dest string, dirs []string, logger log.Logger) (string, error) {
|
|
||||||
path := ""
|
|
||||||
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, []int64{tsdb.DefaultBlockDuration}, nil)
|
|
||||||
if err != nil {
|
|
||||||
return path, err
|
|
||||||
}
|
|
||||||
ulid, err := compactor.Compact(dest, dirs, nil)
|
|
||||||
if err != nil {
|
|
||||||
return path, err
|
|
||||||
}
|
|
||||||
path = filepath.Join(dest, ulid.String())
|
|
||||||
return path, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// sortBlocks sorts block metadata according to the time limits.
|
|
||||||
func sortBlocks(blocks []*newBlockMeta) {
|
|
||||||
sort.Slice(blocks, func(x, y int) bool {
|
|
||||||
bx, by := blocks[x], blocks[y]
|
|
||||||
if bx.mint != by.mint {
|
|
||||||
return bx.mint < by.mint
|
|
||||||
}
|
|
||||||
return bx.maxt < by.maxt
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -1,18 +0,0 @@
|
||||||
package importer
|
|
||||||
|
|
||||||
import (
|
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/textparse"
|
|
||||||
)
|
|
||||||
|
|
||||||
type OpenMetricsParser struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o OpenMetricsParser) IsParsable(reader io.Reader) error {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o OpenMetricsParser) Parse(reader io.Reader) textparse.Parser {
|
|
||||||
panic("implement me")
|
|
||||||
}
|
|
57
tsdb/importer/openmetrics/openmetrics.go
Normal file
57
tsdb/importer/openmetrics/openmetrics.go
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
package openmetrics
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/prometheus/prometheus/pkg/textparse"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Content Type for the Open Metrics Parser.
|
||||||
|
// Needed to init the text parser.
|
||||||
|
const contentType = "application/openmetrics-text;"
|
||||||
|
|
||||||
|
type Parser struct {
|
||||||
|
textparse.Parser
|
||||||
|
|
||||||
|
s *bufio.Scanner
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewParser is a tiny layer between textparse.Parser and importer.Parser which works on io.Reader.
|
||||||
|
// TODO(bwplotka): This is hacky, Parser should allow passing reader from scratch.
|
||||||
|
func NewParser(r io.Reader) textparse.Parser {
|
||||||
|
// TODO(dipack95): Maybe use bufio.Reader.Readline instead?
|
||||||
|
// https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca
|
||||||
|
return &Parser{s: bufio.NewScanner(r)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next advances the parser to the next sample. It returns io.EOF if no
|
||||||
|
// more samples were read.
|
||||||
|
// TODO(bwplotka): Rought implementation, not tested, please help dipack95! (:
|
||||||
|
func (p *Parser) Next() (textparse.Entry, error) {
|
||||||
|
for p.s.Scan() {
|
||||||
|
// TODO(bwplotka): Assuming all line by line. If not do refetch like in previous version with more lines.
|
||||||
|
line := p.s.Bytes()
|
||||||
|
|
||||||
|
p.Parser = textparse.New(line, contentType)
|
||||||
|
if et, err := p.Parser.Next(); err != io.EOF {
|
||||||
|
return et, err
|
||||||
|
}
|
||||||
|
// EOF from parser, continue scanning.
|
||||||
|
}
|
||||||
|
if err := p.s.Err(); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// Series returns the bytes of the series, the timestamp if set, and the value
|
||||||
|
// of the current sample. Timestamps is milliseconds.
|
||||||
|
func (p *Parser) Series() ([]byte, *int64, float64) {
|
||||||
|
b, ts, v := p.Parser.Series()
|
||||||
|
if ts != nil {
|
||||||
|
// OpenMetrics parser has time in nanoseconds. Convert to milliseconds.
|
||||||
|
*ts = *ts / 1000
|
||||||
|
}
|
||||||
|
return b, ts, v
|
||||||
|
}
|
Loading…
Reference in a new issue