From 898af6102d19213199a9015bc4d97d109b35616f Mon Sep 17 00:00:00 2001 From: Nicolas Peugnet Date: Thu, 13 Feb 2025 03:03:31 +0100 Subject: [PATCH] promtool: support creating tsdb blocks from a pipe (#16011) This is very useful when piping the input file to stdin and then using /dev/stdin as the input file. e.g. xzcat dump.xz | promtool tsdb create-blocks-from openmetrics /dev/stdin /tmp/data Signed-off-by: Nicolas Peugnet --- cmd/promtool/tsdb.go | 20 ++++++++-- cmd/promtool/tsdb_posix_test.go | 69 +++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 cmd/promtool/tsdb_posix_test.go diff --git a/cmd/promtool/tsdb.go b/cmd/promtool/tsdb.go index b291d29bf7..6a62e2e8bc 100644 --- a/cmd/promtool/tsdb.go +++ b/cmd/promtool/tsdb.go @@ -826,17 +826,31 @@ func checkErr(err error) int { } func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int { - inputFile, err := fileutil.OpenMmapFile(path) + var buf []byte + info, err := os.Stat(path) if err != nil { return checkErr(err) } - defer inputFile.Close() + if info.Mode()&(os.ModeNamedPipe|os.ModeCharDevice) != 0 { + // Read the pipe chunks by chunks as it cannot be mmap-ed + buf, err = os.ReadFile(path) + if err != nil { + return checkErr(err) + } + } else { + inputFile, err := fileutil.OpenMmapFile(path) + if err != nil { + return checkErr(err) + } + defer inputFile.Close() + buf = inputFile.Bytes() + } if err := os.MkdirAll(outputDir, 0o777); err != nil { return checkErr(fmt.Errorf("create output dir: %w", err)) } - return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration, customLabels)) + return checkErr(backfill(5000, buf, outputDir, humanReadable, quiet, maxBlockDuration, customLabels)) } func displayHistogram(dataType string, datas []int, total int) { diff --git a/cmd/promtool/tsdb_posix_test.go b/cmd/promtool/tsdb_posix_test.go new file mode 100644 index 0000000000..8a83aead70 --- /dev/null +++ b/cmd/promtool/tsdb_posix_test.go @@ -0,0 +1,69 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !windows + +package main + +import ( + "bytes" + "io" + "math" + "os" + "path" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/prometheus/prometheus/tsdb" +) + +func TestTSDBDumpOpenMetricsRoundTripPipe(t *testing.T) { + initialMetrics, err := os.ReadFile("testdata/dump-openmetrics-roundtrip-test.prom") + require.NoError(t, err) + initialMetrics = normalizeNewLine(initialMetrics) + + pipeDir := t.TempDir() + dbDir := t.TempDir() + + // create pipe + pipe := path.Join(pipeDir, "pipe") + err = syscall.Mkfifo(pipe, 0o666) + require.NoError(t, err) + + go func() { + // open pipe to write + in, err := os.OpenFile(pipe, os.O_WRONLY, os.ModeNamedPipe) + require.NoError(t, err) + defer func() { require.NoError(t, in.Close()) }() + _, err = io.Copy(in, bytes.NewReader(initialMetrics)) + require.NoError(t, err) + }() + + // Import samples from OM format + code := backfillOpenMetrics(pipe, dbDir, false, false, 2*time.Hour, map[string]string{}) + require.Equal(t, 0, code) + db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil) + require.NoError(t, err) + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // Dump the blocks into OM format + dumpedMetrics := getDumpedSamples(t, dbDir, "", math.MinInt64, math.MaxInt64, []string{"{__name__=~'(?s:.*)'}"}, formatSeriesSetOpenMetrics) + + // Should get back the initial metrics. + require.Equal(t, string(initialMetrics), dumpedMetrics) +}