2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2016-12-07 08:30:10 -08:00
package main
import (
2016-12-09 04:41:38 -08:00
"flag"
2016-12-07 08:30:10 -08:00
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"sync"
"time"
2017-01-16 12:29:53 -08:00
"unsafe"
2016-12-07 08:30:10 -08:00
2017-09-19 01:20:19 -07:00
"github.com/go-kit/kit/log"
2017-05-18 07:09:30 -07:00
"github.com/pkg/errors"
2017-01-16 12:29:53 -08:00
promlabels "github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
2017-04-04 02:27:26 -07:00
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/labels"
2017-09-09 04:11:12 -07:00
"gopkg.in/alecthomas/kingpin.v2"
2016-12-07 08:30:10 -08:00
)
func main ( ) {
2017-09-09 04:11:12 -07:00
var (
cli = kingpin . New ( filepath . Base ( os . Args [ 0 ] ) , "CLI tool for tsdb" )
benchCmd = cli . Command ( "bench" , "run benchmarks" )
benchWriteCmd = benchCmd . Command ( "write" , "run a write performance benchmark" )
benchWriteOutPath = benchWriteCmd . Flag ( "out" , "set the output path" ) . Default ( "benchout/" ) . String ( )
benchWriteNumMetrics = benchWriteCmd . Flag ( "metrics" , "number of metrics to read" ) . Default ( "10000" ) . Int ( )
benchSamplesFile = benchWriteCmd . Arg ( "file" , "input file with samples data, default is (../../testdata/20k.series)" ) . Default ( "../../testdata/20k.series" ) . String ( )
2016-12-07 08:30:10 -08:00
)
2017-09-09 04:11:12 -07:00
switch kingpin . MustParse ( cli . Parse ( os . Args [ 1 : ] ) ) {
case benchWriteCmd . FullCommand ( ) :
wb := & writeBenchmark {
outPath : * benchWriteOutPath ,
numMetrics : * benchWriteNumMetrics ,
samplesFile : * benchSamplesFile ,
}
wb . run ( )
2016-12-07 08:30:10 -08:00
}
2017-09-09 04:11:12 -07:00
flag . CommandLine . Set ( "log.level" , "debug" )
2016-12-07 08:30:10 -08:00
}
type writeBenchmark struct {
2017-09-09 04:11:12 -07:00
outPath string
samplesFile string
cleanup bool
numMetrics int
2016-12-07 08:30:10 -08:00
2017-02-19 07:04:37 -08:00
storage * tsdb . DB
2016-12-07 08:30:10 -08:00
cpuprof * os . File
memprof * os . File
blockprof * os . File
2017-05-14 02:51:56 -07:00
mtxprof * os . File
2016-12-07 08:30:10 -08:00
}
2017-09-09 04:11:12 -07:00
func ( b * writeBenchmark ) run ( ) {
2016-12-07 08:30:10 -08:00
if b . outPath == "" {
dir , err := ioutil . TempDir ( "" , "tsdb_bench" )
if err != nil {
exitWithError ( err )
}
b . outPath = dir
b . cleanup = true
}
if err := os . RemoveAll ( b . outPath ) ; err != nil {
exitWithError ( err )
}
if err := os . MkdirAll ( b . outPath , 0777 ) ; err != nil {
exitWithError ( err )
}
dir := filepath . Join ( b . outPath , "storage" )
2017-09-19 01:20:19 -07:00
l := log . NewLogfmtLogger ( log . NewSyncWriter ( os . Stderr ) )
l = log . With ( l , "ts" , log . DefaultTimestampUTC , "caller" , log . DefaultCaller )
st , err := tsdb . Open ( dir , l , nil , & tsdb . Options {
2017-02-13 23:53:19 -08:00
WALFlushInterval : 200 * time . Millisecond ,
2017-07-13 07:15:13 -07:00
RetentionDuration : 15 * 24 * 60 * 60 * 1000 , // 15 days in milliseconds
2017-08-03 09:33:13 -07:00
BlockRanges : tsdb . ExponentialBlockRanges ( 2 * 60 * 60 * 1000 , 5 , 3 ) ,
2017-02-09 17:54:26 -08:00
} )
2016-12-20 15:02:37 -08:00
if err != nil {
exitWithError ( err )
2016-12-07 08:30:10 -08:00
}
2016-12-20 15:02:37 -08:00
b . storage = st
2017-01-16 12:29:53 -08:00
var metrics [ ] labels . Labels
2016-12-07 08:30:10 -08:00
measureTime ( "readData" , func ( ) {
2017-09-09 04:11:12 -07:00
f , err := os . Open ( b . samplesFile )
2016-12-07 08:30:10 -08:00
if err != nil {
exitWithError ( err )
}
defer f . Close ( )
metrics , err = readPrometheusLabels ( f , b . numMetrics )
if err != nil {
exitWithError ( err )
}
} )
2017-02-01 06:29:48 -08:00
var total uint64
dur := measureTime ( "ingestScrapes" , func ( ) {
2016-12-07 08:30:10 -08:00
b . startProfiling ( )
2017-09-18 02:20:25 -07:00
total , err = b . ingestScrapes ( metrics , 2000 )
2017-02-01 06:29:48 -08:00
if err != nil {
2016-12-07 08:30:10 -08:00
exitWithError ( err )
}
} )
2017-02-01 06:29:48 -08:00
fmt . Println ( " > total samples:" , total )
fmt . Println ( " > samples/sec:" , float64 ( total ) / dur . Seconds ( ) )
2016-12-07 08:30:10 -08:00
measureTime ( "stopStorage" , func ( ) {
2016-12-20 15:02:37 -08:00
if err := b . storage . Close ( ) ; err != nil {
2016-12-07 08:30:10 -08:00
exitWithError ( err )
}
b . stopProfiling ( )
} )
}
2017-06-07 04:42:53 -07:00
const timeDelta = 30000
2017-02-01 06:29:48 -08:00
func ( b * writeBenchmark ) ingestScrapes ( lbls [ ] labels . Labels , scrapeCount int ) ( uint64 , error ) {
2017-01-10 02:17:37 -08:00
var mu sync . Mutex
var total uint64
2016-12-07 08:30:10 -08:00
2017-02-02 02:09:19 -08:00
for i := 0 ; i < scrapeCount ; i += 100 {
var wg sync . WaitGroup
2017-01-11 03:54:18 -08:00
lbls := lbls
for len ( lbls ) > 0 {
l := 1000
if len ( lbls ) < 1000 {
l = len ( lbls )
2016-12-07 08:30:10 -08:00
}
2017-01-11 03:54:18 -08:00
batch := lbls [ : l ]
lbls = lbls [ l : ]
wg . Add ( 1 )
go func ( ) {
2017-06-07 04:42:53 -07:00
n , err := b . ingestScrapesShard ( batch , 100 , int64 ( timeDelta * i ) )
2017-01-11 03:54:18 -08:00
if err != nil {
// exitWithError(err)
fmt . Println ( " err" , err )
}
mu . Lock ( )
total += n
mu . Unlock ( )
wg . Done ( )
} ( )
}
wg . Wait ( )
2016-12-07 08:30:10 -08:00
}
2017-07-13 07:15:13 -07:00
fmt . Println ( "ingestion completed" )
2017-01-10 02:17:37 -08:00
2017-02-01 06:29:48 -08:00
return total , nil
2016-12-07 08:30:10 -08:00
}
2017-01-11 03:54:18 -08:00
func ( b * writeBenchmark ) ingestScrapesShard ( metrics [ ] labels . Labels , scrapeCount int , baset int64 ) ( uint64 , error ) {
ts := baset
2016-12-07 08:30:10 -08:00
2016-12-08 01:04:24 -08:00
type sample struct {
2016-12-21 00:39:01 -08:00
labels labels . Labels
2016-12-08 01:04:24 -08:00
value int64
2017-09-05 02:45:18 -07:00
ref * uint64
2016-12-08 01:04:24 -08:00
}
2017-01-11 03:54:18 -08:00
scrape := make ( [ ] * sample , 0 , len ( metrics ) )
2016-12-08 01:04:24 -08:00
for _ , m := range metrics {
2017-01-11 03:54:18 -08:00
scrape = append ( scrape , & sample {
labels : m ,
2016-12-08 01:04:24 -08:00
value : 123456789 ,
2017-01-11 03:54:18 -08:00
} )
2016-12-08 01:04:24 -08:00
}
2017-01-10 02:17:37 -08:00
total := uint64 ( 0 )
2016-12-08 01:04:24 -08:00
2016-12-07 08:30:10 -08:00
for i := 0 ; i < scrapeCount ; i ++ {
2017-01-12 10:18:51 -08:00
app := b . storage . Appender ( )
2017-06-07 04:42:53 -07:00
ts += timeDelta
2016-12-07 08:30:10 -08:00
2016-12-08 01:04:24 -08:00
for _ , s := range scrape {
2016-12-09 01:00:14 -08:00
s . value += 1000
2017-01-12 10:18:51 -08:00
if s . ref == nil {
2017-02-01 06:29:48 -08:00
ref , err := app . Add ( s . labels , ts , float64 ( s . value ) )
2017-01-12 10:18:51 -08:00
if err != nil {
panic ( err )
}
s . ref = & ref
2017-02-01 06:29:48 -08:00
} else if err := app . AddFast ( * s . ref , ts , float64 ( s . value ) ) ; err != nil {
2017-02-02 02:09:19 -08:00
2017-05-18 07:09:30 -07:00
if errors . Cause ( err ) != tsdb . ErrNotFound {
2017-01-12 10:18:51 -08:00
panic ( err )
}
2017-02-01 06:29:48 -08:00
ref , err := app . Add ( s . labels , ts , float64 ( s . value ) )
2017-01-12 10:18:51 -08:00
if err != nil {
panic ( err )
}
s . ref = & ref
}
2017-01-10 02:17:37 -08:00
total ++
2016-12-07 08:30:10 -08:00
}
2016-12-20 15:02:37 -08:00
if err := app . Commit ( ) ; err != nil {
2017-01-10 02:17:37 -08:00
return total , err
2016-12-07 08:30:10 -08:00
}
}
2017-01-10 02:17:37 -08:00
return total , nil
2016-12-07 08:30:10 -08:00
}
func ( b * writeBenchmark ) startProfiling ( ) {
var err error
// Start CPU profiling.
b . cpuprof , err = os . Create ( filepath . Join ( b . outPath , "cpu.prof" ) )
if err != nil {
2017-03-19 09:05:01 -07:00
exitWithError ( fmt . Errorf ( "bench: could not create cpu profile: %v" , err ) )
2016-12-07 08:30:10 -08:00
}
pprof . StartCPUProfile ( b . cpuprof )
// Start memory profiling.
b . memprof , err = os . Create ( filepath . Join ( b . outPath , "mem.prof" ) )
if err != nil {
2017-03-19 09:05:01 -07:00
exitWithError ( fmt . Errorf ( "bench: could not create memory profile: %v" , err ) )
2016-12-07 08:30:10 -08:00
}
2017-05-14 02:51:56 -07:00
runtime . MemProfileRate = 64 * 1024
2016-12-07 08:30:10 -08:00
// Start fatal profiling.
b . blockprof , err = os . Create ( filepath . Join ( b . outPath , "block.prof" ) )
if err != nil {
2017-03-19 09:05:01 -07:00
exitWithError ( fmt . Errorf ( "bench: could not create block profile: %v" , err ) )
2016-12-07 08:30:10 -08:00
}
2017-05-14 02:51:56 -07:00
runtime . SetBlockProfileRate ( 20 )
b . mtxprof , err = os . Create ( filepath . Join ( b . outPath , "mutex.prof" ) )
if err != nil {
exitWithError ( fmt . Errorf ( "bench: could not create mutex profile: %v" , err ) )
}
runtime . SetMutexProfileFraction ( 20 )
2016-12-07 08:30:10 -08:00
}
func ( b * writeBenchmark ) stopProfiling ( ) {
if b . cpuprof != nil {
pprof . StopCPUProfile ( )
b . cpuprof . Close ( )
b . cpuprof = nil
}
if b . memprof != nil {
pprof . Lookup ( "heap" ) . WriteTo ( b . memprof , 0 )
b . memprof . Close ( )
b . memprof = nil
}
if b . blockprof != nil {
pprof . Lookup ( "block" ) . WriteTo ( b . blockprof , 0 )
b . blockprof . Close ( )
b . blockprof = nil
runtime . SetBlockProfileRate ( 0 )
}
2017-05-14 02:51:56 -07:00
if b . mtxprof != nil {
pprof . Lookup ( "mutex" ) . WriteTo ( b . mtxprof , 0 )
b . mtxprof . Close ( )
b . mtxprof = nil
runtime . SetMutexProfileFraction ( 0 )
}
2016-12-07 08:30:10 -08:00
}
2017-02-01 06:29:48 -08:00
func measureTime ( stage string , f func ( ) ) time . Duration {
2016-12-07 08:30:10 -08:00
fmt . Printf ( ">> start stage=%s\n" , stage )
start := time . Now ( )
f ( )
fmt . Printf ( ">> completed stage=%s duration=%s\n" , stage , time . Since ( start ) )
2017-02-01 06:29:48 -08:00
return time . Since ( start )
2016-12-07 08:30:10 -08:00
}
2017-01-16 12:29:53 -08:00
func readPrometheusLabels ( r io . Reader , n int ) ( [ ] labels . Labels , error ) {
b , err := ioutil . ReadAll ( r )
if err != nil {
return nil , err
}
2016-12-07 08:30:10 -08:00
2017-01-16 12:29:53 -08:00
p := textparse . New ( b )
i := 0
var mets [ ] labels . Labels
hashes := map [ uint64 ] struct { } { }
2016-12-07 08:30:10 -08:00
2017-01-16 12:29:53 -08:00
for p . Next ( ) && i < n {
m := make ( labels . Labels , 0 , 10 )
p . Metric ( ( * promlabels . Labels ) ( unsafe . Pointer ( & m ) ) )
2016-12-07 08:30:10 -08:00
2017-01-16 12:29:53 -08:00
h := m . Hash ( )
if _ , ok := hashes [ h ] ; ok {
continue
2016-12-07 08:30:10 -08:00
}
2017-01-16 12:29:53 -08:00
mets = append ( mets , m )
hashes [ h ] = struct { } { }
i ++
2016-12-07 08:30:10 -08:00
}
2017-01-16 12:29:53 -08:00
return mets , p . Err ( )
2016-12-07 08:30:10 -08:00
}
func exitWithError ( err error ) {
fmt . Fprintln ( os . Stderr , err )
os . Exit ( 1 )
}