From 2911e78fcebd8bab3db00af23a08894629bddcdb Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 15 Jul 2020 00:39:00 +0100 Subject: [PATCH] Refactored TSDB import and added CSV file support. Signed-off-by: Bartlomiej Plotka --- pkg/textparse/interface.go | 47 +- pkg/textparse/openmetricsparse.go | 21 +- pkg/textparse/promparse.go | 15 +- tsdb/cmd/tsdb/main.go | 74 ++- tsdb/head.go | 26 +- tsdb/importer/blocks/multi.go | 107 ++++ tsdb/importer/blocks/writer.go | 116 +++++ tsdb/importer/csv.go | 1 - tsdb/importer/csv/csv.go | 258 ++++++++++ tsdb/importer/csv/csv_test.go | 85 ++++ tsdb/importer/csv_test.go | 24 - tsdb/importer/import.go | 86 ++++ tsdb/importer/import_test.go | 58 +-- tsdb/importer/importer.go | 601 ----------------------- tsdb/importer/openmetrics.go | 18 - tsdb/importer/openmetrics/openmetrics.go | 57 +++ 16 files changed, 846 insertions(+), 748 deletions(-) create mode 100644 tsdb/importer/blocks/multi.go create mode 100644 tsdb/importer/blocks/writer.go delete mode 100644 tsdb/importer/csv.go create mode 100644 tsdb/importer/csv/csv.go create mode 100644 tsdb/importer/csv/csv_test.go delete mode 100644 tsdb/importer/csv_test.go create mode 100644 tsdb/importer/import.go delete mode 100644 tsdb/importer/importer.go delete mode 100644 tsdb/importer/openmetrics.go create mode 100644 tsdb/importer/openmetrics/openmetrics.go diff --git a/pkg/textparse/interface.go b/pkg/textparse/interface.go index cfcd05e210..4719232750 100644 --- a/pkg/textparse/interface.go +++ b/pkg/textparse/interface.go @@ -16,6 +16,7 @@ package textparse import ( "mime" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" ) @@ -55,8 +56,8 @@ type Parser interface { // exemplar. It returns if an exemplar exists or not. Exemplar(l *exemplar.Exemplar) bool - // Next advances the parser to the next sample. It returns false if no - // more samples were read or an error occurred. + // Next advances the parser to the next sample. It returns io.EOF if no + // more samples were read. Next() (Entry, error) } @@ -94,3 +95,45 @@ const ( MetricTypeStateset = "stateset" MetricTypeUnknown = "unknown" ) + +func (m *MetricType) ParseForOpenMetrics(mtyp string) error { + switch mtyp { + case "counter": + *m = MetricTypeCounter + case "gauge": + *m = MetricTypeGauge + case "histogram": + *m = MetricTypeHistogram + case "gaugehistogram": + *m = MetricTypeGaugeHistogram + case "summary": + *m = MetricTypeSummary + case "info": + *m = MetricTypeInfo + case "stateset": + *m = MetricTypeStateset + case "unknown": + *m = MetricTypeUnknown + default: + return errors.Errorf("invalid metric type %q", mtyp) + } + return nil +} + +func (m *MetricType) ParseForProm(mtyp string) error { + switch mtyp { + case "counter": + *m = MetricTypeCounter + case "gauge": + *m = MetricTypeGauge + case "histogram": + *m = MetricTypeHistogram + case "summary": + *m = MetricTypeSummary + case "unknown": + *m = MetricTypeUnknown + default: + return errors.Errorf("invalid metric type %q", mtyp) + } + return nil +} diff --git a/pkg/textparse/openmetricsparse.go b/pkg/textparse/openmetricsparse.go index 6cfdd8391f..eeb7a4add4 100644 --- a/pkg/textparse/openmetricsparse.go +++ b/pkg/textparse/openmetricsparse.go @@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) { } switch t { case tType: - switch s := yoloString(p.text); s { - case "counter": - p.mtype = MetricTypeCounter - case "gauge": - p.mtype = MetricTypeGauge - case "histogram": - p.mtype = MetricTypeHistogram - case "gaugehistogram": - p.mtype = MetricTypeGaugeHistogram - case "summary": - p.mtype = MetricTypeSummary - case "info": - p.mtype = MetricTypeInfo - case "stateset": - p.mtype = MetricTypeStateset - case "unknown": - p.mtype = MetricTypeUnknown - default: - return EntryInvalid, errors.Errorf("invalid metric type %q", s) + if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil { + return EntryInvalid, err } case tHelp: if !utf8.Valid(p.text) { diff --git a/pkg/textparse/promparse.go b/pkg/textparse/promparse.go index 3c885af0ba..3731b1d22c 100644 --- a/pkg/textparse/promparse.go +++ b/pkg/textparse/promparse.go @@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) { } switch t { case tType: - switch s := yoloString(p.text); s { - case "counter": - p.mtype = MetricTypeCounter - case "gauge": - p.mtype = MetricTypeGauge - case "histogram": - p.mtype = MetricTypeHistogram - case "summary": - p.mtype = MetricTypeSummary - case "untyped": - p.mtype = MetricTypeUnknown - default: - return EntryInvalid, errors.Errorf("invalid metric type %q", s) + if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil { + return EntryInvalid, err } case tHelp: if !utf8.Valid(p.text) { diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 93a60afcda..95ab6fea0d 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,11 +34,15 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/importer" + "github.com/prometheus/prometheus/tsdb/importer/blocks" + "github.com/prometheus/prometheus/tsdb/importer/csv" + "github.com/prometheus/prometheus/tsdb/importer/openmetrics" "gopkg.in/alecthomas/kingpin.v2" ) @@ -53,40 +57,43 @@ func execute() (err error) { var ( defaultDBPath = filepath.Join("benchout", "storage") - cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") + cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") - benchCmd = cli.Command("bench", "run benchmarks") - benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") - benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() - benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() - benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() + benchCmd = cli.Command("bench", "run benchmarks") + benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") + benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() + benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() + benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() - listCmd = cli.Command("ls", "list db blocks") - listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() - listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + listCmd = cli.Command("ls", "list db blocks") + listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() + listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") - analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() - analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() + analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") + analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() + analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() - dumpCmd = cli.Command("dump", "dump samples from a TSDB") - dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() - dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + dumpCmd = cli.Command("dump", "dump samples from a TSDB") + dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() + dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() - importCmd = cli.Command("import", fmt.Sprintf("import samples from input and produce TSDB block. Currently supported input formats: %v. Please refer to the storage docs for more details.", importer.) - importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String() - importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String() - importCSVDelimit = importCmd.Flag("csv-delimiter", "force input parsing to CSV format with given delimiter. Default is coma delimiter.").String() - importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process at once.").Default("10000").Int() - importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time.").Default("20").Int() + importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.") + importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String() + importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String() + importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration() + + omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.") + + csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.") + csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String() ) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) var merr tsdb_errors.MultiError - switch kingpin.MustParse(cli.Parse(os.Args[1:])) { + switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd { case benchWriteCmd.FullCommand(): wb := &writeBenchmark{ outPath: *benchWriteOutPath, @@ -150,7 +157,7 @@ func execute() (err error) { err = merr.Err() }() return dumpSamples(db, *dumpMinTime, *dumpMaxTime) - case importCmd.FullCommand(): + case omImportCmd.FullCommand(), csvImportCmd.FullCommand(): input := os.Stdin if importFilePath != nil { input, err = os.Open(*importFilePath) @@ -163,11 +170,26 @@ func execute() (err error) { err = merr.Err() }() } - return importer.ImportFromFile(logger, input, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren) + + var p textparse.Parser + if cmd == omImportCmd.FullCommand() { + p = openmetrics.NewParser(input) + } else { + if len(*csvImportDelimiter) != 1 { + return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter) + } + + p = csv.NewParser(input, []rune(*csvImportDelimiter)[0]) + } + return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize))) } return nil } +func durToMillis(t time.Duration) int64 { + return int64(t.Seconds() * 1000) +} + type writeBenchmark struct { outPath string samplesFile string diff --git a/tsdb/head.go b/tsdb/head.go index ce3e60b927..cb92159607 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -1644,6 +1644,19 @@ func (s *stripeSeries) getByHash(hash uint64, lset labels.Labels) *memSeries { return series } +type sample struct { + t int64 + v float64 +} + +func (s sample) T() int64 { + return s.t +} + +func (s sample) V() float64 { + return s.v +} + func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, bool) { i := hash & uint64(s.size-1) @@ -1665,19 +1678,6 @@ func (s *stripeSeries) getOrSet(hash uint64, series *memSeries) (*memSeries, boo return series, true } -type sample struct { - t int64 - v float64 -} - -func (s sample) T() int64 { - return s.t -} - -func (s sample) V() float64 { - return s.v -} - // memSeries is the in-memory representation of a series. None of its methods // are goroutine safe and it is the caller's responsibility to lock it. type memSeries struct { diff --git a/tsdb/importer/blocks/multi.go b/tsdb/importer/blocks/multi.go new file mode 100644 index 0000000000..b9719d1a24 --- /dev/null +++ b/tsdb/importer/blocks/multi.go @@ -0,0 +1,107 @@ +package blocks + +import ( + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" +) + +type errAppender struct{ err error } + +func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err } +func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err } +func (a errAppender) Commit() error { return a.err } +func (a errAppender) Rollback() error { return a.err } + +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width +} + +type MultiWriter struct { + blocks map[index.Range]Writer + activeAppenders map[index.Range]storage.Appender + + logger log.Logger + dir string + // TODO(bwplotka): Allow more complex compaction levels. + sizeMillis int64 +} + +func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter { + return &MultiWriter{ + logger: logger, + dir: dir, + sizeMillis: sizeMillis, + blocks: map[index.Range]Writer{}, + activeAppenders: map[index.Range]storage.Appender{}, + } +} + +// Appender is not thread-safe. Returned Appender is not thread-save as well. +// TODO(bwplotka): Consider making it thread safe. +func (w *MultiWriter) Appender() storage.Appender { return w } + +func (w *MultiWriter) getOrCreate(t int64) storage.Appender { + maxt := rangeForTimestamp(t, w.sizeMillis) + hash := index.Range{Start: maxt - w.sizeMillis, End: maxt} + if a, ok := w.activeAppenders[hash]; ok { + return a + } + + nw, err := NewTSDBWriter(w.logger, w.dir) + if err != nil { + return errAppender{err: errors.Wrap(err, "new tsdb writer")} + } + + w.blocks[hash] = nw + w.activeAppenders[hash] = nw.Appender() + return w.activeAppenders[hash] +} + +func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return w.getOrCreate(t).Add(l, t, v) +} + +func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error { + return w.getOrCreate(t).AddFast(ref, t, v) +} + +func (w *MultiWriter) Commit() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Commit()) + } + return merr.Err() +} + +func (w *MultiWriter) Rollback() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Rollback()) + } + return merr.Err() +} + +func (w *MultiWriter) Flush() ([]ulid.ULID, error) { + ids := make([]ulid.ULID, 0, len(w.blocks)) + for _, b := range w.blocks { + id, err := b.Flush() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + +func (w *MultiWriter) Close() error { + var merr tsdb_errors.MultiError + for _, b := range w.blocks { + merr.Add(b.Close()) + } + return merr.Err() +} diff --git a/tsdb/importer/blocks/writer.go b/tsdb/importer/blocks/writer.go new file mode 100644 index 0000000000..df381afd66 --- /dev/null +++ b/tsdb/importer/blocks/writer.go @@ -0,0 +1,116 @@ +package blocks + +import ( + "context" + "math" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// Writer is interface to write time series into Prometheus blocks. +type Writer interface { + storage.Appendable + + // Flush writes current data to disk. + // The block or blocks will contain values accumulated by `Write`. + Flush() ([]ulid.ULID, error) + + // Close releases all resources. No append is allowed anymore to such writer. + Close() error +} + +var _ Writer = &TSDBWriter{} + +// Writer is a block writer that allows appending and flushing to disk. +type TSDBWriter struct { + logger log.Logger + dir string + + head *tsdb.Head +} + +func durToMillis(t time.Duration) int64 { + return int64(t.Seconds() * 1000) +} + +// NewTSDBWriter create new block writer. +// +// The returned writer accumulates all series in memory until `Flush` is called. +// +// Note that the writer will not check if the target directory exists or +// contains anything at all. It is the caller's responsibility to +// ensure that the resulting blocks do not overlap etc. +// Writer ensures the block flush is atomic (via rename). +func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) { + res := &TSDBWriter{ + logger: logger, + dir: dir, + } + return res, res.initHead() +} + +// initHead creates and initialises new head. +func (w *TSDBWriter) initHead() error { + logger := w.logger + + // Keep Registerer and WAL nil as we don't use them. + // Put huge chunkRange; It has to be equal then expected block size. + // Since we don't have info about block size here, set it to large number. + h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), tsdb.DefaultStripeSize) + if err != nil { + return errors.Wrap(err, "tsdb.NewHead") + } + + w.head = h + return w.head.Init(math.MinInt64) +} + +// Appender is not thread-safe. Returned Appender is thread-save however. +func (w *TSDBWriter) Appender() storage.Appender { + return w.head.Appender() +} + +// Flush implements Writer interface. This is where actual block writing +// happens. After flush completes, no write can be done. +func (w *TSDBWriter) Flush() ([]ulid.ULID, error) { + seriesCount := w.head.NumSeries() + if w.head.NumSeries() == 0 { + return nil, errors.New("no series appended; aborting.") + } + + mint := w.head.MinTime() + maxt := w.head.MaxTime() + level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) + + // Flush head to disk as a block. + compactor, err := tsdb.NewLeveledCompactor( + context.Background(), + nil, + w.logger, + []int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning. + chunkenc.NewPool()) + if err != nil { + return nil, errors.Wrap(err, "create leveled compactor") + } + id, err := compactor.Write(w.dir, w.head, mint, maxt+1, nil) + if err != nil { + return nil, errors.Wrap(err, "compactor write") + } + if err := w.head.Truncate(maxt + 1); err != nil { + return nil, errors.Wrap(err, "truncate head") + } + return []ulid.ULID{id}, nil +} + +func (w *TSDBWriter) Close() error { + return w.head.Close() +} diff --git a/tsdb/importer/csv.go b/tsdb/importer/csv.go deleted file mode 100644 index ead7ecac42..0000000000 --- a/tsdb/importer/csv.go +++ /dev/null @@ -1 +0,0 @@ -package importer diff --git a/tsdb/importer/csv/csv.go b/tsdb/importer/csv/csv.go new file mode 100644 index 0000000000..e3a5b7e9f3 --- /dev/null +++ b/tsdb/importer/csv/csv.go @@ -0,0 +1,258 @@ +package csv + +import ( + "encoding/csv" + "fmt" + "io" + "sort" + "strconv" + "strings" + "unsafe" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" +) + +type HeaderType string + +const ( + // TODO(bwplotka): Consider accepting different ones like `values` or `timestamps` and pairs of value - timestamp, similar to label name - value. + Value HeaderType = "value" + TimestampMs HeaderType = "timestamp_ms" // Milliseconds from seconds since 1970-01-01 00:00:00 UTC + MetricName HeaderType = "metric_name" // Value of "__name__" label. + LabelName HeaderType = "label_name" // If both labels and metric_name are specified labels are merged. This header can repeat, but label_value is expected as next column. + LabelValue HeaderType = "label_value" // If both labels and metric_name are specified labels are merged. This header can repeat but label_name is expected to be in previous column. + Help HeaderType = "help" + Type HeaderType = "type" + Unit HeaderType = "unit" + ExemplarValue HeaderType = "exemplar_value" + ExemplarTimestamp HeaderType = "exemplar_timestamp" +) + +type recordEntry struct { + labels labels.Labels + help, unit []byte + typ textparse.MetricType + value float64 + timestamp int64 + exemplar *exemplar.Exemplar +} + +type Parser struct { + cr *csv.Reader + + mapping []HeaderType + record recordEntry +} + +// NewParser returns new CSV parsers that is able to parse input in the format described in RFC 4180 and expose +// as textparse.Parser. Read https://golang.org/pkg/encoding/csv/ for more details about parsing CSV. +func NewParser(r io.Reader, delimiter rune) textparse.Parser { + cr := csv.NewReader(r) + cr.ReuseRecord = true + cr.Comma = delimiter + return &Parser{cr: cr} +} + +// Next advances the parser to the next sample. It returns io.EOF if no +// more samples were read. +func (p *Parser) Next() (textparse.Entry, error) { + for { + record, err := p.cr.Read() + if err != nil { + // io.EOF is returned on the end, and io.EOF is also controlling textparse.Parser.Next + return textparse.EntryInvalid, err + } + + if p.mapping == nil { + p.mapping, err = parseHeaders(record) + if err != nil { + return textparse.EntryInvalid, err + } + continue + } + + if len(record) != len(p.mapping) { + return textparse.EntryInvalid, errors.Errorf("got different number of fields than defined by headers. Got %v, expected %v", len(record), len(p.mapping)) + } + + r := recordEntry{} + var l labels.Label + for i, field := range record { + switch p.mapping[i] { + case Value: + r.value, err = parseFloat(field) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", Value) + } + case TimestampMs: + r.timestamp, err = strconv.ParseInt(field, 10, 64) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", TimestampMs) + } + case MetricName: + if r.labels.Has(labels.MetricName) { + return textparse.EntryInvalid, errors.Errorf("%q (or %q label name) already was specified", MetricName, labels.MetricName) + } + r.labels = append(r.labels, labels.Label{Name: labels.MetricName, Value: field}) + case Help: + r.help = yoloBytes(field) + case Type: + if err := r.typ.ParseForOpenMetrics(field); err != nil { + return textparse.EntryInvalid, err + } + case Unit: + r.unit = yoloBytes(field) + case ExemplarValue, ExemplarTimestamp: + if field == "" { + continue + } + if r.exemplar == nil { + r.exemplar = &exemplar.Exemplar{} + } + if p.mapping[i] == ExemplarValue { + r.exemplar.Value, err = parseFloat(field) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", ExemplarValue) + } + } else { + r.exemplar.Ts, err = strconv.ParseInt(field, 10, 64) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", ExemplarTimestamp) + } + r.exemplar.HasTs = true + } + // Name is required to be before value. On top of that such pair can repeat. + case LabelName: + if field != "" && r.labels.Has(field) { + return textparse.EntryInvalid, errors.Errorf("label name %q already was specified", field) + } + l.Name = field + case LabelValue: + if l.Name == "" { + if field == "" { + continue + } + return textparse.EntryInvalid, errors.Errorf("label value is specified (%q) when previous name was empty", field) + } + if field == "" { + return textparse.EntryInvalid, errors.New("label value cannot be empty if previous name is not empty") + } + l.Value = field + r.labels = append(r.labels, l) + default: + panic(fmt.Sprintf("unknown mapping %q", p.mapping[i])) + } + } + sort.Sort(r.labels) + + if r.exemplar != nil { + r.exemplar.Labels = r.labels + } + + // TODO(bwplotka): Implement iterating over all type if we have any (help, unit, etc). + p.record = r + return textparse.EntrySeries, nil + } +} + +func yoloBytes(s string) []byte { + return *((*[]byte)(unsafe.Pointer(&s))) +} + +func parseFloat(s string) (float64, error) { + // Keep to pre-Go 1.13 float formats. + if strings.ContainsAny(s, "pP_") { + return 0, errors.Errorf("unsupported characters in float %v", s) + } + return strconv.ParseFloat(s, 64) +} + +func parseHeaders(record []string) (mapping []HeaderType, _ error) { + dedup := map[HeaderType]struct{}{} + expectLabelValue := false + for _, field := range record { + switch f := HeaderType(strings.ToLower(field)); f { + case Value, TimestampMs, MetricName, Help, Type, Unit, ExemplarValue, ExemplarTimestamp: + if expectLabelValue { + return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName) + } + if _, ok := dedup[f]; ok { + return nil, errors.Errorf("only one header type of %s is allowed", f) + } + dedup[f] = struct{}{} + mapping = append(mapping, f) + + // Name is required to be before value. On top of that such pair can repeat. + case LabelName: + if expectLabelValue { + return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName) + } + mapping = append(mapping, f) + expectLabelValue = true + case LabelValue: + if !expectLabelValue { + return nil, errors.Errorf("matching %s header expected before %s", LabelName, LabelValue) + } + mapping = append(mapping, f) + expectLabelValue = false + default: + return nil, errors.Errorf("%q header is not supported", field) + } + } + return mapping, nil +} + +// Series returns the bytes of the series, the timestamp if set, and the value +// of the current sample. Timestamps is in milliseconds. +func (p *Parser) Series() ([]byte, *int64, float64) { + return []byte("not-implemented"), &p.record.timestamp, p.record.value +} + +// Help returns the metric name and help text in the current entry. +// Must only be called after Next returned a help entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Help() ([]byte, []byte) { + return []byte(p.record.labels.Get("__name__")), p.record.help +} + +// Type returns the metric name and type in the current entry. +// Must only be called after Next returned a type entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Type() ([]byte, textparse.MetricType) { + return []byte(p.record.labels.Get("__name__")), p.record.typ +} + +// Unit returns the metric name and unit in the current entry. +// Must only be called after Next returned a unit entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Unit() ([]byte, []byte) { + return []byte(p.record.labels.Get("__name__")), p.record.unit +} + +// Comment returns the text of the current comment. +// Must only be called after Next returned a comment entry. +// The returned byte slice becomes invalid after the next call to Next. +func (p *Parser) Comment() []byte { return nil } + +// Metric writes the labels of the current sample into the passed labels. +// It returns the string from which the metric was parsed. +func (p *Parser) Metric(l *labels.Labels) string { + *l = append(*l, p.record.labels...) + // Sort labels to maintain the sorted labels invariant. + sort.Sort(*l) + + return "not-implemented" +} + +// Exemplar writes the exemplar of the current sample into the passed +// exemplar. It returns if an exemplar exists or not. +func (p *Parser) Exemplar(l *exemplar.Exemplar) bool { + if p.record.exemplar == nil { + return false + } + *l = *p.record.exemplar + return true +} diff --git a/tsdb/importer/csv/csv_test.go b/tsdb/importer/csv/csv_test.go new file mode 100644 index 0000000000..bcc23d7848 --- /dev/null +++ b/tsdb/importer/csv/csv_test.go @@ -0,0 +1,85 @@ +package csv + +import ( + "io" + "strings" + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/util/testutil" +) + +type series struct { + // TODO(bwplotka) Assert more stuff once supported. + lbls labels.Labels + samples []sample +} + +type sample struct { + t int64 + v float64 +} + +func TestParser(t *testing.T) { + for _, tcase := range []struct { + name string + input string + + expectedSeries []series + }{ + // TODO(bwplotka): Test all edge cases and all entries. + { + name: "empty", + }, + { + name: "just header", + input: `metric_name,label_name,label_value,timestamp,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp`, + }, + { + name: "mixed header with data", + input: `metric_name,label_name,label_value,timestamp,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp +metric1,pod,abc-1,1594885435,instance,1,some help,1245214.23423,counter,-0.12,bytes because why not,1 +metric1,pod,abc-1,1594885436,instance,1,some help,1.23423,counter,-0.12,bytes because why not,1 +metric1,pod,abc-2,1594885432,,,some help2,1245214.23421,gauge,,bytes, +`, + expectedSeries: []series{ + { + lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1", "instance", "1"), + samples: []sample{{v: 1245214.23423, t: 1594885435}, {v: 1.23423, t: 1594885436}}, + }, + { + lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1"), + samples: []sample{{v: 1245214.23421, t: 1594885432}}, + }, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + p := NewParser(strings.NewReader(tcase.input), ',') + got := map[uint64]series{} + for { + e, err := p.Next() + if err == io.EOF { + break + } + testutil.Ok(t, err) + // For now expects only series. + testutil.Equals(t, textparse.EntrySeries, e) + l := labels.Labels{} + p.Metric(&l) + + s, ok := got[l.Hash()] + if !ok { + got[l.Hash()] = series{lbls: l} + } + + _, ts, v := p.Series() + if ts == nil { + t.Fatal("got no timestamps") + } + s.samples = append(s.samples, sample{t: *ts, v: v}) + } + }) + } +} diff --git a/tsdb/importer/csv_test.go b/tsdb/importer/csv_test.go deleted file mode 100644 index 9a36639e55..0000000000 --- a/tsdb/importer/csv_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package importer - -import ( - "testing" -) - -func TestCSVParser(t *testing.T) { - for _, tcase := range []struct { - name string - input string - }{ - { - name: "empty", - }, - { - name: "just header", - input: `metric,label,value,timestamp`, - }, - } { - t.Run("", func(t *testing.T) { - - }) - } -} diff --git a/tsdb/importer/import.go b/tsdb/importer/import.go new file mode 100644 index 0000000000..45823744cf --- /dev/null +++ b/tsdb/importer/import.go @@ -0,0 +1,86 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "fmt" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/importer/blocks" +) + +// Import imports data from a textparse Parser into block Writer. +// TODO(bwplotka): textparse interface potentially limits the format to never give multiple samples. Fix this as some formats +// (e.g JSON) might allow that. +// Import takes ownership of given block writer. +func Import(logger log.Logger, p textparse.Parser, w blocks.Writer) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + + level.Info(logger).Log("msg", "started importing input data.") + app := w.Appender() + + l := labels.Labels{} + + defer func() { + var merr tsdb_errors.MultiError + merr.Add(err) + merr.Add(w.Close()) + err = merr.Err() + }() + + var e textparse.Entry + for { + e, err = p.Next() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrap(err, "parse") + } + + // For now care about series only. + if e != textparse.EntrySeries { + continue + } + + p.Metric(&l) + _, ts, v := p.Series() + if ts == nil { + return errors.Errorf("expected timestamp for series %v, got none", l.String()) + } + if _, err := app.Add(l, *ts, v); err != nil { + return errors.Wrap(err, "add sample") + } + } + + level.Info(logger).Log("msg", "no more input data, committing appenders and flushing block(s)") + if err := app.Commit(); err != nil { + return errors.Wrap(err, "commit") + } + + ids, err := w.Flush() + if err != nil { + return errors.Wrap(err, "flush") + } + level.Info(logger).Log("msg", "blocks flushed", "ids", fmt.Sprintf("%v", ids)) + return nil +} diff --git a/tsdb/importer/import_test.go b/tsdb/importer/import_test.go index e447cf9aac..284af3e3df 100644 --- a/tsdb/importer/import_test.go +++ b/tsdb/importer/import_test.go @@ -24,18 +24,14 @@ import ( "strings" "testing" - labels2 "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/value" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/testutil" ) -const ( - // We use a lower value for this than the default to test block compaction implicitly. - maxSamplesInMemory = 2000 - maxBlockChildren = 10 -) +// TODO(bwplotka): Did not touch this file, has to updated. Some of this can be simplified and moved to importer/openmetrics package (: func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) { // Assert we have expected number of blocks. @@ -55,7 +51,7 @@ func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, allSymbols[key] = struct{}{} } } - blockSamples, err := readSeries(block, labels2.FromStrings(metricLabels...)) + blockSamples, err := readSeries(block, labels.FromStrings(metricLabels...)) testutil.Ok(t, err) allSamples = append(allSamples, blockSamples...) _ = indexr.Close() @@ -97,7 +93,7 @@ func sortSamples(samples []tsdb.MetricSample) { // The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them, // we just need to pass the common labels, and we will get all the metrics that have them, // even if the other labels b/w the metrics are different. -func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSample, error) { +func readSeries(block tsdb.BlockReader, lbls labels.Labels) ([]tsdb.MetricSample, error) { series := make([]tsdb.MetricSample, 0) ir, err := block.Index() if err != nil { @@ -115,7 +111,7 @@ func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSampl } defer chunkr.Close() for _, lbl := range lbls { - css, err := tsdb.LookupChunkSeries(ir, tsr, labels2.MustNewMatcher(labels2.MatchEqual, lbl.Name, lbl.Value)) + css, err := tsdb.LookupChunkSeries(ir, tsr, labels.MustNewMatcher(labels.MatchEqual, lbl.Name, lbl.Value)) if err != nil { return series, err } @@ -149,7 +145,7 @@ func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample for idx := mint; idx < maxt; idx += int64(step) { // Round to 3 places. val := math.Floor(rand.Float64()*1000) / 1000 - sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels2.FromStrings(labels...)} + sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels.FromStrings(labels...)} series = append(series, sample) } return series @@ -157,7 +153,7 @@ func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample // labelsToStr converts the given labels to a string representation that is compliant with // the OpenMetrics parser. -func labelsToStr(labels labels2.Labels) string { +func labelsToStr(labels labels.Labels) string { str := "{" for idx, l := range labels { str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value)) @@ -220,12 +216,12 @@ http_requests_total{code="400"} 1 1565133713990 { Timestamp: 1565133713989, Value: 1021, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"), }, { Timestamp: 1565133713990, Value: 1, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"), }, }, }, @@ -254,12 +250,12 @@ http_requests_total{code="400"} 2 1575133713990 { Timestamp: 1565133713989, Value: 1022, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"), }, { Timestamp: 1575133713990, Value: 2, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"), }, }, }, @@ -288,12 +284,12 @@ http_requests_total{code="400"} 3 1395066363000 { Timestamp: 1395066363000, Value: 1023, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200"), }, { Timestamp: 1395066363000, Value: 3, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), + Labels: labels.FromStrings("__name__", "http_requests_total", "code", "400"), }, }, }, @@ -338,7 +334,7 @@ no_type_metric{type="bad_news_bears"} 0.0 111 { Timestamp: 111, Value: 0.0, - Labels: labels2.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"), + Labels: labels.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"), }, }, }, @@ -380,7 +376,7 @@ bad_ts{type="bad_timestamp"} 420 1e99 { Timestamp: 6900, Value: 42, - Labels: labels2.FromStrings("__name__", "no_help_no_type", "foo", "bar"), + Labels: labels.FromStrings("__name__", "no_help_no_type", "foo", "bar"), }, }, }, @@ -406,7 +402,7 @@ bad_ts{type="bad_timestamp"} 420 1e99 { Timestamp: 1001, Value: 42.24, - Labels: labels2.FromStrings("__name__", "bare_metric"), + Labels: labels.FromStrings("__name__", "bare_metric"), }, }, }, @@ -430,7 +426,7 @@ no_nl{type="no newline"} for _, test := range tests { tmpDbDir, err := ioutil.TempDir("", "importer") testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) + err = Import(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) if test.IsOk { testutil.Ok(t, err) if len(test.Expected.Symbols) > 0 { @@ -449,7 +445,7 @@ no_nl{type="no newline"} func TestImportBadFile(t *testing.T) { // No file found case. - err := ImportFromFile((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil) + err := Import((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil) testutil.NotOk(t, err) } @@ -558,19 +554,19 @@ func TestImportIntoExistingDB(t *testing.T) { tmpDbDir, err := ioutil.TempDir("", "importer") testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) + err = Import(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) testutil.Ok(t, err) importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep) importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries) - err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) + err = Import(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) testutil.Ok(t, err) expectedSamples := make([]tsdb.MetricSample, 0) for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} { for _, sample := range exp { - lbls := labels2.FromStrings("__name__", test.MetricName) + lbls := labels.FromStrings("__name__", test.MetricName) s := tsdb.MetricSample{ Timestamp: sample.Timestamp, Value: sample.Value, @@ -621,13 +617,13 @@ func TestMixedSeries(t *testing.T) { tmpDbDir, err := ioutil.TempDir("", "importer") testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) + err = Import(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) testutil.Ok(t, err) addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample { augSamples := make([]tsdb.MetricSample, 0) for _, sample := range series { - lbls := labels2.FromStrings("__name__", metricName) + lbls := labels.FromStrings("__name__", metricName) s := tsdb.MetricSample{ Timestamp: sample.Timestamp, Value: sample.Value, @@ -666,7 +662,7 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) { { Timestamp: -1, Value: 0, - Labels: labels2.FromStrings(" INVALID", "INVALID"), + Labels: labels.FromStrings(" INVALID", "INVALID"), }, } secondHalf := genSeries(lbls, 10, 200, 1) @@ -677,7 +673,7 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) { testutil.Ok(t, err) r := strings.NewReader(importText) - err = ImportFromFile(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) + err = Import(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) testutil.NotOk(t, err) // We should abort and return the error with the text, without reading till EOF. @@ -727,13 +723,13 @@ func TestInvalidSyntaxQuickAbort(t *testing.T) { // // tmpDbDir, err := ioutil.TempDir("", "importer") // testutil.Ok(b, err) -// err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil) +// err = Import(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil) // testutil.Ok(b, err) // // importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep) // importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries) // -// err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil) +// err = Import(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil) // testutil.Ok(b, err) // } // } diff --git a/tsdb/importer/importer.go b/tsdb/importer/importer.go deleted file mode 100644 index ead9b38494..0000000000 --- a/tsdb/importer/importer.go +++ /dev/null @@ -1,601 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "bufio" - "bytes" - "context" - "encoding/gob" - "io" - "io/ioutil" - "math" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/textparse" - "github.com/prometheus/prometheus/pkg/value" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/fileutil" -) - -type Parser interface { - // IsParsable returns true if given parser can be used for given input. - // It's up to parser how much bytes it will fetch from reader to detect format. - IsParsable(io.Reader) error - // Parse parses the input. - Parse(io.Reader) error -} - -var ( - // This error is thrown when the current entry does not have a timestamp associated. - NoTimestampError = errors.New("expected timestamp with metric") - // This error is thrown when the sample being parsed currently has a corresponding block - // in the database, but has no metadata collected for it, curiously enough. - NoBlockMetaFoundError = errors.New("no metadata found for current samples' block") -) - -type blockTimestampPair struct { - start, end int64 -} - -type newBlockMeta struct { - index int - count int - mint, maxt int64 - isAligned bool - dir string - children []*newBlockMeta -} - -// Content Type for the Open Metrics Parser. -// Needed to init the text parser. -const contentType = "application/openmetrics-text;" - -// ImportFromFile imports data from a file formatted according to the Open Metrics format, -// converts it into block(s), and places the newly created block(s) in the -// TSDB DB directory, where it is treated like any other block. -func ImportFromFile( - r io.Reader, - outputPath string, - maxSamplesInMemory int, - maxBlockChildren int, -) error { - if logger == nil { - logger = log.NewNopLogger() - } - - level.Debug(logger).Log("msg", "creating temp directory") - tmpDbDir, err := ioutil.TempDir("", "importer") - if err != nil { - return err - } - defer os.RemoveAll(tmpDbDir) - - level.Debug(logger).Log("msg", "reading existing block metadata to infer time ranges") - blockMetas, err := constructBlockMetadata(dbPath) - if err != nil { - return err - } - - level.Info(logger).Log("msg", "importing input data and aligning with existing DB") - if err = writeSamples(r, tmpDbDir, blockMetas, maxSamplesInMemory, maxBlockChildren, logger); err != nil { - return err - } - - level.Info(logger).Log("msg", "merging fully overlapping blocks") - if err = mergeBlocks(tmpDbDir, blockMetas, logger); err != nil { - return err - } - - level.Debug(logger).Log("msg", "copying newly created blocks from temp location to actual DB location") - if err = fileutil.CopyDirs(tmpDbDir, dbPath); err != nil { - return err - } - - return nil -} - -// writeSamples parses each metric sample, and writes the samples to the correspondingly aligned block -// to the given directory. -func writeSamples( - r io.Reader, - dir string, - metas []*newBlockMeta, - maxSamplesInMemory int, - maxBlockChildren int, - logger log.Logger, -) error { - // First block represents everything before DB start. - // Last block represents everything after DB end. - dbMint, dbMaxt := metas[0].maxt, metas[len(metas)-1].mint - - blocks := make([][]*tsdb.MetricSample, len(metas)) - currentPassCount := 0 - - // TODO: Maybe use bufio.Reader.Readline instead? - // https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca - // Use a streaming approach to avoid loading too much data at once. - scanner := bufio.NewScanner(r) - buf := new(bytes.Buffer) - scanner.Split(sampleStreamer(buf)) - - for scanner.Scan() { - if currentPassCount == 0 { - blocks = make([][]*tsdb.MetricSample, len(metas)) - } - - encSample := scanner.Bytes() - decBuf := bytes.NewBuffer(encSample) - sample := tsdb.MetricSample{} - if err := gob.NewDecoder(decBuf).Decode(&sample); err != nil { - level.Error(logger).Log("msg", "failed to decode current entry returned by file scanner", "err", err) - return err - } - - // Find what block this sample belongs to. - var blockIndex int - if len(metas) == 1 || sample.Timestamp < dbMint { - blockIndex = 0 - } else if sample.Timestamp >= dbMaxt { - blockIndex = len(metas) - 1 - } else { - blockIndex = getBlockIndex(sample.Timestamp, metas) - if blockIndex == -1 { - return NoBlockMetaFoundError - } - } - - meta := metas[blockIndex] - meta.mint = value.MinInt64(meta.mint, sample.Timestamp) - meta.maxt = value.MaxInt64(meta.maxt, sample.Timestamp) - meta.count++ - - blocks[blockIndex] = append(blocks[blockIndex], &sample) - - currentPassCount++ - // Have enough samples to write to disk. - if currentPassCount == maxSamplesInMemory { - if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil { - return err - } - // Reset current pass count. - currentPassCount = 0 - } - } - - if err := scanner.Err(); err != nil { - return err - } - - // Flush any remaining samples. - if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil { - return err - } - - return nil -} - -// flushBlocks writes the given blocks of samples to disk, and updates block metadata information. -func flushBlocks( - dir string, - toFlush [][]*tsdb.MetricSample, - metas []*newBlockMeta, - maxBlockChildren int, - logger log.Logger, -) error { - for blockIdx, block := range toFlush { - // If current block is empty, nothing to write. - if len(block) == 0 { - continue - } - meta := metas[blockIdx] - - // Put each sample into the appropriate block. - bins, binTimes := binSamples(block, meta, tsdb.DefaultBlockDuration) - for binIdx, bin := range bins { - if len(bin) == 0 { - continue - } - start, end := binTimes[binIdx].start, binTimes[binIdx].end - path, err := tsdb.CreateBlock(bin, dir, start, end, logger) - if err != nil { - return err - } - child := &newBlockMeta{ - index: meta.index, - count: 0, - mint: start, - maxt: end, - isAligned: meta.isAligned, - dir: path, - } - meta.children = append(meta.children, child) - } - if maxBlockChildren > 0 && len(meta.children) >= maxBlockChildren { - if err := compactMeta(dir, meta, logger); err != nil { - return err - } - } - } - return nil -} - -// sampleStreamer returns a function that can be used to parse an OpenMetrics compliant -// byte array, and return each token in the user-provided byte buffer. -func sampleStreamer(buf *bytes.Buffer) func([]byte, bool) (int, []byte, error) { - currentEntryInvalid := false - return func(data []byte, atEOF bool) (int, []byte, error) { - var err error - advance := 0 - lineIndex := 0 - lines := strings.Split(string(data), "\n") - parser := textparse.New(data, contentType) - for { - var et textparse.Entry - if et, err = parser.Next(); err != nil { - if !atEOF && et == textparse.EntryInvalid && !currentEntryInvalid { - currentEntryInvalid = true - // Fetch more data for the scanner to see if the entry is really invalid. - return 0, nil, nil - } - if err == io.EOF { - err = nil - } - return 0, nil, err - } - - // Reset invalid entry flag as it meant that we just had to get more data - // into the scanner. - if currentEntryInvalid { - currentEntryInvalid = false - } - - // Add 1 to account for newline character. - lineLength := len(lines[lineIndex]) + 1 - lineIndex++ - - if et != textparse.EntrySeries { - advance = advance + lineLength - continue - } - - _, ctime, cvalue := parser.Series() - if ctime == nil { - return 0, nil, NoTimestampError - } - // OpenMetrics parser multiples times by 1000 - undoing that. - ctimeCorrected := *ctime / 1000 - - var clabels labels.Labels - _ = parser.Metric(&clabels) - - sample := tsdb.MetricSample{ - Timestamp: ctimeCorrected, - Value: cvalue, - Labels: labels.FromMap(clabels.Map()), - } - - buf.Reset() - enc := gob.NewEncoder(buf) - if err = enc.Encode(sample); err != nil { - return 0, nil, err - } - - advance += lineLength - return advance, buf.Bytes(), nil - } - } -} - -// makeRange returns a series of block times between start and stop, -// with step divisions. -func makeRange(start, stop, step int64) []blockTimestampPair { - if step <= 0 || stop < start { - return []blockTimestampPair{} - } - r := make([]blockTimestampPair, 0) - // In case we only have samples with the same timestamp. - if start == stop { - r = append(r, blockTimestampPair{ - start: start, - end: stop + 1, - }) - return r - } - for s := start; s < stop; s += step { - pair := blockTimestampPair{ - start: s, - } - if (s + step) >= stop { - pair.end = stop + 1 - } else { - pair.end = s + step - } - r = append(r, pair) - } - return r -} - -// mergeBlocks looks for blocks that have overlapping time intervals, and compacts them. -func mergeBlocks(dest string, metas []*newBlockMeta, logger log.Logger) error { - for _, m := range metas { - // If no children, there's nothing to merge. - if len(m.children) == 0 { - continue - } - if err := compactMeta(dest, m, logger); err != nil { - return err - } - } - return nil -} - -// binBlocks groups blocks that have fully intersecting intervals of the given durations. -// It iterates through the given durations, creating progressive blocks, and once it reaches -// the last given duration, it divvies all the remaining samples into blocks of that duration. -// Input blocks are assumed sorted by time. -func binBlocks(blocks []*newBlockMeta, durations []int64) [][]*newBlockMeta { - bins := make([][]*newBlockMeta, 0) - if len(blocks) == 0 || len(durations) == 0 { - return bins - } - - bin := make([]*newBlockMeta, 0) - - start := blocks[0].mint - blockIdx := 0 -Outer: - for dIdx, duration := range durations { - for bIdx, block := range blocks[blockIdx:] { - blockIdx = bIdx - if block.maxt-block.mint >= duration { - if block.mint <= start { - bin = append(bin, block) - bins = append(bins, bin) - } else { - bins = append(bins, bin) - bins = append(bins, []*newBlockMeta{block}) - } - bin = []*newBlockMeta{} - start = block.maxt - if dIdx == len(durations)-1 { - continue - } else { - blockIdx++ - continue Outer - } - } - if block.mint-start < duration && block.maxt-start < duration { - // If current block is within the duration window, place it in the current bin. - bin = append(bin, block) - blockIdx++ - } else { - // Else create a new block, starting with this block. - bins = append(bins, bin) - bin = []*newBlockMeta{block} - start = block.mint - if dIdx < len(durations)-1 { - blockIdx++ - continue Outer - } - } - } - } - bins = append(bins, bin) - return bins -} - -// binSamples divvies the samples into bins corresponding to the given block. -// If an aligned block is given to it, then a single bin is created, -// else, it divides the samples into blocks of duration. -// It returns the binned samples, and the time limits for each bin. -func binSamples(samples []*tsdb.MetricSample, meta *newBlockMeta, duration int64) ([][]*tsdb.MetricSample, []blockTimestampPair) { - findBlock := func(ts int64, blocks []blockTimestampPair) int { - for idx, block := range blocks { - if block.start <= ts && ts < block.end { - return idx - } - } - return -1 - } - bins := make([][]*tsdb.MetricSample, 0) - times := make([]blockTimestampPair, 0) - if len(samples) == 0 { - return bins, times - } - if meta.isAligned { - bins = append(bins, samples) - times = []blockTimestampPair{{start: meta.mint, end: meta.maxt}} - } else { - timeTranches := makeRange(meta.mint, meta.maxt, duration) - bins = make([][]*tsdb.MetricSample, len(timeTranches)) - binIdx := -1 - for _, sample := range samples { - ts := sample.Timestamp - if binIdx == -1 { - binIdx = findBlock(ts, timeTranches) - } - block := timeTranches[binIdx] - if block.start <= ts && ts < block.end { - bins[binIdx] = append(bins[binIdx], sample) - } else { - binIdx = findBlock(ts, timeTranches) - bins[binIdx] = append(bins[binIdx], sample) - } - } - times = timeTranches - } - return bins, times -} - -// constructBlockMetadata gives us the new blocks' metadatas. -func constructBlockMetadata(path string) ([]*newBlockMeta, error) { - dbMint, dbMaxt := int64(math.MinInt64), int64(math.MaxInt64) - - // If we try to open a regular RW handle on an active TSDB instance, - // it will fail. Hence, we open a RO handle. - db, err := tsdb.OpenDBReadOnly(path, nil) - if err != nil { - return nil, err - } - defer db.Close() - - blocks, err := db.Blocks() - if err != nil { - return nil, err - } - - metas := []*newBlockMeta{ - &newBlockMeta{ - index: 0, - count: 0, - mint: math.MaxInt64, - maxt: dbMint, - isAligned: false, - }, - } - - for idx, block := range blocks { - start, end := block.Meta().MinTime, block.Meta().MaxTime - metas = append(metas, &newBlockMeta{ - index: idx + 1, - count: 0, - mint: start, - maxt: end, - isAligned: true, - }) - if idx == 0 { - dbMint, dbMaxt = start, end - } else { - dbMint = value.MinInt64(dbMint, start) - dbMaxt = value.MaxInt64(dbMaxt, end) - } - } - // If there are existing blocks in the import location, - // we need to add a block for samples that fall over DB maxt. - if len(metas) > 1 { - // Update first block with the right dbMint as it's maxt. - metas[0].maxt = dbMint - metas = append(metas, &newBlockMeta{ - index: len(metas), - count: 0, - mint: dbMaxt, - maxt: math.MinInt64, - isAligned: false, - }) - } - return metas, nil -} - -// getBlockIndex returns the index of the block the current timestamp corresponds to. -// User-provided times are assumed sorted in ascending order. -func getBlockIndex(current int64, metas []*newBlockMeta) int { - for idx, m := range metas { - if current < m.maxt { - return idx - } - } - return -1 -} - -// compactBlocks compacts all the given blocks and places them in the destination directory. -func compactBlocks(dest string, blocks []*newBlockMeta, logger log.Logger) (string, error) { - dirs := make([]string, 0) - for _, meta := range blocks { - dirs = append(dirs, meta.dir) - } - path, err := compactDirs(dest, dirs, logger) - if err != nil { - return "", err - } - for _, meta := range blocks { - // Remove existing blocks directory as it is no longer needed. - if err = os.RemoveAll(meta.dir); err != nil { - return "", err - } - // Update child with new blocks path. - meta.dir = path - } - return path, err -} - -// compactMeta compacts the children of a given block, dividing them into the Prometheus recommended -// block sizes, if it has to. -func compactMeta(dest string, m *newBlockMeta, logger log.Logger) error { - children := make([]*newBlockMeta, 0) - if m.isAligned { - path, err := compactBlocks(dest, m.children, logger) - if err != nil { - return err - } - m.dir = path - } else { - // Sort blocks to ensure we bin properly. - sortBlocks(m.children) - binned := binBlocks(m.children, []int64{tsdb.DefaultBlockDuration}) - for _, bin := range binned { - if len(bin) <= 1 { - children = append(children, bin...) - continue - } - path, err := compactBlocks(dest, bin, logger) - if err != nil { - return err - } - children = append(children, &newBlockMeta{ - index: m.index, - count: 0, - mint: bin[0].mint, - maxt: bin[len(bin)-1].maxt, - isAligned: m.isAligned, - dir: path, - }) - } - } - m.children = children - return nil -} - -// compactDirs compacts the block dirs and places them in destination directory. -func compactDirs(dest string, dirs []string, logger log.Logger) (string, error) { - path := "" - compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, []int64{tsdb.DefaultBlockDuration}, nil) - if err != nil { - return path, err - } - ulid, err := compactor.Compact(dest, dirs, nil) - if err != nil { - return path, err - } - path = filepath.Join(dest, ulid.String()) - return path, nil -} - -// sortBlocks sorts block metadata according to the time limits. -func sortBlocks(blocks []*newBlockMeta) { - sort.Slice(blocks, func(x, y int) bool { - bx, by := blocks[x], blocks[y] - if bx.mint != by.mint { - return bx.mint < by.mint - } - return bx.maxt < by.maxt - }) -} diff --git a/tsdb/importer/openmetrics.go b/tsdb/importer/openmetrics.go deleted file mode 100644 index 4d2e04a10a..0000000000 --- a/tsdb/importer/openmetrics.go +++ /dev/null @@ -1,18 +0,0 @@ -package importer - -import ( - "io" - - "github.com/prometheus/prometheus/pkg/textparse" -) - -type OpenMetricsParser struct { -} - -func (o OpenMetricsParser) IsParsable(reader io.Reader) error { - panic("implement me") -} - -func (o OpenMetricsParser) Parse(reader io.Reader) textparse.Parser { - panic("implement me") -} diff --git a/tsdb/importer/openmetrics/openmetrics.go b/tsdb/importer/openmetrics/openmetrics.go new file mode 100644 index 0000000000..e52cfb4dc8 --- /dev/null +++ b/tsdb/importer/openmetrics/openmetrics.go @@ -0,0 +1,57 @@ +package openmetrics + +import ( + "bufio" + "io" + + "github.com/prometheus/prometheus/pkg/textparse" +) + +// Content Type for the Open Metrics Parser. +// Needed to init the text parser. +const contentType = "application/openmetrics-text;" + +type Parser struct { + textparse.Parser + + s *bufio.Scanner +} + +// NewParser is a tiny layer between textparse.Parser and importer.Parser which works on io.Reader. +// TODO(bwplotka): This is hacky, Parser should allow passing reader from scratch. +func NewParser(r io.Reader) textparse.Parser { + // TODO(dipack95): Maybe use bufio.Reader.Readline instead? + // https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca + return &Parser{s: bufio.NewScanner(r)} +} + +// Next advances the parser to the next sample. It returns io.EOF if no +// more samples were read. +// TODO(bwplotka): Rought implementation, not tested, please help dipack95! (: +func (p *Parser) Next() (textparse.Entry, error) { + for p.s.Scan() { + // TODO(bwplotka): Assuming all line by line. If not do refetch like in previous version with more lines. + line := p.s.Bytes() + + p.Parser = textparse.New(line, contentType) + if et, err := p.Parser.Next(); err != io.EOF { + return et, err + } + // EOF from parser, continue scanning. + } + if err := p.s.Err(); err != nil { + return 0, err + } + return 0, io.EOF +} + +// Series returns the bytes of the series, the timestamp if set, and the value +// of the current sample. Timestamps is milliseconds. +func (p *Parser) Series() ([]byte, *int64, float64) { + b, ts, v := p.Parser.Series() + if ts != nil { + // OpenMetrics parser has time in nanoseconds. Convert to milliseconds. + *ts = *ts / 1000 + } + return b, ts, v +}