TSDB data import tool

Created a tool to import data formatted according to the Prometheus exposition format. The tool can be accessed via the TSDB CLI.

Addresses prometheus/prometheus#535

Signed-off-by: Dipack P Panjabi <dipack.panjabi@gmail.com>

Update PR to comply with changes on master.

Signed-off-by: Dipack P Panjabi <dipack.panjabi@gmail.com>

Add Importer tool minimal documentation (#1)

* Add Importer tool minimal documentation

* Reduce a little import file example

Update docs. Courtesy of @gillg

Signed-off-by: Dipack P Panjabi <dipack.panjabi@gmail.com>

Fix stuff because of rebase

Signed-off-by: Dipack P Panjabi <dipack.panjabi@gmail.com>
This commit is contained in:
Dipack P Panjabi 2019-08-13 11:49:35 -04:00 committed by Bartlomiej Plotka
parent 9c599f1ee2
commit 575cdf2886
4 changed files with 1438 additions and 17 deletions

View file

@ -103,3 +103,57 @@ Note that on the read path, Prometheus only fetches raw series data for a set of
### Existing integrations
To learn more about existing integrations with remote storage systems, see the [Integrations documentation](https://prometheus.io/docs/operating/integrations/#remote-endpoints-and-storage).
## Database management
### Import old metrics from file
The command-line tool `tsdb` contains an `import` command. The goal of this tool is not for a regular backfilling, don't forget prometheus is a pull based system by assumed choice.
You can use it to restore another prometheus partial dump, or any metrics exported from any system. The only supported input format is [OpenMetrics](https://openmetrics.io/) which is the same as Prometheus exporters exposition.
You are free to export/convert your existing data to this format, into one time-sorted text file. The file MUST end with the line string `# EOF`.
Sample file `rrd_exported_data.txt` (`[metric]{[labels]} [number value] [timestamp ms]`):
```
collectd_df_complex{host="myserver.fqdn.com",df="var-log",dimension="free"} 5.8093906125e+10 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.client_req"} 2.3021666667e+01 1582226100000
collectd_df_complex{host="myserver.fqdn.com",df="var-log",dimension="free"} 5.8093906125e+10 1582226100000
collectd_load{host="myserver.fqdn.com",type="midterm"} 0.0155 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.client_req"} 2.3021666667e+01 1582226100000
collectd_load{host="myserver.fqdn.com",type="midterm"} 1.5500000000e-02 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.cache_hit"} 3.8054166667e+01 1582226100000
collectd_df_complex{host="myserver.fqdn.com",df="var-log",dimension="free"} 5.8093906125e+10 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.s_pipe"} 0 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.cache_hit"} 3.8054166667e+01 1582226100000
collectd_load{host="myserver.fqdn.com",type="shortterm"} 1.1000000000e-02 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.client_req"} 2.3021666667e+01 1582226100000
collectd_load{host="myserver.fqdn.com",type="shortterm"} 1.1000000000e-02 1582226100000
collectd_load{host="myserver.fqdn.com",type="shortterm"} 1.1000000000e-02 1582226100000
collectd_load{host="myserver.fqdn.com",type="longterm"} 2.5500000000e-02 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.s_pipe"} 0 1582226100000
collectd_load{host="myserver.fqdn.com"type="longterm"} 2.5500000000e-02 1582226100000
collectd_varnish_derive{host="myserver.fqdn.com",varnish="request_rate",dimension="MAIN.cache_hit"} 3.8054166667e+01 1582226100000
collectd_load{host="myserver.fqdn.com",type="midterm"} 1.5500000000e-02 1582226100000
# EOF
```
Note, `[number value]` can be mixed as normal or scientific number as per your preference. You are free to put custom labels on each metric, don't forget that "relabelling rules" defined in prometheus will not be applied on them! You should produce the final labels on your import file.
This format is simple to produce, but not optimized or compressed, so it's normal if your data file is huge.
Before starting the import process, please verify if there is an overlap between the data you are importing, and what already exists in TSDB.
If there is an overlap, you will need to shut down TSDB, and restart it with `--storage.tsdb.allow-overlapping-blocks`, else you do not need to shut down TSDB.
However, it is recommended you do so anyway.
Do not forget the `--storage.tsdb.retention.time=X` flag, if you not want to lose any imported data points.
An example import command is: `tsdb import rrd_exported_data.txt /var/lib/prometheus/ --max-samples-in-mem=10000`
You can increase `max-sample-in-mem` to speed up the process, but the value 10000 seems a good balance.
This tool will create all Prometheus blocks (see [On-disk layout][On-disk layout] above), in a temporary workspace. By default temp workspace is /tmp/ according to the $TMPDIR env var, so you can change it if you have disk space issues (`TMPDIR=/new/path tsdb import [...]`) !
### Prometheus TSDB imports feedback
Example of a 19G OpenMetrics file, with ~20k timeseries and 200M data points (samples) on 2y period. Globally resolution is very very low in this example.
Import will take around 2h and uncompacted new TSDB blocks will be around 2.1G for 7600 blocks. When prometheus scan them, it starts automatically compacting them in the background. Once compaction is completed (~30min), TSDB blocks will be around 970M for 80 blocks (without loss of data points).
The size, and number of blocks depends on timeseries numbers and metrics resolution, but it gives you an order of sizes.

View file

@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/importer"
"gopkg.in/alecthomas/kingpin.v2"
)
@ -52,23 +53,28 @@ func execute() (err error) {
var (
defaultDBPath = filepath.Join("benchout", "storage")
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
benchCmd = cli.Command("bench", "run benchmarks")
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "..", "testdata", "20kseries.json")).String()
listCmd = cli.Command("ls", "list db blocks")
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
listPath = listCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
analyzeBlockID = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
dumpCmd = cli.Command("dump", "dump samples from a TSDB")
dumpPath = dumpCmd.Arg("db path", "database path (default is "+defaultDBPath+")").Default(defaultDBPath).String()
dumpMinTime = dumpCmd.Flag("min-time", "minimum timestamp to dump").Default(strconv.FormatInt(math.MinInt64, 10)).Int64()
dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64()
importCmd = cli.Command("import", "import samples from file containing information formatted in the Open Metrics format. Please refer to the storage docs for more details.")
importFilePath = importCmd.Arg("file path", "file to import samples from (must be in Open Metrics format)").Required().String()
importDbPath = importCmd.Arg("db path", "database path").Required().String()
importMaxSamplesInMemory = importCmd.Flag("max-samples-in-mem", "maximum number of samples to process in a cycle").Default("10000").Int()
importMaxBlockChildren = importCmd.Flag("max-block-children", "maximum number of children a block can have at a given time").Default("20").Int()
)
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
@ -138,6 +144,12 @@ func execute() (err error) {
err = merr.Err()
}()
return dumpSamples(db, *dumpMinTime, *dumpMaxTime)
case importCmd.FullCommand():
f, err := os.Open(*importFilePath)
if err != nil {
return err
}
return importer.ImportFromFile(f, *importDbPath, *importMaxSamplesInMemory, *importMaxBlockChildren, logger)
}
return nil
}

