mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
The WALFlushInterval is not used anywhere in the code base. The WAL is not an interface anymore to save some lookup time so can't use NopWAL in the tests. Instead can just pass nil as the code checks for that and it is essentially a noop. Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>
538 lines
14 KiB
Go
538 lines
14 KiB
Go
// Copyright 2017 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"runtime/pprof"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/tsdb"
|
|
"github.com/prometheus/tsdb/chunks"
|
|
"github.com/prometheus/tsdb/labels"
|
|
"gopkg.in/alecthomas/kingpin.v2"
|
|
)
|
|
|
|
func main() {
|
|
var (
|
|
cli = kingpin.New(filepath.Base(os.Args[0]), "CLI tool for tsdb")
|
|
benchCmd = cli.Command("bench", "run benchmarks")
|
|
benchWriteCmd = benchCmd.Command("write", "run a write performance benchmark")
|
|
benchWriteOutPath = benchWriteCmd.Flag("out", "set the output path").Default("benchout").String()
|
|
benchWriteNumMetrics = benchWriteCmd.Flag("metrics", "number of metrics to read").Default("10000").Int()
|
|
benchSamplesFile = benchWriteCmd.Arg("file", "input file with samples data, default is ("+filepath.Join("..", "testdata", "20kseries.json")+")").Default(filepath.Join("..", "testdata", "20kseries.json")).String()
|
|
listCmd = cli.Command("ls", "list db blocks")
|
|
listCmdHumanReadable = listCmd.Flag("human-readable", "print human readable values").Short('h').Bool()
|
|
listPath = listCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
|
|
analyzeCmd = cli.Command("analyze", "analyze churn, label pair cardinality.")
|
|
analyzePath = analyzeCmd.Arg("db path", "database path (default is "+filepath.Join("benchout", "storage")+")").Default(filepath.Join("benchout", "storage")).String()
|
|
analyzeBlockId = analyzeCmd.Arg("block id", "block to analyze (default is the last block)").String()
|
|
analyzeLimit = analyzeCmd.Flag("limit", "how many items to show in each list").Default("20").Int()
|
|
)
|
|
|
|
switch kingpin.MustParse(cli.Parse(os.Args[1:])) {
|
|
case benchWriteCmd.FullCommand():
|
|
wb := &writeBenchmark{
|
|
outPath: *benchWriteOutPath,
|
|
numMetrics: *benchWriteNumMetrics,
|
|
samplesFile: *benchSamplesFile,
|
|
}
|
|
wb.run()
|
|
case listCmd.FullCommand():
|
|
db, err := tsdb.Open(*listPath, nil, nil, nil)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
printBlocks(db.Blocks(), listCmdHumanReadable)
|
|
case analyzeCmd.FullCommand():
|
|
db, err := tsdb.Open(*analyzePath, nil, nil, nil)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
blocks := db.Blocks()
|
|
var block *tsdb.Block
|
|
if *analyzeBlockId != "" {
|
|
for _, b := range blocks {
|
|
if b.Meta().ULID.String() == *analyzeBlockId {
|
|
block = b
|
|
break
|
|
}
|
|
}
|
|
} else if len(blocks) > 0 {
|
|
block = blocks[len(blocks)-1]
|
|
}
|
|
if block == nil {
|
|
exitWithError(fmt.Errorf("Block not found"))
|
|
}
|
|
analyzeBlock(block, *analyzeLimit)
|
|
}
|
|
flag.CommandLine.Set("log.level", "debug")
|
|
}
|
|
|
|
type writeBenchmark struct {
|
|
outPath string
|
|
samplesFile string
|
|
cleanup bool
|
|
numMetrics int
|
|
|
|
storage *tsdb.DB
|
|
|
|
cpuprof *os.File
|
|
memprof *os.File
|
|
blockprof *os.File
|
|
mtxprof *os.File
|
|
}
|
|
|
|
func (b *writeBenchmark) run() {
|
|
if b.outPath == "" {
|
|
dir, err := ioutil.TempDir("", "tsdb_bench")
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
b.outPath = dir
|
|
b.cleanup = true
|
|
}
|
|
if err := os.RemoveAll(b.outPath); err != nil {
|
|
exitWithError(err)
|
|
}
|
|
if err := os.MkdirAll(b.outPath, 0777); err != nil {
|
|
exitWithError(err)
|
|
}
|
|
|
|
dir := filepath.Join(b.outPath, "storage")
|
|
|
|
l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
|
|
l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller)
|
|
|
|
st, err := tsdb.Open(dir, l, nil, &tsdb.Options{
|
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
|
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),
|
|
})
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
b.storage = st
|
|
|
|
var metrics []labels.Labels
|
|
|
|
measureTime("readData", func() {
|
|
f, err := os.Open(b.samplesFile)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
defer f.Close()
|
|
|
|
metrics, err = readPrometheusLabels(f, b.numMetrics)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
})
|
|
|
|
var total uint64
|
|
|
|
dur := measureTime("ingestScrapes", func() {
|
|
b.startProfiling()
|
|
total, err = b.ingestScrapes(metrics, 3000)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
})
|
|
|
|
fmt.Println(" > total samples:", total)
|
|
fmt.Println(" > samples/sec:", float64(total)/dur.Seconds())
|
|
|
|
measureTime("stopStorage", func() {
|
|
if err := b.storage.Close(); err != nil {
|
|
exitWithError(err)
|
|
}
|
|
if err := b.stopProfiling(); err != nil {
|
|
exitWithError(err)
|
|
}
|
|
})
|
|
}
|
|
|
|
const timeDelta = 30000
|
|
|
|
func (b *writeBenchmark) ingestScrapes(lbls []labels.Labels, scrapeCount int) (uint64, error) {
|
|
var mu sync.Mutex
|
|
var total uint64
|
|
|
|
for i := 0; i < scrapeCount; i += 100 {
|
|
var wg sync.WaitGroup
|
|
lbls := lbls
|
|
for len(lbls) > 0 {
|
|
l := 1000
|
|
if len(lbls) < 1000 {
|
|
l = len(lbls)
|
|
}
|
|
batch := lbls[:l]
|
|
lbls = lbls[l:]
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
n, err := b.ingestScrapesShard(batch, 100, int64(timeDelta*i))
|
|
if err != nil {
|
|
// exitWithError(err)
|
|
fmt.Println(" err", err)
|
|
}
|
|
mu.Lock()
|
|
total += n
|
|
mu.Unlock()
|
|
wg.Done()
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
fmt.Println("ingestion completed")
|
|
|
|
return total, nil
|
|
}
|
|
|
|
func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
|
|
ts := baset
|
|
|
|
type sample struct {
|
|
labels labels.Labels
|
|
value int64
|
|
ref *uint64
|
|
}
|
|
|
|
scrape := make([]*sample, 0, len(metrics))
|
|
|
|
for _, m := range metrics {
|
|
scrape = append(scrape, &sample{
|
|
labels: m,
|
|
value: 123456789,
|
|
})
|
|
}
|
|
total := uint64(0)
|
|
|
|
for i := 0; i < scrapeCount; i++ {
|
|
app := b.storage.Appender()
|
|
ts += timeDelta
|
|
|
|
for _, s := range scrape {
|
|
s.value += 1000
|
|
|
|
if s.ref == nil {
|
|
ref, err := app.Add(s.labels, ts, float64(s.value))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
s.ref = &ref
|
|
} else if err := app.AddFast(*s.ref, ts, float64(s.value)); err != nil {
|
|
|
|
if errors.Cause(err) != tsdb.ErrNotFound {
|
|
panic(err)
|
|
}
|
|
|
|
ref, err := app.Add(s.labels, ts, float64(s.value))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
s.ref = &ref
|
|
}
|
|
|
|
total++
|
|
}
|
|
if err := app.Commit(); err != nil {
|
|
return total, err
|
|
}
|
|
}
|
|
return total, nil
|
|
}
|
|
|
|
func (b *writeBenchmark) startProfiling() {
|
|
var err error
|
|
|
|
// Start CPU profiling.
|
|
b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
|
|
if err != nil {
|
|
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err))
|
|
}
|
|
if err := pprof.StartCPUProfile(b.cpuprof); err != nil {
|
|
exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err))
|
|
}
|
|
|
|
// Start memory profiling.
|
|
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
|
|
if err != nil {
|
|
exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err))
|
|
}
|
|
runtime.MemProfileRate = 64 * 1024
|
|
|
|
// Start fatal profiling.
|
|
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
|
|
if err != nil {
|
|
exitWithError(fmt.Errorf("bench: could not create block profile: %v", err))
|
|
}
|
|
runtime.SetBlockProfileRate(20)
|
|
|
|
b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof"))
|
|
if err != nil {
|
|
exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err))
|
|
}
|
|
runtime.SetMutexProfileFraction(20)
|
|
}
|
|
|
|
func (b *writeBenchmark) stopProfiling() error {
|
|
if b.cpuprof != nil {
|
|
pprof.StopCPUProfile()
|
|
b.cpuprof.Close()
|
|
b.cpuprof = nil
|
|
}
|
|
if b.memprof != nil {
|
|
if err := pprof.Lookup("heap").WriteTo(b.memprof, 0); err != nil {
|
|
return fmt.Errorf("error writing mem profile: %v", err)
|
|
}
|
|
b.memprof.Close()
|
|
b.memprof = nil
|
|
}
|
|
if b.blockprof != nil {
|
|
if err := pprof.Lookup("block").WriteTo(b.blockprof, 0); err != nil {
|
|
return fmt.Errorf("error writing block profile: %v", err)
|
|
}
|
|
b.blockprof.Close()
|
|
b.blockprof = nil
|
|
runtime.SetBlockProfileRate(0)
|
|
}
|
|
if b.mtxprof != nil {
|
|
if err := pprof.Lookup("mutex").WriteTo(b.mtxprof, 0); err != nil {
|
|
return fmt.Errorf("error writing mutex profile: %v", err)
|
|
}
|
|
b.mtxprof.Close()
|
|
b.mtxprof = nil
|
|
runtime.SetMutexProfileFraction(0)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func measureTime(stage string, f func()) time.Duration {
|
|
fmt.Printf(">> start stage=%s\n", stage)
|
|
start := time.Now()
|
|
f()
|
|
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
|
|
return time.Since(start)
|
|
}
|
|
|
|
func mapToLabels(m map[string]interface{}, l *labels.Labels) {
|
|
for k, v := range m {
|
|
*l = append(*l, labels.Label{Name: k, Value: v.(string)})
|
|
}
|
|
}
|
|
|
|
func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) {
|
|
scanner := bufio.NewScanner(r)
|
|
|
|
var mets []labels.Labels
|
|
hashes := map[uint64]struct{}{}
|
|
i := 0
|
|
|
|
for scanner.Scan() && i < n {
|
|
m := make(labels.Labels, 0, 10)
|
|
|
|
r := strings.NewReplacer("\"", "", "{", "", "}", "")
|
|
s := r.Replace(scanner.Text())
|
|
|
|
labelChunks := strings.Split(s, ",")
|
|
for _, labelChunk := range labelChunks {
|
|
split := strings.Split(labelChunk, ":")
|
|
m = append(m, labels.Label{Name: split[0], Value: split[1]})
|
|
}
|
|
// Order of the k/v labels matters, don't assume we'll always receive them already sorted.
|
|
sort.Sort(m)
|
|
h := m.Hash()
|
|
if _, ok := hashes[h]; ok {
|
|
continue
|
|
}
|
|
mets = append(mets, m)
|
|
hashes[h] = struct{}{}
|
|
i++
|
|
}
|
|
return mets, nil
|
|
}
|
|
|
|
func exitWithError(err error) {
|
|
fmt.Fprintln(os.Stderr, err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
func printBlocks(blocks []*tsdb.Block, humanReadable *bool) {
|
|
tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
defer tw.Flush()
|
|
|
|
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
|
|
for _, b := range blocks {
|
|
meta := b.Meta()
|
|
|
|
fmt.Fprintf(tw,
|
|
"%v\t%v\t%v\t%v\t%v\t%v\n",
|
|
meta.ULID,
|
|
getFormatedTime(meta.MinTime, humanReadable),
|
|
getFormatedTime(meta.MaxTime, humanReadable),
|
|
meta.Stats.NumSamples,
|
|
meta.Stats.NumChunks,
|
|
meta.Stats.NumSeries,
|
|
)
|
|
}
|
|
}
|
|
|
|
func getFormatedTime(timestamp int64, humanReadable *bool) string {
|
|
if *humanReadable {
|
|
return time.Unix(timestamp/1000, 0).String()
|
|
}
|
|
return strconv.FormatInt(timestamp, 10)
|
|
}
|
|
|
|
func analyzeBlock(b *tsdb.Block, limit int) {
|
|
fmt.Printf("Block path: %s\n", b.Dir())
|
|
meta := b.Meta()
|
|
// Presume 1ms resolution that Prometheus uses.
|
|
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
|
|
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
|
|
ir, err := b.Index()
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
defer ir.Close()
|
|
|
|
allLabelNames, err := ir.LabelNames()
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
fmt.Printf("Label names: %d\n", len(allLabelNames))
|
|
|
|
type postingInfo struct {
|
|
key string
|
|
metric uint64
|
|
}
|
|
postingInfos := []postingInfo{}
|
|
|
|
printInfo := func(postingInfos []postingInfo) {
|
|
sort.Slice(postingInfos, func(i, j int) bool { return postingInfos[i].metric > postingInfos[j].metric })
|
|
|
|
for i, pc := range postingInfos {
|
|
fmt.Printf("%d %s\n", pc.metric, pc.key)
|
|
if i >= limit {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
labelsUncovered := map[string]uint64{}
|
|
labelpairsUncovered := map[string]uint64{}
|
|
labelpairsCount := map[string]uint64{}
|
|
entries := 0
|
|
p, err := ir.Postings("", "") // The special all key.
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
lbls := labels.Labels{}
|
|
chks := []chunks.Meta{}
|
|
for p.Next() {
|
|
err = ir.Series(p.At(), &lbls, &chks)
|
|
// Amount of the block time range not covered by this series.
|
|
uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime)
|
|
for _, lbl := range lbls {
|
|
key := lbl.Name + "=" + lbl.Value
|
|
labelsUncovered[lbl.Name] += uncovered
|
|
labelpairsUncovered[key] += uncovered
|
|
labelpairsCount[key] += 1
|
|
entries += 1
|
|
}
|
|
}
|
|
if p.Err() != nil {
|
|
exitWithError(p.Err())
|
|
}
|
|
fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered))
|
|
fmt.Printf("Postings entries (total label pairs): %d\n", entries)
|
|
|
|
postingInfos = postingInfos[:0]
|
|
for k, m := range labelpairsUncovered {
|
|
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
|
|
}
|
|
|
|
fmt.Printf("\nLabel pairs most involved in churning:\n")
|
|
printInfo(postingInfos)
|
|
|
|
postingInfos = postingInfos[:0]
|
|
for k, m := range labelsUncovered {
|
|
postingInfos = append(postingInfos, postingInfo{k, uint64(float64(m) / float64(meta.MaxTime-meta.MinTime))})
|
|
}
|
|
|
|
fmt.Printf("\nLabel names most involved in churning:\n")
|
|
printInfo(postingInfos)
|
|
|
|
postingInfos = postingInfos[:0]
|
|
for k, m := range labelpairsCount {
|
|
postingInfos = append(postingInfos, postingInfo{k, m})
|
|
}
|
|
|
|
fmt.Printf("\nMost common label pairs:\n")
|
|
printInfo(postingInfos)
|
|
|
|
postingInfos = postingInfos[:0]
|
|
for _, n := range allLabelNames {
|
|
lv, err := ir.LabelValues(n)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())})
|
|
}
|
|
fmt.Printf("\nHighest cardinality labels:\n")
|
|
printInfo(postingInfos)
|
|
|
|
postingInfos = postingInfos[:0]
|
|
lv, err := ir.LabelValues("__name__")
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
for i := 0; i < lv.Len(); i++ {
|
|
names, err := lv.At(i)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
for _, n := range names {
|
|
postings, err := ir.Postings("__name__", n)
|
|
if err != nil {
|
|
exitWithError(err)
|
|
}
|
|
count := 0
|
|
for postings.Next() {
|
|
count++
|
|
}
|
|
if postings.Err() != nil {
|
|
exitWithError(postings.Err())
|
|
}
|
|
postingInfos = append(postingInfos, postingInfo{n, uint64(count)})
|
|
}
|
|
}
|
|
fmt.Printf("\nHighest cardinality metric names:\n")
|
|
printInfo(postingInfos)
|
|
}
|