mirror of
synced 2025-03-05 20:59:13 -08:00
resolving merge conflicts
This commit is contained in:
@ -173,8 +173,8 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
c.enablePromQLNegativeOffset = true
level.Info(logger).Log("msg", "Experimental promql-negative-offset enabled")
case "remote-write-receiver":
c.web.RemoteWriteReceiver = true
level.Info(logger).Log("msg", "Experimental remote-write-receiver enabled")
c.web.EnableRemoteWriteReceiver = true
level.Warn(logger).Log("msg", "Remote write receiver enabled via feature flag remote-write-receiver. This is DEPRECATED. Use --web.enable-remote-write-receiver.")
case "expand-external-labels":
c.enableExpandExternalLabels = true
level.Info(logger).Log("msg", "Experimental expand-external-labels enabled")
@ -263,6 +263,9 @@ func main() {
a.Flag("web.enable-admin-api", "Enable API endpoints for admin control actions.").
a.Flag("web.enable-remote-write-receiver", "Enable API endpoint accepting remote write requests.").
a.Flag("web.console.templates", "Path to the console template directory, available at /consoles.").
@ -381,7 +384,7 @@ func main() {
serverOnlyFlag(a, "query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver, extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: agent, exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver (DEPRECATED), extra-scrape-metrics, new-service-discovery-manager. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -18,6 +18,7 @@ import (
@ -27,6 +28,7 @@ import (
@ -43,6 +45,9 @@ import (
yaml "gopkg.in/yaml.v2"
dto "github.com/prometheus/client_model/go"
@ -95,6 +100,7 @@ func main() {
checkMetricsCmd := checkCmd.Command("metrics", checkMetricsUsage)
checkMetricsExtended := checkCmd.Flag("extended", "Print extended information related to the cardinality of the metrics.").Bool()
agentMode := checkConfigCmd.Flag("agent", "Check config file for Prometheus in Agent mode.").Bool()
queryCmd := app.Command("query", "Run query against a Prometheus server.")
@ -228,7 +234,7 @@ func main() {
case checkMetricsCmd.FullCommand():
case queryInstantCmd.FullCommand():
os.Exit(QueryInstant(*queryInstantServer, *queryInstantExpr, *queryInstantTime, p))
@ -629,8 +635,10 @@ $ curl -s http://localhost:9090/metrics | promtool check metrics
// CheckMetrics performs a linting pass on input metrics.
func CheckMetrics() int {
l := promlint.New(os.Stdin)
func CheckMetrics(extended bool) int {
var buf bytes.Buffer
tee := io.TeeReader(os.Stdin, &buf)
l := promlint.New(tee)
problems, err := l.Lint()
if err != nil {
fmt.Fprintln(os.Stderr, "error while linting:", err)
@ -645,9 +653,70 @@ func CheckMetrics() int {
return lintErrExitCode
if extended {
stats, total, err := checkMetricsExtended(&buf)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return failureExitCode
w := tabwriter.NewWriter(os.Stdout, 4, 4, 4, ' ', tabwriter.TabIndent)
fmt.Fprintf(w, "Metric\tCardinality\tPercentage\t\n")
for _, stat := range stats {
fmt.Fprintf(w, "%s\t%d\t%.2f%%\t\n", stat.name, stat.cardinality, stat.percentage*100)
fmt.Fprintf(w, "Total\t%d\t%.f%%\t\n", total, 100.)
return successExitCode
type metricStat struct {
name string
cardinality int
percentage float64
func checkMetricsExtended(r io.Reader) ([]metricStat, int, error) {
p := expfmt.TextParser{}
metricFamilies, err := p.TextToMetricFamilies(r)
if err != nil {
return nil, 0, fmt.Errorf("error while parsing text to metric families: %w", err)
var total int
stats := make([]metricStat, 0, len(metricFamilies))
for _, mf := range metricFamilies {
var cardinality int
switch mf.GetType() {
case dto.MetricType_COUNTER, dto.MetricType_GAUGE, dto.MetricType_UNTYPED:
cardinality = len(mf.Metric)
case dto.MetricType_HISTOGRAM:
// Histogram metrics includes sum, count, buckets.
buckets := len(mf.Metric[0].Histogram.Bucket)
cardinality = len(mf.Metric) * (2 + buckets)
case dto.MetricType_SUMMARY:
// Summary metrics includes sum, count, quantiles.
quantiles := len(mf.Metric[0].Summary.Quantile)
cardinality = len(mf.Metric) * (2 + quantiles)
cardinality = len(mf.Metric)
stats = append(stats, metricStat{name: mf.GetName(), cardinality: cardinality})
total += cardinality
for i := range stats {
stats[i].percentage = float64(stats[i].cardinality) / float64(total)
sort.SliceStable(stats, func(i, j int) bool {
return stats[i].cardinality > stats[j].cardinality
return stats, total, nil
// QueryInstant performs an instant query against a Prometheus server.
func QueryInstant(url *url.URL, query, evalTime string, p printer) int {
if url.Scheme == "" {
@ -18,6 +18,7 @@ import (
@ -322,3 +323,39 @@ func TestAuthorizationConfig(t *testing.T) {
func TestCheckMetricsExtended(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping on windows")
f, err := os.Open("testdata/metrics-test.prom")
require.NoError(t, err)
defer f.Close()
stats, total, err := checkMetricsExtended(f)
require.NoError(t, err)
require.Equal(t, 27, total)
require.Equal(t, []metricStat{
name: "prometheus_tsdb_compaction_chunk_size_bytes",
cardinality: 15,
percentage: float64(15) / float64(27),
name: "go_gc_duration_seconds",
cardinality: 7,
percentage: float64(7) / float64(27),
name: "net_conntrack_dialer_conn_attempted_total",
cardinality: 4,
percentage: float64(4) / float64(27),
name: "go_info",
cardinality: 1,
percentage: float64(1) / float64(27),
}, stats)
Normal file
Normal file
@ -0,0 +1,35 @@
# HELP go_gc_duration_seconds A summary of the pause duration of garbage collection cycles.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 2.391e-05
go_gc_duration_seconds{quantile="0.25"} 9.4402e-05
go_gc_duration_seconds{quantile="0.5"} 0.000118953
go_gc_duration_seconds{quantile="0.75"} 0.000145884
go_gc_duration_seconds{quantile="1"} 0.005201208
go_gc_duration_seconds_sum 0.036134048
go_gc_duration_seconds_count 232
# HELP prometheus_tsdb_compaction_chunk_size_bytes Final size of chunks on their first compaction
# TYPE prometheus_tsdb_compaction_chunk_size_bytes histogram
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="32"} 662
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="48"} 1460
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="72"} 2266
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="108"} 3958
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="162"} 4861
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="243"} 5721
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="364.5"} 10493
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="546.75"} 12464
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="820.125"} 13254
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="1230.1875"} 13699
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="1845.28125"} 13806
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="2767.921875"} 13852
prometheus_tsdb_compaction_chunk_size_bytes_bucket{le="+Inf"} 13867
prometheus_tsdb_compaction_chunk_size_bytes_sum 3.886707e+06
prometheus_tsdb_compaction_chunk_size_bytes_count 13867
# HELP net_conntrack_dialer_conn_attempted_total Total number of connections attempted by the given dialer a given name.
# TYPE net_conntrack_dialer_conn_attempted_total counter
net_conntrack_dialer_conn_attempted_total{dialer_name="blackbox"} 5210
net_conntrack_dialer_conn_attempted_total{dialer_name="default"} 0
net_conntrack_dialer_conn_attempted_total{dialer_name="node"} 21
net_conntrack_dialer_conn_attempted_total{dialer_name="prometheus"} 21
# HELP go_info Information about the Go environment.
# TYPE go_info gauge
go_info{version="go1.17"} 1
@ -46,6 +46,8 @@ More details can be found [here](querying/basics.md#offset-modifier).
The remote write receiver allows Prometheus to accept remote write requests from other Prometheus servers. More details can be found [here](storage.md#overview).
Activating the remote write receiver via a feature flag is deprecated. Use `--web.enable-remote-write-receiver` instead. This feature flag will be ignored in future versions of Prometheus.
## Exemplars storage
@ -1145,3 +1145,17 @@ $ curl -XPOST http://localhost:9090/api/v1/admin/tsdb/clean_tombstones
*New in v2.1 and supports PUT from v2.9*
## Remote Write Receiver
Prometheus can be configured as a receiver for the Prometheus remote write
protocol. This is not considered an efficient way of ingesting samples. Use it
with caution for specific low-volume use cases. It is not suitable for
replacing the ingestion via scraping and turning Prometheus into a push-based
metrics collection system.
Enable the remote write receiver by setting
`--web.enable-remote-write-receiver`. When enabled, the remote write receiver
endpoint is `/api/v1/write`. Find more details [here](../storage.md#overview).
*New in v2.33*
@ -129,7 +129,7 @@ The read and write protocols both use a snappy-compressed protocol buffer encodi
For details on configuring remote storage integrations in Prometheus, see the [remote write](configuration/configuration.md#remote_write) and [remote read](configuration/configuration.md#remote_read) sections of the Prometheus configuration documentation.
The built-in remote write receiver can be enabled by setting the `--enable-feature=remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`.
The built-in remote write receiver can be enabled by setting the `--web.enable-remote-write-receiver` command line flag. When enabled, the remote write receiver endpoint is `/api/v1/write`.
For details on the request and response messages, see the [remote storage protocol buffer definitions](https://github.com/prometheus/prometheus/blob/main/prompb/remote.proto).
@ -32,60 +32,53 @@ type chunkWriteJob struct {
callback func(error)
var (
queueOperationAdd = "add"
queueOperationGet = "get"
queueOperationComplete = "complete"
queueOperations = []string{queueOperationAdd, queueOperationGet, queueOperationComplete}
// chunkWriteQueue is a queue for writing chunks to disk in a non-blocking fashion.
// Chunks that shall be written get added to the queue, which is consumed asynchronously.
// Adding jobs to the queue is non-blocking as long as the queue isn't full.
// Adding jobs to the job is non-blocking as long as the queue isn't full.
type chunkWriteQueue struct {
size int
jobCh chan chunkWriteJob
jobs chan chunkWriteJob
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
chunkRefMapOversized bool // indicates whether more than <size> chunks were put into the chunkRefMap.
chunkRefMapMtx sync.RWMutex
chunkRefMap map[ChunkDiskMapperRef]chunkenc.Chunk
isRunningMtx sync.RWMutex
isRunning bool
isRunningMtx sync.Mutex // Protects the isRunning property.
isRunning bool // Used to prevent that new jobs get added to the queue when the chan is already closed.
workerWg sync.WaitGroup
writeChunk writeChunkF
operationsMetric *prometheus.CounterVec
// Keeping three separate counters instead of only a single CounterVec to improve the performance of the critical
// addJob() method which otherwise would need to perform a WithLabelValues call on the CounterVec.
adds prometheus.Counter
gets prometheus.Counter
completed prometheus.Counter
// writeChunkF is a function which writes chunks, it is dynamic to allow mocking in tests.
type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool) error
func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue {
counters := prometheus.NewCounterVec(
Name: "prometheus_tsdb_chunk_write_queue_operations_total",
Help: "Number of operations on the chunk_write_queue.",
q := &chunkWriteQueue{
size: size,
jobCh: make(chan chunkWriteJob, size),
jobs: make(chan chunkWriteJob, size),
chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk, size),
writeChunk: writeChunk,
operationsMetric: prometheus.NewCounterVec(
Name: "prometheus_tsdb_chunk_write_queue_operations_total",
Help: "Number of operations on the chunk_write_queue.",
adds: counters.WithLabelValues("add"),
gets: counters.WithLabelValues("get"),
completed: counters.WithLabelValues("complete"),
if reg != nil {
// Initialize series for all the possible labels.
for _, op := range queueOperations {
@ -97,7 +90,7 @@ func (c *chunkWriteQueue) start() {
go func() {
defer c.workerWg.Done()
for job := range c.jobCh {
for job := range c.jobs {
@ -118,36 +111,28 @@ func (c *chunkWriteQueue) processJob(job chunkWriteJob) {
delete(c.chunkRefMap, job.ref)
if len(c.chunkRefMap) == 0 {
// If the map had to be grown beyond its allocated size, then we recreate it to free memory.
if c.chunkRefMapOversized {
c.chunkRefMap = make(map[ChunkDiskMapperRef]chunkenc.Chunk, c.size)
c.chunkRefMapOversized = false
func (c *chunkWriteQueue) addJob(job chunkWriteJob) error {
defer c.isRunningMtx.RUnlock()
func (c *chunkWriteQueue) addJob(job chunkWriteJob) (err error) {
defer func() {
if err == nil {
defer c.isRunningMtx.Unlock()
if !c.isRunning {
return errors.New("queue is not started")
// The map might grow beyond the allocated size here, in which case we'll recreate it as soon as it is drained.
c.chunkRefMap[job.ref] = job.chk
if len(c.chunkRefMap) > c.size {
c.chunkRefMapOversized = true
c.jobCh <- job
c.jobs <- job
return nil
@ -158,7 +143,7 @@ func (c *chunkWriteQueue) get(ref ChunkDiskMapperRef) chunkenc.Chunk {
chk, ok := c.chunkRefMap[ref]
if ok {
return chk
@ -174,7 +159,7 @@ func (c *chunkWriteQueue) stop() {
c.isRunning = false
@ -15,6 +15,7 @@ package chunks
import (
@ -80,16 +81,13 @@ func TestChunkWriteQueue_WritingThroughQueue(t *testing.T) {
chunk := chunkenc.NewXORChunk()
ref := newChunkDiskMapperRef(321, 123)
cutFile := true
var callbackWg sync.WaitGroup
awaitCb := make(chan struct{})
require.NoError(t, q.addJob(chunkWriteJob{seriesRef: seriesRef, mint: mint, maxt: maxt, chk: chunk, ref: ref, cutFile: cutFile, callback: func(err error) {
// Wait until job has been consumed.
// compare whether the write function has received all job attributes correctly
// Compare whether the write function has received all job attributes correctly.
require.Equal(t, seriesRef, gotSeriesRef)
require.Equal(t, mint, gotMint)
require.Equal(t, maxt, gotMaxt)
@ -189,12 +187,11 @@ func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
return testError
var callbackWg sync.WaitGroup
awaitCb := make(chan struct{})
var gotError error
callback := func(err error) {
gotError = err
q := newChunkWriteQueue(nil, 1, chunkWriter)
@ -203,11 +200,73 @@ func TestChunkWriteQueue_HandlerErrorViaCallback(t *testing.T) {
job := chunkWriteJob{callback: callback}
require.NoError(t, q.addJob(job))
require.Equal(t, testError, gotError)
func BenchmarkChunkWriteQueue_addJob(b *testing.B) {
for _, withReads := range []bool{false, true} {
b.Run(fmt.Sprintf("with reads %t", withReads), func(b *testing.B) {
for _, concurrentWrites := range []int{1, 10, 100, 1000} {
b.Run(fmt.Sprintf("%d concurrent writes", concurrentWrites), func(b *testing.B) {
issueReadSignal := make(chan struct{})
q := newChunkWriteQueue(nil, 1000, func(ref HeadSeriesRef, i, i2 int64, chunk chunkenc.Chunk, ref2 ChunkDiskMapperRef, b bool) error {
if withReads {
select {
case issueReadSignal <- struct{}{}:
// Can't write to issueReadSignal, don't block but omit read instead.
return nil
b.Cleanup(func() {
// Stopped already, so no more writes will happen.
start := sync.WaitGroup{}
jobs := make(chan chunkWriteJob, b.N)
for i := 0; i < b.N; i++ {
jobs <- chunkWriteJob{
seriesRef: HeadSeriesRef(i),
ref: ChunkDiskMapperRef(i),
go func() {
for range issueReadSignal {
// We don't care about the ID we're getting, we just want to grab the lock.
_ = q.get(ChunkDiskMapperRef(0))
done := sync.WaitGroup{}
for w := 0; w < concurrentWrites; w++ {
go func() {
for j := range jobs {
_ = q.addJob(j)
func queueIsEmpty(q *chunkWriteQueue) bool {
return queueSize(q) == 0
@ -215,7 +274,7 @@ func queueIsEmpty(q *chunkWriteQueue) bool {
func queueIsFull(q *chunkWriteQueue) bool {
// When the queue is full and blocked on the writer the chunkRefMap has one more job than the cap of the jobCh
// because one job is currently being processed and blocked in the writer.
return queueSize(q) == cap(q.jobCh)+1
return queueSize(q) == cap(q.jobs)+1
func queueSize(q *chunkWriteQueue) int {
@ -113,7 +113,7 @@ func (f *chunkPos) getNextChunkRef(chk chunkenc.Chunk) (chkRef ChunkDiskMapperRe
chkLen := uint64(len(chk.Bytes()))
bytesToWrite := f.bytesToWriteForChunk(chkLen)
if f.shouldCutNewFile(bytesToWrite) {
if f.shouldCutNewFile(chkLen) {
f.cutFile = false
cutFile = true
@ -137,37 +137,37 @@ func (f *chunkPos) cutFileOnNextChunk() {
f.cutFile = true
// setSeq sets the sequence number of the head chunk file.
// initSeq sets the sequence number of the head chunk file.
// Should only be used for initialization, after that the sequence number will be managed by chunkPos.
func (f *chunkPos) setSeq(seq uint64) {
func (f *chunkPos) initSeq(seq uint64) {
f.seq = seq
// shouldCutNewFile returns whether a new file should be cut based on the file size.
// Not thread safe, a lock must be held when calling this.
func (f *chunkPos) shouldCutNewFile(bytesToWrite uint64) bool {
// The read or write lock on chunkPos must be held when calling this.
func (f *chunkPos) shouldCutNewFile(chunkSize uint64) bool {
if f.cutFile {
return true
return f.offset == 0 || // First head chunk file.
f.offset+bytesToWrite > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
f.offset+chunkSize+MaxHeadChunkMetaSize > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
// bytesToWriteForChunk returns the number of bytes that will need to be written for the given chunk size,
// including all meta data before and after the chunk data.
// Head chunk format: https://github.com/prometheus/prometheus/blob/main/tsdb/docs/format/head_chunks.md#chunk
func (f *chunkPos) bytesToWriteForChunk(chkLen uint64) uint64 {
// headers
// Headers.
bytes := uint64(SeriesRefSize) + 2*MintMaxtSize + ChunkEncodingSize
// size of chunk length encoded as uvarint
// Size of chunk length encoded as uvarint.
bytes += uint64(varint.UvarintSize(chkLen))
// chunk length
// Chunk length.
bytes += chkLen
// crc32
// crc32.
bytes += CRCSize
return bytes
@ -321,7 +321,7 @@ func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
return nil
@ -410,7 +410,7 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64
if cutFile {
err := cdm.cutExpectRef(ref)
err := cdm.cutAndExpectRef(ref)
if err != nil {
return err
@ -466,18 +466,17 @@ func (cdm *ChunkDiskMapper) writeChunk(seriesRef HeadSeriesRef, mint, maxt int64
// CutNewFile makes that a new file will be created the next time a chunk is written.
func (cdm *ChunkDiskMapper) CutNewFile() error {
func (cdm *ChunkDiskMapper) CutNewFile() {
defer cdm.evtlPosMtx.Unlock()
return nil
// cutExpectRef creates a new m-mapped file.
// cutAndExpectRef creates a new m-mapped file.
// The write lock should be held before calling this.
// It ensures that the position in the new file matches the given chunk reference, if not then it errors.
func (cdm *ChunkDiskMapper) cutExpectRef(chkRef ChunkDiskMapperRef) (err error) {
func (cdm *ChunkDiskMapper) cutAndExpectRef(chkRef ChunkDiskMapperRef) (err error) {
seq, offset, err := cdm.cut()
if err != nil {
return err
@ -864,7 +863,7 @@ func (cdm *ChunkDiskMapper) Truncate(mint int64) error {
// There is a known race condition here because between the check of curFileSize() and the call to CutNewFile()
// a new file could already be cut, this is acceptable because it will simply result in an empty file which
// won't do any harm.
return errs.Err()
@ -20,7 +20,6 @@ import (
@ -101,7 +100,7 @@ func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
require.NoError(t, hrw.CutNewFile())
addChunks(10) // For chunks in in-memory buffer.
@ -166,22 +165,20 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
timeRange := 0
fileTimeStep := 100
var thirdFileMinT, sixthFileMinT int64
var callbackWg sync.WaitGroup
addChunk := func() int {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
// Write a chunks to set maxt for the segment.
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
step := 100
mint, maxt := timeRange+1, timeRange+step-1
var err error
awaitCb := make(chan struct{})
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(cbErr error) {
err = cbErr
timeRange += fileTimeStep
require.NoError(t, err)
timeRange += step
return mint
@ -189,9 +186,6 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
verifyFiles := func(remainingFiles []int) {
// Wait until all chunk write jobs have been processed.
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
@ -206,7 +200,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
mint := int64(addChunk())
if i == 3 {
thirdFileMinT = mint
@ -219,7 +213,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Truncating files.
require.NoError(t, hrw.Truncate(thirdFileMinT))
// Add a chunk to trigger truncation.
// Add a chunk to trigger cutting of new file.
verifyFiles([]int{3, 4, 5, 6, 7, 8})
@ -237,8 +231,13 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Truncating files after restart.
require.NoError(t, hrw.Truncate(sixthFileMinT))
verifyFiles([]int{6, 7, 8, 9})
// Add a chunk to trigger truncation.
// Truncating a second time without adding a chunk shouldn't create a new file.
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
verifyFiles([]int{6, 7, 8, 9})
// Add a chunk to trigger cutting of new file.
verifyFiles([]int{6, 7, 8, 9, 10})
@ -246,7 +245,7 @@ func TestChunkDiskMapper_Truncate(t *testing.T) {
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
// Add a chunk to trigger truncation.
// Add a chunk to trigger cutting of new file.
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
@ -263,26 +262,24 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
timeRange := 0
var callbackWg sync.WaitGroup
addChunk := func() {
awaitCb := make(chan struct{})
step := 100
mint, maxt := timeRange+1, timeRange+step-1
hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
timeRange += step
emptyFile := func() {
_, _, err := hrw.cut()
require.NoError(t, err)
@ -307,7 +304,6 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
verifyFiles := func(remainingFiles []int) {
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
@ -328,11 +324,21 @@ func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
require.NoError(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6})
// Add chunk, so file 6 is not empty anymore.
verifyFiles([]int{3, 4, 5, 6})
// Truncating till file 3 should also delete file 4, because it is empty.
file3Maxt := hrw.mmappedChunkFiles[3].maxt
require.NoError(t, hrw.Truncate(file3Maxt+1))
verifyFiles([]int{5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
hrw = createChunkDiskMapper(t, dir)
verifyFiles([]int{3, 4, 5, 6})
verifyFiles([]int{5, 6, 7})
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
@ -345,13 +351,12 @@ func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
// Write a chunks to iterate on it later.
var err error
var callbackWg sync.WaitGroup
awaitCb := make(chan struct{})
hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(cbErr error) {
err = cbErr
require.NoError(t, err)
dir := hrw.dir.Name()
@ -377,6 +382,8 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
var err error
@ -390,7 +397,9 @@ func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
timeRange += step
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
@ -475,12 +484,11 @@ func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSer
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
var callbackWg sync.WaitGroup
awaitCb := make(chan struct{})
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(cbErr error) {
require.NoError(t, err)
@ -1,710 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
// OldChunkDiskMapper is for writing the Head block chunks to the disk
// and access chunks via mmapped file.
type OldChunkDiskMapper struct {
curFileNumBytes atomic.Int64 // Bytes written in current open file.
/// Writer.
dir *os.File
writeBufferSize int
curFile *os.File // File being written to.
curFileSequence int // Index of current open file being appended to.
curFileMaxt int64 // Used for the size retention.
byteBuf [MaxHeadChunkMetaSize]byte // Buffer used to write the header of the chunk.
chkWriter *bufio.Writer // Writer for the current open file.
crc32 hash.Hash
writePathMtx sync.Mutex
/// Reader.
// The int key in the map is the file number on the disk.
mmappedChunkFiles map[int]*mmappedChunkFile // Contains the m-mapped files for each chunk file mapped with its index.
closers map[int]io.Closer // Closers for resources behind the byte slices.
readPathMtx sync.RWMutex // Mutex used to protect the above 2 maps.
pool chunkenc.Pool // This is used when fetching a chunk from the disk to allocate a chunk.
// Writer and Reader.
// We flush chunks to disk in batches. Hence, we store them in this buffer
// from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer *chunkBuffer
// Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
// This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool
closed bool
// NewOldChunkDiskMapper returns a new ChunkDiskMapper against the given directory
// using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func NewOldChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*OldChunkDiskMapper, error) {
// Validate write buffer size.
if writeBufferSize < MinWriteBufferSize || writeBufferSize > MaxWriteBufferSize {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be between %d and %d (actual: %d)", MinWriteBufferSize, MaxWriteBufferSize, writeBufferSize)
if writeBufferSize%1024 != 0 {
return nil, errors.Errorf("ChunkDiskMapper write buffer size should be a multiple of 1024 (actual: %d)", writeBufferSize)
if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, err
dirFile, err := fileutil.OpenDir(dir)
if err != nil {
return nil, err
m := &OldChunkDiskMapper{
dir: dirFile,
pool: pool,
writeBufferSize: writeBufferSize,
crc32: newCRC32(),
chunkBuffer: newChunkBuffer(),
if m.pool == nil {
m.pool = chunkenc.NewPool()
return m, m.openMMapFiles()
// openMMapFiles opens all files within dir for mmapping.
func (cdm *OldChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
defer func() {
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, closeAllFromMap(cdm.closers)).Err()
cdm.mmappedChunkFiles = nil
cdm.closers = nil
files, err := listChunkFiles(cdm.dir.Name())
if err != nil {
return err
files, err = repairLastChunkFile(files)
if err != nil {
return err
chkFileIndices := make([]int, 0, len(files))
for seq, fn := range files {
f, err := fileutil.OpenMmapFile(fn)
if err != nil {
return errors.Wrapf(err, "mmap files, file: %s", fn)
cdm.closers[seq] = f
cdm.mmappedChunkFiles[seq] = &mmappedChunkFile{byteSlice: realByteSlice(f.Bytes())}
chkFileIndices = append(chkFileIndices, seq)
// Check for gaps in the files.
if len(chkFileIndices) == 0 {
return nil
lastSeq := chkFileIndices[0]
for _, seq := range chkFileIndices[1:] {
if seq != lastSeq+1 {
return errors.Errorf("found unsequential head chunk files %s (index: %d) and %s (index: %d)", files[lastSeq], lastSeq, files[seq], seq)
lastSeq = seq
for i, b := range cdm.mmappedChunkFiles {
if b.byteSlice.Len() < HeadChunkFileHeaderSize {
return errors.Wrapf(errInvalidSize, "%s: invalid head chunk file header", files[i])
// Verify magic number.
if m := binary.BigEndian.Uint32(b.byteSlice.Range(0, MagicChunksSize)); m != MagicHeadChunks {
return errors.Errorf("%s: invalid magic number %x", files[i], m)
// Verify chunk format version.
if v := int(b.byteSlice.Range(MagicChunksSize, MagicChunksSize+ChunksFormatVersionSize)[0]); v != chunksFormatV1 {
return errors.Errorf("%s: invalid chunk format version %d", files[i], v)
return nil
// WriteChunk writes the chunk to the disk.
// The returned chunk ref is the reference from where the chunk encoding starts for the chunk.
func (cdm *OldChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef ChunkDiskMapperRef) {
chkRef, err := func() (ChunkDiskMapperRef, error) {
defer cdm.writePathMtx.Unlock()
if cdm.closed {
return 0, ErrChunkDiskMapperClosed
if cdm.shouldCutNewFile(len(chk.Bytes())) {
if err := cdm.cut(); err != nil {
return 0, err
// if len(chk.Bytes())+MaxHeadChunkMetaSize >= writeBufferSize, it means that chunk >= the buffer size;
// so no need to flush here, as we have to flush at the end (to not keep partial chunks in buffer).
if len(chk.Bytes())+MaxHeadChunkMetaSize < cdm.writeBufferSize && cdm.chkWriter.Available() < MaxHeadChunkMetaSize+len(chk.Bytes()) {
if err := cdm.flushBuffer(); err != nil {
return 0, err
bytesWritten := 0
chkRef = newChunkDiskMapperRef(uint64(cdm.curFileSequence), uint64(cdm.curFileSize()))
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(cdm.byteBuf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
cdm.byteBuf[bytesWritten] = byte(chk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(cdm.byteBuf[bytesWritten:], uint64(len(chk.Bytes())))
bytesWritten += n
if err := cdm.writeAndAppendToCRC32(cdm.byteBuf[:bytesWritten]); err != nil {
return 0, err
if err := cdm.writeAndAppendToCRC32(chk.Bytes()); err != nil {
return 0, err
if err := cdm.writeCRC32(); err != nil {
return 0, err
if maxt > cdm.curFileMaxt {
cdm.curFileMaxt = maxt
cdm.chunkBuffer.put(chkRef, chk)
if len(chk.Bytes())+MaxHeadChunkMetaSize >= cdm.writeBufferSize {
// The chunk was bigger than the buffer itself.
// Flushing to not keep partial chunks in buffer.
if err := cdm.flushBuffer(); err != nil {
return 0, err
return chkRef, nil
if err != nil && callback != nil {
return chkRef
// shouldCutNewFile returns whether a new file should be cut, based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *OldChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
return cdm.curFileSize() == 0 || // First head chunk file.
cdm.curFileSize()+int64(chunkSize+MaxHeadChunkMetaSize) > MaxHeadChunkFileSize // Exceeds the max head chunk file size.
// CutNewFile creates a new m-mapped file.
func (cdm *OldChunkDiskMapper) CutNewFile() (returnErr error) {
defer cdm.writePathMtx.Unlock()
return cdm.cut()
// cut creates a new m-mapped file. The write lock should be held before calling this.
func (cdm *OldChunkDiskMapper) cut() (returnErr error) {
// Sync current tail to disk and close.
if err := cdm.finalizeCurFile(); err != nil {
return err
n, newFile, seq, err := cutSegmentFile(cdm.dir, MagicHeadChunks, headChunksFormatV1, HeadChunkFilePreallocationSize)
if err != nil {
return err
defer func() {
// The file should not be closed if there is no error,
// its kept open in the ChunkDiskMapper.
if returnErr != nil {
returnErr = tsdb_errors.NewMulti(returnErr, newFile.Close()).Err()
if cdm.curFile != nil {
cdm.mmappedChunkFiles[cdm.curFileSequence].maxt = cdm.curFileMaxt
mmapFile, err := fileutil.OpenMmapFileWithSize(newFile.Name(), MaxHeadChunkFileSize)
if err != nil {
return err
cdm.curFileSequence = seq
cdm.curFile = newFile
if cdm.chkWriter != nil {
} else {
cdm.chkWriter = bufio.NewWriterSize(newFile, cdm.writeBufferSize)
cdm.closers[cdm.curFileSequence] = mmapFile
cdm.mmappedChunkFiles[cdm.curFileSequence] = &mmappedChunkFile{byteSlice: realByteSlice(mmapFile.Bytes())}
cdm.curFileMaxt = 0
return nil
// finalizeCurFile writes all pending data to the current tail file,
// truncates its size, and closes it.
func (cdm *OldChunkDiskMapper) finalizeCurFile() error {
if cdm.curFile == nil {
return nil
if err := cdm.flushBuffer(); err != nil {
return err
if err := cdm.curFile.Sync(); err != nil {
return err
return cdm.curFile.Close()
func (cdm *OldChunkDiskMapper) write(b []byte) error {
n, err := cdm.chkWriter.Write(b)
return err
func (cdm *OldChunkDiskMapper) writeAndAppendToCRC32(b []byte) error {
if err := cdm.write(b); err != nil {
return err
_, err := cdm.crc32.Write(b)
return err
func (cdm *OldChunkDiskMapper) writeCRC32() error {
return cdm.write(cdm.crc32.Sum(cdm.byteBuf[:0]))
// flushBuffer flushes the current in-memory chunks.
// Assumes that writePathMtx is _write_ locked before calling this method.
func (cdm *OldChunkDiskMapper) flushBuffer() error {
if err := cdm.chkWriter.Flush(); err != nil {
return err
return nil
// Chunk returns a chunk from a given reference.
func (cdm *OldChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) {
// We hold this read lock for the entire duration because if Close()
// is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed.
defer cdm.readPathMtx.RUnlock()
if cdm.closed {
return nil, ErrChunkDiskMapperClosed
sgmIndex, chkStart := ref.Unpack()
// We skip the series ref and the mint/maxt beforehand.
chkStart += SeriesRefSize + (2 * MintMaxtSize)
chkCRC32 := newCRC32()
// If it is the current open file, then the chunks can be in the buffer too.
if sgmIndex == cdm.curFileSequence {
chunk := cdm.chunkBuffer.get(ref)
if chunk != nil {
return chunk, nil
mmapFile, ok := cdm.mmappedChunkFiles[sgmIndex]
if !ok {
if sgmIndex > cdm.curFileSequence {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: -1,
Err: errors.Errorf("head chunk file index %d more than current open file", sgmIndex),
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.New("head chunk file index %d does not exist on disk"),
if chkStart+MaxChunkLengthFieldSize > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk size data field - required:%v, available:%v", chkStart+MaxChunkLengthFieldSize, mmapFile.byteSlice.Len()),
// Encoding.
chkEnc := mmapFile.byteSlice.Range(chkStart, chkStart+ChunkEncodingSize)[0]
// Data length.
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
chkDataLenStart := chkStart + ChunkEncodingSize
c := mmapFile.byteSlice.Range(chkDataLenStart, chkDataLenStart+MaxChunkLengthFieldSize)
chkDataLen, n := binary.Uvarint(c)
if n <= 0 {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("reading chunk length failed with %d", n),
// Verify the chunk data end.
chkDataEnd := chkDataLenStart + n + int(chkDataLen)
if chkDataEnd > mmapFile.byteSlice.Len() {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk - required:%v, available:%v", chkDataEnd, mmapFile.byteSlice.Len()),
// Check the CRC.
sum := mmapFile.byteSlice.Range(chkDataEnd, chkDataEnd+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(chkStart-(SeriesRefSize+2*MintMaxtSize), chkDataEnd)); err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
// The chunk data itself.
chkData := mmapFile.byteSlice.Range(chkDataEnd-int(chkDataLen), chkDataEnd)
// Make a copy of the chunk data to prevent a panic occurring because the returned
// chunk data slice references an mmap-ed file which could be closed after the
// function returns but while the chunk is still in use.
chkDataCopy := make([]byte, len(chkData))
copy(chkDataCopy, chkData)
chk, err := cdm.pool.Get(chunkenc.Encoding(chkEnc), chkDataCopy)
if err != nil {
return nil, &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: sgmIndex,
Err: err,
return chk, nil
// IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
// and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file.
func (cdm *OldChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) {
defer cdm.writePathMtx.Unlock()
defer func() {
cdm.fileMaxtSet = true
chkCRC32 := newCRC32()
// Iterate files in ascending order.
segIDs := make([]int, 0, len(cdm.mmappedChunkFiles))
for seg := range cdm.mmappedChunkFiles {
segIDs = append(segIDs, seg)
for _, segID := range segIDs {
mmapFile := cdm.mmappedChunkFiles[segID]
fileEnd := mmapFile.byteSlice.Len()
if segID == cdm.curFileSequence {
fileEnd = int(cdm.curFileSize())
idx := HeadChunkFileHeaderSize
for idx < fileEnd {
if fileEnd-idx < MaxHeadChunkMetaSize {
// Check for all 0s which marks the end of the file.
allZeros := true
for _, b := range mmapFile.byteSlice.Range(idx, fileEnd) {
if b != byte(0) {
allZeros = false
if allZeros {
// End of segment chunk file content.
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file has some unread data, but doesn't include enough bytes to read the chunk header"+
" - required:%v, available:%v, file:%d", idx+MaxHeadChunkMetaSize, fileEnd, segID),
chunkRef := newChunkDiskMapperRef(uint64(segID), uint64(idx))
startIdx := idx
seriesRef := HeadSeriesRef(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+SeriesRefSize)))
idx += SeriesRefSize
mint := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
maxt := int64(binary.BigEndian.Uint64(mmapFile.byteSlice.Range(idx, idx+MintMaxtSize)))
idx += MintMaxtSize
// We preallocate file to help with m-mapping (especially windows systems).
// As series ref always starts from 1, we assume it being 0 to be the end of the actual file data.
// We are not considering possible file corruption that can cause it to be 0.
// Additionally we are checking mint and maxt just to be sure.
if seriesRef == 0 && mint == 0 && maxt == 0 {
idx += ChunkEncodingSize // Skip encoding.
dataLen, n := binary.Uvarint(mmapFile.byteSlice.Range(idx, idx+MaxChunkLengthFieldSize))
idx += n
numSamples := binary.BigEndian.Uint16(mmapFile.byteSlice.Range(idx, idx+2))
idx += int(dataLen) // Skip the data.
// In the beginning we only checked for the chunk meta size.
// Now that we have added the chunk data length, we check for sufficient bytes again.
if idx+CRCSize > fileEnd {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the chunk header - required:%v, available:%v, file:%d", idx+CRCSize, fileEnd, segID),
// Check CRC.
sum := mmapFile.byteSlice.Range(idx, idx+CRCSize)
if _, err := chkCRC32.Write(mmapFile.byteSlice.Range(startIdx, idx)); err != nil {
return err
if act := chkCRC32.Sum(nil); !bytes.Equal(act, sum) {
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("checksum mismatch expected:%x, actual:%x", sum, act),
idx += CRCSize
if maxt > mmapFile.maxt {
mmapFile.maxt = maxt
if err := f(seriesRef, chunkRef, mint, maxt, numSamples); err != nil {
if cerr, ok := err.(*CorruptionErr); ok {
cerr.Dir = cdm.dir.Name()
cerr.FileIndex = segID
return cerr
return err
if idx > fileEnd {
// It should be equal to the slice length.
return &CorruptionErr{
Dir: cdm.dir.Name(),
FileIndex: segID,
Err: errors.Errorf("head chunk file doesn't include enough bytes to read the last chunk data - required:%v, available:%v, file:%d", idx, fileEnd, segID),
return nil
// Truncate deletes the head chunk files which are strictly below the mint.
// mint should be in milliseconds.
func (cdm *OldChunkDiskMapper) Truncate(mint int64) error {
if !cdm.fileMaxtSet {
return errors.New("maxt of the files are not set")
// Sort the file indices, else if files deletion fails in between,
// it can lead to unsequential files as the map is not sorted.
chkFileIndices := make([]int, 0, len(cdm.mmappedChunkFiles))
for seq := range cdm.mmappedChunkFiles {
chkFileIndices = append(chkFileIndices, seq)
var removedFiles []int
for _, seq := range chkFileIndices {
if seq == cdm.curFileSequence || cdm.mmappedChunkFiles[seq].maxt >= mint {
if cdm.mmappedChunkFiles[seq].maxt < mint {
removedFiles = append(removedFiles, seq)
errs := tsdb_errors.NewMulti()
// Cut a new file only if the current file has some chunks.
if cdm.curFileSize() > HeadChunkFileHeaderSize {
return errs.Err()
func (cdm *OldChunkDiskMapper) deleteFiles(removedFiles []int) error {
for _, seq := range removedFiles {
if err := cdm.closers[seq].Close(); err != nil {
return err
delete(cdm.mmappedChunkFiles, seq)
delete(cdm.closers, seq)
// We actually delete the files separately to not block the readPathMtx for long.
for _, seq := range removedFiles {
if err := os.Remove(segmentFile(cdm.dir.Name(), seq)); err != nil {
return err
return nil
// DeleteCorrupted deletes all the head chunk files after the one which had the corruption
// (including the corrupt file).
func (cdm *OldChunkDiskMapper) DeleteCorrupted(originalErr error) error {
err := errors.Cause(originalErr) // So that we can pick up errors even if wrapped.
cerr, ok := err.(*CorruptionErr)
if !ok {
return errors.Wrap(originalErr, "cannot handle error")
// Delete all the head chunk files following the corrupt head chunk file.
segs := []int{}
for seg := range cdm.mmappedChunkFiles {
if seg >= cerr.FileIndex {
segs = append(segs, seg)
return cdm.deleteFiles(segs)
// Size returns the size of the chunk files.
func (cdm *OldChunkDiskMapper) Size() (int64, error) {
return fileutil.DirSize(cdm.dir.Name())
func (cdm *OldChunkDiskMapper) curFileSize() int64 {
return cdm.curFileNumBytes.Load()
// Close closes all the open files in ChunkDiskMapper.
// It is not longer safe to access chunks from this struct after calling Close.
func (cdm *OldChunkDiskMapper) Close() error {
// 'WriteChunk' locks writePathMtx first and then readPathMtx for cutting head chunk file.
// The lock order should not be reversed here else it can cause deadlocks.
defer cdm.writePathMtx.Unlock()
defer cdm.readPathMtx.Unlock()
if cdm.closed {
return nil
cdm.closed = true
errs := tsdb_errors.NewMulti(
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{}
return errs.Err()
@ -1,442 +0,0 @@
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
func TestOldChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
expectedBytes := []byte{}
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
chkCRC32 := newCRC32()
type expectedDataType struct {
seriesRef HeadSeriesRef
chunkRef ChunkDiskMapperRef
mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk
expectedData := []expectedDataType{}
var buf [MaxHeadChunkMetaSize]byte
totalChunks := 0
var firstFileName string
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
addChunks := func(numChunks int) {
for i := 0; i < numChunks; i++ {
seriesRef, chkRef, mint, maxt, chunk := createChunkForOld(t, totalChunks, hrw)
expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
if hrw.curFileSequence != 1 {
// We are checking for bytes written only for the first file.
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef))
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
require.NoError(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
require.NoError(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
addChunks(10) // For chunks in in-memory buffer.
// Checking on-disk bytes for the first file.
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
actualBytes, err := ioutil.ReadFile(firstFileName)
require.NoError(t, err)
// Check header of the segment file.
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
// Remaining chunk data.
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
// Testing reading of chunks.
for _, exp := range expectedData {
actChunk, err := hrw.Chunk(exp.chunkRef)
require.NoError(t, err)
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error {
expData := expectedData[idx]
require.Equal(t, expData.seriesRef, seriesRef)
require.Equal(t, expData.chunkRef, chunkRef)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.numSamples, numSamples)
actChunk, err := hrw.Chunk(expData.chunkRef)
require.NoError(t, err)
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
return nil
require.Equal(t, len(expectedData), idx)
// TestOldChunkDiskMapper_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestOldChunkDiskMapper_Truncate(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
timeRange := 0
fileTimeStep := 100
var thirdFileMinT, sixthFileMinT int64
addChunk := func() int {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
// Write a chunks to set maxt for the segment.
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
timeRange += fileTimeStep
return mint
verifyFiles := func(remainingFiles []int) {
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.Equal(t, true, ok)
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
mint := int64(addChunk())
if i == 3 {
thirdFileMinT = mint
} else if i == 6 {
sixthFileMinT = mint
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
// Truncating files.
require.NoError(t, hrw.Truncate(thirdFileMinT))
verifyFiles([]int{3, 4, 5, 6, 7, 8})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarted.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
// Truncating files after restart.
require.NoError(t, hrw.Truncate(sixthFileMinT))
verifyFiles([]int{6, 7, 8, 9, 10})
// As the last file was empty, this creates no new files.
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
verifyFiles([]int{6, 7, 8, 9, 10})
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
// TestOldChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
// holes into the file sequence, even if there are empty files in between non-empty files.
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestOldChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
timeRange += step
emptyFile := func() {
require.NoError(t, hrw.CutNewFile())
nonEmptyFile := func() {
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
emptyFile() // 4.
nonEmptyFile() // 5.
emptyFile() // 6.
verifyFiles := func(remainingFiles []int) {
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
verifyFiles([]int{1, 2, 3, 4, 5, 6})
// Truncating files till 2. It should not delete anything after 3 (inclusive)
// though files 4 and 6 are empty.
file2Maxt := hrw.mmappedChunkFiles[2].maxt
require.NoError(t, hrw.Truncate(file2Maxt+1))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
// Truncate creates another file as 6 is not empty now.
require.NoError(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
// TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestOldChunkDiskMapper_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
// Write a chunks to iterate on it later.
_ = hrw.WriteChunk(1, 0, 1000, randomChunk(t), func(err error) {
require.NoError(t, err)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err := NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error {
return errors.New("random error")
// Truncation call should not return error after IterateAllChunks fails.
require.NoError(t, hrw.Truncate(2000))
func TestOldChunkDiskMapper_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := testOldChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_ = hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t), func(err error) {
require.NoError(t, err)
timeRange += step
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
lastFile := 0
for idx := range hrw.mmappedChunkFiles {
if idx > lastFile {
lastFile = idx
require.Equal(t, 3, lastFile)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Write an empty last file mimicking an abrupt shutdown on file creation.
emptyFileName := segmentFile(dir, lastFile+1)
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666)
require.NoError(t, err)
require.NoError(t, f.Sync())
stat, err := f.Stat()
require.NoError(t, err)
require.Equal(t, int64(0), stat.Size())
require.NoError(t, f.Close())
// Open chunk disk mapper again, corrupt file should be removed.
hrw, err = NewOldChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
// Removed from memory.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
for idx := range hrw.mmappedChunkFiles {
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
// Removed even from disk.
files, err := ioutil.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 3, len(files))
for _, fi := range files {
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
require.NoError(t, err)
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
func testOldChunkDiskMapper(t *testing.T) *OldChunkDiskMapper {
tmpdir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
hrw, err := NewOldChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
return hrw
func createChunkForOld(t *testing.T, idx int, hrw *OldChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
seriesRef = HeadSeriesRef(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
chunkRef = hrw.WriteChunk(seriesRef, mint, maxt, chunk, func(err error) {
require.NoError(t, err)
@ -57,19 +57,6 @@ var (
defaultIsolationDisabled = false
// chunkDiskMapper is a temporary interface while we transition from
// 0 size queue to queue based chunk disk mapper.
type chunkDiskMapper interface {
CutNewFile() (returnErr error)
IterateAllChunks(f func(seriesRef chunks.HeadSeriesRef, chunkRef chunks.ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error)
Truncate(mint int64) error
DeleteCorrupted(originalErr error) error
Size() (int64, error)
Close() error
Chunk(ref chunks.ChunkDiskMapperRef) (chunkenc.Chunk, error)
WriteChunk(seriesRef chunks.HeadSeriesRef, mint, maxt int64, chk chunkenc.Chunk, callback func(err error)) (chkRef chunks.ChunkDiskMapperRef)
// Head handles reads and writes of time series data within a time window.
type Head struct {
chunkRange atomic.Int64
@ -110,7 +97,7 @@ type Head struct {
lastPostingsStatsCall time.Duration // Last posting stats call (PostingsCardinalityStats()) time for caching.
// chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper chunkDiskMapper
chunkDiskMapper *chunks.ChunkDiskMapper
chunkSnapshotMtx sync.Mutex
@ -228,21 +215,13 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, opts *HeadOpti
opts.ChunkPool = chunkenc.NewPool()
if opts.ChunkWriteQueueSize > 0 {
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
} else {
h.chunkDiskMapper, err = chunks.NewOldChunkDiskMapper(
h.chunkDiskMapper, err = chunks.NewChunkDiskMapper(
if err != nil {
return nil, err
@ -481,7 +481,7 @@ func (a *headAppender) Commit() (err error) {
// the appendID for isolation. (The appendID can be zero, which results in no
// isolation for this append.)
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper chunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) {
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (delta int64, sampleInOrder, chunkCreated bool) {
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
@ -579,7 +579,7 @@ func addJitterToChunkEndTime(seriesHash uint64, chunkMinTime, nextAt, maxNextAt
return min(maxNextAt, nextAt+chunkDurationVariance-(chunkDurationMaxVariance/2))
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper) *memChunk {
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
s.headChunk = &memChunk{
@ -600,11 +600,12 @@ func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper chunkDiskMapper)
return s.headChunk
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper chunkDiskMapper) {
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {
if s.headChunk == nil {
// There is no head chunk, so nothing to m-map here.
chunkRef := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk, handleChunkWriteError)
s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
ref: chunkRef,
@ -329,7 +329,7 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id chunks.HeadChunkID, cdm chunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
func (s *memSeries) chunk(id chunks.HeadChunkID, cdm *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
@ -363,7 +363,7 @@ type safeChunk struct {
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper chunkDiskMapper
chunkDiskMapper *chunks.ChunkDiskMapper
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
@ -375,7 +375,7 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm chunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, cdm *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, cdm)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
@ -295,7 +295,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
labelSets[i] = labels.FromStrings("seriesId", strconv.Itoa(i))
g, ctx := errgroup.WithContext(context.Background())
whileNotCanceled := func(f func() (bool, error)) error {
@ -324,9 +324,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
workerReadyWg.Add(writeConcurrency + readConcurrency)
// Start the write workers.
for workerID := 0; workerID < writeConcurrency; workerID++ {
for wid := 0; wid < writeConcurrency; wid++ {
// Create copy of workerID to be used by worker routine.
workerID := workerID
workerID := wid
g.Go(func() error {
// The label sets which this worker will write.
@ -368,9 +368,9 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
readerTsCh := make(chan uint64)
// Start the read workers.
for workerID := 0; workerID < readConcurrency; workerID++ {
for wid := 0; wid < readConcurrency; wid++ {
// Create copy of threadID to be used by worker routine.
workerID := workerID
workerID := wid
g.Go(func() error {
querySeriesRef := (seriesCnt / readConcurrency) * workerID
@ -392,7 +392,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
if len(samples) != 1 {
return false, fmt.Errorf("expected 1 sample, got %d", len(samples))
return false, fmt.Errorf("expected 1 series, got %d", len(samples))
series := lbls.String()
@ -1655,7 +1655,7 @@ func TestHeadReadWriterRepair(t *testing.T) {
_, ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper)
require.True(t, ok, "series append failed")
require.False(t, chunkCreated, "chunk was created")
require.NoError(t, h.chunkDiskMapper.CutNewFile())
require.NoError(t, h.Close())
@ -1592,6 +1592,19 @@
"@lezer/common": "^0.15.0"
"node_modules/@nexucis/fuzzy": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.3.0.tgz",
"integrity": "sha512-Z1+ADKY0fxdBE28REraWhUCNy+Bp5UmpK3Tc/5wdCDpY+6fXh8l2csMtbPGaqEBsyGLxJz9wUYGCf+CW9unyvQ=="
"node_modules/@nexucis/kvsearch": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/@nexucis/kvsearch/-/kvsearch-0.3.0.tgz",
"integrity": "sha512-tHIH6W/mRUZZ0ZQyRbgp2uhat+2O1c1jX1EC6NHv7/8OIeHx1HBZ5ZZb0KSUVWl4jkNzYw6AO39OoTELtrjaQw==",
"dependencies": {
"@nexucis/fuzzy": "^0.3.0"
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"dev": true,
@ -5952,6 +5965,17 @@
"react": "17.0.2"
"node_modules/react-infinite-scroll-component": {
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/react-infinite-scroll-component/-/react-infinite-scroll-component-6.1.0.tgz",
"integrity": "sha512-SQu5nCqy8DxQWpnUVLx7V7b7LcA37aM7tvoWjTLZp1dk6EJibM5/4EJKzOnl07/BsM1Y40sKLuqjCwwH/xV0TQ==",
"dependencies": {
"throttle-debounce": "^2.1.0"
"peerDependencies": {
"react": ">=16.0.0"
"node_modules/react-is": {
"version": "17.0.2",
"license": "MIT"
@ -6603,6 +6627,14 @@
"dev": true,
"license": "MIT"
"node_modules/throttle-debounce": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-2.3.0.tgz",
"integrity": "sha512-H7oLPV0P7+jgvrk+6mwwwBDmxTaxnu9HMXmloNLXwnNO0ZxZ31Orah2n8lU1eMPvsaowP2CX+USCgyovXfdOFQ==",
"engines": {
"node": ">=8"
"node_modules/to-fast-properties": {
"version": "2.0.0",
"dev": true,
@ -7240,6 +7272,7 @@
"@fortawesome/free-solid-svg-icons": "^5.7.2",
"@fortawesome/react-fontawesome": "^0.1.16",
"@nexucis/fuzzy": "^0.3.0",
"@nexucis/kvsearch": "^0.3.0",
"bootstrap": "^4.6.1",
"codemirror-promql": "0.19.0",
"css.escape": "^1.5.1",
@ -7252,6 +7285,7 @@
"react": "^17.0.2",
"react-copy-to-clipboard": "^5.0.4",
"react-dom": "^17.0.2",
"react-infinite-scroll-component": "^6.1.0",
"react-resize-detector": "^6.7.6",
"react-router-dom": "^5.2.1",
"react-test-renderer": "^17.0.2",
@ -9920,10 +9954,6 @@
"node": ">=8"
"react-app/node_modules/@nexucis/fuzzy": {
"version": "0.3.0",
"license": "MIT"
"react-app/node_modules/@npmcli/fs": {
"version": "1.0.0",
"dev": true,
@ -27660,6 +27690,19 @@
"@lezer/common": "^0.15.0"
"@nexucis/fuzzy": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/@nexucis/fuzzy/-/fuzzy-0.3.0.tgz",
"integrity": "sha512-Z1+ADKY0fxdBE28REraWhUCNy+Bp5UmpK3Tc/5wdCDpY+6fXh8l2csMtbPGaqEBsyGLxJz9wUYGCf+CW9unyvQ=="
"@nexucis/kvsearch": {
"version": "0.3.0",
"resolved": "https://registry.npmjs.org/@nexucis/kvsearch/-/kvsearch-0.3.0.tgz",
"integrity": "sha512-tHIH6W/mRUZZ0ZQyRbgp2uhat+2O1c1jX1EC6NHv7/8OIeHx1HBZ5ZZb0KSUVWl4jkNzYw6AO39OoTELtrjaQw==",
"requires": {
"@nexucis/fuzzy": "^0.3.0"
"@nodelib/fs.scandir": {
"version": "2.1.5",
"dev": true,
@ -29682,6 +29725,7 @@
"@fortawesome/free-solid-svg-icons": "^5.7.2",
"@fortawesome/react-fontawesome": "^0.1.16",
"@nexucis/fuzzy": "^0.3.0",
"@nexucis/kvsearch": "^0.3.0",
"@testing-library/react-hooks": "^7.0.1",
"@types/enzyme": "^3.10.10",
"@types/flot": "0.0.32",
@ -29718,6 +29762,7 @@
"react": "^17.0.2",
"react-copy-to-clipboard": "^5.0.4",
"react-dom": "^17.0.2",
"react-infinite-scroll-component": "^6.1.0",
"react-resize-detector": "^6.7.6",
"react-router-dom": "^5.2.1",
"react-scripts": "4.0.3",
@ -31395,9 +31440,6 @@
"@nexucis/fuzzy": {
"version": "0.3.0"
"@npmcli/fs": {
"version": "1.0.0",
"dev": true,
@ -44490,6 +44532,14 @@
"scheduler": "^0.20.2"
"react-infinite-scroll-component": {
"version": "6.1.0",
"resolved": "https://registry.npmjs.org/react-infinite-scroll-component/-/react-infinite-scroll-component-6.1.0.tgz",
"integrity": "sha512-SQu5nCqy8DxQWpnUVLx7V7b7LcA37aM7tvoWjTLZp1dk6EJibM5/4EJKzOnl07/BsM1Y40sKLuqjCwwH/xV0TQ==",
"requires": {
"throttle-debounce": "^2.1.0"
"react-is": {
"version": "17.0.2"
@ -44937,6 +44987,11 @@
"version": "0.2.0",
"dev": true
"throttle-debounce": {
"version": "2.3.0",
"resolved": "https://registry.npmjs.org/throttle-debounce/-/throttle-debounce-2.3.0.tgz",
"integrity": "sha512-H7oLPV0P7+jgvrk+6mwwwBDmxTaxnu9HMXmloNLXwnNO0ZxZ31Orah2n8lU1eMPvsaowP2CX+USCgyovXfdOFQ=="
"to-fast-properties": {
"version": "2.0.0",
"dev": true
@ -20,6 +20,7 @@
"@fortawesome/free-solid-svg-icons": "^5.7.2",
"@fortawesome/react-fontawesome": "^0.1.16",
"@nexucis/fuzzy": "^0.3.0",
"@nexucis/kvsearch": "^0.3.0",
"bootstrap": "^4.6.1",
"codemirror-promql": "0.19.0",
"css.escape": "^1.5.1",
@ -32,6 +33,7 @@
"react": "^17.0.2",
"react-copy-to-clipboard": "^5.0.4",
"react-dom": "^17.0.2",
"react-infinite-scroll-component": "^6.1.0",
"react-resize-detector": "^6.7.6",
"react-router-dom": "^5.2.1",
"react-test-renderer": "^17.0.2",
Normal file
Normal file
@ -0,0 +1,91 @@
import React, { FC, useEffect, useState } from 'react';
import { getColor, Target } from './target';
import InfiniteScroll from 'react-infinite-scroll-component';
import { Badge, Table } from 'reactstrap';
import TargetLabels from './TargetLabels';
import styles from './ScrapePoolPanel.module.css';
import { formatRelative } from '../../utils';
import { now } from 'moment';
import TargetScrapeDuration from './TargetScrapeDuration';
import EndpointLink from './EndpointLink';
const columns = ['Endpoint', 'State', 'Labels', 'Last Scrape', 'Scrape Duration', 'Error'];
const initialNumberOfTargetsDisplayed = 50;
interface ScrapePoolContentProps {
targets: Target[];
export const ScrapePoolContent: FC<ScrapePoolContentProps> = ({ targets }) => {
const [items, setItems] = useState<Target[]>(targets.slice(0, 50));
const [index, setIndex] = useState<number>(initialNumberOfTargetsDisplayed);
const [hasMore, setHasMore] = useState<boolean>(targets.length > initialNumberOfTargetsDisplayed);
useEffect(() => {
setItems(targets.slice(0, initialNumberOfTargetsDisplayed));
setHasMore(targets.length > initialNumberOfTargetsDisplayed);
}, [targets]);
const fetchMoreData = () => {
if (items.length === targets.length) {
} else {
const newIndex = index + initialNumberOfTargetsDisplayed;
setItems(targets.slice(0, newIndex));
return (
height={items.length > 25 ? '75vh' : ''}
<Table className={styles.table} size="sm" bordered hover striped>
<tr key="header">
{columns.map((column) => (
<th key={column}>{column}</th>
{items.map((target, index) => (
<tr key={index}>
<td className={styles.endpoint}>
<EndpointLink endpoint={target.scrapeUrl} globalUrl={target.globalUrl} />
<td className={styles.state}>
<Badge color={getColor(target.health)}>{target.health.toUpperCase()}</Badge>
<td className={styles.labels}>
<td className={styles['last-scrape']}>{formatRelative(target.lastScrape, now())}</td>
<td className={styles['scrape-duration']}>
<td className={styles.errors}>
{target.lastError ? <span className="text-danger">{target.lastError}</span> : null}
@ -3,8 +3,7 @@ import { mount, ReactWrapper } from 'enzyme';
import { act } from 'react-dom/test-utils';
import { Alert } from 'reactstrap';
import { sampleApiResponse } from './__testdata__/testdata';
import ScrapePoolList from './ScrapePoolList';
import ScrapePoolPanel from './ScrapePoolPanel';
import ScrapePoolList, { ScrapePoolPanel } from './ScrapePoolList';
import { Target } from './target';
import { FetchMock } from 'jest-fetch-mock/types';
import { PathPrefixContext } from '../../contexts/PathPrefixContext';
@ -48,7 +47,7 @@ describe('ScrapePoolList', () => {
const panels = scrapePoolList.find(ScrapePoolPanel);
const activeTargets: Target[] = sampleApiResponse.data.activeTargets as Target[];
const activeTargets: Target[] = sampleApiResponse.data.activeTargets as unknown as Target[];
activeTargets.forEach(({ scrapePool }: Target) => {
const panel = scrapePoolList.find(ScrapePoolPanel).filterWhere((panel) => panel.prop('scrapePool') === scrapePool);
@ -1,26 +1,69 @@
import React, { FC } from 'react';
import Filter, { Expanded, FilterData } from './Filter';
import { useFetch } from '../../hooks/useFetch';
import { groupTargets, Target } from './target';
import ScrapePoolPanel from './ScrapePoolPanel';
import { withStatusIndicator } from '../../components/withStatusIndicator';
import { KVSearch } from '@nexucis/kvsearch';
import { usePathPrefix } from '../../contexts/PathPrefixContext';
import { useFetch } from '../../hooks/useFetch';
import { API_PATH } from '../../constants/constants';
import { groupTargets, ScrapePool, ScrapePools, Target } from './target';
import { withStatusIndicator } from '../../components/withStatusIndicator';
import { ChangeEvent, FC, useEffect, useState } from 'react';
import { Col, Collapse, Input, InputGroup, InputGroupAddon, InputGroupText, Row } from 'reactstrap';
import { FontAwesomeIcon } from '@fortawesome/react-fontawesome';
import { faSearch } from '@fortawesome/free-solid-svg-icons';
import { ScrapePoolContent } from './ScrapePoolContent';
import Filter, { Expanded, FilterData } from './Filter';
import { useLocalStorage } from '../../hooks/useLocalStorage';
import styles from './ScrapePoolPanel.module.css';
import { ToggleMoreLess } from '../../components/ToggleMoreLess';
interface ScrapePoolListProps {
activeTargets: Target[];
export const ScrapePoolContent: FC<ScrapePoolListProps> = ({ activeTargets }) => {
const targetGroups = groupTargets(activeTargets);
const kvSearch = new KVSearch({
shouldSort: true,
indexedKeys: ['labels', 'scrapePool', ['labels', /.*/]],
interface PanelProps {
scrapePool: string;
targetGroup: ScrapePool;
expanded: boolean;
toggleExpanded: () => void;
export const ScrapePoolPanel: FC<PanelProps> = (props: PanelProps) => {
const modifier = props.targetGroup.upCount < props.targetGroup.targets.length ? 'danger' : 'normal';
const id = `pool-${props.scrapePool}`;
const anchorProps = {
href: `#${id}`,
return (
<ToggleMoreLess event={props.toggleExpanded} showMore={props.expanded}>
<a className={styles[modifier]} {...anchorProps}>
{`${props.scrapePool} (${props.targetGroup.upCount}/${props.targetGroup.targets.length} up)`}
<Collapse isOpen={props.expanded}>
<ScrapePoolContent targets={props.targetGroup.targets} />
// ScrapePoolListContent is taking care of every possible filter
const ScrapePoolListContent: FC<ScrapePoolListProps> = ({ activeTargets }) => {
const initialPoolList = groupTargets(activeTargets);
const [poolList, setPoolList] = useState<ScrapePools>(initialPoolList);
const [targetList, setTargetList] = useState(activeTargets);
const initialFilter: FilterData = {
showHealthy: true,
showUnhealthy: true,
const [filter, setFilter] = useLocalStorage('targets-page-filter', initialFilter);
const initialExpanded: Expanded = Object.keys(targetGroups).reduce(
const initialExpanded: Expanded = Object.keys(initialPoolList).reduce(
(acc: { [scrapePool: string]: boolean }, scrapePool: string) => ({
[scrapePool]: true,
@ -28,14 +71,44 @@ export const ScrapePoolContent: FC<ScrapePoolListProps> = ({ activeTargets }) =>
const [expanded, setExpanded] = useLocalStorage('targets-page-expansion-state', initialExpanded);
const { showHealthy, showUnhealthy } = filter;
const handleSearchChange = (e: ChangeEvent<HTMLTextAreaElement | HTMLInputElement>) => {
if (e.target.value !== '') {
const result = kvSearch.filter(e.target.value.trim(), activeTargets);
result.map((value) => {
return value.original as unknown as Target;
} else {
useEffect(() => {
const list = targetList.filter((t) => showHealthy || t.health.toLowerCase() !== 'up');
}, [showHealthy, targetList]);
return (
<Filter filter={filter} setFilter={setFilter} expanded={expanded} setExpanded={setExpanded} />
<Row xs="4" className="align-items-center">
<Filter filter={filter} setFilter={setFilter} expanded={expanded} setExpanded={setExpanded} />
<Col xs="6">
<InputGroupAddon addonType="prepend">
<InputGroupText>{<FontAwesomeIcon icon={faSearch} />}</InputGroupText>
<Input autoFocus onChange={handleSearchChange} placeholder="Filter by endpoint or labels" />
.filter((scrapePool) => {
const targetGroup = targetGroups[scrapePool];
const targetGroup = poolList[scrapePool];
const isHealthy = targetGroup.upCount === targetGroup.targets.length;
return (isHealthy && showHealthy) || (!isHealthy && showUnhealthy);
@ -43,7 +116,7 @@ export const ScrapePoolContent: FC<ScrapePoolListProps> = ({ activeTargets }) =>
toggleExpanded={(): void => setExpanded({ ...expanded, [scrapePool]: !expanded[scrapePool] })}
@ -51,11 +124,10 @@ export const ScrapePoolContent: FC<ScrapePoolListProps> = ({ activeTargets }) =>
ScrapePoolContent.displayName = 'ScrapePoolContent';
const ScrapePoolListWithStatusIndicator = withStatusIndicator(ScrapePoolContent);
const ScrapePoolListWithStatusIndicator = withStatusIndicator(ScrapePoolListContent);
const ScrapePoolList: FC = () => {
export const ScrapePoolList: FC = () => {
const pathPrefix = usePathPrefix();
const { response, error, isLoading } = useFetch<ScrapePoolListProps>(`${pathPrefix}/${API_PATH}/targets?state=active`);
const { status: responseStatus } = response;
@ -1,137 +0,0 @@
import React from 'react';
import { mount, shallow } from 'enzyme';
import { targetGroups } from './__testdata__/testdata';
import ScrapePoolPanel, { columns } from './ScrapePoolPanel';
import { Button, Collapse, Table, Badge } from 'reactstrap';
import { Target, getColor } from './target';
import EndpointLink from './EndpointLink';
import TargetLabels from './TargetLabels';
import sinon from 'sinon';
describe('ScrapePoolPanel', () => {
const defaultProps = {
scrapePool: 'blackbox',
targetGroup: targetGroups.blackbox,
expanded: true,
toggleExpanded: sinon.spy(),
const scrapePoolPanel = shallow(<ScrapePoolPanel {...defaultProps} />);
it('renders a container', () => {
const div = scrapePoolPanel.find('div').filterWhere((elem) => elem.hasClass('container'));
describe('Header', () => {
it('renders an anchor with up count and danger color if upCount < targetsCount', () => {
const anchor = scrapePoolPanel.find('a');
expect(anchor.text()).toEqual('blackbox (2/3 up)');
it('renders an anchor with up count and normal color if upCount == targetsCount', () => {
const props = {
scrapePool: 'prometheus',
targetGroup: targetGroups.prometheus,
const scrapePoolPanel = shallow(<ScrapePoolPanel {...props} />);
const anchor = scrapePoolPanel.find('a');
expect(anchor.text()).toEqual('prometheus (1/1 up)');
it('renders a show more btn if collapsed', () => {
const props = {
scrapePool: 'prometheus',
targetGroup: targetGroups.prometheus,
toggleExpanded: sinon.spy(),
const div = document.createElement('div');
div.id = `series-labels-prometheus-0`;
const div2 = document.createElement('div');
div2.id = `scrape-duration-prometheus-0`;
const scrapePoolPanel = mount(<ScrapePoolPanel {...props} />);
const btn = scrapePoolPanel.find(Button);
it('renders a Collapse component', () => {
const collapse = scrapePoolPanel.find(Collapse);
describe('Table', () => {
it('renders a table', () => {
const table = scrapePoolPanel.find(Table);
const headers = table.find('th');
columns.forEach((col) => {
describe('for each target', () => {
const table = scrapePoolPanel.find(Table);
({ discoveredLabels, labels, scrapeUrl, lastError, health }: Target, idx: number) => {
const row = table.find('tr').at(idx + 1);
it('renders an EndpointLink with the scrapeUrl', () => {
const link = row.find(EndpointLink);
it('renders a badge for health', () => {
const td = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('state')));
const badge = td.find(Badge);
it('renders series labels', () => {
const targetLabels = row.find(TargetLabels);
it('renders last scrape time', () => {
const lastScrapeCell = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('last-scrape')));
it('renders last scrape duration', () => {
const lastScrapeCell = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('scrape-duration')));
it('renders a badge for Errors', () => {
const td = row.find('td').filterWhere((elem) => Boolean(elem.hasClass('errors')));
const badge = td.find(Badge);
expect(badge).toHaveLength(lastError ? 1 : 0);
if (lastError) {
@ -1,95 +0,0 @@
import React, { FC } from 'react';
import { ScrapePool, getColor } from './target';
import { Collapse, Table, Badge } from 'reactstrap';
import styles from './ScrapePoolPanel.module.css';
import { Target } from './target';
import EndpointLink from './EndpointLink';
import TargetLabels from './TargetLabels';
import TargetScrapeDuration from './TargetScrapeDuration';
import { now } from 'moment';
import { ToggleMoreLess } from '../../components/ToggleMoreLess';
import { formatRelative } from '../../utils';
interface PanelProps {
scrapePool: string;
targetGroup: ScrapePool;
expanded: boolean;
toggleExpanded: () => void;
export const columns = ['Endpoint', 'State', 'Labels', 'Last Scrape', 'Scrape Duration', 'Error'];
const ScrapePoolPanel: FC<PanelProps> = ({ scrapePool, targetGroup, expanded, toggleExpanded }) => {
const modifier = targetGroup.upCount < targetGroup.targets.length ? 'danger' : 'normal';
const id = `pool-${scrapePool}`;
const anchorProps = {
href: `#${id}`,
return (
<div className={styles.container}>
<ToggleMoreLess event={toggleExpanded} showMore={expanded}>
<a className={styles[modifier]} {...anchorProps}>
{`${scrapePool} (${targetGroup.upCount}/${targetGroup.targets.length} up)`}
<Collapse isOpen={expanded}>
<Table className={styles.table} size="sm" bordered hover striped>
<tr key="header">
{columns.map((column) => (
<th key={column}>{column}</th>
{targetGroup.targets.map((target: Target, idx: number) => {
const {
} = target;
const color = getColor(health);
return (
<tr key={scrapeUrl}>
<td className={styles.endpoint}>
<EndpointLink endpoint={scrapeUrl} globalUrl={globalUrl} />
<td className={styles.state}>
<Badge color={color}>{health.toUpperCase()}</Badge>
<td className={styles.labels}>
<TargetLabels discoveredLabels={discoveredLabels} labels={labels} scrapePool={scrapePool} idx={idx} />
<td className={styles['last-scrape']}>{formatRelative(lastScrape, now())}</td>
<td className={styles['scrape-duration']}>
<td className={styles.errors}>{lastError ? <span className="text-danger">{lastError}</span> : null}</td>
export default ScrapePoolPanel;
@ -33,10 +33,16 @@ const TargetLabels: FC<TargetLabelsProps> = ({ discoveredLabels, labels, idx, sc
<Tooltip isOpen={tooltipOpen} target={CSS.escape(id)} toggle={toggle} style={{ maxWidth: 'none', textAlign: 'left' }}>
style={{ maxWidth: 'none', textAlign: 'left' }}
<b>Before relabeling:</b>
{formatLabels(discoveredLabels).map((s: string, idx: number) => (
<Fragment key={idx}>
{formatLabels(discoveredLabels).map((s: string, labelIndex: number) => (
<Fragment key={labelIndex}>
<br />
<span className={styles.discovered}>{s}</span>
@ -37,7 +37,7 @@ exports[`targetLabels renders discovered labels 1`] = `
Object {
@ -2,7 +2,7 @@ export interface Labels {
[key: string]: string;
export interface Target {
export type Target = {
discoveredLabels: Labels;
labels: Labels;
scrapePool: string;
@ -14,7 +14,7 @@ export interface Target {
health: string;
scrapeInterval: string;
scrapeTimeout: string;
export interface DroppedTarget {
discoveredLabels: Labels;
@ -257,7 +257,7 @@ type Options struct {
RemoteReadSampleLimit int
RemoteReadConcurrencyLimit int
RemoteReadBytesInFrame int
RemoteWriteReceiver bool
EnableRemoteWriteReceiver bool
IsAgent bool
Gatherer prometheus.Gatherer
@ -314,7 +314,7 @@ func New(logger log.Logger, o *Options) *Handler {
FactoryRr := func(_ context.Context) api_v1.RulesRetriever { return h.ruleManager }
var app storage.Appendable
if o.RemoteWriteReceiver {
if o.EnableRemoteWriteReceiver {
app = h.storage
Reference in a new issue