diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 0332c33eaa..47bf02c104 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -239,6 +239,12 @@ func main() { dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings() + tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.") + dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String() + dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64() + dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() + dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings() + importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.") importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool() importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool() @@ -390,7 +396,9 @@ func main() { os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable))) case tsdbDumpCmd.FullCommand(): - os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch))) + os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet))) + case tsdbDumpOpenMetricsCmd.FullCommand(): + os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics))) // TODO(aSquare14): Work on adding support for custom block size. case openMetricsImportCmd.FullCommand(): os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration)) diff --git a/cmd/promtool/testdata/dump-openmetrics-roundtrip-test.prom b/cmd/promtool/testdata/dump-openmetrics-roundtrip-test.prom new file mode 100644 index 0000000000..c2318e94e6 --- /dev/null +++ b/cmd/promtool/testdata/dump-openmetrics-roundtrip-test.prom @@ -0,0 +1,15 @@ +my_histogram_bucket{instance="localhost:8000",job="example2",le="+Inf"} 1.0267820369e+10 1700215884.373 +my_histogram_bucket{instance="localhost:8000",job="example2",le="+Inf"} 1.026872507e+10 1700215889.373 +my_histogram_bucket{instance="localhost:8000",job="example2",le="0.01"} 0 1700215884.373 +my_histogram_bucket{instance="localhost:8000",job="example2",le="0.01"} 0 1700215889.373 +my_histogram_count{instance="localhost:8000",job="example2"} 1.0267820369e+10 1700215884.373 +my_histogram_count{instance="localhost:8000",job="example2"} 1.026872507e+10 1700215889.373 +my_summary_count{instance="localhost:8000",job="example5"} 9.518161497e+09 1700211684.981 +my_summary_count{instance="localhost:8000",job="example5"} 9.519048034e+09 1700211689.984 +my_summary_sum{instance="localhost:8000",job="example5"} 5.2349889185e+10 1700211684.981 +my_summary_sum{instance="localhost:8000",job="example5"} 5.2354761848e+10 1700211689.984 +up{instance="localhost:8000",job="example2"} 1 1700226034.330 +up{instance="localhost:8000",job="example2"} 1 1700226094.329 +up{instance="localhost:8000",job="example3"} 1 1700210681.366 +up{instance="localhost:8000",job="example3"} 1 1700210686.366 +# EOF diff --git a/cmd/promtool/testdata/dump-openmetrics-test.prom b/cmd/promtool/testdata/dump-openmetrics-test.prom new file mode 100644 index 0000000000..c027b8c270 --- /dev/null +++ b/cmd/promtool/testdata/dump-openmetrics-test.prom @@ -0,0 +1,11 @@ +my_counter{baz="abc",foo="bar"} 1 0.000 +my_counter{baz="abc",foo="bar"} 2 60.000 +my_counter{baz="abc",foo="bar"} 3 120.000 +my_counter{baz="abc",foo="bar"} 4 180.000 +my_counter{baz="abc",foo="bar"} 5 240.000 +my_gauge{abc="baz",bar="foo"} 9 0.000 +my_gauge{abc="baz",bar="foo"} 8 60.000 +my_gauge{abc="baz",bar="foo"} 0 120.000 +my_gauge{abc="baz",bar="foo"} 4 180.000 +my_gauge{abc="baz",bar="foo"} 7 240.000 +# EOF diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index 4bba8421c2..519f735102 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -15,6 +15,7 @@ package main import ( "bufio" + "bytes" "context" "errors" "fmt" @@ -706,7 +707,9 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb. return nil } -func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string) (err error) { +type SeriesSetFormatter func(series storage.SeriesSet) error + +func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) { db, err := tsdb.OpenDBReadOnly(path, nil) if err != nil { return err @@ -736,6 +739,22 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str ss = q.Select(ctx, false, nil, matcherSets[0]...) } + err = formatter(ss) + if err != nil { + return err + } + + if ws := ss.Warnings(); len(ws) > 0 { + return tsdb_errors.NewMulti(ws.AsErrors()...).Err() + } + + if ss.Err() != nil { + return ss.Err() + } + return nil +} + +func formatSeriesSet(ss storage.SeriesSet) error { for ss.Next() { series := ss.At() lbs := series.Labels() @@ -756,14 +775,44 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str return ss.Err() } } + return nil +} - if ws := ss.Warnings(); len(ws) > 0 { - return tsdb_errors.NewMulti(ws.AsErrors()...).Err() - } +// CondensedString is labels.Labels.String() without spaces after the commas. +func CondensedString(ls labels.Labels) string { + var b bytes.Buffer - if ss.Err() != nil { - return ss.Err() + b.WriteByte('{') + i := 0 + ls.Range(func(l labels.Label) { + if i > 0 { + b.WriteByte(',') + } + b.WriteString(l.Name) + b.WriteByte('=') + b.WriteString(strconv.Quote(l.Value)) + i++ + }) + b.WriteByte('}') + return b.String() +} + +func formatSeriesSetOpenMetrics(ss storage.SeriesSet) error { + for ss.Next() { + series := ss.At() + lbs := series.Labels() + metricName := lbs.Get(labels.MetricName) + lbs = lbs.DropMetricName() + it := series.Iterator(nil) + for it.Next() == chunkenc.ValFloat { + ts, val := it.At() + fmt.Printf("%s%s %g %.3f\n", metricName, CondensedString(lbs), val, float64(ts)/1000) + } + if it.Err() != nil { + return ss.Err() + } } + fmt.Println("# EOF") return nil } diff --git a/cmd/promtool/tsdb_test.go b/cmd/promtool/tsdb_test.go index aeb51a07e0..36a65d73e4 100644 --- a/cmd/promtool/tsdb_test.go +++ b/cmd/promtool/tsdb_test.go @@ -22,10 +22,12 @@ import ( "runtime" "strings" "testing" + "time" "github.com/stretchr/testify/require" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/tsdb" ) func TestGenerateBucket(t *testing.T) { @@ -52,7 +54,7 @@ func TestGenerateBucket(t *testing.T) { } // getDumpedSamples dumps samples and returns them. -func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []string) string { +func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) string { t.Helper() oldStdout := os.Stdout @@ -65,6 +67,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin mint, maxt, match, + formatter, ) require.NoError(t, err) @@ -76,6 +79,14 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin return buf.String() } +func normalizeNewLine(b []byte) []byte { + if strings.Contains(runtime.GOOS, "windows") { + // We use "/n" while dumping on windows as well. + return bytes.ReplaceAll(b, []byte("\r\n"), []byte("\n")) + } + return b +} + func TestTSDBDump(t *testing.T) { storage := promql.LoadedStorage(t, ` load 1m @@ -136,15 +147,48 @@ func TestTSDBDump(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - dumpedMetrics := getDumpedSamples(t, storage.Dir(), tt.mint, tt.maxt, tt.match) + dumpedMetrics := getDumpedSamples(t, storage.Dir(), tt.mint, tt.maxt, tt.match, formatSeriesSet) expectedMetrics, err := os.ReadFile(tt.expectedDump) require.NoError(t, err) - if strings.Contains(runtime.GOOS, "windows") { - // We use "/n" while dumping on windows as well. - expectedMetrics = bytes.ReplaceAll(expectedMetrics, []byte("\r\n"), []byte("\n")) - } + expectedMetrics = normalizeNewLine(expectedMetrics) // even though in case of one matcher samples are not sorted, the order in the cases above should stay the same. require.Equal(t, string(expectedMetrics), dumpedMetrics) }) } } + +func TestTSDBDumpOpenMetrics(t *testing.T) { + storage := promql.LoadedStorage(t, ` + load 1m + my_counter{foo="bar", baz="abc"} 1 2 3 4 5 + my_gauge{bar="foo", abc="baz"} 9 8 0 4 7 + `) + + expectedMetrics, err := os.ReadFile("testdata/dump-openmetrics-test.prom") + require.NoError(t, err) + expectedMetrics = normalizeNewLine(expectedMetrics) + dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics) + require.Equal(t, string(expectedMetrics), dumpedMetrics) +} + +func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) { + initialMetrics, err := os.ReadFile("testdata/dump-openmetrics-roundtrip-test.prom") + require.NoError(t, err) + initialMetrics = normalizeNewLine(initialMetrics) + + dbDir := t.TempDir() + // Import samples from OM format + err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour) + require.NoError(t, err) + db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // Dump the blocks into OM format + dumpedMetrics := getDumpedSamples(t, dbDir, math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics) + + // Should get back the initial metrics. + require.Equal(t, string(initialMetrics), dumpedMetrics) +} diff --git a/docs/command-line/promtool.md b/docs/command-line/promtool.md index 863bc068c3..3eceed48f2 100644 --- a/docs/command-line/promtool.md +++ b/docs/command-line/promtool.md @@ -582,6 +582,32 @@ Dump samples from a TSDB. +##### `promtool tsdb dump-openmetrics` + +[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped. + + + +###### Flags + +| Flag | Description | Default | +| --- | --- | --- | +| --min-time | Minimum timestamp to dump. | `-9223372036854775808` | +| --max-time | Maximum timestamp to dump. | `9223372036854775807` | +| --match | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` | + + + + +###### Arguments + +| Argument | Description | Default | +| --- | --- | --- | +| db path | Database path (default is data/). | `data/` | + + + + ##### `promtool tsdb create-blocks-from` [Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.