diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 14c738d672..eaa97fe32e 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -173,8 +173,8 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error { c.enablePromQLNegativeOffset = true level.Info(logger).Log("msg", "Experimental promql-negative-offset enabled") case "remote-write-receiver": - c.web.RemoteWriteReceiver = true - level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled") + c.web.EnableRemoteWriteReceiver = true + level.Warn(logger).Log("msg", "Remote write receiver enabled via feature flag remote-write-receiver. This is DEPRECATED. Use --web.enable-remote-write-receiver.") case "expand-external-labels": c.enableExpandExternalLabels = true level.Info(logger).Log("msg", "Experimental expand-external-labels enabled") @@ -263,6 +263,9 @@ func main() { a.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions."). Default("false").BoolVar(&cfg.web.EnableAdminAPI) + a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests."). + Default("false").BoolVar(&cfg.web.EnableRemoteWriteReceiver) + a.Flag("web.console.templates", "Path to the console template directory, available at /consoles."). Default("consoles").StringVar(&cfg.web.ConsoleTemplatesPath) @@ -381,7 +384,7 @@ func main() { serverOnlyFlag(a, "query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return."). Default("50000000").IntVar(&cfg.queryMaxSamples) - a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). + a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details."). Default("").StringsVar(&cfg.featureList) promlogflag.AddFlags(a, &cfg.promlogConfig) diff --git a/cmd/promtool/main.go b/cmd/promtool/main.go index 96536e467b..a8098bea1d 100644 --- a/cmd/promtool/main.go +++ b/cmd/promtool/main.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "io" "io/ioutil" "math" "net/http" @@ -27,6 +28,7 @@ import ( "sort" "strconv" "strings" + "text/tabwriter" "time" "github.com/go-kit/log" @@ -43,6 +45,9 @@ import ( "gopkg.in/alecthomas/kingpin.v2" yaml "gopkg.in/yaml.v2" + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" "github.com/prometheus/prometheus/discovery/file" @@ -95,6 +100,7 @@ func main() { ).Required().ExistingFiles() checkMetricsCmd := checkCmd.Command("metrics", checkMetricsUsage) + checkMetricsExtended := checkCmd.Flag("extended", "Print extended information related to the cardinality of the metrics.").Bool() agentMode := checkConfigCmd.Flag("agent", "Check config file for Prometheus in Agent mode.").Bool() queryCmd := app.Command("query", "Run query against a Prometheus server.") @@ -228,7 +234,7 @@ func main() { os.Exit(CheckRules(*ruleFiles...)) case checkMetricsCmd.FullCommand(): - os.Exit(CheckMetrics()) + os.Exit(CheckMetrics(*checkMetricsExtended)) case queryInstantCmd.FullCommand(): os.Exit(QueryInstant(*queryInstantServer, *queryInstantExpr, *queryInstantTime, p)) @@ -629,8 +635,10 @@ $ curl -s http://localhost:9090/metrics | promtool check metrics `) // CheckMetrics performs a linting pass on input metrics. -func CheckMetrics() int { - l := promlint.New(os.Stdin) +func CheckMetrics(extended bool) int { + var buf bytes.Buffer + tee := io.TeeReader(os.Stdin, &buf) + l := promlint.New(tee) problems, err := l.Lint() if err != nil { fmt.Fprintln(os.Stderr, "error while linting:", err) @@ -645,9 +653,70 @@ func CheckMetrics() int { return lintErrExitCode } + if extended { + stats, total, err := checkMetricsExtended(&buf) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return failureExitCode + } + w := tabwriter.NewWriter(os.Stdout, 4, 4, 4, ' ', tabwriter.TabIndent) + fmt.Fprintf(w, "Metric\tCardinality\tPercentage\t\n") + for _, stat := range stats { + fmt.Fprintf(w, "%s\t%d\t%.2f%%\t\n", stat.name, stat.cardinality, stat.percentage*100) + } + fmt.Fprintf(w, "Total\t%d\t%.f%%\t\n", total, 100.) + w.Flush() + } + return successExitCode } +type metricStat struct { + name string + cardinality int + percentage float64 +} + +func checkMetricsExtended(r io.Reader) ([]metricStat, int, error) { + p := expfmt.TextParser{} + metricFamilies, err := p.TextToMetricFamilies(r) + if err != nil { + return nil, 0, fmt.Errorf("error while parsing text to metric families: %w", err) + } + + var total int + stats := make([]metricStat, 0, len(metricFamilies)) + for _, mf := range metricFamilies { + var cardinality int + switch mf.GetType() { + case dto.MetricType_COUNTER, dto.MetricType_GAUGE, dto.MetricType_UNTYPED: + cardinality = len(mf.Metric) + case dto.MetricType_HISTOGRAM: + // Histogram metrics includes sum, count, buckets. + buckets := len(mf.Metric[0].Histogram.Bucket) + cardinality = len(mf.Metric) * (2 + buckets) + case dto.MetricType_SUMMARY: + // Summary metrics includes sum, count, quantiles. + quantiles := len(mf.Metric[0].Summary.Quantile) + cardinality = len(mf.Metric) * (2 + quantiles) + default: + cardinality = len(mf.Metric) + } + stats = append(stats, metricStat{name: mf.GetName(), cardinality: cardinality}) + total += cardinality + } + + for i := range stats { + stats[i].percentage = float64(stats[i].cardinality) / float64(total) + } + + sort.SliceStable(stats, func(i, j int) bool { + return stats[i].cardinality > stats[j].cardinality + }) + + return stats, total, nil +} + // QueryInstant performs an instant query against a Prometheus server. func QueryInstant(url *url.URL, query, evalTime string, p printer) int { if url.Scheme == "" { diff --git a/cmd/promtool/main_test.go b/cmd/promtool/main_test.go index 82b5323c6d..d4773fc918 100644 --- a/cmd/promtool/main_test.go +++ b/cmd/promtool/main_test.go @@ -18,6 +18,7 @@ import ( "net/http" "net/http/httptest" "net/url" + "os" "runtime" "strings" "testing" @@ -322,3 +323,39 @@ func TestAuthorizationConfig(t *testing.T) { }) } } + +func TestCheckMetricsExtended(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Skipping on windows") + } + + f, err := os.Open("testdata/metrics-test.prom") + require.NoError(t, err) + defer f.Close() + + stats, total, err := checkMetricsExtended(f) + require.NoError(t, err) + require.Equal(t, 27, total) + require.Equal(t, []metricStat{ + { + name: "prometheus_tsdb_compaction_chunk_size_bytes", + cardinality: 15, + percentage: float64(15) / float64(27), + }, + { + name: "go_gc_duration_seconds", + cardinality: 7, + percentage: float64(7) / float64(27), + }, + { + name: "net_conntrack_dialer_conn_attempted_total", + cardinality: 4, + percentage: float64(4) / float64(27), + }, + { + name: "go_info", + cardinality: 1, + percentage: float64(1) / float64(27), + }, + }, stats) +} diff --git a/cmd/promtool/testdata/metrics-test.prom b/cmd/promtool/testdata/metrics-test.prom new file mode 100644 index 0000000000..bb4c81afe6 --- /dev/null +++ b/cmd/promtool/testdata/metrics-test.prom @@ -0,0 +1,35 @@ +# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles. +# TYPE go_gc_duration_seconds summary +go_gc_duration_seconds{quantile="0"} 2.391e-05 +go_gc_duration_seconds{quantile="0.25"} 9.4402e-05 +go_gc_duration_seconds{quantile="0.5"} 0.000118953 +go_gc_duration_seconds{quantile="0.75"} 0.000145884 +go_gc_duration_seconds{quantile="1"} 0.005201208 +go_gc_duration_seconds_sum 0.036134048 +go_gc_duration_seconds_count 232 +# HELP prometheus_tsdb_compaction_chunk_size_bytes Final size of chunks on their first compaction +# TYPE prometheus_tsdb_compaction_chunk_size_bytes histogram +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="32"} 662 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="48"} 1460 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="72"} 2266 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="108"} 3958 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="162"} 4861 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="243"} 5721 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="364.5"} 10493 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="546.75"} 12464 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="820.125"} 13254 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="1230.1875"} 13699 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="1845.28125"} 13806 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="2767.921875"} 13852 +prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="+Inf"} 13867 +prometheus_tsdb_compaction_chunk_size_bytes_sum 3.886707e+06 +prometheus_tsdb_compaction_chunk_size_bytes_count 13867 +# HELP net_conntrack_dialer_conn_attempted_total Total number of connections attempted by the given dialer a given name. +# TYPE net_conntrack_dialer_conn_attempted_total counter +net_conntrack_dialer_conn_attempted_total{dialer_name="blackbox"} 5210 +net_conntrack_dialer_conn_attempted_total{dialer_name="default"} 0 +net_conntrack_dialer_conn_attempted_total{dialer_name="node"} 21 +net_conntrack_dialer_conn_attempted_total{dialer_name="prometheus"} 21 +# HELP go_info Information about the Go environment. +# TYPE go_info gauge +go_info{version="go1.17"} 1 diff --git a/docs/feature_flags.md b/docs/feature_flags.md index c175e61985..fd09c40886 100644 --- a/docs/feature_flags.md +++ b/docs/feature_flags.md @@ -46,6 +46,8 @@ More details can be found [here](querying/basics.md#offset-modifier). The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview). +Activating the remote write receiver via a feature flag is deprecated. Use `--web.enable-remote-write-receiver` instead. This feature flag will be ignored in future versions of Prometheus. + ## Exemplars storage `--enable-feature=exemplar-storage` diff --git a/docs/querying/api.md b/docs/querying/api.md index 6fa21cc7d6..84d8c8f962 100644 --- a/docs/querying/api.md +++ b/docs/querying/api.md @@ -1145,3 +1145,17 @@ $ curl -XPOST http://localhost:9090/api/v1/admin/tsdb/clean_tombstones ``` *New in v2.1 and supports PUT from v2.9* + +## Remote Write Receiver + +Prometheus can be configured as a receiver for the Prometheus remote write +protocol. This is not considered an efficient way of ingesting samples. Use it +with caution for specific low-volume use cases. It is not suitable for +replacing the ingestion via scraping and turning Prometheus into a push-based +metrics collection system. + +Enable the remote write receiver by setting +`--web.enable-remote-write-receiver`. When enabled, the remote write receiver +endpoint is `/api/v1/write`. Find more details [here](../storage.md#overview). + +*New in v2.33* diff --git a/docs/storage.md b/docs/storage.md index a6ad5e7980..c4526518a3 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -129,7 +129,7 @@ The read and write protocols both use a snappy-compressed protocol buffer encodi For details on configuring remote storage integrations in Prometheus, see the [remote write](configuration/configuration.md#remote_write) and [remote read](configuration/configuration.md#remote_read) sections of the Prometheus configuration documentation. -The built-in remote write receiver can be enabled by setting the `--enable-feature=remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`. +The built-in remote write receiver can be enabled by setting the `--web.enable-remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`. For details on the request and response messages, see the [remote storage protocol buffer definitions](https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto). diff --git a/tsdb/chunks/chunk_write_queue.go b/tsdb/chunks/chunk_write_queue.go index 307ccfcdd9..5cdd2e81f0 100644 --- a/tsdb/chunks/chunk_write_queue.go +++ b/tsdb/chunks/chunk_write_queue.go @@ -32,60 +32,53 @@ type chunkWriteJob struct { callback func(error) } -var ( - queueOperationAdd = "add" - queueOperationGet = "get" - queueOperationComplete = "complete" - queueOperations = []string{queueOperationAdd, queueOperationGet, queueOperationComplete} -) - // chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion. // Chunks that shall be written get added to the queue, which is consumed asynchronously. -// Adding jobs to the queue is non-blocking as long as the queue isn't full. +// Adding jobs to the job is non-blocking as long as the queue isn't full. type chunkWriteQueue struct { - size int - jobCh chan chunkWriteJob + jobs chan chunkWriteJob - chunkRefMapMtx sync.RWMutex - chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - chunkRefMapOversized bool // indicates whether more than chunks were put into the chunkRefMap. + chunkRefMapMtx sync.RWMutex + chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk - isRunningMtx sync.RWMutex - isRunning bool + isRunningMtx sync.Mutex // Protects the isRunning property. + isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed. workerWg sync.WaitGroup writeChunk writeChunkF - operationsMetric *prometheus.CounterVec + // Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical + // addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec. + adds prometheus.Counter + gets prometheus.Counter + completed prometheus.Counter } // writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests. type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { + counters := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "prometheus_tsdb_chunk_write_queue_operations_total", + Help: "Number of operations on the chunk_write_queue.", + }, + []string{"operation"}, + ) + q := &chunkWriteQueue{ - size: size, - jobCh: make(chan chunkWriteJob, size), + jobs: make(chan chunkWriteJob, size), chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size), writeChunk: writeChunk, - operationsMetric: prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "prometheus_tsdb_chunk_write_queue_operations_total", - Help: "Number of operations on the chunk_write_queue.", - }, - []string{"operation"}, - ), + adds: counters.WithLabelValues("add"), + gets: counters.WithLabelValues("get"), + completed: counters.WithLabelValues("complete"), } if reg != nil { - reg.MustRegister(q.operationsMetric) - - // Initialize series for all the possible labels. - for _, op := range queueOperations { - q.operationsMetric.WithLabelValues(op).Add(0) - } + reg.MustRegister(counters) } q.start() @@ -97,7 +90,7 @@ func (c *chunkWriteQueue) start() { go func() { defer c.workerWg.Done() - for job := range c.jobCh { + for job := range c.jobs { c.processJob(job) } }() @@ -118,36 +111,28 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) { delete(c.chunkRefMap, job.ref) - if len(c.chunkRefMap) == 0 { - // If the map had to be grown beyond its allocated size, then we recreate it to free memory. - if c.chunkRefMapOversized { - c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.size) - c.chunkRefMapOversized = false - } - } - - c.operationsMetric.WithLabelValues(queueOperationComplete).Inc() + c.completed.Inc() } -func (c *chunkWriteQueue) addJob(job chunkWriteJob) error { - c.isRunningMtx.RLock() - defer c.isRunningMtx.RUnlock() +func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) { + defer func() { + if err == nil { + c.adds.Inc() + } + }() + + c.isRunningMtx.Lock() + defer c.isRunningMtx.Unlock() if !c.isRunning { return errors.New("queue is not started") } c.chunkRefMapMtx.Lock() - // The map might grow beyond the allocated size here, in which case we'll recreate it as soon as it is drained. c.chunkRefMap[job.ref] = job.chk - if len(c.chunkRefMap) > c.size { - c.chunkRefMapOversized = true - } c.chunkRefMapMtx.Unlock() - c.jobCh <- job - - c.operationsMetric.WithLabelValues(queueOperationAdd).Inc() + c.jobs <- job return nil } @@ -158,7 +143,7 @@ func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk { chk, ok := c.chunkRefMap[ref] if ok { - c.operationsMetric.WithLabelValues(queueOperationGet).Inc() + c.gets.Inc() } return chk @@ -174,7 +159,26 @@ func (c *chunkWriteQueue) stop() { c.isRunning = false - close(c.jobCh) + close(c.jobs) c.workerWg.Wait() } + +func (c *chunkWriteQueue) queueIsEmpty() bool { + return c.queueSize() == 0 +} + +func (c *chunkWriteQueue) queueIsFull() bool { + // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh + // because one job is currently being processed and blocked in the writer. + return c.queueSize() == cap(c.jobs)+1 +} + +func (c *chunkWriteQueue) queueSize() int { + c.chunkRefMapMtx.Lock() + defer c.chunkRefMapMtx.Unlock() + + // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has + // been fully processed, it remains in the chunkRefMap until the processing is complete. + return len(c.chunkRefMap) +} diff --git a/tsdb/chunks/chunk_write_queue_test.go b/tsdb/chunks/chunk_write_queue_test.go index d6d2e3cb90..0e971a336b 100644 --- a/tsdb/chunks/chunk_write_queue_test.go +++ b/tsdb/chunks/chunk_write_queue_test.go @@ -15,6 +15,7 @@ package chunks import ( "errors" + "fmt" "sync" "testing" "time" @@ -80,16 +81,13 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) { chunk := chunkenc.NewXORChunk() ref := newChunkDiskMapperRef(321, 123) cutFile := true - var callbackWg sync.WaitGroup - callbackWg.Add(1) + awaitCb := make(chan struct{}) require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(err error) { - callbackWg.Done() + close(awaitCb) }})) + <-awaitCb - // Wait until job has been consumed. - callbackWg.Wait() - - // compare whether the write function has received all job attributes correctly + // Compare whether the write function has received all job attributes correctly. require.Equal(t, seriesRef, gotSeriesRef) require.Equal(t, mint, gotMint) require.Equal(t, maxt, gotMaxt) @@ -148,7 +146,7 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { } // The queue should be full. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Adding another job should block as long as no job from the queue gets consumed. addedJob := atomic.NewBool(false) @@ -168,19 +166,19 @@ func TestChunkWriteQueue_WrappingAroundSizeLimit(t *testing.T) { require.Eventually(t, func() bool { return addedJob.Load() }, time.Second, time.Millisecond*10) // The queue should be full again. - require.True(t, queueIsFull(q)) + require.True(t, q.queueIsFull()) // Consume +1 jobs from the queue. // To drain the queue we need to consume +1 jobs because 1 job // is already in the state of being processed. for job := 0; job < sizeLimit+1; job++ { - require.False(t, queueIsEmpty(q)) + require.False(t, q.queueIsEmpty()) unblockChunkWriter() } // Wait until all jobs have been processed. callbackWg.Wait() - require.True(t, queueIsEmpty(q)) + require.True(t, q.queueIsEmpty()) } func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { @@ -189,12 +187,11 @@ func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { return testError } - var callbackWg sync.WaitGroup - callbackWg.Add(1) + awaitCb := make(chan struct{}) var gotError error callback := func(err error) { gotError = err - callbackWg.Done() + close(awaitCb) } q := newChunkWriteQueue(nil, 1, chunkWriter) @@ -203,26 +200,69 @@ func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) { job := chunkWriteJob{callback: callback} require.NoError(t, q.addJob(job)) - callbackWg.Wait() + <-awaitCb require.Equal(t, testError, gotError) } -func queueIsEmpty(q *chunkWriteQueue) bool { - return queueSize(q) == 0 -} +func BenchmarkChunkWriteQueue_addJob(b *testing.B) { + for _, withReads := range []bool{false, true} { + b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) { + for _, concurrentWrites := range []int{1, 10, 100, 1000} { + b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) { + issueReadSignal := make(chan struct{}) + q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error { + if withReads { + select { + case issueReadSignal <- struct{}{}: + default: + // Can't write to issueReadSignal, don't block but omit read instead. + } + } + return nil + }) + b.Cleanup(func() { + // Stopped already, so no more writes will happen. + close(issueReadSignal) + }) + b.Cleanup(q.stop) -func queueIsFull(q *chunkWriteQueue) bool { - // When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh - // because one job is currently being processed and blocked in the writer. - return queueSize(q) == cap(q.jobCh)+1 -} + start := sync.WaitGroup{} + start.Add(1) -func queueSize(q *chunkWriteQueue) int { - q.chunkRefMapMtx.Lock() - defer q.chunkRefMapMtx.Unlock() + jobs := make(chan chunkWriteJob, b.N) + for i := 0; i < b.N; i++ { + jobs <- chunkWriteJob{ + seriesRef: HeadSeriesRef(i), + ref: ChunkDiskMapperRef(i), + } + } + close(jobs) - // Looking at chunkRefMap instead of jobCh because the job is popped from the chan before it has - // been fully processed, it remains in the chunkRefMap until the processing is complete. - return len(q.chunkRefMap) + go func() { + for range issueReadSignal { + // We don't care about the ID we're getting, we just want to grab the lock. + _ = q.get(ChunkDiskMapperRef(0)) + } + }() + + done := sync.WaitGroup{} + done.Add(concurrentWrites) + for w := 0; w < concurrentWrites; w++ { + go func() { + start.Wait() + for j := range jobs { + _ = q.addJob(j) + } + done.Done() + }() + } + + b.ResetTimer() + start.Done() + done.Wait() + }) + } + }) + } } diff --git a/tsdb/chunks/head_chunks.go b/tsdb/chunks/head_chunks.go index bb1730722e..4ea454de62 100644 --- a/tsdb/chunks/head_chunks.go +++ b/tsdb/chunks/head_chunks.go @@ -113,7 +113,7 @@ func (f *chunkPos) getNextChunkRef(chk chunkenc.Chunk) (chkRef ChunkDiskMapperRe chkLen := uint64(len(chk.Bytes())) bytesToWrite := f.bytesToWriteForChunk(chkLen) - if f.shouldCutNewFile(bytesToWrite) { + if f.shouldCutNewFile(chkLen) { f.toNewFile() f.cutFile = false cutFile = true @@ -137,37 +137,37 @@ func (f *chunkPos) cutFileOnNextChunk() { f.cutFile = true } -// setSeq sets the sequence number of the head chunk file. +// initSeq sets the sequence number of the head chunk file. // Should only be used for initialization, after that the sequence number will be managed by chunkPos. -func (f *chunkPos) setSeq(seq uint64) { +func (f *chunkPos) initSeq(seq uint64) { f.seq = seq } // shouldCutNewFile returns whether a new file should be cut based on the file size. -// Not thread safe, a lock must be held when calling this. -func (f *chunkPos) shouldCutNewFile(bytesToWrite uint64) bool { +// The read or write lock on chunkPos must be held when calling this. +func (f *chunkPos) shouldCutNewFile(chunkSize uint64) bool { if f.cutFile { return true } return f.offset == 0 || // First head chunk file. - f.offset+bytesToWrite > MaxHeadChunkFileSize // Exceeds the max head chunk file size. + f.offset+chunkSize+MaxHeadChunkMetaSize > MaxHeadChunkFileSize // Exceeds the max head chunk file size. } // bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size, // including all meta data before and after the chunk data. // Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 { - // headers + // Headers. bytes := uint64(SeriesRefSize) + 2*MintMaxtSize + ChunkEncodingSize - // size of chunk length encoded as uvarint + // Size of chunk length encoded as uvarint. bytes += uint64(varint.UvarintSize(chkLen)) - // chunk length + // Chunk length. bytes += chkLen - // crc32 + // crc32. bytes += CRCSize return bytes @@ -321,7 +321,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { } } - cdm.evtlPos.setSeq(uint64(lastSeq)) + cdm.evtlPos.initSeq(uint64(lastSeq)) return nil } @@ -410,7 +410,7 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 } if cutFile { - err := cdm.cutExpectRef(ref) + err := cdm.cutAndExpectRef(ref) if err != nil { return err } @@ -466,18 +466,21 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64 } // CutNewFile makes that a new file will be created the next time a chunk is written. -func (cdm *ChunkDiskMapper) CutNewFile() error { +func (cdm *ChunkDiskMapper) CutNewFile() { cdm.evtlPosMtx.Lock() defer cdm.evtlPosMtx.Unlock() cdm.evtlPos.cutFileOnNextChunk() - return nil } -// cutExpectRef creates a new m-mapped file. +func (cdm *ChunkDiskMapper) IsQueueEmpty() bool { + return cdm.writeQueue.queueIsEmpty() +} + +// cutAndExpectRef creates a new m-mapped file. // The write lock should be held before calling this. // It ensures that the position in the new file matches the given chunk reference, if not then it errors. -func (cdm *ChunkDiskMapper) cutExpectRef(chkRef ChunkDiskMapperRef) (err error) { +func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err error) { seq, offset, err := cdm.cut() if err != nil { return err @@ -864,7 +867,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error { // There is a known race condition here because between the check of curFileSize() and the call to CutNewFile() // a new file could already be cut, this is acceptable because it will simply result in an empty file which // won't do any harm. - errs.Add(cdm.CutNewFile()) + cdm.CutNewFile() } errs.Add(cdm.deleteFiles(removedFiles)) return errs.Err() diff --git a/tsdb/chunks/head_chunks_test.go b/tsdb/chunks/head_chunks_test.go index aefc60f03a..be1784137e 100644 --- a/tsdb/chunks/head_chunks_test.go +++ b/tsdb/chunks/head_chunks_test.go @@ -20,7 +20,6 @@ import ( "math/rand" "os" "strconv" - "sync" "testing" "github.com/stretchr/testify/require" @@ -101,7 +100,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { } } addChunks(100) - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() addChunks(10) // For chunks in in-memory buffer. } @@ -166,22 +165,20 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { timeRange := 0 fileTimeStep := 100 var thirdFileMinT, sixthFileMinT int64 - var callbackWg sync.WaitGroup addChunk := func() int { t.Helper() - callbackWg.Add(1) - - mint := timeRange + 1 // Just after the new file cut. - maxt := timeRange + fileTimeStep - 1 // Just before the next file. - - // Write a chunks to set maxt for the segment. - hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - callbackWg.Done() - require.NoError(t, err) + step := 100 + mint, maxt := timeRange+1, timeRange+step-1 + var err error + awaitCb := make(chan struct{}) + hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) { + err = cbErr + close(awaitCb) }) - - timeRange += fileTimeStep + <-awaitCb + require.NoError(t, err) + timeRange += step return mint } @@ -189,9 +186,6 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { verifyFiles := func(remainingFiles []int) { t.Helper() - // Wait until all chunk write jobs have been processed. - callbackWg.Wait() - files, err := ioutil.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") @@ -206,7 +200,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Create segments 1 to 7. for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) + hrw.CutNewFile() mint := int64(addChunk()) if i == 3 { thirdFileMinT = mint @@ -219,7 +213,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Truncating files. require.NoError(t, hrw.Truncate(thirdFileMinT)) - // Add a chunk to trigger truncation. + // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{3, 4, 5, 6, 7, 8}) @@ -237,8 +231,13 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Truncating files after restart. require.NoError(t, hrw.Truncate(sixthFileMinT)) + verifyFiles([]int{6, 7, 8, 9}) - // Add a chunk to trigger truncation. + // Truncating a second time without adding a chunk shouldn't create a new file. + require.NoError(t, hrw.Truncate(sixthFileMinT+1)) + verifyFiles([]int{6, 7, 8, 9}) + + // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{6, 7, 8, 9, 10}) @@ -246,7 +245,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) { // Truncating till current time should not delete the current active file. require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) - // Add a chunk to trigger truncation. + // Add a chunk to trigger cutting of new file. addChunk() verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. @@ -263,26 +262,24 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { }() timeRange := 0 - var callbackWg sync.WaitGroup addChunk := func() { t.Helper() - callbackWg.Add(1) + awaitCb := make(chan struct{}) step := 100 mint, maxt := timeRange+1, timeRange+step-1 hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - callbackWg.Done() + close(awaitCb) require.NoError(t, err) }) + <-awaitCb timeRange += step } emptyFile := func() { t.Helper() - callbackWg.Wait() - _, _, err := hrw.cut() require.NoError(t, err) hrw.evtlPosMtx.Lock() @@ -307,7 +304,6 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { verifyFiles := func(remainingFiles []int) { t.Helper() - callbackWg.Wait() files, err := ioutil.ReadDir(hrw.dir.Name()) require.NoError(t, err) require.Equal(t, len(remainingFiles), len(files), "files on disk") @@ -328,11 +324,21 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { require.NoError(t, hrw.Truncate(file2Maxt+1)) verifyFiles([]int{3, 4, 5, 6}) + // Add chunk, so file 6 is not empty anymore. + addChunk() + verifyFiles([]int{3, 4, 5, 6}) + + // Truncating till file 3 should also delete file 4, because it is empty. + file3Maxt := hrw.mmappedChunkFiles[3].maxt + require.NoError(t, hrw.Truncate(file3Maxt+1)) + addChunk() + verifyFiles([]int{5, 6, 7}) + dir := hrw.dir.Name() require.NoError(t, hrw.Close()) // Restarting checks for unsequential files. hrw = createChunkDiskMapper(t, dir) - verifyFiles([]int{3, 4, 5, 6}) + verifyFiles([]int{5, 6, 7}) } // TestHeadReadWriter_TruncateAfterIterateChunksError tests for @@ -345,13 +351,12 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) { // Write a chunks to iterate on it later. var err error - var callbackWg sync.WaitGroup - callbackWg.Add(1) + awaitCb := make(chan struct{}) hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) { err = cbErr - callbackWg.Done() + close(awaitCb) }) - callbackWg.Wait() + <-awaitCb require.NoError(t, err) dir := hrw.dir.Name() @@ -377,6 +382,8 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { timeRange := 0 addChunk := func() { + t.Helper() + step := 100 mint, maxt := timeRange+1, timeRange+step-1 var err error @@ -390,7 +397,9 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) { timeRange += step } nonEmptyFile := func() { - require.NoError(t, hrw.CutNewFile()) + t.Helper() + + hrw.CutNewFile() addChunk() } @@ -475,12 +484,11 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer mint = int64((idx)*1000 + 1) maxt = int64((idx + 1) * 1000) chunk = randomChunk(t) - var callbackWg sync.WaitGroup - callbackWg.Add(1) + awaitCb := make(chan struct{}) chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) { require.NoError(t, err) - callbackWg.Done() + close(awaitCb) }) - callbackWg.Wait() + <-awaitCb return } diff --git a/tsdb/chunks/old_head_chunks.go b/tsdb/chunks/old_head_chunks.go deleted file mode 100644 index 194d5be95f..0000000000 --- a/tsdb/chunks/old_head_chunks.go +++ /dev/null @@ -1,710 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "bufio" - "bytes" - "encoding/binary" - "hash" - "io" - "os" - "sort" - "sync" - - "github.com/pkg/errors" - "go.uber.org/atomic" - - "github.com/prometheus/prometheus/tsdb/chunkenc" - tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/prometheus/prometheus/tsdb/fileutil" -) - -// OldChunkDiskMapper is for writing the Head block chunks to the disk -// and access chunks via mmapped file. -type OldChunkDiskMapper struct { - curFileNumBytes atomic.Int64 // Bytes written in current open file. - - /// Writer. - dir *os.File - writeBufferSize int - - curFile *os.File // File being written to. - curFileSequence int // Index of current open file being appended to. - curFileMaxt int64 // Used for the size retention. - - byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk. - chkWriter *bufio.Writer // Writer for the current open file. - crc32 hash.Hash - writePathMtx sync.Mutex - - /// Reader. - // The int key in the map is the file number on the disk. - mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index. - closers map[int]io.Closer // Closers for resources behind the byte slices. - readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps. - pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk. - - // Writer and Reader. - // We flush chunks to disk in batches. Hence, we store them in this buffer - // from which chunks are served till they are flushed and are ready for m-mapping. - chunkBuffer *chunkBuffer - - // Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map. - // This is done after iterating through all the chunks in those files using the IterateAllChunks method. - fileMaxtSet bool - - closed bool -} - -// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory -// using the default head chunk file duration. -// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) { - // Validate write buffer size. - if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize) - } - if writeBufferSize%1024 != 0 { - return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize) - } - - if err := os.MkdirAll(dir, 0o777); err != nil { - return nil, err - } - dirFile, err := fileutil.OpenDir(dir) - if err != nil { - return nil, err - } - - m := &OldChunkDiskMapper{ - dir: dirFile, - pool: pool, - writeBufferSize: writeBufferSize, - crc32: newCRC32(), - chunkBuffer: newChunkBuffer(), - } - - if m.pool == nil { - m.pool = chunkenc.NewPool() - } - - return m, m.openMMapFiles() -} - -// openMMapFiles opens all files within dir for mmapping. -func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) { - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - defer func() { - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err() - - cdm.mmappedChunkFiles = nil - cdm.closers = nil - } - }() - - files, err := listChunkFiles(cdm.dir.Name()) - if err != nil { - return err - } - - files, err = repairLastChunkFile(files) - if err != nil { - return err - } - - chkFileIndices := make([]int, 0, len(files)) - for seq, fn := range files { - f, err := fileutil.OpenMmapFile(fn) - if err != nil { - return errors.Wrapf(err, "mmap files, file: %s", fn) - } - cdm.closers[seq] = f - cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())} - chkFileIndices = append(chkFileIndices, seq) - } - - // Check for gaps in the files. - sort.Ints(chkFileIndices) - if len(chkFileIndices) == 0 { - return nil - } - lastSeq := chkFileIndices[0] - for _, seq := range chkFileIndices[1:] { - if seq != lastSeq+1 { - return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq) - } - lastSeq = seq - } - - for i, b := range cdm.mmappedChunkFiles { - if b.byteSlice.Len() < HeadChunkFileHeaderSize { - return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i]) - } - // Verify magic number. - if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks { - return errors.Errorf("%s: invalid magic number %x", files[i], m) - } - - // Verify chunk format version. - if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 { - return errors.Errorf("%s: invalid chunk format version %d", files[i], v) - } - } - - return nil -} - -// WriteChunk writes the chunk to the disk. -// The returned chunk ref is the reference from where the chunk encoding starts for the chunk. -func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) { - chkRef, err := func() (ChunkDiskMapperRef, error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - if cdm.closed { - return 0, ErrChunkDiskMapperClosed - } - - if cdm.shouldCutNewFile(len(chk.Bytes())) { - if err := cdm.cut(); err != nil { - return 0, err - } - } - - // if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size; - // so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer). - if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) { - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - cdm.crc32.Reset() - bytesWritten := 0 - - chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize())) - - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - cdm.byteBuf[bytesWritten] = byte(chk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes()))) - bytesWritten += n - - if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil { - return 0, err - } - if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil { - return 0, err - } - if err := cdm.writeCRC32(); err != nil { - return 0, err - } - - if maxt > cdm.curFileMaxt { - cdm.curFileMaxt = maxt - } - - cdm.chunkBuffer.put(chkRef, chk) - - if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize { - // The chunk was bigger than the buffer itself. - // Flushing to not keep partial chunks in buffer. - if err := cdm.flushBuffer(); err != nil { - return 0, err - } - } - - return chkRef, nil - }() - - if err != nil && callback != nil { - callback(err) - } - - return chkRef -} - -// shouldCutNewFile returns whether a new file should be cut, based on time and size retention. -// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. -// Time retention: so that we can delete old chunks with some time guarantee in low load environments. -func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { - return cdm.curFileSize() == 0 || // First head chunk file. - cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size. -} - -// CutNewFile creates a new m-mapped file. -func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - return cdm.cut() -} - -// cut creates a new m-mapped file. The write lock should be held before calling this. -func (cdm *OldChunkDiskMapper) cut() (returnErr error) { - // Sync current tail to disk and close. - if err := cdm.finalizeCurFile(); err != nil { - return err - } - - n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize) - if err != nil { - return err - } - defer func() { - // The file should not be closed if there is no error, - // its kept open in the ChunkDiskMapper. - if returnErr != nil { - returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err() - } - }() - - cdm.curFileNumBytes.Store(int64(n)) - - if cdm.curFile != nil { - cdm.readPathMtx.Lock() - cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt - cdm.readPathMtx.Unlock() - } - - mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize) - if err != nil { - return err - } - - cdm.readPathMtx.Lock() - cdm.curFileSequence = seq - cdm.curFile = newFile - if cdm.chkWriter != nil { - cdm.chkWriter.Reset(newFile) - } else { - cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize) - } - - cdm.closers[cdm.curFileSequence] = mmapFile - cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())} - cdm.readPathMtx.Unlock() - - cdm.curFileMaxt = 0 - - return nil -} - -// finalizeCurFile writes all pending data to the current tail file, -// truncates its size, and closes it. -func (cdm *OldChunkDiskMapper) finalizeCurFile() error { - if cdm.curFile == nil { - return nil - } - - if err := cdm.flushBuffer(); err != nil { - return err - } - - if err := cdm.curFile.Sync(); err != nil { - return err - } - - return cdm.curFile.Close() -} - -func (cdm *OldChunkDiskMapper) write(b []byte) error { - n, err := cdm.chkWriter.Write(b) - cdm.curFileNumBytes.Add(int64(n)) - return err -} - -func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error { - if err := cdm.write(b); err != nil { - return err - } - _, err := cdm.crc32.Write(b) - return err -} - -func (cdm *OldChunkDiskMapper) writeCRC32() error { - return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0])) -} - -// flushBuffer flushes the current in-memory chunks. -// Assumes that writePathMtx is _write_ locked before calling this method. -func (cdm *OldChunkDiskMapper) flushBuffer() error { - if err := cdm.chkWriter.Flush(); err != nil { - return err - } - cdm.chunkBuffer.clear() - return nil -} - -// Chunk returns a chunk from a given reference. -func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { - cdm.readPathMtx.RLock() - // We hold this read lock for the entire duration because if Close() - // is called, the data in the byte slice will get corrupted as the mmapped - // file will be closed. - defer cdm.readPathMtx.RUnlock() - - if cdm.closed { - return nil, ErrChunkDiskMapperClosed - } - - sgmIndex, chkStart := ref.Unpack() - // We skip the series ref and the mint/maxt beforehand. - chkStart += SeriesRefSize + (2 * MintMaxtSize) - chkCRC32 := newCRC32() - - // If it is the current open file, then the chunks can be in the buffer too. - if sgmIndex == cdm.curFileSequence { - chunk := cdm.chunkBuffer.get(ref) - if chunk != nil { - return chunk, nil - } - } - - mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex] - if !ok { - if sgmIndex > cdm.curFileSequence { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: -1, - Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex), - } - } - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.New("head chunk file index %d does not exist on disk"), - } - } - - if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()), - } - } - - // Encoding. - chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0] - - // Data length. - // With the minimum chunk length this should never cause us reading - // over the end of the slice. - chkDataLenStart := chkStart + ChunkEncodingSize - c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize) - chkDataLen, n := binary.Uvarint(c) - if n <= 0 { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("reading chunk length failed with %d", n), - } - } - - // Verify the chunk data end. - chkDataEnd := chkDataLenStart + n + int(chkDataLen) - if chkDataEnd > mmapFile.byteSlice.Len() { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()), - } - } - - // Check the CRC. - sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - - // The chunk data itself. - chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd) - - // Make a copy of the chunk data to prevent a panic occurring because the returned - // chunk data slice references an mmap-ed file which could be closed after the - // function returns but while the chunk is still in use. - chkDataCopy := make([]byte, len(chkData)) - copy(chkDataCopy, chkData) - - chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy) - if err != nil { - return nil, &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: sgmIndex, - Err: err, - } - } - return chk, nil -} - -// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it -// and runs the provided function with information about each chunk. It returns on the first error encountered. -// NOTE: This method needs to be called at least once after creating ChunkDiskMapper -// to set the maxt of all the file. -func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - - defer func() { - cdm.fileMaxtSet = true - }() - - chkCRC32 := newCRC32() - - // Iterate files in ascending order. - segIDs := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seg := range cdm.mmappedChunkFiles { - segIDs = append(segIDs, seg) - } - sort.Ints(segIDs) - for _, segID := range segIDs { - mmapFile := cdm.mmappedChunkFiles[segID] - fileEnd := mmapFile.byteSlice.Len() - if segID == cdm.curFileSequence { - fileEnd = int(cdm.curFileSize()) - } - idx := HeadChunkFileHeaderSize - for idx < fileEnd { - if fileEnd-idx < MaxHeadChunkMetaSize { - // Check for all 0s which marks the end of the file. - allZeros := true - for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) { - if b != byte(0) { - allZeros = false - break - } - } - if allZeros { - // End of segment chunk file content. - break - } - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+ - " - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID), - } - } - chkCRC32.Reset() - chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx)) - - startIdx := idx - seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize))) - idx += SeriesRefSize - mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize))) - idx += MintMaxtSize - - // We preallocate file to help with m-mapping (especially windows systems). - // As series ref always starts from 1, we assume it being 0 to be the end of the actual file data. - // We are not considering possible file corruption that can cause it to be 0. - // Additionally we are checking mint and maxt just to be sure. - if seriesRef == 0 && mint == 0 && maxt == 0 { - break - } - - idx += ChunkEncodingSize // Skip encoding. - dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize)) - idx += n - - numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2)) - idx += int(dataLen) // Skip the data. - - // In the beginning we only checked for the chunk meta size. - // Now that we have added the chunk data length, we check for sufficient bytes again. - if idx+CRCSize > fileEnd { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID), - } - } - - // Check CRC. - sum := mmapFile.byteSlice.Range(idx, idx+CRCSize) - if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil { - return err - } - if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) { - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act), - } - } - idx += CRCSize - - if maxt > mmapFile.maxt { - mmapFile.maxt = maxt - } - - if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil { - if cerr, ok := err.(*CorruptionErr); ok { - cerr.Dir = cdm.dir.Name() - cerr.FileIndex = segID - return cerr - } - return err - } - } - - if idx > fileEnd { - // It should be equal to the slice length. - return &CorruptionErr{ - Dir: cdm.dir.Name(), - FileIndex: segID, - Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID), - } - } - } - - return nil -} - -// Truncate deletes the head chunk files which are strictly below the mint. -// mint should be in milliseconds. -func (cdm *OldChunkDiskMapper) Truncate(mint int64) error { - if !cdm.fileMaxtSet { - return errors.New("maxt of the files are not set") - } - cdm.readPathMtx.RLock() - - // Sort the file indices, else if files deletion fails in between, - // it can lead to unsequential files as the map is not sorted. - chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles)) - for seq := range cdm.mmappedChunkFiles { - chkFileIndices = append(chkFileIndices, seq) - } - sort.Ints(chkFileIndices) - - var removedFiles []int - for _, seq := range chkFileIndices { - if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint { - break - } - if cdm.mmappedChunkFiles[seq].maxt < mint { - removedFiles = append(removedFiles, seq) - } - } - cdm.readPathMtx.RUnlock() - - errs := tsdb_errors.NewMulti() - // Cut a new file only if the current file has some chunks. - if cdm.curFileSize() > HeadChunkFileHeaderSize { - errs.Add(cdm.CutNewFile()) - } - errs.Add(cdm.deleteFiles(removedFiles)) - return errs.Err() -} - -func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error { - cdm.readPathMtx.Lock() - for _, seq := range removedFiles { - if err := cdm.closers[seq].Close(); err != nil { - cdm.readPathMtx.Unlock() - return err - } - delete(cdm.mmappedChunkFiles, seq) - delete(cdm.closers, seq) - } - cdm.readPathMtx.Unlock() - - // We actually delete the files separately to not block the readPathMtx for long. - for _, seq := range removedFiles { - if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil { - return err - } - } - - return nil -} - -// DeleteCorrupted deletes all the head chunk files after the one which had the corruption -// (including the corrupt file). -func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error { - err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped. - cerr, ok := err.(*CorruptionErr) - if !ok { - return errors.Wrap(originalErr, "cannot handle error") - } - - // Delete all the head chunk files following the corrupt head chunk file. - segs := []int{} - cdm.readPathMtx.RLock() - for seg := range cdm.mmappedChunkFiles { - if seg >= cerr.FileIndex { - segs = append(segs, seg) - } - } - cdm.readPathMtx.RUnlock() - - return cdm.deleteFiles(segs) -} - -// Size returns the size of the chunk files. -func (cdm *OldChunkDiskMapper) Size() (int64, error) { - return fileutil.DirSize(cdm.dir.Name()) -} - -func (cdm *OldChunkDiskMapper) curFileSize() int64 { - return cdm.curFileNumBytes.Load() -} - -// Close closes all the open files in ChunkDiskMapper. -// It is not longer safe to access chunks from this struct after calling Close. -func (cdm *OldChunkDiskMapper) Close() error { - // 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file. - // The lock order should not be reversed here else it can cause deadlocks. - cdm.writePathMtx.Lock() - defer cdm.writePathMtx.Unlock() - cdm.readPathMtx.Lock() - defer cdm.readPathMtx.Unlock() - - if cdm.closed { - return nil - } - cdm.closed = true - - errs := tsdb_errors.NewMulti( - closeAllFromMap(cdm.closers), - cdm.finalizeCurFile(), - cdm.dir.Close(), - ) - cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} - cdm.closers = map[int]io.Closer{} - - return errs.Err() -} diff --git a/tsdb/chunks/old_head_chunks_test.go b/tsdb/chunks/old_head_chunks_test.go deleted file mode 100644 index 4a1943bf05..0000000000 --- a/tsdb/chunks/old_head_chunks_test.go +++ /dev/null @@ -1,442 +0,0 @@ -// Copyright 2020 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package chunks - -import ( - "encoding/binary" - "errors" - "io/ioutil" - "math/rand" - "os" - "strconv" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/prometheus/prometheus/tsdb/chunkenc" -) - -func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - expectedBytes := []byte{} - nextChunkOffset := uint64(HeadChunkFileHeaderSize) - chkCRC32 := newCRC32() - - type expectedDataType struct { - seriesRef HeadSeriesRef - chunkRef ChunkDiskMapperRef - mint, maxt int64 - numSamples uint16 - chunk chunkenc.Chunk - } - expectedData := []expectedDataType{} - - var buf [MaxHeadChunkMetaSize]byte - totalChunks := 0 - var firstFileName string - for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 { - addChunks := func(numChunks int) { - for i := 0; i < numChunks; i++ { - seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw) - totalChunks++ - expectedData = append(expectedData, expectedDataType{ - seriesRef: seriesRef, - mint: mint, - maxt: maxt, - chunkRef: chkRef, - chunk: chunk, - numSamples: uint16(chunk.NumSamples()), - }) - - if hrw.curFileSequence != 1 { - // We are checking for bytes written only for the first file. - continue - } - - // Calculating expected bytes written on disk for first file. - firstFileName = hrw.curFile.Name() - require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef) - - bytesWritten := 0 - chkCRC32.Reset() - - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef)) - bytesWritten += SeriesRefSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint)) - bytesWritten += MintMaxtSize - binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt)) - bytesWritten += MintMaxtSize - buf[bytesWritten] = byte(chunk.Encoding()) - bytesWritten += ChunkEncodingSize - n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes()))) - bytesWritten += n - - expectedBytes = append(expectedBytes, buf[:bytesWritten]...) - _, err := chkCRC32.Write(buf[:bytesWritten]) - require.NoError(t, err) - expectedBytes = append(expectedBytes, chunk.Bytes()...) - _, err = chkCRC32.Write(chunk.Bytes()) - require.NoError(t, err) - - expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...) - - // += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC. - nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize - } - } - addChunks(100) - hrw.CutNewFile() - addChunks(10) // For chunks in in-memory buffer. - } - - // Checking on-disk bytes for the first file. - require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles)) - require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers)) - - actualBytes, err := ioutil.ReadFile(firstFileName) - require.NoError(t, err) - - // Check header of the segment file. - require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize]))) - require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize])) - - // Remaining chunk data. - fileEnd := HeadChunkFileHeaderSize + len(expectedBytes) - require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd]) - - // Testing reading of chunks. - for _, exp := range expectedData { - actChunk, err := hrw.Chunk(exp.chunkRef) - require.NoError(t, err) - require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes()) - } - - // Testing IterateAllChunks method. - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - idx := 0 - require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error { - t.Helper() - - expData := expectedData[idx] - require.Equal(t, expData.seriesRef, seriesRef) - require.Equal(t, expData.chunkRef, chunkRef) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.maxt, maxt) - require.Equal(t, expData.numSamples, numSamples) - - actChunk, err := hrw.Chunk(expData.chunkRef) - require.NoError(t, err) - require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes()) - - idx++ - return nil - })) - require.Equal(t, len(expectedData), idx) -} - -// TestOldChunkDiskMapper_Truncate tests -// * If truncation is happening properly based on the time passed. -// * The active file is not deleted even if the passed time makes it eligible to be deleted. -// * Empty current file does not lead to creation of another file after truncation. -// * Non-empty current file leads to creation of another file after truncation. -func TestOldChunkDiskMapper_Truncate(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - fileTimeStep := 100 - var thirdFileMinT, sixthFileMinT int64 - - addChunk := func() int { - mint := timeRange + 1 // Just after the new file cut. - maxt := timeRange + fileTimeStep - 1 // Just before the next file. - - // Write a chunks to set maxt for the segment. - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - timeRange += fileTimeStep - - return mint - } - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := ioutil.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.Equal(t, true, ok) - } - } - - // Create segments 1 to 7. - for i := 1; i <= 7; i++ { - require.NoError(t, hrw.CutNewFile()) - mint := int64(addChunk()) - if i == 3 { - thirdFileMinT = mint - } else if i == 6 { - sixthFileMinT = mint - } - } - verifyFiles([]int{1, 2, 3, 4, 5, 6, 7}) - - // Truncating files. - require.NoError(t, hrw.Truncate(thirdFileMinT)) - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarted. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) - require.True(t, hrw.fileMaxtSet) - - verifyFiles([]int{3, 4, 5, 6, 7, 8}) - // New file is created after restart even if last file was empty. - addChunk() - verifyFiles([]int{3, 4, 5, 6, 7, 8, 9}) - - // Truncating files after restart. - require.NoError(t, hrw.Truncate(sixthFileMinT)) - verifyFiles([]int{6, 7, 8, 9, 10}) - - // As the last file was empty, this creates no new files. - require.NoError(t, hrw.Truncate(sixthFileMinT+1)) - verifyFiles([]int{6, 7, 8, 9, 10}) - addChunk() - - // Truncating till current time should not delete the current active file. - require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep)))) - verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created. -} - -// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke -// holes into the file sequence, even if there are empty files in between non-empty files. -// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation -// simply deleted all empty files instead of stopping once it encountered a non-empty file. -func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - emptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - } - nonEmptyFile := func() { - emptyFile() - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - emptyFile() // 4. - nonEmptyFile() // 5. - emptyFile() // 6. - - verifyFiles := func(remainingFiles []int) { - t.Helper() - - files, err := ioutil.ReadDir(hrw.dir.Name()) - require.NoError(t, err) - require.Equal(t, len(remainingFiles), len(files), "files on disk") - require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles") - require.Equal(t, len(remainingFiles), len(hrw.closers), "closers") - - for _, i := range remainingFiles { - _, ok := hrw.mmappedChunkFiles[i] - require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i) - } - } - - verifyFiles([]int{1, 2, 3, 4, 5, 6}) - - // Truncating files till 2. It should not delete anything after 3 (inclusive) - // though files 4 and 6 are empty. - file2Maxt := hrw.mmappedChunkFiles[2].maxt - require.NoError(t, hrw.Truncate(file2Maxt+1)) - // As 6 was empty, it should not create another file. - verifyFiles([]int{3, 4, 5, 6}) - - addChunk() - // Truncate creates another file as 6 is not empty now. - require.NoError(t, hrw.Truncate(file2Maxt+1)) - verifyFiles([]int{3, 4, 5, 6, 7}) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting checks for unsequential files. - var err error - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - verifyFiles([]int{3, 4, 5, 6, 7}) -} - -// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for -// https://github.com/prometheus/prometheus/issues/7753 -func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - // Write a chunks to iterate on it later. - _ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) { - require.NoError(t, err) - }) - - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Restarting to recreate https://github.com/prometheus/prometheus/issues/7753. - hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - - // Forcefully failing IterateAllChunks. - require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { - return errors.New("random error") - })) - - // Truncation call should not return error after IterateAllChunks fails. - require.NoError(t, hrw.Truncate(2000)) -} - -func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) { - hrw := testOldChunkDiskMapper(t) - defer func() { - require.NoError(t, hrw.Close()) - }() - - timeRange := 0 - addChunk := func() { - step := 100 - mint, maxt := timeRange+1, timeRange+step-1 - _ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) { - require.NoError(t, err) - }) - timeRange += step - } - nonEmptyFile := func() { - require.NoError(t, hrw.CutNewFile()) - addChunk() - } - - addChunk() // 1. Created with the first chunk. - nonEmptyFile() // 2. - nonEmptyFile() // 3. - - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - lastFile := 0 - for idx := range hrw.mmappedChunkFiles { - if idx > lastFile { - lastFile = idx - } - } - require.Equal(t, 3, lastFile) - dir := hrw.dir.Name() - require.NoError(t, hrw.Close()) - - // Write an empty last file mimicking an abrupt shutdown on file creation. - emptyFileName := segmentFile(dir, lastFile+1) - f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666) - require.NoError(t, err) - require.NoError(t, f.Sync()) - stat, err := f.Stat() - require.NoError(t, err) - require.Equal(t, int64(0), stat.Size()) - require.NoError(t, f.Close()) - - // Open chunk disk mapper again, corrupt file should be removed. - hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) - require.True(t, hrw.fileMaxtSet) - - // Removed from memory. - require.Equal(t, 3, len(hrw.mmappedChunkFiles)) - for idx := range hrw.mmappedChunkFiles { - require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file") - } - - // Removed even from disk. - files, err := ioutil.ReadDir(dir) - require.NoError(t, err) - require.Equal(t, 3, len(files)) - for _, fi := range files { - seq, err := strconv.ParseUint(fi.Name(), 10, 64) - require.NoError(t, err) - require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file") - } -} - -func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper { - tmpdir, err := ioutil.TempDir("", "data") - require.NoError(t, err) - t.Cleanup(func() { - require.NoError(t, os.RemoveAll(tmpdir)) - }) - - hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize) - require.NoError(t, err) - require.False(t, hrw.fileMaxtSet) - require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil })) - require.True(t, hrw.fileMaxtSet) - return hrw -} - -func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) { - seriesRef = HeadSeriesRef(rand.Int63()) - mint = int64((idx)*1000 + 1) - maxt = int64((idx + 1) * 1000) - chunk = randomChunk(t) - chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) { - require.NoError(t, err) - }) - return -} diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 45d2325bae..d94c387b34 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -1461,6 +1461,10 @@ func TestSizeRetention(t *testing.T) { } require.NoError(t, headApp.Commit()) + require.Eventually(t, func() bool { + return db.Head().chunkDiskMapper.IsQueueEmpty() + }, 2*time.Second, 100*time.Millisecond) + // Test that registered size matches the actual disk size. require.NoError(t, db.reloadBlocks()) // Reload the db to register the new db size. require.Equal(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. diff --git a/tsdb/head.go b/tsdb/head.go index 104b02e942..dcd732a2e0 100644 --- a/tsdb/head.go +++ b/tsdb/head.go @@ -57,19 +57,6 @@ var ( defaultIsolationDisabled = false ) -// chunkDiskMapper is a temporary interface while we transition from -// 0 size queue to queue based chunk disk mapper. -type chunkDiskMapper interface { - CutNewFile() (returnErr error) - IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) - Truncate(mint int64) error - DeleteCorrupted(originalErr error) error - Size() (int64, error) - Close() error - Chunk(ref chunks.ChunkDiskMapperRef) (chunkenc.Chunk, error) - WriteChunk(seriesRef chunks.HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef chunks.ChunkDiskMapperRef) -} - // Head handles reads and writes of time series data within a time window. type Head struct { chunkRange atomic.Int64 @@ -110,7 +97,7 @@ type Head struct { lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching. // chunkDiskMapper is used to write and read Head chunks to/from disk. - chunkDiskMapper chunkDiskMapper + chunkDiskMapper *chunks.ChunkDiskMapper chunkSnapshotMtx sync.Mutex @@ -228,21 +215,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti opts.ChunkPool = chunkenc.NewPool() } - if opts.ChunkWriteQueueSize > 0 { - h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( - r, - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - opts.ChunkWriteQueueSize, - ) - } else { - h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper( - mmappedChunksDir(opts.ChunkDirRoot), - opts.ChunkPool, - opts.ChunkWriteBufferSize, - ) - } + h.chunkDiskMapper, err = chunks.NewChunkDiskMapper( + r, + mmappedChunksDir(opts.ChunkDirRoot), + opts.ChunkPool, + opts.ChunkWriteBufferSize, + opts.ChunkWriteQueueSize, + ) if err != nil { return nil, err } diff --git a/tsdb/head_append.go b/tsdb/head_append.go index 7247afa3f5..011904c4ca 100644 --- a/tsdb/head_append.go +++ b/tsdb/head_append.go @@ -481,7 +481,7 @@ func (a *headAppender) Commit() (err error) { // the appendID for isolation. (The appendID can be zero, which results in no // isolation for this append.) // It is unsafe to call this concurrently with s.iterator(...) without holding the series lock. -func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { +func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) { // Based on Gorilla white papers this offers near-optimal compression ratio // so anything bigger that this has diminishing returns and increases // the time range within which we have to decompress all samples. @@ -579,7 +579,7 @@ func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2)) } -func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) *memChunk { +func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk { s.mmapCurrentHeadChunk(chunkDiskMapper) s.headChunk = &memChunk{ @@ -600,11 +600,12 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) return s.headChunk } -func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper chunkDiskMapper) { +func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) { if s.headChunk == nil { // There is no head chunk, so nothing to m-map here. return } + chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError) s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{ ref: chunkRef, diff --git a/tsdb/head_read.go b/tsdb/head_read.go index 0209a6a15c..fcbd08ca1e 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -329,7 +329,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) { // chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk. // If garbageCollect is true, it means that the returned *memChunk // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage. -func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { +func (s *memSeries) chunk(id chunks.HeadChunkID, cdm *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix @@ -363,7 +363,7 @@ type safeChunk struct { s *memSeries cid chunks.HeadChunkID isoState *isolationState - chunkDiskMapper chunkDiskMapper + chunkDiskMapper *chunks.ChunkDiskMapper } func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { @@ -375,7 +375,7 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator { // iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range. // It is unsafe to call this concurrently with s.append(...) without holding the series lock. -func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm chunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { +func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { c, garbageCollect, err := s.chunk(id, cdm) // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a // series's chunk, which got then garbage collected before it got diff --git a/tsdb/head_test.go b/tsdb/head_test.go index 357e381279..774b764917 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -295,7 +295,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i)) } - head.initTime(0) + head.Init(0) g, ctx := errgroup.WithContext(context.Background()) whileNotCanceled := func(f func() (bool, error)) error { @@ -324,9 +324,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { workerReadyWg.Add(writeConcurrency + readConcurrency) // Start the write workers. - for workerID := 0; workerID < writeConcurrency; workerID++ { + for wid := 0; wid < writeConcurrency; wid++ { // Create copy of workerID to be used by worker routine. - workerID := workerID + workerID := wid g.Go(func() error { // The label sets which this worker will write. @@ -368,9 +368,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { readerTsCh := make(chan uint64) // Start the read workers. - for workerID := 0; workerID < readConcurrency; workerID++ { + for wid := 0; wid < readConcurrency; wid++ { // Create copy of threadID to be used by worker routine. - workerID := workerID + workerID := wid g.Go(func() error { querySeriesRef := (seriesCnt / readConcurrency) * workerID @@ -392,7 +392,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) { } if len(samples) != 1 { - return false, fmt.Errorf("expected 1 sample, got %d", len(samples)) + return false, fmt.Errorf("expected 1 series, got %d", len(samples)) } series := lbls.String() @@ -1655,7 +1655,7 @@ func TestHeadReadWriterRepair(t *testing.T) { _, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) require.True(t, ok, "series append failed") require.False(t, chunkCreated, "chunk was created") - require.NoError(t, h.chunkDiskMapper.CutNewFile()) + h.chunkDiskMapper.CutNewFile() } require.NoError(t, h.Close()) diff --git a/web/ui/package-lock.json b/web/ui/package-lock.json index 7bce79f191..7267da9027 100644 --- a/web/ui/package-lock.json +++ b/web/ui/package-lock.json @@ -1592,6 +1592,19 @@ "@lezer/common": "^0.15.0" } }, + "node_modules/@nexucis/fuzzy": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.3.0.tgz", + "integrity": "sha512-Z1+ADKY0fxdBE28REraWhUCNy+Bp5UmpK3Tc/5wdCDpY+6fXh8l2csMtbPGaqEBsyGLxJz9wUYGCf+CW9unyvQ==" + }, + "node_modules/@nexucis/kvsearch": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@nexucis/kvsearch/-/kvsearch-0.3.0.tgz", + "integrity": "sha512-tHIH6W/mRUZZ0ZQyRbgp2uhat+2O1c1jX1EC6NHv7/8OIeHx1HBZ5ZZb0KSUVWl4jkNzYw6AO39OoTELtrjaQw==", + "dependencies": { + "@nexucis/fuzzy": "^0.3.0" + } + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "dev": true, @@ -5952,6 +5965,17 @@ "react": "17.0.2" } }, + "node_modules/react-infinite-scroll-component": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/react-infinite-scroll-component/-/react-infinite-scroll-component-6.1.0.tgz", + "integrity": "sha512-SQu5nCqy8DxQWpnUVLx7V7b7LcA37aM7tvoWjTLZp1dk6EJibM5/4EJKzOnl07/BsM1Y40sKLuqjCwwH/xV0TQ==", + "dependencies": { + "throttle-debounce": "^2.1.0" + }, + "peerDependencies": { + "react": ">=16.0.0" + } + }, "node_modules/react-is": { "version": "17.0.2", "license": "MIT" @@ -6603,6 +6627,14 @@ "dev": true, "license": "MIT" }, + "node_modules/throttle-debounce": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-2.3.0.tgz", + "integrity": "sha512-H7oLPV0P7+jgvrk+6mwwwBDmxTaxnu9HMXmloNLXwnNO0ZxZ31Orah2n8lU1eMPvsaowP2CX+USCgyovXfdOFQ==", + "engines": { + "node": ">=8" + } + }, "node_modules/to-fast-properties": { "version": "2.0.0", "dev": true, @@ -7240,6 +7272,7 @@ "@fortawesome/free-solid-svg-icons": "^5.7.2", "@fortawesome/react-fontawesome": "^0.1.16", "@nexucis/fuzzy": "^0.3.0", + "@nexucis/kvsearch": "^0.3.0", "bootstrap": "^4.6.1", "codemirror-promql": "0.19.0", "css.escape": "^1.5.1", @@ -7252,6 +7285,7 @@ "react": "^17.0.2", "react-copy-to-clipboard": "^5.0.4", "react-dom": "^17.0.2", + "react-infinite-scroll-component": "^6.1.0", "react-resize-detector": "^6.7.6", "react-router-dom": "^5.2.1", "react-test-renderer": "^17.0.2", @@ -9920,10 +9954,6 @@ "node": ">=8" } }, - "react-app/node_modules/@nexucis/fuzzy": { - "version": "0.3.0", - "license": "MIT" - }, "react-app/node_modules/@npmcli/fs": { "version": "1.0.0", "dev": true, @@ -27660,6 +27690,19 @@ "@lezer/common": "^0.15.0" } }, + "@nexucis/fuzzy": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.3.0.tgz", + "integrity": "sha512-Z1+ADKY0fxdBE28REraWhUCNy+Bp5UmpK3Tc/5wdCDpY+6fXh8l2csMtbPGaqEBsyGLxJz9wUYGCf+CW9unyvQ==" + }, + "@nexucis/kvsearch": { + "version": "0.3.0", + "resolved": "https://registry.npmjs.org/@nexucis/kvsearch/-/kvsearch-0.3.0.tgz", + "integrity": "sha512-tHIH6W/mRUZZ0ZQyRbgp2uhat+2O1c1jX1EC6NHv7/8OIeHx1HBZ5ZZb0KSUVWl4jkNzYw6AO39OoTELtrjaQw==", + "requires": { + "@nexucis/fuzzy": "^0.3.0" + } + }, "@nodelib/fs.scandir": { "version": "2.1.5", "dev": true, @@ -29682,6 +29725,7 @@ "@fortawesome/free-solid-svg-icons": "^5.7.2", "@fortawesome/react-fontawesome": "^0.1.16", "@nexucis/fuzzy": "^0.3.0", + "@nexucis/kvsearch": "^0.3.0", "@testing-library/react-hooks": "^7.0.1", "@types/enzyme": "^3.10.10", "@types/flot": "0.0.32", @@ -29718,6 +29762,7 @@ "react": "^17.0.2", "react-copy-to-clipboard": "^5.0.4", "react-dom": "^17.0.2", + "react-infinite-scroll-component": "^6.1.0", "react-resize-detector": "^6.7.6", "react-router-dom": "^5.2.1", "react-scripts": "4.0.3", @@ -31395,9 +31440,6 @@ } } }, - "@nexucis/fuzzy": { - "version": "0.3.0" - }, "@npmcli/fs": { "version": "1.0.0", "dev": true, @@ -44490,6 +44532,14 @@ "scheduler": "^0.20.2" } }, + "react-infinite-scroll-component": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/react-infinite-scroll-component/-/react-infinite-scroll-component-6.1.0.tgz", + "integrity": "sha512-SQu5nCqy8DxQWpnUVLx7V7b7LcA37aM7tvoWjTLZp1dk6EJibM5/4EJKzOnl07/BsM1Y40sKLuqjCwwH/xV0TQ==", + "requires": { + "throttle-debounce": "^2.1.0" + } + }, "react-is": { "version": "17.0.2" }, @@ -44937,6 +44987,11 @@ "version": "0.2.0", "dev": true }, + "throttle-debounce": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-2.3.0.tgz", + "integrity": "sha512-H7oLPV0P7+jgvrk+6mwwwBDmxTaxnu9HMXmloNLXwnNO0ZxZ31Orah2n8lU1eMPvsaowP2CX+USCgyovXfdOFQ==" + }, "to-fast-properties": { "version": "2.0.0", "dev": true diff --git a/web/ui/react-app/package.json b/web/ui/react-app/package.json index 3369b93325..5eed84df1d 100644 --- a/web/ui/react-app/package.json +++ b/web/ui/react-app/package.json @@ -20,6 +20,7 @@ "@fortawesome/free-solid-svg-icons": "^5.7.2", "@fortawesome/react-fontawesome": "^0.1.16", "@nexucis/fuzzy": "^0.3.0", + "@nexucis/kvsearch": "^0.3.0", "bootstrap": "^4.6.1", "codemirror-promql": "0.19.0", "css.escape": "^1.5.1", @@ -32,6 +33,7 @@ "react": "^17.0.2", "react-copy-to-clipboard": "^5.0.4", "react-dom": "^17.0.2", + "react-infinite-scroll-component": "^6.1.0", "react-resize-detector": "^6.7.6", "react-router-dom": "^5.2.1", "react-test-renderer": "^17.0.2", diff --git a/web/ui/react-app/src/pages/targets/ScrapePoolContent.tsx b/web/ui/react-app/src/pages/targets/ScrapePoolContent.tsx new file mode 100644 index 0000000000..8a55a09b16 --- /dev/null +++ b/web/ui/react-app/src/pages/targets/ScrapePoolContent.tsx @@ -0,0 +1,91 @@ +import React, { FC, useEffect, useState } from 'react'; +import { getColor, Target } from './target'; +import InfiniteScroll from 'react-infinite-scroll-component'; +import { Badge, Table } from 'reactstrap'; +import TargetLabels from './TargetLabels'; +import styles from './ScrapePoolPanel.module.css'; +import { formatRelative } from '../../utils'; +import { now } from 'moment'; +import TargetScrapeDuration from './TargetScrapeDuration'; +import EndpointLink from './EndpointLink'; + +const columns = ['Endpoint', 'State', 'Labels', 'Last Scrape', 'Scrape Duration', 'Error']; +const initialNumberOfTargetsDisplayed = 50; + +interface ScrapePoolContentProps { + targets: Target[]; +} + +export const ScrapePoolContent: FC = ({ targets }) => { + const [items, setItems] = useState(targets.slice(0, 50)); + const [index, setIndex] = useState(initialNumberOfTargetsDisplayed); + const [hasMore, setHasMore] = useState(targets.length > initialNumberOfTargetsDisplayed); + + useEffect(() => { + setItems(targets.slice(0, initialNumberOfTargetsDisplayed)); + setHasMore(targets.length > initialNumberOfTargetsDisplayed); + }, [targets]); + + const fetchMoreData = () => { + if (items.length === targets.length) { + setHasMore(false); + } else { + const newIndex = index + initialNumberOfTargetsDisplayed; + setIndex(newIndex); + setItems(targets.slice(0, newIndex)); + } + }; + + return ( + loading...} + dataLength={items.length} + height={items.length > 25 ? '75vh' : ''} + > + + + + {columns.map((column) => ( + + ))} + + + + {items.map((target, index) => ( + + + + + + + + + ))} + +
{column}
+ + + {target.health.toUpperCase()} + + + {formatRelative(target.lastScrape, now())} + + + {target.lastError ? {target.lastError} : null} +
+
+ ); +}; diff --git a/web/ui/react-app/src/pages/targets/ScrapePoolList.test.tsx b/web/ui/react-app/src/pages/targets/ScrapePoolList.test.tsx index dc400de994..867d1d3bed 100644 --- a/web/ui/react-app/src/pages/targets/ScrapePoolList.test.tsx +++ b/web/ui/react-app/src/pages/targets/ScrapePoolList.test.tsx @@ -3,8 +3,7 @@ import { mount, ReactWrapper } from 'enzyme'; import { act } from 'react-dom/test-utils'; import { Alert } from 'reactstrap'; import { sampleApiResponse } from './__testdata__/testdata'; -import ScrapePoolList from './ScrapePoolList'; -import ScrapePoolPanel from './ScrapePoolPanel'; +import ScrapePoolList, { ScrapePoolPanel } from './ScrapePoolList'; import { Target } from './target'; import { FetchMock } from 'jest-fetch-mock/types'; import { PathPrefixContext } from '../../contexts/PathPrefixContext'; @@ -48,7 +47,7 @@ describe('ScrapePoolList', () => { }); const panels = scrapePoolList.find(ScrapePoolPanel); expect(panels).toHaveLength(3); - const activeTargets: Target[] = sampleApiResponse.data.activeTargets as Target[]; + const activeTargets: Target[] = sampleApiResponse.data.activeTargets as unknown as Target[]; activeTargets.forEach(({ scrapePool }: Target) => { const panel = scrapePoolList.find(ScrapePoolPanel).filterWhere((panel) => panel.prop('scrapePool') === scrapePool); expect(panel).toHaveLength(1); diff --git a/web/ui/react-app/src/pages/targets/ScrapePoolList.tsx b/web/ui/react-app/src/pages/targets/ScrapePoolList.tsx index a5f2183632..f3f42da75a 100644 --- a/web/ui/react-app/src/pages/targets/ScrapePoolList.tsx +++ b/web/ui/react-app/src/pages/targets/ScrapePoolList.tsx @@ -1,26 +1,69 @@ -import React, { FC } from 'react'; -import Filter, { Expanded, FilterData } from './Filter'; -import { useFetch } from '../../hooks/useFetch'; -import { groupTargets, Target } from './target'; -import ScrapePoolPanel from './ScrapePoolPanel'; -import { withStatusIndicator } from '../../components/withStatusIndicator'; +import { KVSearch } from '@nexucis/kvsearch'; import { usePathPrefix } from '../../contexts/PathPrefixContext'; +import { useFetch } from '../../hooks/useFetch'; import { API_PATH } from '../../constants/constants'; +import { groupTargets, ScrapePool, ScrapePools, Target } from './target'; +import { withStatusIndicator } from '../../components/withStatusIndicator'; +import { ChangeEvent, FC, useEffect, useState } from 'react'; +import { Col, Collapse, Input, InputGroup, InputGroupAddon, InputGroupText, Row } from 'reactstrap'; +import { FontAwesomeIcon } from '@fortawesome/react-fontawesome'; +import { faSearch } from '@fortawesome/free-solid-svg-icons'; +import { ScrapePoolContent } from './ScrapePoolContent'; +import Filter, { Expanded, FilterData } from './Filter'; import { useLocalStorage } from '../../hooks/useLocalStorage'; +import styles from './ScrapePoolPanel.module.css'; +import { ToggleMoreLess } from '../../components/ToggleMoreLess'; interface ScrapePoolListProps { activeTargets: Target[]; } -export const ScrapePoolContent: FC = ({ activeTargets }) => { - const targetGroups = groupTargets(activeTargets); +const kvSearch = new KVSearch({ + shouldSort: true, + indexedKeys: ['labels', 'scrapePool', ['labels', /.*/]], +}); + +interface PanelProps { + scrapePool: string; + targetGroup: ScrapePool; + expanded: boolean; + toggleExpanded: () => void; +} + +export const ScrapePoolPanel: FC = (props: PanelProps) => { + const modifier = props.targetGroup.upCount < props.targetGroup.targets.length ? 'danger' : 'normal'; + const id = `pool-${props.scrapePool}`; + const anchorProps = { + href: `#${id}`, + id, + }; + return ( + + ); +}; + +// ScrapePoolListContent is taking care of every possible filter +const ScrapePoolListContent: FC = ({ activeTargets }) => { + const initialPoolList = groupTargets(activeTargets); + const [poolList, setPoolList] = useState(initialPoolList); + const [targetList, setTargetList] = useState(activeTargets); + const initialFilter: FilterData = { showHealthy: true, showUnhealthy: true, }; const [filter, setFilter] = useLocalStorage('targets-page-filter', initialFilter); - const initialExpanded: Expanded = Object.keys(targetGroups).reduce( + const initialExpanded: Expanded = Object.keys(initialPoolList).reduce( (acc: { [scrapePool: string]: boolean }, scrapePool: string) => ({ ...acc, [scrapePool]: true, @@ -28,14 +71,44 @@ export const ScrapePoolContent: FC = ({ activeTargets }) => {} ); const [expanded, setExpanded] = useLocalStorage('targets-page-expansion-state', initialExpanded); - const { showHealthy, showUnhealthy } = filter; + + const handleSearchChange = (e: ChangeEvent) => { + if (e.target.value !== '') { + const result = kvSearch.filter(e.target.value.trim(), activeTargets); + setTargetList( + result.map((value) => { + return value.original as unknown as Target; + }) + ); + } else { + setTargetList(activeTargets); + } + }; + + useEffect(() => { + const list = targetList.filter((t) => showHealthy || t.health.toLowerCase() !== 'up'); + setPoolList(groupTargets(list)); + }, [showHealthy, targetList]); + return ( <> - - {Object.keys(targetGroups) + + + + + + + + {} + + + + + + {Object.keys(poolList) .filter((scrapePool) => { - const targetGroup = targetGroups[scrapePool]; + const targetGroup = poolList[scrapePool]; const isHealthy = targetGroup.upCount === targetGroup.targets.length; return (isHealthy && showHealthy) || (!isHealthy && showUnhealthy); }) @@ -43,7 +116,7 @@ export const ScrapePoolContent: FC = ({ activeTargets }) => setExpanded({ ...expanded, [scrapePool]: !expanded[scrapePool] })} /> @@ -51,11 +124,10 @@ export const ScrapePoolContent: FC = ({ activeTargets }) => ); }; -ScrapePoolContent.displayName = 'ScrapePoolContent'; -const ScrapePoolListWithStatusIndicator = withStatusIndicator(ScrapePoolContent); +const ScrapePoolListWithStatusIndicator = withStatusIndicator(ScrapePoolListContent); -const ScrapePoolList: FC = () => { +export const ScrapePoolList: FC = () => { const pathPrefix = usePathPrefix(); const { response, error, isLoading } = useFetch(`${pathPrefix}/${API_PATH}/targets?state=active`); const { status: responseStatus } = response; diff --git a/web/ui/react-app/src/pages/targets/ScrapePoolPanel.test.tsx b/web/ui/react-app/src/pages/targets/ScrapePoolPanel.test.tsx deleted file mode 100644 index 5facf7b76e..0000000000 --- a/web/ui/react-app/src/pages/targets/ScrapePoolPanel.test.tsx +++ /dev/null @@ -1,137 +0,0 @@ -import React from 'react'; -import { mount, shallow } from 'enzyme'; -import { targetGroups } from './__testdata__/testdata'; -import ScrapePoolPanel, { columns } from './ScrapePoolPanel'; -import { Button, Collapse, Table, Badge } from 'reactstrap'; -import { Target, getColor } from './target'; -import EndpointLink from './EndpointLink'; -import TargetLabels from './TargetLabels'; -import sinon from 'sinon'; - -describe('ScrapePoolPanel', () => { - const defaultProps = { - scrapePool: 'blackbox', - targetGroup: targetGroups.blackbox, - expanded: true, - toggleExpanded: sinon.spy(), - }; - const scrapePoolPanel = shallow(); - - it('renders a container', () => { - const div = scrapePoolPanel.find('div').filterWhere((elem) => elem.hasClass('container')); - expect(div).toHaveLength(1); - }); - - describe('Header', () => { - it('renders an anchor with up count and danger color if upCount < targetsCount', () => { - const anchor = scrapePoolPanel.find('a'); - expect(anchor).toHaveLength(1); - expect(anchor.prop('id')).toEqual('pool-blackbox'); - expect(anchor.prop('href')).toEqual('#pool-blackbox'); - expect(anchor.text()).toEqual('blackbox (2/3 up)'); - expect(anchor.prop('className')).toEqual('danger'); - }); - - it('renders an anchor with up count and normal color if upCount == targetsCount', () => { - const props = { - ...defaultProps, - scrapePool: 'prometheus', - targetGroup: targetGroups.prometheus, - }; - const scrapePoolPanel = shallow(); - const anchor = scrapePoolPanel.find('a'); - expect(anchor).toHaveLength(1); - expect(anchor.prop('id')).toEqual('pool-prometheus'); - expect(anchor.prop('href')).toEqual('#pool-prometheus'); - expect(anchor.text()).toEqual('prometheus (1/1 up)'); - expect(anchor.prop('className')).toEqual('normal'); - }); - - it('renders a show more btn if collapsed', () => { - const props = { - ...defaultProps, - scrapePool: 'prometheus', - targetGroup: targetGroups.prometheus, - toggleExpanded: sinon.spy(), - }; - const div = document.createElement('div'); - div.id = `series-labels-prometheus-0`; - document.body.appendChild(div); - const div2 = document.createElement('div'); - div2.id = `scrape-duration-prometheus-0`; - document.body.appendChild(div2); - const scrapePoolPanel = mount(); - - const btn = scrapePoolPanel.find(Button); - btn.simulate('click'); - expect(props.toggleExpanded.calledOnce).toBe(true); - }); - }); - - it('renders a Collapse component', () => { - const collapse = scrapePoolPanel.find(Collapse); - expect(collapse.prop('isOpen')).toBe(true); - }); - - describe('Table', () => { - it('renders a table', () => { - const table = scrapePoolPanel.find(Table); - const headers = table.find('th'); - expect(table).toHaveLength(1); - expect(headers).toHaveLength(6); - columns.forEach((col) => { - expect(headers.contains(col)); - }); - }); - - describe('for each target', () => { - const table = scrapePoolPanel.find(Table); - defaultProps.targetGroup.targets.forEach( - ({ discoveredLabels, labels, scrapeUrl, lastError, health }: Target, idx: number) => { - const row = table.find('tr').at(idx + 1); - - it('renders an EndpointLink with the scrapeUrl', () => { - const link = row.find(EndpointLink); - expect(link).toHaveLength(1); - expect(link.prop('endpoint')).toEqual(scrapeUrl); - }); - - it('renders a badge for health', () => { - const td = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('state'))); - const badge = td.find(Badge); - expect(badge).toHaveLength(1); - expect(badge.prop('color')).toEqual(getColor(health)); - expect(badge.children().text()).toEqual(health.toUpperCase()); - }); - - it('renders series labels', () => { - const targetLabels = row.find(TargetLabels); - expect(targetLabels).toHaveLength(1); - expect(targetLabels.prop('discoveredLabels')).toEqual(discoveredLabels); - expect(targetLabels.prop('labels')).toEqual(labels); - }); - - it('renders last scrape time', () => { - const lastScrapeCell = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('last-scrape'))); - expect(lastScrapeCell).toHaveLength(1); - }); - - it('renders last scrape duration', () => { - const lastScrapeCell = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('scrape-duration'))); - expect(lastScrapeCell).toHaveLength(1); - }); - - it('renders a badge for Errors', () => { - const td = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('errors'))); - const badge = td.find(Badge); - expect(badge).toHaveLength(lastError ? 1 : 0); - if (lastError) { - expect(badge.prop('color')).toEqual('danger'); - expect(badge.children().text()).toEqual(lastError); - } - }); - } - ); - }); - }); -}); diff --git a/web/ui/react-app/src/pages/targets/ScrapePoolPanel.tsx b/web/ui/react-app/src/pages/targets/ScrapePoolPanel.tsx deleted file mode 100644 index 35ff99eb26..0000000000 --- a/web/ui/react-app/src/pages/targets/ScrapePoolPanel.tsx +++ /dev/null @@ -1,95 +0,0 @@ -import React, { FC } from 'react'; -import { ScrapePool, getColor } from './target'; -import { Collapse, Table, Badge } from 'reactstrap'; -import styles from './ScrapePoolPanel.module.css'; -import { Target } from './target'; -import EndpointLink from './EndpointLink'; -import TargetLabels from './TargetLabels'; -import TargetScrapeDuration from './TargetScrapeDuration'; -import { now } from 'moment'; -import { ToggleMoreLess } from '../../components/ToggleMoreLess'; -import { formatRelative } from '../../utils'; - -interface PanelProps { - scrapePool: string; - targetGroup: ScrapePool; - expanded: boolean; - toggleExpanded: () => void; -} - -export const columns = ['Endpoint', 'State', 'Labels', 'Last Scrape', 'Scrape Duration', 'Error']; - -const ScrapePoolPanel: FC = ({ scrapePool, targetGroup, expanded, toggleExpanded }) => { - const modifier = targetGroup.upCount < targetGroup.targets.length ? 'danger' : 'normal'; - const id = `pool-${scrapePool}`; - const anchorProps = { - href: `#${id}`, - id, - }; - - return ( -
- - - {`${scrapePool} (${targetGroup.upCount}/${targetGroup.targets.length} up)`} - - - - - - - {columns.map((column) => ( - - ))} - - - - {targetGroup.targets.map((target: Target, idx: number) => { - const { - discoveredLabels, - labels, - scrapePool, - scrapeUrl, - globalUrl, - lastError, - lastScrape, - lastScrapeDuration, - health, - scrapeInterval, - scrapeTimeout, - } = target; - const color = getColor(health); - - return ( - - - - - - - - - ); - })} - -
{column}
- - - {health.toUpperCase()} - - - {formatRelative(lastScrape, now())} - - {lastError ? {lastError} : null}
-
-
- ); -}; - -export default ScrapePoolPanel; diff --git a/web/ui/react-app/src/pages/targets/TargetLabels.tsx b/web/ui/react-app/src/pages/targets/TargetLabels.tsx index 2664cda96a..d85c58304e 100644 --- a/web/ui/react-app/src/pages/targets/TargetLabels.tsx +++ b/web/ui/react-app/src/pages/targets/TargetLabels.tsx @@ -33,10 +33,16 @@ const TargetLabels: FC = ({ discoveredLabels, labels, idx, sc ); })} - + Before relabeling: - {formatLabels(discoveredLabels).map((s: string, idx: number) => ( - + {formatLabels(discoveredLabels).map((s: string, labelIndex: number) => ( +
{s}
diff --git a/web/ui/react-app/src/pages/targets/__snapshots__/TargetLabels.test.tsx.snap b/web/ui/react-app/src/pages/targets/__snapshots__/TargetLabels.test.tsx.snap index 76c139feb2..3c5c856f00 100644 --- a/web/ui/react-app/src/pages/targets/__snapshots__/TargetLabels.test.tsx.snap +++ b/web/ui/react-app/src/pages/targets/__snapshots__/TargetLabels.test.tsx.snap @@ -37,7 +37,7 @@ exports[`targetLabels renders discovered labels 1`] = `