Add basic initial developer docs for TSDB (#9451)

* Add basic initial developer docs for TSDB

There's a decent amount of content already out there (blog posts,
conference talks, etc), but:
* when they get stale, they don't tend to get updated
* they still leave me with questions that I'ld like to answer
  for developers (like me) who want to use, or work with, TSDB

What I propose is developer docs inside the prometheus
repository.  Easy to find and harness the power of the community
to expand it and keep it up to date.

* perfect is the enemy of good.  Let's have a base and incrementally improve
* Markdown docs should be broad but not too deep.  Source code comments
  can complement them, and are the ideal place for implementation details.

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* use example code that works out of the box

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* Apply suggestions from code review

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* PR feedback

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* more docs

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* PR feedback

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* Apply suggestions from code review

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Apply suggestions from code review

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* feedback

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* Update tsdb/docs/usage.md

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>

* final tweaks

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* workaround docs versioning issue

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* Move example code to real executable, testable example.

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* cleanup example test and make sure it always reproduces

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* obtain temp dir in a way that works with older Go versions

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* Fix Ganesh's comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
This commit is contained in:
Dieter Plaetinck 2021-11-17 05:21:27 -05:00 committed by GitHub
parent 49d8f02c1f
commit 0fac9bb859
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 261 additions and 42 deletions

View file