View file

@ -0,0 +1,753 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"sort"
"strconv"
"strings"
"testing"
labels2 "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/testutil"
)
const (
// We use a lower value for this than the default to test block compaction implicitly.
maxSamplesInMemory = 2000
maxBlockChildren = 10
)
func testBlocks(t *testing.T, blocks []tsdb.BlockReader, metricLabels []string, expectedMint, expectedMaxt int64, expectedSamples []tsdb.MetricSample, expectedSymbols []string, expectedNumBlocks int) {
// Assert we have expected number of blocks.
testutil.Equals(t, expectedNumBlocks, len(blocks))
allSymbols := make(map[string]struct{})
allSamples := make([]tsdb.MetricSample, 0)
maxt, mint := int64(math.MinInt64), int64(math.MaxInt64)
for _, block := range blocks {
maxt, mint = value.MaxInt64(maxt, block.Meta().MaxTime), value.MinInt64(mint, block.Meta().MinTime)
indexr, err := block.Index()
testutil.Ok(t, err)
symbols := indexr.Symbols()
for symbols.Next() {
key := symbols.At()
if _, ok := allSymbols[key]; !ok {
allSymbols[key] = struct{}{}
}
}
blockSamples, err := readSeries(block, labels2.FromStrings(metricLabels...))
testutil.Ok(t, err)
allSamples = append(allSamples, blockSamples...)
_ = indexr.Close()
}
allSymbolsSlice := make([]string, 0)
for key := range allSymbols {
allSymbolsSlice = append(allSymbolsSlice, key)
}
sort.Strings(allSymbolsSlice)
sort.Strings(expectedSymbols)
// Assert that all symbols that we expect to find are present.
testutil.Equals(t, expectedSymbols, allSymbolsSlice)
sortSamples(allSamples)
sortSamples(expectedSamples)
// Assert that all samples that we imported + what existed already are present.
testutil.Assert(t, len(allSamples) == len(expectedSamples), "number of expected samples is different from actual samples")
testutil.Equals(t, expectedSamples, allSamples, "actual samples are different from expected samples")
// Assert that DB's time ranges are as specified.
testutil.Equals(t, expectedMaxt, maxt)
testutil.Equals(t, expectedMint, mint)
}
func sortSamples(samples []tsdb.MetricSample) {
sort.Slice(samples, func(x, y int) bool {
sx, sy := samples[x], samples[y]
// If timestamps are equal, sort based on values.
if sx.Timestamp != sy.Timestamp {
return sx.Timestamp < sy.Timestamp
}
return sx.Value < sy.Value
})
}
// readSeries returns all series present in the block, that contain the labels we want.
// The labels do not have to be exhaustive, i.e. if we have metrics with common labels b/w them,
// we just need to pass the common labels, and we will get all the metrics that have them,
// even if the other labels b/w the metrics are different.
func readSeries(block tsdb.BlockReader, lbls labels2.Labels) ([]tsdb.MetricSample, error) {
series := make([]tsdb.MetricSample, 0)
ir, err := block.Index()
if err != nil {
return series, err
}
defer ir.Close()
tsr, err := block.Tombstones()
if err != nil {
return series, err
}
defer tsr.Close()
chunkr, err := block.Chunks()
if err != nil {
return series, err
}
defer chunkr.Close()
for _, lbl := range lbls {
css, err := tsdb.LookupChunkSeries(ir, tsr, labels2.MustNewMatcher(labels2.MatchEqual, lbl.Name, lbl.Value))
if err != nil {
return series, err
}
for css.Next() {
actLabels, chkMetas, _ := css.At()
var chunkIter chunkenc.Iterator
for _, meta := range chkMetas {
chk, err := chunkr.Chunk(meta.Ref)
if err != nil {
return series, err
}
chunkIter = chk.Iterator(chunkIter)
for chunkIter.Next() {
t, v := chunkIter.At()
sample := tsdb.MetricSample{
Timestamp: t,
Value: v,
Labels: actLabels,
}
series = append(series, sample)
}
}
}
}
return series, nil
}
// genSeries generates series from mint to maxt, with a step value.
func genSeries(labels []string, mint, maxt int64, step int) []tsdb.MetricSample {
series := make([]tsdb.MetricSample, 0)
for idx := mint; idx < maxt; idx += int64(step) {
// Round to 3 places.
val := math.Floor(rand.Float64()*1000) / 1000
sample := tsdb.MetricSample{Timestamp: idx, Value: val, Labels: labels2.FromStrings(labels...)}
series = append(series, sample)
}
return series
}
// labelsToStr converts the given labels to a string representation that is compliant with
// the OpenMetrics parser.
func labelsToStr(labels labels2.Labels) string {
str := "{"
for idx, l := range labels {
str += fmt.Sprintf("%s=%s", l.Name, strconv.Quote(l.Value))
if idx < len(labels)-1 {
str += ","
}
}
str += "}"
return str
}
// genOpenMetricsText formats the given series data into OpenMetrics compliant format.
func genOpenMetricsText(metricName, metricType string, series []tsdb.MetricSample) string {
str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType)
for _, s := range series {
str += fmt.Sprintf("\n%s%s %f %d", metricName, labelsToStr(s.Labels), s.Value, s.Timestamp)
}
str += fmt.Sprintf("\n# EOF")
return str
}
func TestImport(t *testing.T) {
tests := []struct {
ToParse string
IsOk bool
MetricLabels []string
Expected struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}
}{
{
ToParse: `# EOF`,
IsOk: true,
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1021 1565133713989
http_requests_total{code="400"} 1 1565133713990
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1565133713989,
MaxTime: 1565133713991,
NumBlocks: 1,
Symbols: []string{"__name__", "http_requests_total", "code", "200", "400"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1565133713989,
Value: 1021,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1565133713990,
Value: 1,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1022 1565133713989
http_requests_total{code="400"} 2 1575133713990
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1565133713989,
MaxTime: 1575133713991,
NumBlocks: 2,
Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1565133713989,
Value: 1022,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1575133713990,
Value: 2,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1023 1395066363000
http_requests_total{code="400"} 3 1395066363000
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "http_requests_total"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1395066363000,
MaxTime: 1395066363001,
NumBlocks: 1,
Symbols: []string{"http_requests_total", "code", "200", "400", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1395066363000,
Value: 1023,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "200"),
},
{
Timestamp: 1395066363000,
Value: 3,
Labels: labels2.FromStrings("__name__", "http_requests_total", "code", "400"),
},
},
},
},
{
ToParse: `# HELP something_weird Something weird
# TYPE something_weird gauge
something_weird{problem="infinite timestamp"} +Inf -3982045
# EOF
`,
IsOk: false,
},
{
// Metric has no timestamp, hence invalid
ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
# TYPE rpc_duration_seconds summary
rpc_duration_seconds{quantile="0.01"} 3102
rpc_duration_seconds{quantile="0.05"} 3272
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP no_type_metric This is a metric with no TYPE string
no_type_metric{type="bad_news_bears"} 0.0 111
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "no_type_metric"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 111,
MaxTime: 112,
NumBlocks: 1,
Symbols: []string{"no_type_metric", "type", "bad_news_bears", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 111,
Value: 0.0,
Labels: labels2.FromStrings("__name__", "no_type_metric", "type", "bad_news_bears"),
},
},
},
},
{
ToParse: `# HELP bad_ts This is a metric with an extreme timestamp
# TYPE bad_ts gauge
bad_ts{type="bad_timestamp"} 420 -1e99
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP bad_ts This is a metric with an extreme timestamp
# TYPE bad_ts gauge
bad_ts{type="bad_timestamp"} 420 1e99
# EOF
`,
IsOk: false,
},
{
ToParse: `no_help_no_type{foo="bar"} 42 6900
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "no_help_no_type"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 6900,
MaxTime: 6901,
NumBlocks: 1,
Symbols: []string{"no_help_no_type", "foo", "bar", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 6900,
Value: 42,
Labels: labels2.FromStrings("__name__", "no_help_no_type", "foo", "bar"),
},
},
},
},
{
ToParse: `bare_metric 42.24 1001
# EOF
`,
IsOk: true,
MetricLabels: []string{"__name__", "bare_metric"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
Symbols []string
Samples []tsdb.MetricSample
}{
MinTime: 1001,
MaxTime: 1002,
NumBlocks: 1,
Symbols: []string{"bare_metric", "__name__"},
Samples: []tsdb.MetricSample{
{
Timestamp: 1001,
Value: 42.24,
Labels: labels2.FromStrings("__name__", "bare_metric"),
},
},
},
},
{
ToParse: `# HELP bad_metric This a bad metric
# TYPE bad_metric bad_type
bad_metric{type="has no type information"} 0.0 111
# EOF
`,
IsOk: false,
},
{
ToParse: `# HELP no_nl This test has no newline so will fail
# TYPE no_nl gauge
no_nl{type="no newline"}
# EOF`,
IsOk: false,
},
}
for _, test := range tests {
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(test.ToParse), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
if test.IsOk {
testutil.Ok(t, err)
if len(test.Expected.Symbols) > 0 {
db, err := tsdb.OpenDBReadOnly(tmpDbDir, nil)
testutil.Ok(t, err)
blocks, err := db.Blocks()
testutil.Ok(t, err)
testBlocks(t, blocks, test.MetricLabels, test.Expected.MinTime, test.Expected.MaxTime, test.Expected.Samples, test.Expected.Symbols, test.Expected.NumBlocks)
}
} else {
testutil.NotOk(t, err)
}
_ = os.RemoveAll(tmpDbDir)
}
}
func TestImportBadFile(t *testing.T) {
// No file found case.
err := ImportFromFile((*os.File)(nil), "/buzz/baz/bar/foo", maxSamplesInMemory, maxBlockChildren, nil)
testutil.NotOk(t, err)
}
func TestImportIntoExistingDB(t *testing.T) {
type importTest struct {
MetricName string
MetricType string
MetricLabels []string
GeneratorStep int
DBMint, DBMaxt int64
ImportShuffle bool
ImportMint, ImportMaxt int64
ExpectedMint, ExpectedMaxt int64
ExpectedSymbols []string
ExpectedNumBlocks int
}
tests := []importTest{
{
// Import overlapping data only.
MetricName: "test_metric",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 2000,
ImportShuffle: false,
ImportMint: 1000,
ImportMaxt: 1500,
ExpectedMint: 1000,
ExpectedMaxt: 2000,
ExpectedSymbols: []string{"__name__", "test_metric", "foo", "bar"},
ExpectedNumBlocks: 2,
},
{
// Test overlapping, and non-overlapping data, with a lot of samples.
MetricName: "test_metric_2",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 4000,
ImportShuffle: false,
ImportMint: 0,
ImportMaxt: 5000,
ExpectedSymbols: []string{"__name__", "test_metric_2", "foo", "bar"},
ExpectedMint: 0,
ExpectedMaxt: 5000,
ExpectedNumBlocks: 4,
},
{
// Test overlapping, and non-overlapping data, aligning the blocks.
// This test also creates a large number of samples, across a much wider time range, to test
// if the importer will divvy samples beyond DB time limits.
MetricName: "test_metric_3",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 20000,
DBMint: 1000,
DBMaxt: tsdb.DefaultBlockDuration,
ImportShuffle: false,
ImportMint: 0,
ImportMaxt: tsdb.DefaultBlockDuration * 2,
ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"},
ExpectedMint: 0,
ExpectedMaxt: (tsdb.DefaultBlockDuration * 2) - 20000 + 1,
ExpectedNumBlocks: 4,
},
{
// Import partially overlapping data only.
MetricName: "test_metric_4",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 1000,
DBMaxt: 2000,
ImportShuffle: false,
ImportMint: 500,
ImportMaxt: 1500,
ExpectedMint: 500,
ExpectedMaxt: 2000,
ExpectedSymbols: []string{"__name__", "test_metric_4", "foo", "bar"},
ExpectedNumBlocks: 3,
},
{
// Import that never overlaps, after the DB max time.
MetricName: "test_metric_5",
MetricType: "gauge",
MetricLabels: []string{"foo", "bar"},
GeneratorStep: 1,
DBMint: 0,
DBMaxt: 1000,
ImportShuffle: false,
ImportMint: tsdb.DefaultBlockDuration + 1000,
ImportMaxt: tsdb.DefaultBlockDuration + 2000,
ExpectedMint: 0,
ExpectedMaxt: tsdb.DefaultBlockDuration + 2000,
ExpectedSymbols: []string{"__name__", "test_metric_5", "foo", "bar"},
ExpectedNumBlocks: 2,
},
}
for _, test := range tests {
initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep)
initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
expectedSamples := make([]tsdb.MetricSample, 0)
for _, exp := range [][]tsdb.MetricSample{initSeries, importSeries} {
for _, sample := range exp {
lbls := labels2.FromStrings("__name__", test.MetricName)
s := tsdb.MetricSample{
Timestamp: sample.Timestamp,
Value: sample.Value,
Labels: append(lbls, sample.Labels...),
}
expectedSamples = append(expectedSamples, s)
}
}
db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{
RetentionDuration: tsdb.DefaultOptions().RetentionDuration,
AllowOverlappingBlocks: true,
})
testutil.Ok(t, err)
blocks := make([]tsdb.BlockReader, 0)
for _, b := range db.Blocks() {
blocks = append(blocks, b)
}
testBlocks(t, blocks, test.MetricLabels, test.ExpectedMint, test.ExpectedMaxt, expectedSamples, test.ExpectedSymbols, test.ExpectedNumBlocks)
// Close and remove all temp files and folders.
_ = db.Close()
_ = os.RemoveAll(tmpDbDir)
}
}
// Test to see if multiple series are imported correctly.
func TestMixedSeries(t *testing.T) {
partialOMText := func(metricName, metricType string, series []tsdb.MetricSample) string {
str := fmt.Sprintf("# HELP %s This is a metric\n# TYPE %s %s", metricName, metricName, metricType)
for _, s := range series {
str += fmt.Sprintf("\n%s%s %f %d", metricName, s.Labels.String(), s.Value, s.Timestamp)
}
return str
}
metricA := "metricA"
metricLabels := []string{"foo", "bar"}
mint, maxt := int64(0), int64(1000)
step := 5
seriesA := genSeries(metricLabels, mint, maxt, step)
textA := partialOMText(metricA, "gauge", seriesA)
metricB := "metricB"
seriesB := genSeries(metricLabels, mint, maxt, step)
textB := partialOMText(metricB, "gauge", seriesB)
text := fmt.Sprintf("%s\n%s\n# EOF", textA, textB)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
err = ImportFromFile(strings.NewReader(text), tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.Ok(t, err)
addMetricLabel := func(series []tsdb.MetricSample, metricName string) []tsdb.MetricSample {
augSamples := make([]tsdb.MetricSample, 0)
for _, sample := range series {
lbls := labels2.FromStrings("__name__", metricName)
s := tsdb.MetricSample{
Timestamp: sample.Timestamp,
Value: sample.Value,
Labels: append(lbls, sample.Labels...),
}
augSamples = append(augSamples, s)
}
return augSamples
}
expectedSamples := append(addMetricLabel(seriesA, metricA), addMetricLabel(seriesB, metricB)...)
expectedSymbols := append([]string{"__name__", metricA, metricB}, metricLabels...)
db, err := tsdb.Open(tmpDbDir, nil, nil, &tsdb.Options{
RetentionDuration: tsdb.DefaultOptions().RetentionDuration,
AllowOverlappingBlocks: true,
})
testutil.Ok(t, err)
blocks := make([]tsdb.BlockReader, 0)
for _, b := range db.Blocks() {
blocks = append(blocks, b)
}
testBlocks(t, blocks, metricLabels, mint, maxt-int64(step-1), expectedSamples, expectedSymbols, 1)
_ = os.RemoveAll(tmpDbDir)
}
// Test to check if we can detect an error with the text,
// without having to read till EOF.
func TestInvalidSyntaxQuickAbort(t *testing.T) {
lbls := []string{"foo", "bar"}
firstHalf := genSeries(lbls, 0, 10, 1)
invalidMetrics := []tsdb.MetricSample{
{
Timestamp: -1,
Value: 0,
Labels: labels2.FromStrings(" INVALID", "INVALID"),
},
}
secondHalf := genSeries(lbls, 10, 200, 1)
combined := append(firstHalf, append(invalidMetrics, secondHalf...)...)
importText := genOpenMetricsText("invalid_test", "gauge", combined)
tmpDbDir, err := ioutil.TempDir("", "importer")
testutil.Ok(t, err)
r := strings.NewReader(importText)
err = ImportFromFile(r, tmpDbDir, maxSamplesInMemory, maxBlockChildren, nil)
testutil.NotOk(t, err)
// We should abort and return the error with the text, without reading till EOF.
testutil.Assert(t, r.Len() > 0, "import text has been completely read")
}
// func benchVirtual(vThresh int, b *testing.B) {
// for n := 0; n < b.N; n++ {
// type importTest struct {
// MetricName string
// MetricType string
// MetricLabels []string
// GeneratorStep int
// DBMint, DBMaxt int64
// ImportShuffle bool
// ImportMint, ImportMaxt int64
// ExpectedMint, ExpectedMaxt int64
// ExpectedSymbols []string
// ExpectedNumBlocks int
// }
//
// tests := []importTest{
// {
// // Test overlapping, and non-overlapping data, aligning the blocks.
// // This test also creates a large number of samples, across a much wider time range, to test
// // if the importer will divvy samples beyond DB time limits into progressively smaller blocks
// // using the ranges in tsdb.DefaultOptions.BlockRanges.
// MetricName: "test_metric_3",
// MetricType: "gauge",
// MetricLabels: []string{"foo", "bar"},
// GeneratorStep: 5000,
// DBMint: 1000,
// DBMaxt: tsdb.DefaultOptions.BlockRanges[2],
// ImportShuffle: false,
// ImportMint: 0,
// ImportMaxt: tsdb.DefaultOptions.BlockRanges[2]*2 + 10000,
// ExpectedSymbols: []string{"__name__", "test_metric_3", "foo", "bar"},
// ExpectedMint: 0,
// ExpectedMaxt: (tsdb.DefaultOptions.BlockRanges[2] * 2) + 1,
// ExpectedNumBlocks: 5,
// },
// }
//
// for _, test := range tests {
// initSeries := genSeries(test.MetricLabels, test.DBMint, test.DBMaxt, test.GeneratorStep)
// initText := genOpenMetricsText(test.MetricName, test.MetricType, initSeries)
//
// tmpDbDir, err := ioutil.TempDir("", "importer")
// testutil.Ok(b, err)
// err = ImportFromFile(strings.NewReader(initText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
// testutil.Ok(b, err)
//
// importSeries := genSeries(test.MetricLabels, test.ImportMint, test.ImportMaxt, test.GeneratorStep)
// importText := genOpenMetricsText(test.MetricName, test.MetricType, importSeries)
//
// err = ImportFromFile(strings.NewReader(importText), tmpDbDir, maxSamplesInMemory, vThresh, nil)
// testutil.Ok(b, err)
// }
// }
// }
//
// func Benchmark_Vertical0(b *testing.B) {
// benchVirtual(0, b)
// }
// func Benchmark_Vertical5(b *testing.B) {
// benchVirtual(5, b)
// }
// func Benchmark_Vertical10(b *testing.B) {
// benchVirtual(10, b)
// }
// func Benchmark_Vertical20(b *testing.B) {
// benchVirtual(20, b)
// }

602
tsdb/importer/importer.go Normal file
View file

@ -0,0 +1,602 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package importer
import (
"bufio"
"bytes"
"context"
"encoding/gob"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"sort"
"strings"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
)
// Implementing the error interface to create a
// constant, which cannot be overridden.
// https://dave.cheney.net/2016/04/07/constant-errors
type Error string
func (e Error) Error() string {
return string(e)
}
const (
// This error is thrown when the current entry does not have a timestamp associated.
NoTimestampError = Error("expected timestamp with metric")
// This error is thrown when the sample being parsed currently has a corresponding block
// in the database, but has no metadata collected for it, curiously enough.
NoBlockMetaFoundError = Error("no metadata found for current samples' block")
)
type blockTimestampPair struct {
start, end int64
}
type newBlockMeta struct {
index int
count int
mint, maxt int64
isAligned bool
dir string
children []*newBlockMeta
}
// Content Type for the Open Metrics Parser.
// Needed to init the text parser.
const contentType = "application/openmetrics-text;"
// ImportFromFile imports data from a file formatted according to the Open Metrics format,
// converts it into block(s), and places the newly created block(s) in the
// TSDB DB directory, where it is treated like any other block.
func ImportFromFile(
r io.Reader,
dbPath string,
maxSamplesInMemory int,
maxBlockChildren int,
logger log.Logger,
) error {
if logger == nil {
logger = log.NewNopLogger()
}
level.Debug(logger).Log("msg", "creating temp directory")
tmpDbDir, err := ioutil.TempDir("", "importer")
if err != nil {
return err
}
defer os.RemoveAll(tmpDbDir)
level.Debug(logger).Log("msg", "reading existing block metadata to infer time ranges")
blockMetas, err := constructBlockMetadata(dbPath)
if err != nil {
return err
}
level.Info(logger).Log("msg", "importing input data and aligning with existing DB")
if err = writeSamples(r, tmpDbDir, blockMetas, maxSamplesInMemory, maxBlockChildren, logger); err != nil {
return err
}
level.Info(logger).Log("msg", "merging fully overlapping blocks")
if err = mergeBlocks(tmpDbDir, blockMetas, logger); err != nil {
return err
}
level.Debug(logger).Log("msg", "copying newly created blocks from temp location to actual DB location")
if err = fileutil.CopyDirs(tmpDbDir, dbPath); err != nil {
return err
}
return nil
}
// writeSamples parses each metric sample, and writes the samples to the correspondingly aligned block
// to the given directory.
func writeSamples(
r io.Reader,
dir string,
metas []*newBlockMeta,
maxSamplesInMemory int,
maxBlockChildren int,
logger log.Logger,
) error {
// First block represents everything before DB start.
// Last block represents everything after DB end.
dbMint, dbMaxt := metas[0].maxt, metas[len(metas)-1].mint
blocks := make([][]*tsdb.MetricSample, len(metas))
currentPassCount := 0
// TODO: Maybe use bufio.Reader.Readline instead?
// https://stackoverflow.com/questions/21124327/how-to-read-a-text-file-line-by-line-in-go-when-some-lines-are-long-enough-to-ca
// Use a streaming approach to avoid loading too much data at once.
scanner := bufio.NewScanner(r)
buf := new(bytes.Buffer)
scanner.Split(sampleStreamer(buf))
for scanner.Scan() {
if currentPassCount == 0 {
blocks = make([][]*tsdb.MetricSample, len(metas))
}
encSample := scanner.Bytes()
decBuf := bytes.NewBuffer(encSample)
sample := tsdb.MetricSample{}
if err := gob.NewDecoder(decBuf).Decode(&sample); err != nil {
level.Error(logger).Log("msg", "failed to decode current entry returned by file scanner", "err", err)
return err
}
// Find what block this sample belongs to.
var blockIndex int
if len(metas) == 1 || sample.Timestamp < dbMint {
blockIndex = 0
} else if sample.Timestamp >= dbMaxt {
blockIndex = len(metas) - 1
} else {
blockIndex = getBlockIndex(sample.Timestamp, metas)
if blockIndex == -1 {
return NoBlockMetaFoundError
}
}
meta := metas[blockIndex]
meta.mint = value.MinInt64(meta.mint, sample.Timestamp)
meta.maxt = value.MaxInt64(meta.maxt, sample.Timestamp)
meta.count++
blocks[blockIndex] = append(blocks[blockIndex], &sample)
currentPassCount++
// Have enough samples to write to disk.
if currentPassCount == maxSamplesInMemory {
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
return err
}
// Reset current pass count.
currentPassCount = 0
}
}
if err := scanner.Err(); err != nil {
return err
}
// Flush any remaining samples.
if err := flushBlocks(dir, blocks, metas, maxBlockChildren, logger); err != nil {
return err
}
return nil
}
// flushBlocks writes the given blocks of samples to disk, and updates block metadata information.
func flushBlocks(
dir string,
toFlush [][]*tsdb.MetricSample,
metas []*newBlockMeta,
maxBlockChildren int,
logger log.Logger,
) error {
for blockIdx, block := range toFlush {
// If current block is empty, nothing to write.
if len(block) == 0 {
continue
}
meta := metas[blockIdx]
// Put each sample into the appropriate block.
bins, binTimes := binSamples(block, meta, tsdb.DefaultBlockDuration)
for binIdx, bin := range bins {
if len(bin) == 0 {
continue
}
start, end := binTimes[binIdx].start, binTimes[binIdx].end
path, err := tsdb.CreateBlock(bin, dir, start, end, logger)
if err != nil {
return err
}
child := &newBlockMeta{
index: meta.index,
count: 0,
mint: start,
maxt: end,
isAligned: meta.isAligned,
dir: path,
}
meta.children = append(meta.children, child)
}
if maxBlockChildren > 0 && len(meta.children) >= maxBlockChildren {
if err := compactMeta(dir, meta, logger); err != nil {
return err
}
}
}
return nil
}
// sampleStreamer returns a function that can be used to parse an OpenMetrics compliant
// byte array, and return each token in the user-provided byte buffer.
func sampleStreamer(buf *bytes.Buffer) func([]byte, bool) (int, []byte, error) {
currentEntryInvalid := false
return func(data []byte, atEOF bool) (int, []byte, error) {
var err error
advance := 0
lineIndex := 0
lines := strings.Split(string(data), "\n")
parser := textparse.New(data, contentType)
for {
var et textparse.Entry
if et, err = parser.Next(); err != nil {
if !atEOF && et == textparse.EntryInvalid && !currentEntryInvalid {
currentEntryInvalid = true
// Fetch more data for the scanner to see if the entry is really invalid.
return 0, nil, nil
}
if err == io.EOF {
err = nil
}
return 0, nil, err
}
// Reset invalid entry flag as it meant that we just had to get more data
// into the scanner.
if currentEntryInvalid {
currentEntryInvalid = false
}
// Add 1 to account for newline character.
lineLength := len(lines[lineIndex]) + 1
lineIndex++
if et != textparse.EntrySeries {
advance = advance + lineLength
continue
}
_, ctime, cvalue := parser.Series()
if ctime == nil {
return 0, nil, NoTimestampError
}
// OpenMetrics parser multiples times by 1000 - undoing that.
ctimeCorrected := *ctime / 1000
var clabels labels.Labels
_ = parser.Metric(&clabels)
sample := tsdb.MetricSample{
Timestamp: ctimeCorrected,
Value: cvalue,
Labels: labels.FromMap(clabels.Map()),
}
buf.Reset()
enc := gob.NewEncoder(buf)
if err = enc.Encode(sample); err != nil {
return 0, nil, err
}
advance += lineLength
return advance, buf.Bytes(), nil
}
}
}
// makeRange returns a series of block times between start and stop,
// with step divisions.
func makeRange(start, stop, step int64) []blockTimestampPair {
if step <= 0 || stop < start {
return []blockTimestampPair{}
}
r := make([]blockTimestampPair, 0)
// In case we only have samples with the same timestamp.
if start == stop {
r = append(r, blockTimestampPair{
start: start,
end: stop + 1,
})
return r
}
for s := start; s < stop; s += step {
pair := blockTimestampPair{
start: s,
}
if (s + step) >= stop {
pair.end = stop + 1
} else {
pair.end = s + step
}
r = append(r, pair)
}
return r
}
// mergeBlocks looks for blocks that have overlapping time intervals, and compacts them.
func mergeBlocks(dest string, metas []*newBlockMeta, logger log.Logger) error {
for _, m := range metas {
// If no children, there's nothing to merge.
if len(m.children) == 0 {
continue
}
if err := compactMeta(dest, m, logger); err != nil {
return err
}
}
return nil
}
// binBlocks groups blocks that have fully intersecting intervals of the given durations.
// It iterates through the given durations, creating progressive blocks, and once it reaches
// the last given duration, it divvies all the remaining samples into blocks of that duration.
// Input blocks are assumed sorted by time.
func binBlocks(blocks []*newBlockMeta, durations []int64) [][]*newBlockMeta {
bins := make([][]*newBlockMeta, 0)
if len(blocks) == 0 || len(durations) == 0 {
return bins
}
bin := make([]*newBlockMeta, 0)
start := blocks[0].mint
blockIdx := 0
Outer:
for dIdx, duration := range durations {
for bIdx, block := range blocks[blockIdx:] {
blockIdx = bIdx
if block.maxt-block.mint >= duration {
if block.mint <= start {
bin = append(bin, block)
bins = append(bins, bin)
} else {
bins = append(bins, bin)
bins = append(bins, []*newBlockMeta{block})
}
bin = []*newBlockMeta{}
start = block.maxt
if dIdx == len(durations)-1 {
continue
} else {
blockIdx++
continue Outer
}
}
if block.mint-start < duration && block.maxt-start < duration {
// If current block is within the duration window, place it in the current bin.
bin = append(bin, block)
blockIdx++
} else {
// Else create a new block, starting with this block.
bins = append(bins, bin)
bin = []*newBlockMeta{block}
start = block.mint
if dIdx < len(durations)-1 {
blockIdx++
continue Outer
}
}
}
}
bins = append(bins, bin)
return bins
}
// binSamples divvies the samples into bins corresponding to the given block.
// If an aligned block is given to it, then a single bin is created,
// else, it divides the samples into blocks of duration.
// It returns the binned samples, and the time limits for each bin.
func binSamples(samples []*tsdb.MetricSample, meta *newBlockMeta, duration int64) ([][]*tsdb.MetricSample, []blockTimestampPair) {
findBlock := func(ts int64, blocks []blockTimestampPair) int {
for idx, block := range blocks {
if block.start <= ts && ts < block.end {
return idx
}
}
return -1
}
bins := make([][]*tsdb.MetricSample, 0)
times := make([]blockTimestampPair, 0)
if len(samples) == 0 {
return bins, times
}
if meta.isAligned {
bins = append(bins, samples)
times = []blockTimestampPair{{start: meta.mint, end: meta.maxt}}
} else {
timeTranches := makeRange(meta.mint, meta.maxt, duration)
bins = make([][]*tsdb.MetricSample, len(timeTranches))
binIdx := -1
for _, sample := range samples {
ts := sample.Timestamp
if binIdx == -1 {
binIdx = findBlock(ts, timeTranches)
}
block := timeTranches[binIdx]
if block.start <= ts && ts < block.end {
bins[binIdx] = append(bins[binIdx], sample)
} else {
binIdx = findBlock(ts, timeTranches)
bins[binIdx] = append(bins[binIdx], sample)
}
}
times = timeTranches
}
return bins, times
}
// constructBlockMetadata gives us the new blocks' metadatas.
func constructBlockMetadata(path string) ([]*newBlockMeta, error) {
dbMint, dbMaxt := int64(math.MinInt64), int64(math.MaxInt64)
// If we try to open a regular RW handle on an active TSDB instance,
// it will fail. Hence, we open a RO handle.
db, err := tsdb.OpenDBReadOnly(path, nil)
if err != nil {
return nil, err
}
defer db.Close()
blocks, err := db.Blocks()
if err != nil {
return nil, err
}
metas := []*newBlockMeta{
&newBlockMeta{
index: 0,
count: 0,
mint: math.MaxInt64,
maxt: dbMint,
isAligned: false,
},
}
for idx, block := range blocks {
start, end := block.Meta().MinTime, block.Meta().MaxTime
metas = append(metas, &newBlockMeta{
index: idx + 1,
count: 0,
mint: start,
maxt: end,
isAligned: true,
})
if idx == 0 {
dbMint, dbMaxt = start, end
} else {
dbMint = value.MinInt64(dbMint, start)
dbMaxt = value.MaxInt64(dbMaxt, end)
}
}
// If there are existing blocks in the import location,
// we need to add a block for samples that fall over DB maxt.
if len(metas) > 1 {
// Update first block with the right dbMint as it's maxt.
metas[0].maxt = dbMint
metas = append(metas, &newBlockMeta{
index: len(metas),
count: 0,
mint: dbMaxt,
maxt: math.MinInt64,
isAligned: false,
})
}
return metas, nil
}
// getBlockIndex returns the index of the block the current timestamp corresponds to.
// User-provided times are assumed sorted in ascending order.
func getBlockIndex(current int64, metas []*newBlockMeta) int {
for idx, m := range metas {
if current < m.maxt {
return idx
}
}
return -1
}
// compactBlocks compacts all the given blocks and places them in the destination directory.
func compactBlocks(dest string, blocks []*newBlockMeta, logger log.Logger) (string, error) {
dirs := make([]string, 0)
for _, meta := range blocks {
dirs = append(dirs, meta.dir)
}
path, err := compactDirs(dest, dirs, logger)
if err != nil {
return "", err
}
for _, meta := range blocks {
// Remove existing blocks directory as it is no longer needed.
if err = os.RemoveAll(meta.dir); err != nil {
return "", err
}
// Update child with new blocks path.
meta.dir = path
}
return path, err
}
// compactMeta compacts the children of a given block, dividing them into the Prometheus recommended
// block sizes, if it has to.
func compactMeta(dest string, m *newBlockMeta, logger log.Logger) error {
children := make([]*newBlockMeta, 0)
if m.isAligned {
path, err := compactBlocks(dest, m.children, logger)
if err != nil {
return err
}
m.dir = path
} else {
// Sort blocks to ensure we bin properly.
sortBlocks(m.children)
binned := binBlocks(m.children, []int64{tsdb.DefaultBlockDuration})
for _, bin := range binned {
if len(bin) <= 1 {
children = append(children, bin...)
continue
}
path, err := compactBlocks(dest, bin, logger)
if err != nil {
return err
}
children = append(children, &newBlockMeta{
index: m.index,
count: 0,
mint: bin[0].mint,
maxt: bin[len(bin)-1].maxt,
isAligned: m.isAligned,
dir: path,
})
}
}
m.children = children
return nil
}
// compactDirs compacts the block dirs and places them in destination directory.
func compactDirs(dest string, dirs []string, logger log.Logger) (string, error) {
path := ""
compactor, err := tsdb.NewLeveledCompactor(context.Background(), nil, logger, []int64{tsdb.DefaultBlockDuration}, nil)
if err != nil {
return path, err
}
ulid, err := compactor.Compact(dest, dirs, nil)
if err != nil {
return path, err
}
path = filepath.Join(dest, ulid.String())
return path, nil
}
// sortBlocks sorts block metadata according to the time limits.
func sortBlocks(blocks []*newBlockMeta) {
sort.Slice(blocks, func(x, y int) bool {
bx, by := blocks[x], blocks[y]
if bx.mint != by.mint {
return bx.mint < by.mint
}
return bx.maxt < by.maxt
})
}