mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-11 05:47:27 -08:00
promtool: add a "tsdb dump-openmetrics" to dump in OpemMetrics format.
This closes the loop, as the output can be fed into "tsdb create-blocks-from openmetrics" Native histograms are not supported. Signed-off-by: machine424 <ayoubmrini424@gmail.com>
This commit is contained in:
parent
0474b0bc39
commit
4b71f6ffc2
|
@ -239,6 +239,12 @@ func main() {
|
||||||
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||||
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
dumpMatch := tsdbDumpCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
||||||
|
|
||||||
|
tsdbDumpOpenMetricsCmd := tsdbCmd.Command("dump-openmetrics", "[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.")
|
||||||
|
dumpOpenMetricsPath := tsdbDumpOpenMetricsCmd.Arg("db path", "Database path (default is "+defaultDBPath+").").Default(defaultDBPath).String()
|
||||||
|
dumpOpenMetricsMinTime := tsdbDumpOpenMetricsCmd.Flag("min-time", "Minimum timestamp to dump.").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
|
||||||
|
dumpOpenMetricsMaxTime := tsdbDumpOpenMetricsCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||||
|
dumpOpenMetricsMatch := tsdbDumpOpenMetricsCmd.Flag("match", "Series selector. Can be specified multiple times.").Default("{__name__=~'(?s:.*)'}").Strings()
|
||||||
|
|
||||||
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
|
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
|
||||||
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
|
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
|
||||||
importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool()
|
importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool()
|
||||||
|
@ -390,7 +396,9 @@ func main() {
|
||||||
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
||||||
|
|
||||||
case tsdbDumpCmd.FullCommand():
|
case tsdbDumpCmd.FullCommand():
|
||||||
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch)))
|
os.Exit(checkErr(dumpSamples(ctx, *dumpPath, *dumpMinTime, *dumpMaxTime, *dumpMatch, formatSeriesSet)))
|
||||||
|
case tsdbDumpOpenMetricsCmd.FullCommand():
|
||||||
|
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
|
||||||
// TODO(aSquare14): Work on adding support for custom block size.
|
// TODO(aSquare14): Work on adding support for custom block size.
|
||||||
case openMetricsImportCmd.FullCommand():
|
case openMetricsImportCmd.FullCommand():
|
||||||
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
|
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration))
|
||||||
|
|
15
cmd/promtool/testdata/dump-openmetrics-roundtrip-test.prom
vendored
Normal file
15
cmd/promtool/testdata/dump-openmetrics-roundtrip-test.prom
vendored
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
my_histogram_bucket{instance="localhost:8000",job="example2",le="+Inf"} 1.0267820369e+10 1700215884.373
|
||||||
|
my_histogram_bucket{instance="localhost:8000",job="example2",le="+Inf"} 1.026872507e+10 1700215889.373
|
||||||
|
my_histogram_bucket{instance="localhost:8000",job="example2",le="0.01"} 0 1700215884.373
|
||||||
|
my_histogram_bucket{instance="localhost:8000",job="example2",le="0.01"} 0 1700215889.373
|
||||||
|
my_histogram_count{instance="localhost:8000",job="example2"} 1.0267820369e+10 1700215884.373
|
||||||
|
my_histogram_count{instance="localhost:8000",job="example2"} 1.026872507e+10 1700215889.373
|
||||||
|
my_summary_count{instance="localhost:8000",job="example5"} 9.518161497e+09 1700211684.981
|
||||||
|
my_summary_count{instance="localhost:8000",job="example5"} 9.519048034e+09 1700211689.984
|
||||||
|
my_summary_sum{instance="localhost:8000",job="example5"} 5.2349889185e+10 1700211684.981
|
||||||
|
my_summary_sum{instance="localhost:8000",job="example5"} 5.2354761848e+10 1700211689.984
|
||||||
|
up{instance="localhost:8000",job="example2"} 1 1700226034.330
|
||||||
|
up{instance="localhost:8000",job="example2"} 1 1700226094.329
|
||||||
|
up{instance="localhost:8000",job="example3"} 1 1700210681.366
|
||||||
|
up{instance="localhost:8000",job="example3"} 1 1700210686.366
|
||||||
|
# EOF
|
11
cmd/promtool/testdata/dump-openmetrics-test.prom
vendored
Normal file
11
cmd/promtool/testdata/dump-openmetrics-test.prom
vendored
Normal file
|
@ -0,0 +1,11 @@
|
||||||
|
my_counter{baz="abc",foo="bar"} 1 0.000
|
||||||
|
my_counter{baz="abc",foo="bar"} 2 60.000
|
||||||
|
my_counter{baz="abc",foo="bar"} 3 120.000
|
||||||
|
my_counter{baz="abc",foo="bar"} 4 180.000
|
||||||
|
my_counter{baz="abc",foo="bar"} 5 240.000
|
||||||
|
my_gauge{abc="baz",bar="foo"} 9 0.000
|
||||||
|
my_gauge{abc="baz",bar="foo"} 8 60.000
|
||||||
|
my_gauge{abc="baz",bar="foo"} 0 120.000
|
||||||
|
my_gauge{abc="baz",bar="foo"} 4 180.000
|
||||||
|
my_gauge{abc="baz",bar="foo"} 7 240.000
|
||||||
|
# EOF
|
|
@ -15,6 +15,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -706,7 +707,9 @@ func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string) (err error) {
|
type SeriesSetFormatter func(series storage.SeriesSet) error
|
||||||
|
|
||||||
|
func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) (err error) {
|
||||||
db, err := tsdb.OpenDBReadOnly(path, nil)
|
db, err := tsdb.OpenDBReadOnly(path, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -736,6 +739,22 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str
|
||||||
ss = q.Select(ctx, false, nil, matcherSets[0]...)
|
ss = q.Select(ctx, false, nil, matcherSets[0]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = formatter(ss)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if ws := ss.Warnings(); len(ws) > 0 {
|
||||||
|
return tsdb_errors.NewMulti(ws.AsErrors()...).Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
if ss.Err() != nil {
|
||||||
|
return ss.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func formatSeriesSet(ss storage.SeriesSet) error {
|
||||||
for ss.Next() {
|
for ss.Next() {
|
||||||
series := ss.At()
|
series := ss.At()
|
||||||
lbs := series.Labels()
|
lbs := series.Labels()
|
||||||
|
@ -756,14 +775,44 @@ func dumpSamples(ctx context.Context, path string, mint, maxt int64, match []str
|
||||||
return ss.Err()
|
return ss.Err()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if ws := ss.Warnings(); len(ws) > 0 {
|
// CondensedString is labels.Labels.String() without spaces after the commas.
|
||||||
return tsdb_errors.NewMulti(ws.AsErrors()...).Err()
|
func CondensedString(ls labels.Labels) string {
|
||||||
|
var b bytes.Buffer
|
||||||
|
|
||||||
|
b.WriteByte('{')
|
||||||
|
i := 0
|
||||||
|
ls.Range(func(l labels.Label) {
|
||||||
|
if i > 0 {
|
||||||
|
b.WriteByte(',')
|
||||||
}
|
}
|
||||||
|
b.WriteString(l.Name)
|
||||||
|
b.WriteByte('=')
|
||||||
|
b.WriteString(strconv.Quote(l.Value))
|
||||||
|
i++
|
||||||
|
})
|
||||||
|
b.WriteByte('}')
|
||||||
|
return b.String()
|
||||||
|
}
|
||||||
|
|
||||||
if ss.Err() != nil {
|
func formatSeriesSetOpenMetrics(ss storage.SeriesSet) error {
|
||||||
|
for ss.Next() {
|
||||||
|
series := ss.At()
|
||||||
|
lbs := series.Labels()
|
||||||
|
metricName := lbs.Get(labels.MetricName)
|
||||||
|
lbs = lbs.DropMetricName()
|
||||||
|
it := series.Iterator(nil)
|
||||||
|
for it.Next() == chunkenc.ValFloat {
|
||||||
|
ts, val := it.At()
|
||||||
|
fmt.Printf("%s%s %g %.3f\n", metricName, CondensedString(lbs), val, float64(ts)/1000)
|
||||||
|
}
|
||||||
|
if it.Err() != nil {
|
||||||
return ss.Err()
|
return ss.Err()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
fmt.Println("# EOF")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,10 +22,12 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/promql"
|
"github.com/prometheus/prometheus/promql"
|
||||||
|
"github.com/prometheus/prometheus/tsdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestGenerateBucket(t *testing.T) {
|
func TestGenerateBucket(t *testing.T) {
|
||||||
|
@ -52,7 +54,7 @@ func TestGenerateBucket(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// getDumpedSamples dumps samples and returns them.
|
// getDumpedSamples dumps samples and returns them.
|
||||||
func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []string) string {
|
func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []string, formatter SeriesSetFormatter) string {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
oldStdout := os.Stdout
|
oldStdout := os.Stdout
|
||||||
|
@ -65,6 +67,7 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
|
||||||
mint,
|
mint,
|
||||||
maxt,
|
maxt,
|
||||||
match,
|
match,
|
||||||
|
formatter,
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -76,6 +79,14 @@ func getDumpedSamples(t *testing.T, path string, mint, maxt int64, match []strin
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeNewLine(b []byte) []byte {
|
||||||
|
if strings.Contains(runtime.GOOS, "windows") {
|
||||||
|
// We use "/n" while dumping on windows as well.
|
||||||
|
return bytes.ReplaceAll(b, []byte("\r\n"), []byte("\n"))
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
func TestTSDBDump(t *testing.T) {
|
func TestTSDBDump(t *testing.T) {
|
||||||
storage := promql.LoadedStorage(t, `
|
storage := promql.LoadedStorage(t, `
|
||||||
load 1m
|
load 1m
|
||||||
|
@ -136,15 +147,48 @@ func TestTSDBDump(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
dumpedMetrics := getDumpedSamples(t, storage.Dir(), tt.mint, tt.maxt, tt.match)
|
dumpedMetrics := getDumpedSamples(t, storage.Dir(), tt.mint, tt.maxt, tt.match, formatSeriesSet)
|
||||||
expectedMetrics, err := os.ReadFile(tt.expectedDump)
|
expectedMetrics, err := os.ReadFile(tt.expectedDump)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
if strings.Contains(runtime.GOOS, "windows") {
|
expectedMetrics = normalizeNewLine(expectedMetrics)
|
||||||
// We use "/n" while dumping on windows as well.
|
|
||||||
expectedMetrics = bytes.ReplaceAll(expectedMetrics, []byte("\r\n"), []byte("\n"))
|
|
||||||
}
|
|
||||||
// even though in case of one matcher samples are not sorted, the order in the cases above should stay the same.
|
// even though in case of one matcher samples are not sorted, the order in the cases above should stay the same.
|
||||||
require.Equal(t, string(expectedMetrics), dumpedMetrics)
|
require.Equal(t, string(expectedMetrics), dumpedMetrics)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTSDBDumpOpenMetrics(t *testing.T) {
|
||||||
|
storage := promql.LoadedStorage(t, `
|
||||||
|
load 1m
|
||||||
|
my_counter{foo="bar", baz="abc"} 1 2 3 4 5
|
||||||
|
my_gauge{bar="foo", abc="baz"} 9 8 0 4 7
|
||||||
|
`)
|
||||||
|
|
||||||
|
expectedMetrics, err := os.ReadFile("testdata/dump-openmetrics-test.prom")
|
||||||
|
require.NoError(t, err)
|
||||||
|
expectedMetrics = normalizeNewLine(expectedMetrics)
|
||||||
|
dumpedMetrics := getDumpedSamples(t, storage.Dir(), math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
|
||||||
|
require.Equal(t, string(expectedMetrics), dumpedMetrics)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {
|
||||||
|
initialMetrics, err := os.ReadFile("testdata/dump-openmetrics-roundtrip-test.prom")
|
||||||
|
require.NoError(t, err)
|
||||||
|
initialMetrics = normalizeNewLine(initialMetrics)
|
||||||
|
|
||||||
|
dbDir := t.TempDir()
|
||||||
|
// Import samples from OM format
|
||||||
|
err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour)
|
||||||
|
require.NoError(t, err)
|
||||||
|
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
t.Cleanup(func() {
|
||||||
|
require.NoError(t, db.Close())
|
||||||
|
})
|
||||||
|
|
||||||
|
// Dump the blocks into OM format
|
||||||
|
dumpedMetrics := getDumpedSamples(t, dbDir, math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics)
|
||||||
|
|
||||||
|
// Should get back the initial metrics.
|
||||||
|
require.Equal(t, string(initialMetrics), dumpedMetrics)
|
||||||
|
}
|
||||||
|
|
|
@ -582,6 +582,32 @@ Dump samples from a TSDB.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
##### `promtool tsdb dump-openmetrics`
|
||||||
|
|
||||||
|
[Experimental] Dump samples from a TSDB into OpenMetrics format. Native histograms are not dumped.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
###### Flags
|
||||||
|
|
||||||
|
| Flag | Description | Default |
|
||||||
|
| --- | --- | --- |
|
||||||
|
| <code class="text-nowrap">--min-time</code> | Minimum timestamp to dump. | `-9223372036854775808` |
|
||||||
|
| <code class="text-nowrap">--max-time</code> | Maximum timestamp to dump. | `9223372036854775807` |
|
||||||
|
| <code class="text-nowrap">--match</code> | Series selector. Can be specified multiple times. | `{__name__=~'(?s:.*)'}` |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
###### Arguments
|
||||||
|
|
||||||
|
| Argument | Description | Default |
|
||||||
|
| --- | --- | --- |
|
||||||
|
| db path | Database path (default is data/). | `data/` |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
##### `promtool tsdb create-blocks-from`
|
##### `promtool tsdb create-blocks-from`
|
||||||
|
|
||||||
[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.
|
[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.
|
||||||
|
|
Loading…
Reference in a new issue