@ -111,7 +111,7 @@ For more information on building, running, and developing on the new React-based
## More information ## More information
* The source code is periodically indexed: [Prometheus Core](https://pkg.go.dev/github.com/prometheus/prometheus). * The source code is periodically indexed, but due to an issue with versioning, the "latest" docs shown on Godoc are outdated. Instead, you can use [the docs for v2.31.1](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab).
* You will find a CircleCI configuration in [`.circleci/config.yml`](.circleci/config.yml). * You will find a CircleCI configuration in [`.circleci/config.yml`](.circleci/config.yml).
* See the [Community page](https://prometheus.io/community) for how to reach the Prometheus developers and users on various communication channels. * See the [Community page](https://prometheus.io/community) for how to reach the Prometheus developers and users on various communication channels.

View file

@ -131,7 +131,7 @@ the Prometheus server will be able to see them.
### The SD interface ### The SD interface
A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [target group](https://pkg.go.dev/github.com/prometheus/prometheus/discovery/targetgroup#Group). The SD mechanism sends the targets down to prometheus as list of target groups. A Service Discovery (SD) mechanism has to discover targets and provide them to Prometheus. We expect similar targets to be grouped together, in the form of a [target group](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/discovery/targetgroup#Group). The SD mechanism sends the targets down to prometheus as list of target groups.
An SD mechanism has to implement the `Discoverer` Interface: An SD mechanism has to implement the `Discoverer` Interface:
```go ```go

View file

@ -173,9 +173,9 @@ func (f QueryableFunc) Querier(ctx context.Context, mint, maxt int64) (Querier,
// Operations on the Appender interface are not goroutine-safe. // Operations on the Appender interface are not goroutine-safe.
type Appender interface { type Appender interface {
// Append adds a sample pair for the given series. // Append adds a sample pair for the given series.
// An optional reference number can be provided to accelerate calls. // An optional series reference can be provided to accelerate calls.
// A reference number is returned which can be used to add further // A series reference number is returned which can be used to add further
// samples in the same or later transactions. // samples to the given series in the same or later transactions.
// Returned reference numbers are ephemeral and may be rejected in calls // Returned reference numbers are ephemeral and may be rejected in calls
// to Append() at any point. Adding the sample via Append() returns a new // to Append() at any point. Adding the sample via Append() returns a new
// reference number. // reference number.

View file

@ -1,16 +1,25 @@
# TSDB # TSDB
[![GoPkg](https://pkg.go.dev/badge/github.com/prometheus/prometheus/tsdb.svg)](https://pkg.go.dev/github.com/prometheus/prometheus/tsdb) [![GoPkg](https://pkg.go.dev/badge/github.com/prometheus/prometheus/tsdb.svg)](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb)
This directory contains the Prometheus storage layer that is used in its 2.x releases. This directory contains the Prometheus TSDB (Time Series DataBase) library,
which handles storage and querying of all Prometheus v2 data.
A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/). Due to an issue with versioning, the "latest" docs shown on Godoc are outdated.
Instead you may use [the docs for v2.31.1](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab)
Based on the Gorilla TSDB [white papers](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf). ## Documentation
Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/). * [Data format](docs/format/README.md).
* [Usage](docs/usage.md).
* [Bstream details](docs/bstream.md).
## External resources
* A writeup of the original design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).
* Video: [Storing 16 Bytes at Scale](https://youtu.be/b_pEevMAC3I) from [PromCon 2017](https://promcon.io/2017-munich/).
* Compression is based on the Gorilla TSDB [white paper](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf).
See also the [format documentation](docs/format/README.md) and [bstream details](docs/bstream.md).
A series of blog posts explaining different components of TSDB: A series of blog posts explaining different components of TSDB:
* [The Head Block](https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/) * [The Head Block](https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/)

View file

@ -109,7 +109,7 @@ type Meta struct {
MinTime, MaxTime int64 MinTime, MaxTime int64
} }
// Iterator iterates over the chunk of a time series. // Iterator iterates over the chunks of a single time series.
type Iterator interface { type Iterator interface {
// At returns the current meta. // At returns the current meta.
// It depends on implementation if the chunk is populated or not. // It depends on implementation if the chunk is populated or not.

View file

@ -51,7 +51,7 @@ const (
MintMaxtSize = 8 MintMaxtSize = 8
// SeriesRefSize is the size of series reference on disk. // SeriesRefSize is the size of series reference on disk.
SeriesRefSize = 8 SeriesRefSize = 8
// HeadChunkFileHeaderSize is the total size of the header for the head chunk file. // HeadChunkFileHeaderSize is the total size of the header for a head chunk file.
HeadChunkFileHeaderSize = SegmentHeaderSize HeadChunkFileHeaderSize = SegmentHeaderSize
// MaxHeadChunkFileSize is the max size of a head chunk file. // MaxHeadChunkFileSize is the max size of a head chunk file.
MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB. MaxHeadChunkFileSize = 128 * 1024 * 1024 // 128 MiB.
@ -124,19 +124,20 @@ type ChunkDiskMapper struct {
// from which chunks are served till they are flushed and are ready for m-mapping. // from which chunks are served till they are flushed and are ready for m-mapping.
chunkBuffer *chunkBuffer chunkBuffer *chunkBuffer
// If 'true', it indicated that the maxt of all the on-disk files were set // Whether the maxt field is set for all mmapped chunk files tracked within the mmappedChunkFiles map.
// after iterating through all the chunks in those files. // This is done after iterating through all the chunks in those files using the IterateAllChunks method.
fileMaxtSet bool fileMaxtSet bool
closed bool closed bool
} }
// mmappedChunkFile provides mmapp access to an entire head chunks file that holds many chunks.
type mmappedChunkFile struct { type mmappedChunkFile struct {
byteSlice ByteSlice byteSlice ByteSlice
maxt int64 maxt int64 // Max timestamp among all of this file's chunks.
} }
// NewChunkDiskMapper returns a new writer against the given directory // NewChunkDiskMapper returns a new ChunkDiskMapper against the given directory
// using the default head chunk file duration. // using the default head chunk file duration.
// NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper // NOTE: 'IterateAllChunks' method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file. // to set the maxt of all the file.
@ -172,6 +173,7 @@ func NewChunkDiskMapper(dir string, pool chunkenc.Pool, writeBufferSize int) (*C
return m, m.openMMapFiles() return m, m.openMMapFiles()
} }
// openMMapFiles opens all files within dir for mmapping.
func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) { func (cdm *ChunkDiskMapper) openMMapFiles() (returnErr error) {
cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{} cdm.mmappedChunkFiles = map[int]*mmappedChunkFile{}
cdm.closers = map[int]io.Closer{} cdm.closers = map[int]io.Closer{}
@ -254,7 +256,7 @@ func listChunkFiles(dir string) (map[int]string, error) {
} }
// repairLastChunkFile deletes the last file if it's empty. // repairLastChunkFile deletes the last file if it's empty.
// Because we don't fsync when creating these file, we could end // Because we don't fsync when creating these files, we could end
// up with an empty file at the end during an abrupt shutdown. // up with an empty file at the end during an abrupt shutdown.
func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) { func repairLastChunkFile(files map[int]string) (_ map[int]string, returnErr error) {
lastFile := -1 lastFile := -1
@ -350,7 +352,7 @@ func (cdm *ChunkDiskMapper) WriteChunk(seriesRef HeadSeriesRef, mint, maxt int64
return chkRef, nil return chkRef, nil
} }
// shouldCutNewFile decides the cutting of a new file based on time and size retention. // shouldCutNewFile returns whether a new file should be cut, based on time and size retention.
// Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map. // Size retention: because depending on the system architecture, there is a limit on how big of a file we can m-map.
// Time retention: so that we can delete old chunks with some time guarantee in low load environments. // Time retention: so that we can delete old chunks with some time guarantee in low load environments.
func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool { func (cdm *ChunkDiskMapper) shouldCutNewFile(chunkSize int) bool {
@ -465,7 +467,7 @@ func (cdm *ChunkDiskMapper) flushBuffer() error {
// Chunk returns a chunk from a given reference. // Chunk returns a chunk from a given reference.
func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) { func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error) {
cdm.readPathMtx.RLock() cdm.readPathMtx.RLock()
// We hold this read lock for the entire duration because if the Close() // We hold this read lock for the entire duration because if Close()
// is called, the data in the byte slice will get corrupted as the mmapped // is called, the data in the byte slice will get corrupted as the mmapped
// file will be closed. // file will be closed.
defer cdm.readPathMtx.RUnlock() defer cdm.readPathMtx.RUnlock()
@ -575,8 +577,8 @@ func (cdm *ChunkDiskMapper) Chunk(ref ChunkDiskMapperRef) (chunkenc.Chunk, error
return chk, nil return chk, nil
} }
// IterateAllChunks iterates on all the chunks in its byte slices in the order of the head chunk file sequence // IterateAllChunks iterates all mmappedChunkFiles (in order of head chunk file name/number) and all the chunks within it
// and runs the provided function on each chunk. It returns on the first error encountered. // and runs the provided function with information about each chunk. It returns on the first error encountered.
// NOTE: This method needs to be called at least once after creating ChunkDiskMapper // NOTE: This method needs to be called at least once after creating ChunkDiskMapper
// to set the maxt of all the file. // to set the maxt of all the file.
func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) { func (cdm *ChunkDiskMapper) IterateAllChunks(f func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error) (err error) {
@ -825,7 +827,7 @@ func closeAllFromMap(cs map[int]io.Closer) error {
const inBufferShards = 128 // 128 is a randomly chosen number. const inBufferShards = 128 // 128 is a randomly chosen number.
// chunkBuffer is a thread safe buffer for chunks. // chunkBuffer is a thread safe lookup table for chunks by their ref.
type chunkBuffer struct { type chunkBuffer struct {
inBufferChunks [inBufferShards]map[ChunkDiskMapperRef]chunkenc.Chunk inBufferChunks [inBufferShards]map[ChunkDiskMapperRef]chunkenc.Chunk
inBufferChunksMtxs [inBufferShards]sync.RWMutex inBufferChunksMtxs [inBufferShards]sync.RWMutex

View file

@ -596,6 +596,9 @@ func validateOpts(opts *Options, rngs []int64) (*Options, []int64) {
return opts, rngs return opts, rngs
} }
// open returns a new DB in the given directory.
// It initializes the lockfile, WAL, compactor, and Head (by replaying the WAL), and runs the database.
// It is not safe to open more than one DB in the same directory.
func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) { func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs []int64, stats *DBStats) (_ *DB, returnedErr error) {
if err := os.MkdirAll(dir, 0o777); err != nil { if err := os.MkdirAll(dir, 0o777); err != nil {
return nil, err return nil, err

71
tsdb/docs/usage.md Normal file
View file

@ -0,0 +1,71 @@
# Usage
TSDB can be - and is - used by other applications such as [Cortex](https://cortexmetrics.io/) and [Thanos](https://thanos.io/).
This directory contains documentation for any developers who wish to work on or with TSDB.
For a full example of instantiating a database, adding and querying data, see the [tsdb example in the docs](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb).
`tsdb/db_test.go` also demonstrates various specific usages of the TSDB library.
## Instantiating a database
Callers should use [`tsdb.Open`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#Open) to open a TSDB
(the directory may be new or pre-existing).
This returns a [`*tsdb.DB`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#DB) which is the actual database.
A `DB` has the following main components:
* Compactor: a [leveled compactor](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#LeveledCompactor). Note: it is currently the only compactor implementation. It runs automatically.
* [`Head`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#DB.Head)
* [Blocks (persistent blocks)](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#DB.Blocks)
The `Head` is responsible for a lot. Here are its main components:
* [WAL](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb/wal#WAL) (Write Ahead Log).
* [`stripeSeries`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/head.go#L1292):
this holds all the active series by linking to [`memSeries`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/head.go#L1462)
by an ID (aka "ref") and by labels hash.
* Postings list (reverse index): For any label-value pair, holds all the corresponding series refs. Used for queries.
* Tombstones.
## Adding data
Use [`db.Appender()`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#DB.Appender) to obtain an "appender".
The [golang docs](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/storage#Appender) speak mostly for themselves.
Remember:
* Use `Commit()` to add the samples to the DB and update the WAL.
* Create a new appender each time you commit.
* Appenders are not concurrency safe, but scrapes run concurrently and as such, leverage multiple appenders concurrently.
This reduces contention, although Commit() contend the same critical section (writing to the WAL is serialized), and may
inflate append tail latency if multiple appenders try to commit at the same time.
Append may reject data due to these conditions:
1) `timestamp < minValidTime` where `minValidTime` is the highest of:
* the maxTime of the last block (i.e. the last truncation time of Head) - updated via [`Head.Truncate()`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#Head.Truncate) and [`DB.compactHead()`](https://github.com/prometheus/prometheus/blob/411021ada9ab41095923b8d2df9365b632fd40c3/tsdb/db.go#L968)
* `tsdb.min-block-duration/2` older than the max time in the Head block. Note that while technically `storage.tsdb.min-block-duration` is configurable, it's a hidden option and changing it is discouraged. So We can assume this value to be 2h.
Breaching this condition results in "out of bounds" errors.
The first condition assures the block that will be generated doesn't overlap with the previous one (which simplifies querying)
The second condition assures the sample won't go into the so called "compaction window", that is the section of the data that might be in process of being saved into a persistent block on disk. (because that logic runs concurrently with ingestion without a lock)
2) The labels don't validate. (if the set is empty or contains duplicate label names)
3) If the sample, for the respective series (based on all the labels) is out of order or has a different value for the last (highest) timestamp seen. (results in `storage.ErrOutOfOrderSample` and `storage.ErrDuplicateSampleForTimestamp` respectively)
`Commit()` may also refuse data that is out of order with respect to samples that were added via a different appender.
## Querying data
Use [`db.Querier()`](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/tsdb#DB.Querier) to obtain a "querier".
The [golang docs](https://pkg.go.dev/github.com/prometheus/prometheus@v1.8.2-0.20211105201321-411021ada9ab/storage#Querier) speak mostly for themselves.
Remember:
* A querier can only see data that was committed when it was created. This limits the lifetime of a querier.
* A querier should be closed when you're done with it.
* Use mint/maxt to avoid loading unneeded data.
## Example code
Find the example code for ingesting samples and querying them in [`tsdb/example_test.go`](../example_test.go)

112
tsdb/example_test.go Normal file
View file

@ -0,0 +1,112 @@
// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"fmt"
"math"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/labels"
)
func TestExample(t *testing.T) {
// Create a random dir to work in. Open() doesn't require a pre-existing dir, but
// we want to make sure not to make a mess where we shouldn't.
dir := t.TempDir()
// Open a TSDB for reading and/or writing.
db, err := Open(dir, nil, nil, DefaultOptions(), nil)
require.NoError(t, err)
// Open an appender for writing.
app := db.Appender(context.Background())
lbls := labels.FromStrings("foo", "bar")
var appendedSamples []sample
// Ref is 0 for the first append since we don't know the reference for the series.
ts, v := time.Now().Unix(), 123.0
ref, err := app.Append(0, lbls, ts, v)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
// Another append for a second later.
// Re-using the ref from above since it's the same series, makes append faster.
time.Sleep(time.Second)
ts, v = time.Now().Unix(), 124
_, err = app.Append(ref, lbls, ts, v)
require.NoError(t, err)
appendedSamples = append(appendedSamples, sample{ts, v})
// Commit to storage.
err = app.Commit()
require.NoError(t, err)
// In case you want to do more appends after app.Commit(),
// you need a new appender.
// app = db.Appender(context.Background())
//
// ... adding more samples.
//
// Commit to storage.
// err = app.Commit()
// require.NoError(t, err)
// Open a querier for reading.
querier, err := db.Querier(context.Background(), math.MinInt64, math.MaxInt64)
require.NoError(t, err)
ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
var queriedSamples []sample
for ss.Next() {
series := ss.At()
fmt.Println("series:", series.Labels().String())
it := series.Iterator()
for it.Next() {
ts, v := it.At()
fmt.Println("sample", ts, v)
queriedSamples = append(queriedSamples, sample{ts, v})
}
require.NoError(t, it.Err())
fmt.Println("it.Err():", it.Err())
}
require.NoError(t, ss.Err())
fmt.Println("ss.Err():", ss.Err())
ws := ss.Warnings()
if len(ws) > 0 {
fmt.Println("warnings:", ws)
}
err = querier.Close()
require.NoError(t, err)
// Clean up any last resources when done.
err = db.Close()
require.NoError(t, err)
require.Equal(t, appendedSamples, queriedSamples)
// Output:
// series: {foo="bar"}
// sample <ts1> 123
// sample <ts2> 124
// it.Err(): <nil>
// ss.Err(): <nil>
}

View file

@ -992,12 +992,15 @@ func (h *Head) Stats(statsByLabelName string) *Stats {
} }
} }
// RangeHead allows querying Head via an IndexReader, ChunkReader and tombstones.Reader
// but only within a restricted range. Used for queries and compactions.
type RangeHead struct { type RangeHead struct {
head *Head head *Head
mint, maxt int64 mint, maxt int64
} }
// NewRangeHead returns a *RangeHead. // NewRangeHead returns a *RangeHead.
// There are no restrictions on mint/maxt.
func NewRangeHead(head *Head, mint, maxt int64) *RangeHead { func NewRangeHead(head *Head, mint, maxt int64) *RangeHead {
return &RangeHead{ return &RangeHead{
head: head, head: head,
@ -1284,15 +1287,17 @@ const (
DefaultStripeSize = 1 << 14 DefaultStripeSize = 1 << 14
) )
// stripeSeries locks modulo ranges of IDs and hashes to reduce lock contention. // stripeSeries holds series by HeadSeriesRef ("ID") and also by hash of their labels.
// ID-based lookups via (getByID()) are preferred over getByHash() for performance reasons.
// It locks modulo ranges of IDs and hashes to reduce lock contention.
// The locks are padded to not be on the same cache line. Filling the padded space // The locks are padded to not be on the same cache line. Filling the padded space
// with the maps was profiled to be slower likely due to the additional pointer // with the maps was profiled to be slower likely due to the additional pointer
// dereferences. // dereferences.
type stripeSeries struct { type stripeSeries struct {
size int size int
series []map[chunks.HeadSeriesRef]*memSeries series []map[chunks.HeadSeriesRef]*memSeries // Sharded by ref. A series ref is the value of `size` when the series was being newly added.
hashes []seriesHashmap hashes []seriesHashmap // Sharded by label hash.
locks []stripeLock locks []stripeLock // Sharded by ref for series access, by label hash for hashes access.
seriesLifecycleCallback SeriesLifecycleCallback seriesLifecycleCallback SeriesLifecycleCallback
} }
@ -1466,17 +1471,24 @@ type memSeries struct {
ref chunks.HeadSeriesRef ref chunks.HeadSeriesRef
lset labels.Labels lset labels.Labels
mmappedChunks []*mmappedChunk mmappedChunks []*mmappedChunk // Immutable chunks on disk that have not yet gone into a block, in order of ascending time stamps.
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay. mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
headChunk *memChunk headChunk *memChunk // Most recent chunk in memory that's still being built.
chunkRange int64 chunkRange int64
firstChunkID int firstChunkID int
nextAt int64 // Timestamp at which to cut the next chunk. nextAt int64 // Timestamp at which to cut the next chunk.
sampleBuf [4]sample
// We keep the last 4 samples here (in addition to appending them to the chunk) so we don't need coordination between appender and querier.
// Even the most compact encoding of a sample takes 2 bits, so the last byte is not contended.
sampleBuf [4]sample
pendingCommit bool // Whether there are samples waiting to be committed to this series. pendingCommit bool // Whether there are samples waiting to be committed to this series.
app chunkenc.Appender // Current appender for the chunk. // Current appender for the head chunk. Set when a new head chunk is cut.
// It is nil only if headChunk is nil. E.g. if there was an appender that created a new series, but rolled back the commit
// (the first sample would create a headChunk, hence appender, but rollback skipped it while the Append() call would create a series).
app chunkenc.Appender
memChunkPool *sync.Pool memChunkPool *sync.Pool

View file

@ -114,7 +114,7 @@ func (h *Head) Appender(_ context.Context) storage.Appender {
} }
func (h *Head) appender() *headAppender { func (h *Head) appender() *headAppender {
appendID, cleanupAppendIDsBelow := h.iso.newAppendID() appendID, cleanupAppendIDsBelow := h.iso.newAppendID() // Every appender gets an ID that is cleared upon commit/rollback.
// Allocate the exemplars buffer only if exemplars are enabled. // Allocate the exemplars buffer only if exemplars are enabled.
var exemplarsBuf []exemplarWithSeriesRef var exemplarsBuf []exemplarWithSeriesRef
@ -224,10 +224,10 @@ type headAppender struct {
minValidTime int64 // No samples below this timestamp are allowed. minValidTime int64 // No samples below this timestamp are allowed.
mint, maxt int64 mint, maxt int64
series []record.RefSeries series []record.RefSeries // New series held by this appender.
samples []record.RefSample samples []record.RefSample // New samples held by this appender.
exemplars []exemplarWithSeriesRef exemplars []exemplarWithSeriesRef // New exemplars held by this appender.
sampleSeries []*memSeries sampleSeries []*memSeries // Series corresponding to the samples held by this appender (using corresponding slice indices - same series may appear more than once).
appendID, cleanupAppendIDsBelow uint64 appendID, cleanupAppendIDsBelow uint64
closed bool closed bool
@ -361,6 +361,7 @@ func (a *headAppender) GetRef(lset labels.Labels) (storage.SeriesRef, labels.Lab
return storage.SeriesRef(s.ref), s.lset return storage.SeriesRef(s.ref), s.lset
} }
// log writes all headAppender's data to the WAL.
func (a *headAppender) log() error { func (a *headAppender) log() error {
if a.head.wal == nil { if a.head.wal == nil {
return nil return nil
@ -412,6 +413,7 @@ func exemplarsForEncoding(es []exemplarWithSeriesRef) []record.RefExemplar {
return ret return ret
} }
// Commit writes to the WAL and adds the data to the Head.
func (a *headAppender) Commit() (err error) { func (a *headAppender) Commit() (err error) {
if a.closed { if a.closed {
return ErrAppenderClosed return ErrAppenderClosed
@ -481,7 +483,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
if c == nil { if c == nil {
if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t { if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {
// Out of order sample. Sample timestamp is already in the mmaped chunks, so ignore it. // Out of order sample. Sample timestamp is already in the mmapped chunks, so ignore it.
return false, false return false, false
} }
// There is no chunk in this series yet, create the first chunk for the sample. // There is no chunk in this series yet, create the first chunk for the sample.
@ -583,6 +585,7 @@ func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper
}) })
} }
// Rollback removes the samples and exemplars from headAppender and writes any series to WAL.
func (a *headAppender) Rollback() (err error) { func (a *headAppender) Rollback() (err error) {
if a.closed { if a.closed {
return ErrAppenderClosed return ErrAppenderClosed

View file

@ -175,6 +175,8 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chk
return nil return nil
} }
// chunkID returns the ID corresponding to .mmappedChunks[pos]
// (head chunk if pos==len(mmappedChunks))
func (s *memSeries) chunkID(pos int) int { func (s *memSeries) chunkID(pos int) int {
return pos + s.firstChunkID return pos + s.firstChunkID
} }
@ -288,9 +290,9 @@ func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
}, nil }, nil
} }
// chunk returns the chunk for the chunk id from memory or by m-mapping it from the disk. // chunk returns the chunk for the chunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk // If garbageCollect is true, it means that the returned *memChunk
// (and not the chunkenc.Chunk inside it) can be garbage collected after it's usage. // (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) { func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are // ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index. // incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
@ -335,7 +337,7 @@ func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
return it return it
} }
// iterator returns a chunk iterator. // iterator returns a chunk iterator for the requested chunkID.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock. // It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator { func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper) c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
@ -439,6 +441,8 @@ func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *
} }
} }
// memSafeIterator returns values from the wrapped stopIterator
// except the last 4, which come from buf.
type memSafeIterator struct { type memSafeIterator struct {
stopIterator stopIterator
@ -482,6 +486,8 @@ func (it *memSafeIterator) At() (int64, float64) {
return s.t, s.v return s.t, s.v
} }
// stopIterator wraps an Iterator, but only returns the first
// stopAfter values, if initialized with i=-1.
type stopIterator struct { type stopIterator struct {
chunkenc.Iterator chunkenc.Iterator

View file

@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
// Package record contains the various record types used for encoding various Head block data in the WAL and in-memory snapshot.
package record package record
import ( import (