mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 05:34:05 -08:00
Backfill: print created blocks only, add human-readable option
Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
c08009139a
commit
53480c168d
|
@ -65,7 +65,7 @@ func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) {
|
|||
return maxt, mint, nil
|
||||
}
|
||||
|
||||
func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string) (returnErr error) {
|
||||
func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string, humanReadable bool) (returnErr error) {
|
||||
blockDuration := tsdb.DefaultBlockDuration
|
||||
mint = blockDuration * (mint / blockDuration)
|
||||
|
||||
|
@ -77,6 +77,8 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp
|
|||
returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err()
|
||||
}()
|
||||
|
||||
var wroteHeader bool
|
||||
|
||||
for t := mint; t <= maxt; t = t + blockDuration {
|
||||
err := func() error {
|
||||
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), outputDir, blockDuration)
|
||||
|
@ -133,36 +135,46 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp
|
|||
app = w.Appender(ctx)
|
||||
samplesCount = 0
|
||||
}
|
||||
|
||||
if err := app.Commit(); err != nil {
|
||||
return errors.Wrap(err, "commit")
|
||||
}
|
||||
if _, err := w.Flush(ctx); err != nil && err != tsdb.ErrNoSeriesAppended {
|
||||
|
||||
block, err := w.Flush(ctx)
|
||||
switch err {
|
||||
case nil:
|
||||
blocks, err := db.Blocks()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get blocks")
|
||||
}
|
||||
for _, b := range blocks {
|
||||
if b.Meta().ULID == block {
|
||||
printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable)
|
||||
wroteHeader = true
|
||||
break
|
||||
}
|
||||
}
|
||||
case tsdb.ErrNoSeriesAppended:
|
||||
default:
|
||||
return errors.Wrap(err, "flush")
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "process blocks")
|
||||
}
|
||||
|
||||
blocks, err := db.Blocks()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "get blocks")
|
||||
}
|
||||
if len(blocks) <= 0 {
|
||||
continue
|
||||
}
|
||||
printBlocks(blocks[len(blocks)-1:], true)
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
func backfill(maxSamplesInAppender int, input []byte, outputDir string) (err error) {
|
||||
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable bool) (err error) {
|
||||
p := textparse.NewOpenMetricsParser(input)
|
||||
maxt, mint, err := getMinAndMaxTimestamps(p)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting min and max timestamp")
|
||||
}
|
||||
return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir), "block creation")
|
||||
return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir, humanReadable), "block creation")
|
||||
}
|
||||
|
|
|
@ -423,7 +423,7 @@ after_eof 1 2
|
|||
require.NoError(t, os.RemoveAll(outputDir))
|
||||
}()
|
||||
|
||||
err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir)
|
||||
err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false)
|
||||
|
||||
if !test.IsOk {
|
||||
require.Error(t, err, test.Description)
|
||||
|
|
|
@ -134,6 +134,7 @@ func main() {
|
|||
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
|
||||
|
||||
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
|
||||
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
|
||||
openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.")
|
||||
// TODO(aSquare14): add flag to set default block duration
|
||||
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
|
||||
|
@ -196,7 +197,7 @@ func main() {
|
|||
os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime)))
|
||||
//TODO(aSquare14): Work on adding support for custom block size.
|
||||
case openMetricsImportCmd.FullCommand():
|
||||
os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath)))
|
||||
os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable)))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -354,15 +354,18 @@ func listBlocks(path string, humanReadable bool) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
printBlocks(blocks, humanReadable)
|
||||
printBlocks(blocks, true, humanReadable)
|
||||
return nil
|
||||
}
|
||||
|
||||
func printBlocks(blocks []tsdb.BlockReader, humanReadable bool) {
|
||||
func printBlocks(blocks []tsdb.BlockReader, writeHeader, humanReadable bool) {
|
||||
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
||||
defer tw.Flush()
|
||||
|
||||
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE")
|
||||
if writeHeader {
|
||||
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE")
|
||||
}
|
||||
|
||||
for _, b := range blocks {
|
||||
meta := b.Meta()
|
||||
|
||||
|
@ -615,11 +618,11 @@ func checkErr(err error) int {
|
|||
return 0
|
||||
}
|
||||
|
||||
func backfillOpenMetrics(path string, outputDir string) (err error) {
|
||||
func backfillOpenMetrics(path string, outputDir string, humanReadable bool) (err error) {
|
||||
inputFile, err := fileutil.OpenMmapFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer inputFile.Close()
|
||||
return backfill(5000, inputFile.Bytes(), outputDir)
|
||||
return backfill(5000, inputFile.Bytes(), outputDir, humanReadable)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue