Merge branch 'review'

This commit is contained in:
beorn7 2020-12-28 16:18:32 +01:00
commit 97f9bcb243
4 changed files with 37 additions and 20 deletions

View file

@ -65,7 +65,7 @@ func getMinAndMaxTimestamps(p textparse.Parser) (int64, int64, error) {
return maxt, mint, nil return maxt, mint, nil
} }
func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string) (returnErr error) { func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outputDir string, humanReadable bool) (returnErr error) {
blockDuration := tsdb.DefaultBlockDuration blockDuration := tsdb.DefaultBlockDuration
mint = blockDuration * (mint / blockDuration) mint = blockDuration * (mint / blockDuration)
@ -77,6 +77,8 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp
returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err() returnErr = tsdb_errors.NewMulti(returnErr, db.Close()).Err()
}() }()
var wroteHeader bool
for t := mint; t <= maxt; t = t + blockDuration { for t := mint; t <= maxt; t = t + blockDuration {
err := func() error { err := func() error {
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), outputDir, blockDuration) w, err := tsdb.NewBlockWriter(log.NewNopLogger(), outputDir, blockDuration)
@ -133,36 +135,46 @@ func createBlocks(input []byte, mint, maxt int64, maxSamplesInAppender int, outp
app = w.Appender(ctx) app = w.Appender(ctx)
samplesCount = 0 samplesCount = 0
} }
if err := app.Commit(); err != nil { if err := app.Commit(); err != nil {
return errors.Wrap(err, "commit") return errors.Wrap(err, "commit")
} }
if _, err := w.Flush(ctx); err != nil && err != tsdb.ErrNoSeriesAppended {
block, err := w.Flush(ctx)
switch err {
case nil:
blocks, err := db.Blocks()
if err != nil {
return errors.Wrap(err, "get blocks")
}
for _, b := range blocks {
if b.Meta().ULID == block {
printBlocks([]tsdb.BlockReader{b}, !wroteHeader, humanReadable)
wroteHeader = true
break
}
}
case tsdb.ErrNoSeriesAppended:
default:
return errors.Wrap(err, "flush") return errors.Wrap(err, "flush")
} }
return nil return nil
}() }()
if err != nil { if err != nil {
return errors.Wrap(err, "process blocks") return errors.Wrap(err, "process blocks")
} }
blocks, err := db.Blocks()
if err != nil {
return errors.Wrap(err, "get blocks")
}
if len(blocks) <= 0 {
continue
}
printBlocks(blocks[len(blocks)-1:], true)
} }
return nil return nil
} }
func backfill(maxSamplesInAppender int, input []byte, outputDir string) (err error) { func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable bool) (err error) {
p := textparse.NewOpenMetricsParser(input) p := textparse.NewOpenMetricsParser(input)
maxt, mint, err := getMinAndMaxTimestamps(p) maxt, mint, err := getMinAndMaxTimestamps(p)
if err != nil { if err != nil {
return errors.Wrap(err, "getting min and max timestamp") return errors.Wrap(err, "getting min and max timestamp")
} }
return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir), "block creation") return errors.Wrap(createBlocks(input, mint, maxt, maxSamplesInAppender, outputDir, humanReadable), "block creation")
} }

View file

@ -423,7 +423,7 @@ after_eof 1 2
require.NoError(t, os.RemoveAll(outputDir)) require.NoError(t, os.RemoveAll(outputDir))
}() }()
err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir) err = backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false)
if !test.IsOk { if !test.IsOk {
require.Error(t, err, test.Description) require.Error(t, err, test.Description)

View file

@ -134,6 +134,7 @@ func main() {
dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() dumpMaxTime := tsdbDumpCmd.Flag("max-time", "Maximum timestamp to dump.").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.") importCmd := tsdbCmd.Command("create-blocks-from", "[Experimental] Import samples from input and produce TSDB blocks. Please refer to the storage docs for more details.")
importHumanReadable := importCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.") openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.")
// TODO(aSquare14): add flag to set default block duration // TODO(aSquare14): add flag to set default block duration
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String() importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
@ -196,7 +197,7 @@ func main() {
os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime))) os.Exit(checkErr(dumpSamples(*dumpPath, *dumpMinTime, *dumpMaxTime)))
//TODO(aSquare14): Work on adding support for custom block size. //TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand(): case openMetricsImportCmd.FullCommand():
os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath))) os.Exit(checkErr(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable)))
} }
} }

View file

@ -354,15 +354,18 @@ func listBlocks(path string, humanReadable bool) error {
if err != nil { if err != nil {
return err return err
} }
printBlocks(blocks, humanReadable) printBlocks(blocks, true, humanReadable)
return nil return nil
} }
func printBlocks(blocks []tsdb.BlockReader, humanReadable bool) { func printBlocks(blocks []tsdb.BlockReader, writeHeader, humanReadable bool) {
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
defer tw.Flush() defer tw.Flush()
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE") if writeHeader {
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tDURATION\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES\tSIZE")
}
for _, b := range blocks { for _, b := range blocks {
meta := b.Meta() meta := b.Meta()
@ -615,7 +618,7 @@ func checkErr(err error) int {
return 0 return 0
} }
func backfillOpenMetrics(path string, outputDir string) (err error) { func backfillOpenMetrics(path string, outputDir string, humanReadable bool) (err error) {
inputFile, err := fileutil.OpenMmapFile(path) inputFile, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return err return err
@ -625,5 +628,6 @@ func backfillOpenMetrics(path string, outputDir string) (err error) {
if err := os.MkdirAll(outputDir, 0777); err != nil { if err := os.MkdirAll(outputDir, 0777); err != nil {
return errors.Wrap(err, "create output dir") return errors.Wrap(err, "create output dir")
} }
return backfill(5000, inputFile.Bytes(), outputDir)
return backfill(5000, inputFile.Bytes(), outputDir, humanReadable)
} }