From 3ac96c7841ed2d81a9611bd3c158007a85559c98 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Mon, 13 Jul 2020 18:33:27 +0100 Subject: [PATCH] Added TSDB import with OpenMetrics and CSV file support. Based on https://github.com/prometheus/prometheus/pull/5887 Thanks for your work so far @dipack95, it helped a lot! Changes on top of @dipack95: * Addressed all reviews components * Use subcommands for different formats * Simplifed block creation, no need to be such complex for first iteration. * Simpliefied and separate concerns. No need to have access to DB. Block * writting is separated as well for ease of benchmarking and test. This will be also needed by @JessicaGreben * Added import support for different formats. * Removed all tests - those had to be pulled over and adjusted ): Signed-off-by: Bartlomiej Plotka --- pkg/textparse/interface.go | 47 +- pkg/textparse/openmetricsparse.go | 21 +- pkg/textparse/promparse.go | 15 +- tsdb/cmd/tsdb/main.go | 92 ++- tsdb/importer/blocks/multi.go | 120 ++++ tsdb/importer/blocks/writer.go | 139 +++++ tsdb/importer/csv/csv.go | 271 ++++++++ tsdb/importer/csv/csv_test.go | 98 +++ tsdb/importer/import.go | 86 +++ tsdb/importer/import_test.go | 753 ----------------------- tsdb/importer/importer.go | 602 ------------------ tsdb/importer/openmetrics/openmetrics.go | 70 +++ 12 files changed, 897 insertions(+), 1417 deletions(-) create mode 100644 tsdb/importer/blocks/multi.go create mode 100644 tsdb/importer/blocks/writer.go create mode 100644 tsdb/importer/csv/csv.go create mode 100644 tsdb/importer/csv/csv_test.go create mode 100644 tsdb/importer/import.go delete mode 100644 tsdb/importer/import_test.go delete mode 100644 tsdb/importer/importer.go create mode 100644 tsdb/importer/openmetrics/openmetrics.go diff --git a/pkg/textparse/interface.go b/pkg/textparse/interface.go index cfcd05e21..471923275 100644 --- a/pkg/textparse/interface.go +++ b/pkg/textparse/interface.go @@ -16,6 +16,7 @@ package textparse import ( "mime" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/exemplar" "github.com/prometheus/prometheus/pkg/labels" ) @@ -55,8 +56,8 @@ type Parser interface { // exemplar. It returns if an exemplar exists or not. Exemplar(l *exemplar.Exemplar) bool - // Next advances the parser to the next sample. It returns false if no - // more samples were read or an error occurred. + // Next advances the parser to the next sample. It returns io.EOF if no + // more samples were read. Next() (Entry, error) } @@ -94,3 +95,45 @@ const ( MetricTypeStateset = "stateset" MetricTypeUnknown = "unknown" ) + +func (m *MetricType) ParseForOpenMetrics(mtyp string) error { + switch mtyp { + case "counter": + *m = MetricTypeCounter + case "gauge": + *m = MetricTypeGauge + case "histogram": + *m = MetricTypeHistogram + case "gaugehistogram": + *m = MetricTypeGaugeHistogram + case "summary": + *m = MetricTypeSummary + case "info": + *m = MetricTypeInfo + case "stateset": + *m = MetricTypeStateset + case "unknown": + *m = MetricTypeUnknown + default: + return errors.Errorf("invalid metric type %q", mtyp) + } + return nil +} + +func (m *MetricType) ParseForProm(mtyp string) error { + switch mtyp { + case "counter": + *m = MetricTypeCounter + case "gauge": + *m = MetricTypeGauge + case "histogram": + *m = MetricTypeHistogram + case "summary": + *m = MetricTypeSummary + case "unknown": + *m = MetricTypeUnknown + default: + return errors.Errorf("invalid metric type %q", mtyp) + } + return nil +} diff --git a/pkg/textparse/openmetricsparse.go b/pkg/textparse/openmetricsparse.go index 6cfdd8391..eeb7a4add 100644 --- a/pkg/textparse/openmetricsparse.go +++ b/pkg/textparse/openmetricsparse.go @@ -259,25 +259,8 @@ func (p *OpenMetricsParser) Next() (Entry, error) { } switch t { case tType: - switch s := yoloString(p.text); s { - case "counter": - p.mtype = MetricTypeCounter - case "gauge": - p.mtype = MetricTypeGauge - case "histogram": - p.mtype = MetricTypeHistogram - case "gaugehistogram": - p.mtype = MetricTypeGaugeHistogram - case "summary": - p.mtype = MetricTypeSummary - case "info": - p.mtype = MetricTypeInfo - case "stateset": - p.mtype = MetricTypeStateset - case "unknown": - p.mtype = MetricTypeUnknown - default: - return EntryInvalid, errors.Errorf("invalid metric type %q", s) + if err := p.mtype.ParseForOpenMetrics(yoloString(p.text)); err != nil { + return EntryInvalid, err } case tHelp: if !utf8.Valid(p.text) { diff --git a/pkg/textparse/promparse.go b/pkg/textparse/promparse.go index 3c885af0b..3731b1d22 100644 --- a/pkg/textparse/promparse.go +++ b/pkg/textparse/promparse.go @@ -289,19 +289,8 @@ func (p *PromParser) Next() (Entry, error) { } switch t { case tType: - switch s := yoloString(p.text); s { - case "counter": - p.mtype = MetricTypeCounter - case "gauge": - p.mtype = MetricTypeGauge - case "histogram": - p.mtype = MetricTypeHistogram - case "summary": - p.mtype = MetricTypeSummary - case "untyped": - p.mtype = MetricTypeUnknown - default: - return EntryInvalid, errors.Errorf("invalid metric type %q", s) + if err := p.mtype.ParseForProm(yoloString(p.text)); err != nil { + return EntryInvalid, err } case tHelp: if !utf8.Valid(p.text) { diff --git a/tsdb/cmd/tsdb/main.go b/tsdb/cmd/tsdb/main.go index 643c0e54e..ef514c93e 100644 --- a/tsdb/cmd/tsdb/main.go +++ b/tsdb/cmd/tsdb/main.go @@ -34,11 +34,15 @@ import ( "github.com/go-kit/kit/log" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/importer" + "github.com/prometheus/prometheus/tsdb/importer/blocks" + "github.com/prometheus/prometheus/tsdb/importer/csv" + "github.com/prometheus/prometheus/tsdb/importer/openmetrics" "gopkg.in/alecthomas/kingpin.v2" ) @@ -53,34 +57,43 @@ func execute() (err error) { var ( defaultDBPath = filepath.Join("benchout", "storage") - cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") - benchCmd = cli.Command("bench", "run benchmarks") - benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") - benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() - benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() - benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() - listCmd = cli.Command("ls", "list db blocks") - listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() - listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") - analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() - analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() - dumpCmd = cli.Command("dump", "dump samples from a TSDB") - dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() - dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() - dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() - importCmd = cli.Command("import", "import samples from file containing information formatted in the Open Metrics format. Please refer to the storage docs for more details.") - importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Open Metrics format)").Required().String() - importDbPath = importCmd.Arg("db path", "database path").Required().String() - importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process in a cycle").Default("10000").Int() - importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time").Default("20").Int() + cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb") + + benchCmd = cli.Command("bench", "run benchmarks") + benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark") + benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String() + benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int() + benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String() + + listCmd = cli.Command("ls", "list db blocks") + listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool() + listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + + analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.") + analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String() + analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int() + + dumpCmd = cli.Command("dump", "dump samples from a TSDB") + dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String() + dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() + dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + + importCmd = cli.Command("import", "[Experimental] import samples from input and produce TSDB block. Please refer to the storage docs for more details.") + importDbPath = importCmd.Flag("output", "output directory for generated block").Default(".").String() + importFilePath = importCmd.Flag("input-file", "disables reading from input and using file to import samples from. If empty input is required").String() + importBlockSize = importCmd.Flag("block-size", "The maximum block size. The actual block timestamps will be aligned with Prometheus time ranges.").Default("2h").Hidden().Duration() + + omImportCmd = importCmd.Command("openmetrics", "import samples from OpenMetrics input and produce TSDB block. Please refer to the storage docs for more details.") + + csvImportCmd = importCmd.Command("csv", "import samples from CSV input and produce TSDB block. Please refer to the storage docs for more details.") + csvImportDelimiter = csvImportCmd.Flag("delimiter", "CSV single character for fields delimiting").Default(",").String() ) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) var merr tsdb_errors.MultiError - switch kingpin.MustParse(cli.Parse(os.Args[1:])) { + switch cmd := kingpin.MustParse(cli.Parse(os.Args[1:])); cmd { case benchWriteCmd.FullCommand(): wb := &writeBenchmark{ outPath: *benchWriteOutPath, @@ -144,16 +157,39 @@ func execute() (err error) { err = merr.Err() }() return dumpSamples(db, *dumpMinTime, *dumpMaxTime) - case importCmd.FullCommand(): - f, err := os.Open(*importFilePath) - if err != nil { - return err + case omImportCmd.FullCommand(), csvImportCmd.FullCommand(): + input := os.Stdin + if *importFilePath != "" { + input, err = os.Open(*importFilePath) + if err != nil { + return err + } + defer func() { + merr.Add(err) + merr.Add(input.Close()) + err = merr.Err() + }() } - return importer.ImportFromFile(f, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren, logger) + + var p textparse.Parser + if cmd == omImportCmd.FullCommand() { + p = openmetrics.NewParser(input) + } else { + if len(*csvImportDelimiter) != 1 { + return errors.Errorf("wrong format of delimiter flag, expected single character, got %q", *csvImportDelimiter) + } + + p = csv.NewParser(input, []rune(*csvImportDelimiter)[0]) + } + return importer.Import(logger, p, blocks.NewMultiWriter(logger, *importDbPath, durToMillis(*importBlockSize))) } return nil } +func durToMillis(t time.Duration) int64 { + return int64(t.Seconds() * 1000) +} + type writeBenchmark struct { outPath string samplesFile string diff --git a/tsdb/importer/blocks/multi.go b/tsdb/importer/blocks/multi.go new file mode 100644 index 000000000..4007860d8 --- /dev/null +++ b/tsdb/importer/blocks/multi.go @@ -0,0 +1,120 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blocks + +import ( + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/index" +) + +type errAppender struct{ err error } + +func (a errAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) { return 0, a.err } +func (a errAppender) AddFast(ref uint64, t int64, v float64) error { return a.err } +func (a errAppender) Commit() error { return a.err } +func (a errAppender) Rollback() error { return a.err } + +func rangeForTimestamp(t int64, width int64) (maxt int64) { + return (t/width)*width + width +} + +type MultiWriter struct { + blocks map[index.Range]Writer + activeAppenders map[index.Range]storage.Appender + + logger log.Logger + dir string + // TODO(bwplotka): Allow more complex compaction levels. + sizeMillis int64 +} + +func NewMultiWriter(logger log.Logger, dir string, sizeMillis int64) *MultiWriter { + return &MultiWriter{ + logger: logger, + dir: dir, + sizeMillis: sizeMillis, + blocks: map[index.Range]Writer{}, + activeAppenders: map[index.Range]storage.Appender{}, + } +} + +// Appender is not thread-safe. Returned Appender is not thread-save as well. +// TODO(bwplotka): Consider making it thread safe. +func (w *MultiWriter) Appender() storage.Appender { return w } + +func (w *MultiWriter) getOrCreate(t int64) storage.Appender { + maxt := rangeForTimestamp(t, w.sizeMillis) + hash := index.Range{Start: maxt - w.sizeMillis, End: maxt} + if a, ok := w.activeAppenders[hash]; ok { + return a + } + + nw, err := NewTSDBWriter(w.logger, w.dir) + if err != nil { + return errAppender{err: errors.Wrap(err, "new tsdb writer")} + } + + w.blocks[hash] = nw + w.activeAppenders[hash] = nw.Appender() + return w.activeAppenders[hash] +} + +func (w *MultiWriter) Add(l labels.Labels, t int64, v float64) (uint64, error) { + return w.getOrCreate(t).Add(l, t, v) +} + +func (w *MultiWriter) AddFast(ref uint64, t int64, v float64) error { + return w.getOrCreate(t).AddFast(ref, t, v) +} + +func (w *MultiWriter) Commit() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Commit()) + } + return merr.Err() +} + +func (w *MultiWriter) Rollback() error { + var merr tsdb_errors.MultiError + for _, a := range w.activeAppenders { + merr.Add(a.Rollback()) + } + return merr.Err() +} + +func (w *MultiWriter) Flush() ([]ulid.ULID, error) { + ids := make([]ulid.ULID, 0, len(w.blocks)) + for _, b := range w.blocks { + id, err := b.Flush() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + +func (w *MultiWriter) Close() error { + var merr tsdb_errors.MultiError + for _, b := range w.blocks { + merr.Add(b.Close()) + } + return merr.Err() +} diff --git a/tsdb/importer/blocks/writer.go b/tsdb/importer/blocks/writer.go new file mode 100644 index 000000000..3af869e06 --- /dev/null +++ b/tsdb/importer/blocks/writer.go @@ -0,0 +1,139 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package blocks + +import ( + "context" + "io/ioutil" + "math" + "os" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/tsdb/chunkenc" +) + +// Writer is interface to write time series into Prometheus blocks. +type Writer interface { + storage.Appendable + + // Flush writes current data to disk. + // The block or blocks will contain values accumulated by `Write`. + Flush() ([]ulid.ULID, error) + + // Close releases all resources. No append is allowed anymore to such writer. + Close() error +} + +var _ Writer = &TSDBWriter{} + +// Writer is a block writer that allows appending and flushing to disk. +type TSDBWriter struct { + logger log.Logger + dir string + + head *tsdb.Head + tmpDir string +} + +func durToMillis(t time.Duration) int64 { + return int64(t.Seconds() * 1000) +} + +// NewTSDBWriter create new block writer. +// +// The returned writer accumulates all series in memory until `Flush` is called. +// +// Note that the writer will not check if the target directory exists or +// contains anything at all. It is the caller's responsibility to +// ensure that the resulting blocks do not overlap etc. +// Writer ensures the block flush is atomic (via rename). +func NewTSDBWriter(logger log.Logger, dir string) (*TSDBWriter, error) { + res := &TSDBWriter{ + logger: logger, + dir: dir, + } + return res, res.initHead() +} + +// initHead creates and initialises new head. +func (w *TSDBWriter) initHead() error { + logger := w.logger + + // Keep Registerer and WAL nil as we don't use them. + // Put huge chunkRange; It has to be equal then expected block size. + // Since we don't have info about block size here, set it to large number. + + tmpDir, err := ioutil.TempDir(os.TempDir(), "head") + if err != nil { + return errors.Wrap(err, "create temp dir") + } + w.tmpDir = tmpDir + + h, err := tsdb.NewHead(nil, logger, nil, durToMillis(9999*time.Hour), w.tmpDir, nil, tsdb.DefaultStripeSize, nil) + if err != nil { + return errors.Wrap(err, "tsdb.NewHead") + } + + w.head = h + return w.head.Init(math.MinInt64) +} + +// Appender is not thread-safe. Returned Appender is thread-save however. +func (w *TSDBWriter) Appender() storage.Appender { + return w.head.Appender() +} + +// Flush implements Writer interface. This is where actual block writing +// happens. After flush completes, no write can be done. +func (w *TSDBWriter) Flush() ([]ulid.ULID, error) { + seriesCount := w.head.NumSeries() + if w.head.NumSeries() == 0 { + return nil, errors.New("no series appended; aborting.") + } + + mint := w.head.MinTime() + maxt := w.head.MaxTime() + 1 + level.Info(w.logger).Log("msg", "flushing", "series_count", seriesCount, "mint", timestamp.Time(mint), "maxt", timestamp.Time(maxt)) + + // Flush head to disk as a block. + compactor, err := tsdb.NewLeveledCompactor( + context.Background(), + nil, + w.logger, + []int64{durToMillis(2 * time.Hour)}, // Does not matter, used only for planning. + chunkenc.NewPool()) + if err != nil { + return nil, errors.Wrap(err, "create leveled compactor") + } + id, err := compactor.Write(w.dir, w.head, mint, maxt, nil) + if err != nil { + return nil, errors.Wrap(err, "compactor write") + } + // TODO(bwplotka): Potential truncate head, and allow writer reuse. Currently truncating fails with + // truncate chunks.HeadReadWriter: maxt of the files are not set. + return []ulid.ULID{id}, nil +} + +func (w *TSDBWriter) Close() error { + _ = os.RemoveAll(w.tmpDir) + return w.head.Close() +} diff --git a/tsdb/importer/csv/csv.go b/tsdb/importer/csv/csv.go new file mode 100644 index 000000000..ffedfef46 --- /dev/null +++ b/tsdb/importer/csv/csv.go @@ -0,0 +1,271 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package csv + +import ( + "encoding/csv" + "fmt" + "io" + "sort" + "strconv" + "strings" + "unsafe" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/exemplar" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" +) + +type HeaderType string + +const ( + // TODO(bwplotka): Consider accepting different ones like `values` or `timestamps` and pairs of value - timestamp, similar to label name - value. + Value HeaderType = "value" + TimestampMs HeaderType = "timestamp_ms" // Milliseconds from seconds since 1970-01-01 00:00:00 UTC + MetricName HeaderType = "metric_name" // Value of "__name__" label. + LabelName HeaderType = "label_name" // If both labels and metric_name are specified labels are merged. This header can repeat, but label_value is expected as next column. + LabelValue HeaderType = "label_value" // If both labels and metric_name are specified labels are merged. This header can repeat but label_name is expected to be in previous column. + Help HeaderType = "help" + Type HeaderType = "type" + Unit HeaderType = "unit" + ExemplarValue HeaderType = "exemplar_value" + ExemplarTimestampMs HeaderType = "exemplar_timestamp_ms" +) + +type recordEntry struct { + labels labels.Labels + help, unit []byte + typ textparse.MetricType + value float64 + timestampMs int64 + exemplar *exemplar.Exemplar +} + +type Parser struct { + cr *csv.Reader + + mapping []HeaderType + record recordEntry +} + +// NewParser returns new CSV parsers that is able to parse input in the format described in RFC 4180 and expose +// as textparse.Parser. Read https://golang.org/pkg/encoding/csv/ for more details about parsing CSV. +func NewParser(r io.Reader, delimiter rune) textparse.Parser { + cr := csv.NewReader(r) + cr.ReuseRecord = true + cr.Comma = delimiter + return &Parser{cr: cr} +} + +// Next advances the parser to the next sample. It returns io.EOF if no +// more samples were read. +func (p *Parser) Next() (textparse.Entry, error) { + for { + record, err := p.cr.Read() + if err != nil { + // io.EOF is returned on the end, and io.EOF is also controlling textparse.Parser.Next + return textparse.EntryInvalid, err + } + + if p.mapping == nil { + p.mapping, err = parseHeaders(record) + if err != nil { + return textparse.EntryInvalid, err + } + continue + } + + if len(record) != len(p.mapping) { + return textparse.EntryInvalid, errors.Errorf("got different number of fields than defined by headers. Got %v, expected %v", len(record), len(p.mapping)) + } + + r := recordEntry{} + var l labels.Label + for i, field := range record { + switch p.mapping[i] { + case Value: + r.value, err = parseFloat(field) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", Value) + } + case TimestampMs: + r.timestampMs, err = strconv.ParseInt(field, 10, 64) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", TimestampMs) + } + case MetricName: + if r.labels.Has(labels.MetricName) { + return textparse.EntryInvalid, errors.Errorf("%q (or %q label name) already was specified", MetricName, labels.MetricName) + } + r.labels = append(r.labels, labels.Label{Name: labels.MetricName, Value: field}) + case Help: + r.help = yoloBytes(field) + case Type: + if err := r.typ.ParseForOpenMetrics(field); err != nil { + return textparse.EntryInvalid, err + } + case Unit: + r.unit = yoloBytes(field) + case ExemplarValue, ExemplarTimestampMs: + if field == "" { + continue + } + if r.exemplar == nil { + r.exemplar = &exemplar.Exemplar{} + } + if p.mapping[i] == ExemplarValue { + r.exemplar.Value, err = parseFloat(field) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as float64", ExemplarValue) + } + } else { + r.exemplar.Ts, err = strconv.ParseInt(field, 10, 64) + if err != nil { + return textparse.EntryInvalid, errors.Wrapf(err, "parse %q as int64", ExemplarTimestampMs) + } + r.exemplar.HasTs = true + } + // Name is required to be before value. On top of that such pair can repeat. + case LabelName: + if field != "" && r.labels.Has(field) { + return textparse.EntryInvalid, errors.Errorf("label name %q already was specified", field) + } + l.Name = field + case LabelValue: + if l.Name == "" { + if field == "" { + continue + } + return textparse.EntryInvalid, errors.Errorf("label value is specified (%q) when previous name was empty", field) + } + if field == "" { + return textparse.EntryInvalid, errors.New("label value cannot be empty if previous name is not empty") + } + l.Value = field + r.labels = append(r.labels, l) + default: + panic(fmt.Sprintf("unknown mapping %q", p.mapping[i])) + } + } + sort.Sort(r.labels) + + if r.exemplar != nil { + r.exemplar.Labels = r.labels + } + + // TODO(bwplotka): Implement iterating over all type if we have any (help, unit, etc). + p.record = r + return textparse.EntrySeries, nil + } +} + +func yoloBytes(s string) []byte { + return *((*[]byte)(unsafe.Pointer(&s))) +} + +func parseFloat(s string) (float64, error) { + // Keep to pre-Go 1.13 float formats. + if strings.ContainsAny(s, "pP_") { + return 0, errors.Errorf("unsupported characters in float %v", s) + } + return strconv.ParseFloat(s, 64) +} + +func parseHeaders(record []string) (mapping []HeaderType, _ error) { + dedup := map[HeaderType]struct{}{} + expectLabelValue := false + for _, field := range record { + switch f := HeaderType(strings.ToLower(field)); f { + case Value, TimestampMs, MetricName, Help, Type, Unit, ExemplarValue, ExemplarTimestampMs: + if expectLabelValue { + return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName) + } + if _, ok := dedup[f]; ok { + return nil, errors.Errorf("only one header type of %s is allowed", f) + } + dedup[f] = struct{}{} + mapping = append(mapping, f) + + // Name is required to be before value. On top of that such pair can repeat. + case LabelName: + if expectLabelValue { + return nil, errors.Errorf("matching %s header expected right after %s", LabelValue, LabelName) + } + mapping = append(mapping, f) + expectLabelValue = true + case LabelValue: + if !expectLabelValue { + return nil, errors.Errorf("matching %s header expected before %s", LabelName, LabelValue) + } + mapping = append(mapping, f) + expectLabelValue = false + default: + return nil, errors.Errorf("%q header is not supported", field) + } + } + return mapping, nil +} + +// Series returns the bytes of the series, the timestamp if set, and the value +// of the current sample. Timestamps is in milliseconds. +func (p *Parser) Series() ([]byte, *int64, float64) { + return []byte("not-implemented"), &p.record.timestampMs, p.record.value +} + +// Help returns the metric name and help text in the current entry. +// Must only be called after Next returned a help entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Help() ([]byte, []byte) { + return []byte(p.record.labels.Get("__name__")), p.record.help +} + +// Type returns the metric name and type in the current entry. +// Must only be called after Next returned a type entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Type() ([]byte, textparse.MetricType) { + return []byte(p.record.labels.Get("__name__")), p.record.typ +} + +// Unit returns the metric name and unit in the current entry. +// Must only be called after Next returned a unit entry. +// The returned byte slices become invalid after the next call to Next. +func (p *Parser) Unit() ([]byte, []byte) { + return []byte(p.record.labels.Get("__name__")), p.record.unit +} + +// Comment returns the text of the current comment. +// Must only be called after Next returned a comment entry. +// The returned byte slice becomes invalid after the next call to Next. +func (p *Parser) Comment() []byte { return nil } + +// Metric writes the labels of the current sample into the passed labels. +// It returns the string from which the metric was parsed. +func (p *Parser) Metric(l *labels.Labels) string { + *l = append(*l, p.record.labels...) + // Sort labels to maintain the sorted labels invariant. + sort.Sort(*l) + + return "not-implemented" +} + +// Exemplar writes the exemplar of the current sample into the passed +// exemplar. It returns if an exemplar exists or not. +func (p *Parser) Exemplar(l *exemplar.Exemplar) bool { + if p.record.exemplar == nil { + return false + } + *l = *p.record.exemplar + return true +} diff --git a/tsdb/importer/csv/csv_test.go b/tsdb/importer/csv/csv_test.go new file mode 100644 index 000000000..9ff3bfeda --- /dev/null +++ b/tsdb/importer/csv/csv_test.go @@ -0,0 +1,98 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package csv + +import ( + "io" + "strings" + "testing" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + "github.com/prometheus/prometheus/util/testutil" +) + +type series struct { + // TODO(bwplotka) Assert more stuff once supported. + lbls labels.Labels + samples []sample +} + +type sample struct { + t int64 + v float64 +} + +func TestParser(t *testing.T) { + for _, tcase := range []struct { + name string + input string + + expectedSeries []series + }{ + // TODO(bwplotka): Test all edge cases and all entries. + { + name: "empty", + }, + { + name: "just header", + input: `metric_name,label_name,label_value,timestamp_ms,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp_ms`, + }, + { + name: "mixed header with data", + input: `metric_name,label_name,label_value,timestamp_ms,label_name,label_value,help,value,type,exemplar_value,unit,exemplar_timestamp_ms +metric1,pod,abc-1,1594885435,instance,1,some help,1245214.23423,counter,-0.12,bytes because why not,1 +metric1,pod,abc-1,1594885436,instance,1,some help,1.23423,counter,-0.12,bytes because why not,1 +metric1,pod,abc-2,1594885432,,,some help2,1245214.23421,gauge,,bytes, +`, + expectedSeries: []series{ + { + lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1", "instance", "1"), + samples: []sample{{v: 1245214.23423, t: 1594885435}, {v: 1.23423, t: 1594885436}}, + }, + { + lbls: labels.FromStrings(labels.MetricName, "metric1", "pod", "abc-1"), + samples: []sample{{v: 1245214.23421, t: 1594885432}}, + }, + }, + }, + } { + t.Run(tcase.name, func(t *testing.T) { + p := NewParser(strings.NewReader(tcase.input), ',') + got := map[uint64]series{} + for { + e, err := p.Next() + if err == io.EOF { + break + } + testutil.Ok(t, err) + // For now expects only series. + testutil.Equals(t, textparse.EntrySeries, e) + l := labels.Labels{} + p.Metric(&l) + + s, ok := got[l.Hash()] + if !ok { + got[l.Hash()] = series{lbls: l} + } + + _, ts, v := p.Series() + if ts == nil { + t.Fatal("got no timestamps") + } + s.samples = append(s.samples, sample{t: *ts, v: v}) + } + }) + } +} diff --git a/tsdb/importer/import.go b/tsdb/importer/import.go new file mode 100644 index 000000000..3e89f4821 --- /dev/null +++ b/tsdb/importer/import.go @@ -0,0 +1,86 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importer + +import ( + "fmt" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/textparse" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/importer/blocks" +) + +// Import imports data from a textparse Parser into block Writer. +// TODO(bwplotka): textparse interface potentially limits the format to never give multiple samples. Fix this as some formats +// (e.g JSON) might allow that. +// Import takes ownership of given block writer. +func Import(logger log.Logger, p textparse.Parser, w blocks.Writer) (err error) { + if logger == nil { + logger = log.NewNopLogger() + } + + level.Info(logger).Log("msg", "started importing input data.") + app := w.Appender() + + defer func() { + var merr tsdb_errors.MultiError + merr.Add(err) + merr.Add(w.Close()) + err = merr.Err() + }() + + var e textparse.Entry + for { + e, err = p.Next() + if err == io.EOF { + break + } + if err != nil { + return errors.Wrap(err, "parse") + } + + // For now care about series only. + if e != textparse.EntrySeries { + continue + } + + // TODO(bwplotka): Avoid allocations using AddFast method and maintaining refs. + l := labels.Labels{} + p.Metric(&l) + _, ts, v := p.Series() + if ts == nil { + return errors.Errorf("expected timestamp for series %v, got none", l.String()) + } + if _, err := app.Add(l, *ts, v); err != nil { + return errors.Wrap(err, "add sample") + } + } + + level.Info(logger).Log("msg", "no more input data, committing appenders and flushing block(s)") + if err := app.Commit(); err != nil { + return errors.Wrap(err, "commit") + } + + ids, err := w.Flush() + if err != nil { + return errors.Wrap(err, "flush") + } + level.Info(logger).Log("msg", "blocks flushed", "ids", fmt.Sprintf("%v", ids)) + return nil +} diff --git a/tsdb/importer/import_test.go b/tsdb/importer/import_test.go deleted file mode 100644 index e447cf9aa..000000000 --- a/tsdb/importer/import_test.go +++ /dev/null @@ -1,753 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "fmt" - "io/ioutil" - "math" - "math/rand" - "os" - "sort" - "strconv" - "strings" - "testing" - - labels2 "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/value" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/prometheus/prometheus/util/testutil" -) - -const ( - // We use a lower value for this than the default to test block compaction implicitly. - maxSamplesInMemory = 2000 - maxBlockChildren = 10 -) - -func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) { - // Assert we have expected number of blocks. - testutil.Equals(t, expectedNumBlocks, len(blocks)) - - allSymbols := make(map[string]struct{}) - allSamples := make([]tsdb.MetricSample, 0) - maxt, mint := int64(math.MinInt64), int64(math.MaxInt64) - for _, block := range blocks { - maxt, mint = value.MaxInt64(maxt, block.Meta().MaxTime), value.MinInt64(mint, block.Meta().MinTime) - indexr, err := block.Index() - testutil.Ok(t, err) - symbols := indexr.Symbols() - for symbols.Next() { - key := symbols.At() - if _, ok := allSymbols[key]; !ok { - allSymbols[key] = struct{}{} - } - } - blockSamples, err := readSeries(block, labels2.FromStrings(metricLabels...)) - testutil.Ok(t, err) - allSamples = append(allSamples, blockSamples...) - _ = indexr.Close() - } - - allSymbolsSlice := make([]string, 0) - for key := range allSymbols { - allSymbolsSlice = append(allSymbolsSlice, key) - } - - sort.Strings(allSymbolsSlice) - sort.Strings(expectedSymbols) - // Assert that all symbols that we expect to find are present. - testutil.Equals(t, expectedSymbols, allSymbolsSlice) - - sortSamples(allSamples) - sortSamples(expectedSamples) - // Assert that all samples that we imported + what existed already are present. - testutil.Assert(t, len(allSamples) == len(expectedSamples), "number of expected samples is different from actual samples") - testutil.Equals(t, expectedSamples, allSamples, "actual samples are different from expected samples") - - // Assert that DB's time ranges are as specified. - testutil.Equals(t, expectedMaxt, maxt) - testutil.Equals(t, expectedMint, mint) -} - -func sortSamples(samples []tsdb.MetricSample) { - sort.Slice(samples, func(x, y int) bool { - sx, sy := samples[x], samples[y] - // If timestamps are equal, sort based on values. - if sx.Timestamp != sy.Timestamp { - return sx.Timestamp < sy.Timestamp - } - return sx.Value < sy.Value - }) -} - -// readSeries returns all series present in the block, that contain the labels we want. -// The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them, -// we just need to pass the common labels, and we will get all the metrics that have them, -// even if the other labels b/w the metrics are different. -func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSample, error) { - series := make([]tsdb.MetricSample, 0) - ir, err := block.Index() - if err != nil { - return series, err - } - defer ir.Close() - tsr, err := block.Tombstones() - if err != nil { - return series, err - } - defer tsr.Close() - chunkr, err := block.Chunks() - if err != nil { - return series, err - } - defer chunkr.Close() - for _, lbl := range lbls { - css, err := tsdb.LookupChunkSeries(ir, tsr, labels2.MustNewMatcher(labels2.MatchEqual, lbl.Name, lbl.Value)) - if err != nil { - return series, err - } - for css.Next() { - actLabels, chkMetas, _ := css.At() - var chunkIter chunkenc.Iterator - for _, meta := range chkMetas { - chk, err := chunkr.Chunk(meta.Ref) - if err != nil { - return series, err - } - chunkIter = chk.Iterator(chunkIter) - for chunkIter.Next() { - t, v := chunkIter.At() - sample := tsdb.MetricSample{ - Timestamp: t, - Value: v, - Labels: actLabels, - } - series = append(series, sample) - } - } - } - } - return series, nil -} - -// genSeries generates series from mint to maxt, with a step value. -func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample { - series := make([]tsdb.MetricSample, 0) - for idx := mint; idx < maxt; idx += int64(step) { - // Round to 3 places. - val := math.Floor(rand.Float64()*1000) / 1000 - sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels2.FromStrings(labels...)} - series = append(series, sample) - } - return series -} - -// labelsToStr converts the given labels to a string representation that is compliant with -// the OpenMetrics parser. -func labelsToStr(labels labels2.Labels) string { - str := "{" - for idx, l := range labels { - str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value)) - if idx < len(labels)-1 { - str += "," - } - } - str += "}" - return str -} - -// genOpenMetricsText formats the given series data into OpenMetrics compliant format. -func genOpenMetricsText(metricName, metricType string, series []tsdb.MetricSample) string { - str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType) - for _, s := range series { - str += fmt.Sprintf("\n%s%s %f %d", metricName, labelsToStr(s.Labels), s.Value, s.Timestamp) - } - str += fmt.Sprintf("\n# EOF") - return str -} - -func TestImport(t *testing.T) { - tests := []struct { - ToParse string - IsOk bool - MetricLabels []string - Expected struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - } - }{ - { - ToParse: `# EOF`, - IsOk: true, - }, - { - ToParse: `# HELP http_requests_total The total number of HTTP requests. -# TYPE http_requests_total counter -http_requests_total{code="200"} 1021 1565133713989 -http_requests_total{code="400"} 1 1565133713990 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "http_requests_total"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 1565133713989, - MaxTime: 1565133713991, - NumBlocks: 1, - Symbols: []string{"__name__", "http_requests_total", "code", "200", "400"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 1565133713989, - Value: 1021, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), - }, - { - Timestamp: 1565133713990, - Value: 1, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), - }, - }, - }, - }, - { - ToParse: `# HELP http_requests_total The total number of HTTP requests. -# TYPE http_requests_total counter -http_requests_total{code="200"} 1022 1565133713989 -http_requests_total{code="400"} 2 1575133713990 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "http_requests_total"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 1565133713989, - MaxTime: 1575133713991, - NumBlocks: 2, - Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 1565133713989, - Value: 1022, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), - }, - { - Timestamp: 1575133713990, - Value: 2, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), - }, - }, - }, - }, - { - ToParse: `# HELP http_requests_total The total number of HTTP requests. -# TYPE http_requests_total counter -http_requests_total{code="200"} 1023 1395066363000 -http_requests_total{code="400"} 3 1395066363000 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "http_requests_total"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 1395066363000, - MaxTime: 1395066363001, - NumBlocks: 1, - Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 1395066363000, - Value: 1023, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"), - }, - { - Timestamp: 1395066363000, - Value: 3, - Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"), - }, - }, - }, - }, - { - ToParse: `# HELP something_weird Something weird -# TYPE something_weird gauge -something_weird{problem="infinite timestamp"} +Inf -3982045 -# EOF -`, - IsOk: false, - }, - { - // Metric has no timestamp, hence invalid - ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds. -# TYPE rpc_duration_seconds summary -rpc_duration_seconds{quantile="0.01"} 3102 -rpc_duration_seconds{quantile="0.05"} 3272 -# EOF -`, - IsOk: false, - }, - { - ToParse: `# HELP no_type_metric This is a metric with no TYPE string -no_type_metric{type="bad_news_bears"} 0.0 111 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "no_type_metric"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 111, - MaxTime: 112, - NumBlocks: 1, - Symbols: []string{"no_type_metric", "type", "bad_news_bears", "__name__"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 111, - Value: 0.0, - Labels: labels2.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"), - }, - }, - }, - }, - { - ToParse: `# HELP bad_ts This is a metric with an extreme timestamp -# TYPE bad_ts gauge -bad_ts{type="bad_timestamp"} 420 -1e99 -# EOF -`, - IsOk: false, - }, - { - ToParse: `# HELP bad_ts This is a metric with an extreme timestamp -# TYPE bad_ts gauge -bad_ts{type="bad_timestamp"} 420 1e99 -# EOF -`, - IsOk: false, - }, - { - ToParse: `no_help_no_type{foo="bar"} 42 6900 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "no_help_no_type"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 6900, - MaxTime: 6901, - NumBlocks: 1, - Symbols: []string{"no_help_no_type", "foo", "bar", "__name__"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 6900, - Value: 42, - Labels: labels2.FromStrings("__name__", "no_help_no_type", "foo", "bar"), - }, - }, - }, - }, - { - ToParse: `bare_metric 42.24 1001 -# EOF -`, - IsOk: true, - MetricLabels: []string{"__name__", "bare_metric"}, - Expected: struct { - MinTime int64 - MaxTime int64 - NumBlocks int - Symbols []string - Samples []tsdb.MetricSample - }{ - MinTime: 1001, - MaxTime: 1002, - NumBlocks: 1, - Symbols: []string{"bare_metric", "__name__"}, - Samples: []tsdb.MetricSample{ - { - Timestamp: 1001, - Value: 42.24, - Labels: labels2.FromStrings("__name__", "bare_metric"), - }, - }, - }, - }, - { - ToParse: `# HELP bad_metric This a bad metric -# TYPE bad_metric bad_type -bad_metric{type="has no type information"} 0.0 111 -# EOF -`, - IsOk: false, - }, - { - ToParse: `# HELP no_nl This test has no newline so will fail -# TYPE no_nl gauge -no_nl{type="no newline"} -# EOF`, - IsOk: false, - }, - } - for _, test := range tests { - tmpDbDir, err := ioutil.TempDir("", "importer") - testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) - if test.IsOk { - testutil.Ok(t, err) - if len(test.Expected.Symbols) > 0 { - db, err := tsdb.OpenDBReadOnly(tmpDbDir, nil) - testutil.Ok(t, err) - blocks, err := db.Blocks() - testutil.Ok(t, err) - testBlocks(t, blocks, test.MetricLabels, test.Expected.MinTime, test.Expected.MaxTime, test.Expected.Samples, test.Expected.Symbols, test.Expected.NumBlocks) - } - } else { - testutil.NotOk(t, err) - } - _ = os.RemoveAll(tmpDbDir) - } -} - -func TestImportBadFile(t *testing.T) { - // No file found case. - err := ImportFromFile((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil) - testutil.NotOk(t, err) -} - -func TestImportIntoExistingDB(t *testing.T) { - type importTest struct { - MetricName string - MetricType string - MetricLabels []string - GeneratorStep int - DBMint, DBMaxt int64 - ImportShuffle bool - ImportMint, ImportMaxt int64 - ExpectedMint, ExpectedMaxt int64 - ExpectedSymbols []string - ExpectedNumBlocks int - } - - tests := []importTest{ - { - // Import overlapping data only. - MetricName: "test_metric", - MetricType: "gauge", - MetricLabels: []string{"foo", "bar"}, - GeneratorStep: 1, - DBMint: 1000, - DBMaxt: 2000, - ImportShuffle: false, - ImportMint: 1000, - ImportMaxt: 1500, - ExpectedMint: 1000, - ExpectedMaxt: 2000, - ExpectedSymbols: []string{"__name__", "test_metric", "foo", "bar"}, - ExpectedNumBlocks: 2, - }, - { - // Test overlapping, and non-overlapping data, with a lot of samples. - MetricName: "test_metric_2", - MetricType: "gauge", - MetricLabels: []string{"foo", "bar"}, - GeneratorStep: 1, - DBMint: 1000, - DBMaxt: 4000, - ImportShuffle: false, - ImportMint: 0, - ImportMaxt: 5000, - ExpectedSymbols: []string{"__name__", "test_metric_2", "foo", "bar"}, - ExpectedMint: 0, - ExpectedMaxt: 5000, - ExpectedNumBlocks: 4, - }, - { - // Test overlapping, and non-overlapping data, aligning the blocks. - // This test also creates a large number of samples, across a much wider time range, to test - // if the importer will divvy samples beyond DB time limits. - MetricName: "test_metric_3", - MetricType: "gauge", - MetricLabels: []string{"foo", "bar"}, - GeneratorStep: 20000, - DBMint: 1000, - DBMaxt: tsdb.DefaultBlockDuration, - ImportShuffle: false, - ImportMint: 0, - ImportMaxt: tsdb.DefaultBlockDuration * 2, - ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"}, - ExpectedMint: 0, - ExpectedMaxt: (tsdb.DefaultBlockDuration * 2) - 20000 + 1, - ExpectedNumBlocks: 4, - }, - { - // Import partially overlapping data only. - MetricName: "test_metric_4", - MetricType: "gauge", - MetricLabels: []string{"foo", "bar"}, - GeneratorStep: 1, - DBMint: 1000, - DBMaxt: 2000, - ImportShuffle: false, - ImportMint: 500, - ImportMaxt: 1500, - ExpectedMint: 500, - ExpectedMaxt: 2000, - ExpectedSymbols: []string{"__name__", "test_metric_4", "foo", "bar"}, - ExpectedNumBlocks: 3, - }, - { - // Import that never overlaps, after the DB max time. - MetricName: "test_metric_5", - MetricType: "gauge", - MetricLabels: []string{"foo", "bar"}, - GeneratorStep: 1, - DBMint: 0, - DBMaxt: 1000, - ImportShuffle: false, - ImportMint: tsdb.DefaultBlockDuration + 1000, - ImportMaxt: tsdb.DefaultBlockDuration + 2000, - ExpectedMint: 0, - ExpectedMaxt: tsdb.DefaultBlockDuration + 2000, - ExpectedSymbols: []string{"__name__", "test_metric_5", "foo", "bar"}, - ExpectedNumBlocks: 2, - }, - } - - for _, test := range tests { - initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep) - initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries) - - tmpDbDir, err := ioutil.TempDir("", "importer") - testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) - testutil.Ok(t, err) - - importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep) - importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries) - - err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) - testutil.Ok(t, err) - - expectedSamples := make([]tsdb.MetricSample, 0) - for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} { - for _, sample := range exp { - lbls := labels2.FromStrings("__name__", test.MetricName) - s := tsdb.MetricSample{ - Timestamp: sample.Timestamp, - Value: sample.Value, - Labels: append(lbls, sample.Labels...), - } - expectedSamples = append(expectedSamples, s) - } - } - - db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{ - RetentionDuration: tsdb.DefaultOptions().RetentionDuration, - AllowOverlappingBlocks: true, - }) - testutil.Ok(t, err) - - blocks := make([]tsdb.BlockReader, 0) - for _, b := range db.Blocks() { - blocks = append(blocks, b) - } - testBlocks(t, blocks, test.MetricLabels, test.ExpectedMint, test.ExpectedMaxt, expectedSamples, test.ExpectedSymbols, test.ExpectedNumBlocks) - - // Close and remove all temp files and folders. - _ = db.Close() - _ = os.RemoveAll(tmpDbDir) - } -} - -// Test to see if multiple series are imported correctly. -func TestMixedSeries(t *testing.T) { - partialOMText := func(metricName, metricType string, series []tsdb.MetricSample) string { - str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType) - for _, s := range series { - str += fmt.Sprintf("\n%s%s %f %d", metricName, s.Labels.String(), s.Value, s.Timestamp) - } - return str - } - metricA := "metricA" - metricLabels := []string{"foo", "bar"} - mint, maxt := int64(0), int64(1000) - step := 5 - seriesA := genSeries(metricLabels, mint, maxt, step) - textA := partialOMText(metricA, "gauge", seriesA) - metricB := "metricB" - seriesB := genSeries(metricLabels, mint, maxt, step) - textB := partialOMText(metricB, "gauge", seriesB) - - text := fmt.Sprintf("%s\n%s\n# EOF", textA, textB) - - tmpDbDir, err := ioutil.TempDir("", "importer") - testutil.Ok(t, err) - err = ImportFromFile(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) - testutil.Ok(t, err) - - addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample { - augSamples := make([]tsdb.MetricSample, 0) - for _, sample := range series { - lbls := labels2.FromStrings("__name__", metricName) - s := tsdb.MetricSample{ - Timestamp: sample.Timestamp, - Value: sample.Value, - Labels: append(lbls, sample.Labels...), - } - augSamples = append(augSamples, s) - } - return augSamples - } - - expectedSamples := append(addMetricLabel(seriesA, metricA), addMetricLabel(seriesB, metricB)...) - expectedSymbols := append([]string{"__name__", metricA, metricB}, metricLabels...) - - db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{ - RetentionDuration: tsdb.DefaultOptions().RetentionDuration, - AllowOverlappingBlocks: true, - }) - testutil.Ok(t, err) - - blocks := make([]tsdb.BlockReader, 0) - for _, b := range db.Blocks() { - blocks = append(blocks, b) - } - - testBlocks(t, blocks, metricLabels, mint, maxt-int64(step-1), expectedSamples, expectedSymbols, 1) - - _ = os.RemoveAll(tmpDbDir) -} - -// Test to check if we can detect an error with the text, -// without having to read till EOF. -func TestInvalidSyntaxQuickAbort(t *testing.T) { - lbls := []string{"foo", "bar"} - firstHalf := genSeries(lbls, 0, 10, 1) - invalidMetrics := []tsdb.MetricSample{ - { - Timestamp: -1, - Value: 0, - Labels: labels2.FromStrings(" INVALID", "INVALID"), - }, - } - secondHalf := genSeries(lbls, 10, 200, 1) - combined := append(firstHalf, append(invalidMetrics, secondHalf...)...) - importText := genOpenMetricsText("invalid_test", "gauge", combined) - - tmpDbDir, err := ioutil.TempDir("", "importer") - testutil.Ok(t, err) - - r := strings.NewReader(importText) - err = ImportFromFile(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil) - - testutil.NotOk(t, err) - // We should abort and return the error with the text, without reading till EOF. - testutil.Assert(t, r.Len() > 0, "import text has been completely read") -} - -// func benchVirtual(vThresh int, b *testing.B) { -// for n := 0; n < b.N; n++ { -// type importTest struct { -// MetricName string -// MetricType string -// MetricLabels []string -// GeneratorStep int -// DBMint, DBMaxt int64 -// ImportShuffle bool -// ImportMint, ImportMaxt int64 -// ExpectedMint, ExpectedMaxt int64 -// ExpectedSymbols []string -// ExpectedNumBlocks int -// } -// -// tests := []importTest{ -// { -// // Test overlapping, and non-overlapping data, aligning the blocks. -// // This test also creates a large number of samples, across a much wider time range, to test -// // if the importer will divvy samples beyond DB time limits into progressively smaller blocks -// // using the ranges in tsdb.DefaultOptions.BlockRanges. -// MetricName: "test_metric_3", -// MetricType: "gauge", -// MetricLabels: []string{"foo", "bar"}, -// GeneratorStep: 5000, -// DBMint: 1000, -// DBMaxt: tsdb.DefaultOptions.BlockRanges[2], -// ImportShuffle: false, -// ImportMint: 0, -// ImportMaxt: tsdb.DefaultOptions.BlockRanges[2]*2 + 10000, -// ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"}, -// ExpectedMint: 0, -// ExpectedMaxt: (tsdb.DefaultOptions.BlockRanges[2] * 2) + 1, -// ExpectedNumBlocks: 5, -// }, -// } -// -// for _, test := range tests { -// initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep) -// initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries) -// -// tmpDbDir, err := ioutil.TempDir("", "importer") -// testutil.Ok(b, err) -// err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil) -// testutil.Ok(b, err) -// -// importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep) -// importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries) -// -// err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil) -// testutil.Ok(b, err) -// } -// } -// } -// -// func Benchmark_Vertical0(b *testing.B) { -// benchVirtual(0, b) -// } -// func Benchmark_Vertical5(b *testing.B) { -// benchVirtual(5, b) -// } -// func Benchmark_Vertical10(b *testing.B) { -// benchVirtual(10, b) -// } -// func Benchmark_Vertical20(b *testing.B) { -// benchVirtual(20, b) -// } diff --git a/tsdb/importer/importer.go b/tsdb/importer/importer.go deleted file mode 100644 index ef001b3d5..000000000 --- a/tsdb/importer/importer.go +++ /dev/null @@ -1,602 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "bufio" - "bytes" - "context" - "encoding/gob" - "io" - "io/ioutil" - "math" - "os" - "path/filepath" - "sort" - "strings" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/textparse" - "github.com/prometheus/prometheus/pkg/value" - "github.com/prometheus/prometheus/tsdb" - "github.com/prometheus/prometheus/tsdb/fileutil" -) - -// Implementing the error interface to create a -// constant, which cannot be overridden. -// https://dave.cheney.net/2016/04/07/constant-errors -type Error string - -func (e Error) Error() string { - return string(e) -} - -const ( - // This error is thrown when the current entry does not have a timestamp associated. - NoTimestampError = Error("expected timestamp with metric") - // This error is thrown when the sample being parsed currently has a corresponding block - // in the database, but has no metadata collected for it, curiously enough. - NoBlockMetaFoundError = Error("no metadata found for current samples' block") -) - -type blockTimestampPair struct { - start, end int64 -} - -type newBlockMeta struct { - index int - count int - mint, maxt int64 - isAligned bool - dir string - children []*newBlockMeta -} - -// Content Type for the Open Metrics Parser. -// Needed to init the text parser. -const contentType = "application/openmetrics-text;" - -// ImportFromFile imports data from a file formatted according to the Open Metrics format, -// converts it into block(s), and places the newly created block(s) in the -// TSDB DB directory, where it is treated like any other block. -func ImportFromFile( - r io.Reader, - dbPath string, - maxSamplesInMemory int, - maxBlockChildren int, - logger log.Logger, -) error { - if logger == nil { - logger = log.NewNopLogger() - } - - level.Debug(logger).Log("msg", "creating temp directory") - tmpDbDir, err := ioutil.TempDir("", "importer") - if err != nil { - return err - } - defer os.RemoveAll(tmpDbDir) - - level.Debug(logger).Log("msg", "reading existing block metadata to infer time ranges") - blockMetas, err := constructBlockMetadata(dbPath) - if err != nil { - return err - } - - level.Info(logger).Log("msg", "importing input data and aligning with existing DB") - if err = writeSamples(r, tmpDbDir, blockMetas, maxSamplesInMemory, maxBlockChildren, logger); err != nil { - return err - } - - level.Info(logger).Log("msg", "merging fully overlapping blocks") - if err = mergeBlocks(tmpDbDir, blockMetas, logger); err != nil { - return err - } - - level.Debug(logger).Log("msg", "copying newly created blocks from temp location to actual DB location") - if err = fileutil.CopyDirs(tmpDbDir, dbPath); err != nil { - return err - } - - return nil -} - -// writeSamples parses each metric sample, and writes the samples to the correspondingly aligned block -// to the given directory. -func writeSamples( - r io.Reader, - dir string, - metas []*newBlockMeta, - maxSamplesInMemory int, - maxBlockChildren int, - logger log.Logger, -) error { - // First block represents everything before DB start. - // Last block represents everything after DB end. - dbMint, dbMaxt := metas[0].maxt, metas[len(metas)-1].mint - - blocks := make([][]*tsdb.MetricSample, len(metas)) - currentPassCount := 0 - - // TODO: Maybe use bufio.Reader.Readline instead? - // https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca - // Use a streaming approach to avoid loading too much data at once. - scanner := bufio.NewScanner(r) - buf := new(bytes.Buffer) - scanner.Split(sampleStreamer(buf)) - - for scanner.Scan() { - if currentPassCount == 0 { - blocks = make([][]*tsdb.MetricSample, len(metas)) - } - - encSample := scanner.Bytes() - decBuf := bytes.NewBuffer(encSample) - sample := tsdb.MetricSample{} - if err := gob.NewDecoder(decBuf).Decode(&sample); err != nil { - level.Error(logger).Log("msg", "failed to decode current entry returned by file scanner", "err", err) - return err - } - - // Find what block this sample belongs to. - var blockIndex int - if len(metas) == 1 || sample.Timestamp < dbMint { - blockIndex = 0 - } else if sample.Timestamp >= dbMaxt { - blockIndex = len(metas) - 1 - } else { - blockIndex = getBlockIndex(sample.Timestamp, metas) - if blockIndex == -1 { - return NoBlockMetaFoundError - } - } - - meta := metas[blockIndex] - meta.mint = value.MinInt64(meta.mint, sample.Timestamp) - meta.maxt = value.MaxInt64(meta.maxt, sample.Timestamp) - meta.count++ - - blocks[blockIndex] = append(blocks[blockIndex], &sample) - - currentPassCount++ - // Have enough samples to write to disk. - if currentPassCount == maxSamplesInMemory { - if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil { - return err - } - // Reset current pass count. - currentPassCount = 0 - } - } - - if err := scanner.Err(); err != nil { - return err - } - - // Flush any remaining samples. - if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil { - return err - } - - return nil -} - -// flushBlocks writes the given blocks of samples to disk, and updates block metadata information. -func flushBlocks( - dir string, - toFlush [][]*tsdb.MetricSample, - metas []*newBlockMeta, - maxBlockChildren int, - logger log.Logger, -) error { - for blockIdx, block := range toFlush { - // If current block is empty, nothing to write. - if len(block) == 0 { - continue - } - meta := metas[blockIdx] - - // Put each sample into the appropriate block. - bins, binTimes := binSamples(block, meta, tsdb.DefaultBlockDuration) - for binIdx, bin := range bins { - if len(bin) == 0 { - continue - } - start, end := binTimes[binIdx].start, binTimes[binIdx].end - path, err := tsdb.CreateBlock(bin, dir, start, end, logger) - if err != nil { - return err - } - child := &newBlockMeta{ - index: meta.index, - count: 0, - mint: start, - maxt: end, - isAligned: meta.isAligned, - dir: path, - } - meta.children = append(meta.children, child) - } - if maxBlockChildren > 0 && len(meta.children) >= maxBlockChildren { - if err := compactMeta(dir, meta, logger); err != nil { - return err - } - } - } - return nil -} - -// sampleStreamer returns a function that can be used to parse an OpenMetrics compliant -// byte array, and return each token in the user-provided byte buffer. -func sampleStreamer(buf *bytes.Buffer) func([]byte, bool) (int, []byte, error) { - currentEntryInvalid := false - return func(data []byte, atEOF bool) (int, []byte, error) { - var err error - advance := 0 - lineIndex := 0 - lines := strings.Split(string(data), "\n") - parser := textparse.New(data, contentType) - for { - var et textparse.Entry - if et, err = parser.Next(); err != nil { - if !atEOF && et == textparse.EntryInvalid && !currentEntryInvalid { - currentEntryInvalid = true - // Fetch more data for the scanner to see if the entry is really invalid. - return 0, nil, nil - } - if err == io.EOF { - err = nil - } - return 0, nil, err - } - - // Reset invalid entry flag as it meant that we just had to get more data - // into the scanner. - if currentEntryInvalid { - currentEntryInvalid = false - } - - // Add 1 to account for newline character. - lineLength := len(lines[lineIndex]) + 1 - lineIndex++ - - if et != textparse.EntrySeries { - advance = advance + lineLength - continue - } - - _, ctime, cvalue := parser.Series() - if ctime == nil { - return 0, nil, NoTimestampError - } - // OpenMetrics parser multiples times by 1000 - undoing that. - ctimeCorrected := *ctime / 1000 - - var clabels labels.Labels - _ = parser.Metric(&clabels) - - sample := tsdb.MetricSample{ - Timestamp: ctimeCorrected, - Value: cvalue, - Labels: labels.FromMap(clabels.Map()), - } - - buf.Reset() - enc := gob.NewEncoder(buf) - if err = enc.Encode(sample); err != nil { - return 0, nil, err - } - - advance += lineLength - return advance, buf.Bytes(), nil - } - } -} - -// makeRange returns a series of block times between start and stop, -// with step divisions. -func makeRange(start, stop, step int64) []blockTimestampPair { - if step <= 0 || stop < start { - return []blockTimestampPair{} - } - r := make([]blockTimestampPair, 0) - // In case we only have samples with the same timestamp. - if start == stop { - r = append(r, blockTimestampPair{ - start: start, - end: stop + 1, - }) - return r - } - for s := start; s < stop; s += step { - pair := blockTimestampPair{ - start: s, - } - if (s + step) >= stop { - pair.end = stop + 1 - } else { - pair.end = s + step - } - r = append(r, pair) - } - return r -} - -// mergeBlocks looks for blocks that have overlapping time intervals, and compacts them. -func mergeBlocks(dest string, metas []*newBlockMeta, logger log.Logger) error { - for _, m := range metas { - // If no children, there's nothing to merge. - if len(m.children) == 0 { - continue - } - if err := compactMeta(dest, m, logger); err != nil { - return err - } - } - return nil -} - -// binBlocks groups blocks that have fully intersecting intervals of the given durations. -// It iterates through the given durations, creating progressive blocks, and once it reaches -// the last given duration, it divvies all the remaining samples into blocks of that duration. -// Input blocks are assumed sorted by time. -func binBlocks(blocks []*newBlockMeta, durations []int64) [][]*newBlockMeta { - bins := make([][]*newBlockMeta, 0) - if len(blocks) == 0 || len(durations) == 0 { - return bins - } - - bin := make([]*newBlockMeta, 0) - - start := blocks[0].mint - blockIdx := 0 -Outer: - for dIdx, duration := range durations { - for bIdx, block := range blocks[blockIdx:] { - blockIdx = bIdx - if block.maxt-block.mint >= duration { - if block.mint <= start { - bin = append(bin, block) - bins = append(bins, bin) - } else { - bins = append(bins, bin) - bins = append(bins, []*newBlockMeta{block}) - } - bin = []*newBlockMeta{} - start = block.maxt - if dIdx == len(durations)-1 { - continue - } else { - blockIdx++ - continue Outer - } - } - if block.mint-start < duration && block.maxt-start < duration { - // If current block is within the duration window, place it in the current bin. - bin = append(bin, block) - blockIdx++ - } else { - // Else create a new block, starting with this block. - bins = append(bins, bin) - bin = []*newBlockMeta{block} - start = block.mint - if dIdx < len(durations)-1 { - blockIdx++ - continue Outer - } - } - } - } - bins = append(bins, bin) - return bins -} - -// binSamples divvies the samples into bins corresponding to the given block. -// If an aligned block is given to it, then a single bin is created, -// else, it divides the samples into blocks of duration. -// It returns the binned samples, and the time limits for each bin. -func binSamples(samples []*tsdb.MetricSample, meta *newBlockMeta, duration int64) ([][]*tsdb.MetricSample, []blockTimestampPair) { - findBlock := func(ts int64, blocks []blockTimestampPair) int { - for idx, block := range blocks { - if block.start <= ts && ts < block.end { - return idx - } - } - return -1 - } - bins := make([][]*tsdb.MetricSample, 0) - times := make([]blockTimestampPair, 0) - if len(samples) == 0 { - return bins, times - } - if meta.isAligned { - bins = append(bins, samples) - times = []blockTimestampPair{{start: meta.mint, end: meta.maxt}} - } else { - timeTranches := makeRange(meta.mint, meta.maxt, duration) - bins = make([][]*tsdb.MetricSample, len(timeTranches)) - binIdx := -1 - for _, sample := range samples { - ts := sample.Timestamp - if binIdx == -1 { - binIdx = findBlock(ts, timeTranches) - } - block := timeTranches[binIdx] - if block.start <= ts && ts < block.end { - bins[binIdx] = append(bins[binIdx], sample) - } else { - binIdx = findBlock(ts, timeTranches) - bins[binIdx] = append(bins[binIdx], sample) - } - } - times = timeTranches - } - return bins, times -} - -// constructBlockMetadata gives us the new blocks' metadatas. -func constructBlockMetadata(path string) ([]*newBlockMeta, error) { - dbMint, dbMaxt := int64(math.MinInt64), int64(math.MaxInt64) - - // If we try to open a regular RW handle on an active TSDB instance, - // it will fail. Hence, we open a RO handle. - db, err := tsdb.OpenDBReadOnly(path, nil) - if err != nil { - return nil, err - } - defer db.Close() - - blocks, err := db.Blocks() - if err != nil { - return nil, err - } - - metas := []*newBlockMeta{ - &newBlockMeta{ - index: 0, - count: 0, - mint: math.MaxInt64, - maxt: dbMint, - isAligned: false, - }, - } - - for idx, block := range blocks { - start, end := block.Meta().MinTime, block.Meta().MaxTime - metas = append(metas, &newBlockMeta{ - index: idx + 1, - count: 0, - mint: start, - maxt: end, - isAligned: true, - }) - if idx == 0 { - dbMint, dbMaxt = start, end - } else { - dbMint = value.MinInt64(dbMint, start) - dbMaxt = value.MaxInt64(dbMaxt, end) - } - } - // If there are existing blocks in the import location, - // we need to add a block for samples that fall over DB maxt. - if len(metas) > 1 { - // Update first block with the right dbMint as it's maxt. - metas[0].maxt = dbMint - metas = append(metas, &newBlockMeta{ - index: len(metas), - count: 0, - mint: dbMaxt, - maxt: math.MinInt64, - isAligned: false, - }) - } - return metas, nil -} - -// getBlockIndex returns the index of the block the current timestamp corresponds to. -// User-provided times are assumed sorted in ascending order. -func getBlockIndex(current int64, metas []*newBlockMeta) int { - for idx, m := range metas { - if current < m.maxt { - return idx - } - } - return -1 -} - -// compactBlocks compacts all the given blocks and places them in the destination directory. -func compactBlocks(dest string, blocks []*newBlockMeta, logger log.Logger) (string, error) { - dirs := make([]string, 0) - for _, meta := range blocks { - dirs = append(dirs, meta.dir) - } - path, err := compactDirs(dest, dirs, logger) - if err != nil { - return "", err - } - for _, meta := range blocks { - // Remove existing blocks directory as it is no longer needed. - if err = os.RemoveAll(meta.dir); err != nil { - return "", err - } - // Update child with new blocks path. - meta.dir = path - } - return path, err -} - -// compactMeta compacts the children of a given block, dividing them into the Prometheus recommended -// block sizes, if it has to. -func compactMeta(dest string, m *newBlockMeta, logger log.Logger) error { - children := make([]*newBlockMeta, 0) - if m.isAligned { - path, err := compactBlocks(dest, m.children, logger) - if err != nil { - return err - } - m.dir = path - } else { - // Sort blocks to ensure we bin properly. - sortBlocks(m.children) - binned := binBlocks(m.children, []int64{tsdb.DefaultBlockDuration}) - for _, bin := range binned { - if len(bin) <= 1 { - children = append(children, bin...) - continue - } - path, err := compactBlocks(dest, bin, logger) - if err != nil { - return err - } - children = append(children, &newBlockMeta{ - index: m.index, - count: 0, - mint: bin[0].mint, - maxt: bin[len(bin)-1].maxt, - isAligned: m.isAligned, - dir: path, - }) - } - } - m.children = children - return nil -} - -// compactDirs compacts the block dirs and places them in destination directory. -func compactDirs(dest string, dirs []string, logger log.Logger) (string, error) { - path := "" - compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, []int64{tsdb.DefaultBlockDuration}, nil) - if err != nil { - return path, err - } - ulid, err := compactor.Compact(dest, dirs, nil) - if err != nil { - return path, err - } - path = filepath.Join(dest, ulid.String()) - return path, nil -} - -// sortBlocks sorts block metadata according to the time limits. -func sortBlocks(blocks []*newBlockMeta) { - sort.Slice(blocks, func(x, y int) bool { - bx, by := blocks[x], blocks[y] - if bx.mint != by.mint { - return bx.mint < by.mint - } - return bx.maxt < by.maxt - }) -} diff --git a/tsdb/importer/openmetrics/openmetrics.go b/tsdb/importer/openmetrics/openmetrics.go new file mode 100644 index 000000000..8b3ce6116 --- /dev/null +++ b/tsdb/importer/openmetrics/openmetrics.go @@ -0,0 +1,70 @@ +// Copyright 2020 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openmetrics + +import ( + "bufio" + "io" + + "github.com/prometheus/prometheus/pkg/textparse" +) + +// Content Type for the Open Metrics Parser. +// Needed to init the text parser. +const contentType = "application/openmetrics-text;" + +type Parser struct { + textparse.Parser + + s *bufio.Scanner +} + +// NewParser is a tiny layer between textparse.Parser and importer.Parser which works on io.Reader. +// TODO(bwplotka): This is hacky, Parser should allow passing reader from scratch. +func NewParser(r io.Reader) textparse.Parser { + // TODO(dipack95): Maybe use bufio.Reader.Readline instead? + // https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca + return &Parser{s: bufio.NewScanner(r)} +} + +// Next advances the parser to the next sample. It returns io.EOF if no +// more samples were read. +// TODO(bwplotka): Rought implementation, not tested, please help dipack95! (: +func (p *Parser) Next() (textparse.Entry, error) { + for p.s.Scan() { + // TODO(bwplotka): Assuming all line by line. If not do refetch like in previous version with more lines. + line := p.s.Bytes() + + p.Parser = textparse.New(line, contentType) + if et, err := p.Parser.Next(); err != io.EOF { + return et, err + } + // EOF from parser, continue scanning. + } + if err := p.s.Err(); err != nil { + return 0, err + } + return 0, io.EOF +} + +// Series returns the bytes of the series, the timestamp if set, and the value +// of the current sample. Timestamps is milliseconds. +func (p *Parser) Series() ([]byte, *int64, float64) { + b, ts, v := p.Parser.Series() + if ts != nil { + // OpenMetrics parser has time in nanoseconds. Convert to milliseconds. + *ts = *ts / 1000 + } + return b, ts, v +}