Sync sparsehistogram branch with main (#9189)

* Fix `kuma_sd` targetgroup reporting (#9157)

* Bundle all xDS targets into a single group

Signed-off-by: austin ce <austin.cawley@gmail.com>

* Snapshot in-memory chunks on shutdown for faster restarts (#7229)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Rename links

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Remove Individual Data Type Caps in Per-shard Buffering for Remote Write (#8921)

* Moved everything to nPending buffer

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Simplify exemplar capacity addition

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Added pre-allocation

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Don't allocate if not sending exemplars

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Avoid deadlock when processing duplicate series record (#9170)

* Avoid deadlock when processing duplicate series record

`processWALSamples()` needs to be able to send on its output channel
before it can read the input channel, so reads to allow this in case the
output channel is full.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* processWALSamples: update comment

Previous text seems to relate to an earlier implementation.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Optimise WAL loading by removing extra map and caching min-time (#9160)

* BenchmarkLoadWAL: close WAL after use

So that goroutines are stopped and resources released

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: make series IDs co-prime with #workers

Series are distributed across workers by taking the modulus of the
ID with the number of workers, so multiples of 100 are a poor choice.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: simulate mmapped chunks

Real Prometheus cuts chunks every 120 samples, then skips those samples
when re-reading the WAL. Simulate this by creating a single mapped chunk
for each series, since the max time is all the reader looks at.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Fix comment

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Remove series map from processWALSamples()

The locks that is commented to reduce contention in are now sharded
32,000 ways, so won't be contended. Removing the map saves memory and
goes just as fast.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* loadWAL: Cache the last mmapped chunk time

So we can skip calling append() for samples it will reject.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Improvements from code review

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Full stops and capitals on comments

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Cache max time in both places mmappedChunks is updated

Including refactor to extract function `setMMappedChunks`, to reduce
code duplication.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Update head min/max time when mmapped chunks added

This ensures we have the correct values if no WAL samples are added for
that series.

Note that `mSeries.maxTime()` was always `math.MinInt64` before, since
that function doesn't consider mmapped chunks.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Split Go and React Tests (#8897)

* Added go-ci and react-ci

Co-authored-by: Julien Pivotto <roidelapluie@inuits.eu>
Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Remove search keymap from new expression editor (#9184)

Signed-off-by: Julius Volz <julius.volz@gmail.com>

Co-authored-by: Austin Cawley-Edwards <austin.cawley@gmail.com>
Co-authored-by: Levi Harrison <git@leviharrison.dev>
Co-authored-by: Julien Pivotto <roidelapluie@inuits.eu>
Co-authored-by: Bryan Boreham <bjboreham@gmail.com>
Co-authored-by: Julius Volz <julius.volz@gmail.com>
This commit is contained in:
Ganesh Vernekar 2021-08-11 15:43:17 +05:30 committed by GitHub
parent 19e98e5469
commit 095f572d4a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 1165 additions and 264 deletions

View file

@ -17,21 +17,17 @@ executors:
- image: circleci/golang:1.15-node
jobs:
test:
test_go:
executor: golang
steps:
- prometheus/setup_environment
- go/load-cache:
key: v1
- restore_cache:
keys:
- v3-npm-deps-{{ checksum "web/ui/react-app/yarn.lock" }}
- v3-npm-deps-
- run:
command: sudo apt-get install -y yamllint
- run:
command: make
command: make GO_ONLY=1
environment:
# Run garbage collection more aggressively to avoid getting OOMed during the lint phase.
GOGC: "20"
@ -50,12 +46,24 @@ jobs:
file: promtool
- go/save-cache:
key: v1
- store_test_results:
path: test-results
test_react:
executor: golang
steps:
- checkout
- restore_cache:
keys:
- v3-npm-deps-{{ checksum "web/ui/react-app/yarn.lock" }}
- v3-npm-deps-
- run:
command: make react-app-test
- save_cache:
key: v3-npm-deps-{{ checksum "web/ui/react-app/yarn.lock" }}
paths:
- /home/circleci/.cache/yarn
- store_test_results:
path: test-results
test_windows:
executor:
@ -121,7 +129,11 @@ workflows:
version: 2
prometheus:
jobs:
- test:
- test_go:
filters:
tags:
only: /.*/
- test_react:
filters:
tags:
only: /.*/
@ -146,7 +158,8 @@ workflows:
- prometheus/publish_main:
context: org-context
requires:
- test
- test_go
- test_react
- build
filters:
branches:
@ -155,7 +168,8 @@ workflows:
- prometheus/publish_release:
context: org-context
requires:
- test
- test_go
- test_react
- build
filters:
tags:

View file

@ -64,7 +64,14 @@ react-app-test: | $(REACT_APP_NODE_MODULES_PATH) react-app-lint
cd $(REACT_APP_PATH) && yarn test --no-watch --coverage
.PHONY: test
# If we only want to only test go code we have to change the test target
# which is called by all.
ifeq ($(GO_ONLY),1)
test: common-test
else
test: common-test react-app-test
endif
.PHONY: npm_licenses
npm_licenses: $(REACT_APP_NODE_MODULES_PATH)

View file

@ -150,6 +150,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "exemplar-storage":
c.tsdb.EnableExemplarStorage = true
level.Info(logger).Log("msg", "Experimental in-memory exemplar storage enabled")
case "memory-snapshot-on-shutdown":
c.tsdb.EnableMemorySnapshotOnShutdown = true
level.Info(logger).Log("msg", "Experimental memory snapshot on shutdown enabled")
case "":
continue
default:
@ -310,7 +313,7 @@ func main() {
a.Flag("query.max-samples", "Maximum number of samples a single query can load into memory. Note that queries will fail if they try to load more samples than this into memory, so this also limits the number of samples a query can return.").
Default("50000000").IntVar(&cfg.queryMaxSamples)
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: promql-at-modifier, promql-negative-offset, remote-write-receiver, exemplar-storage, expand-external-labels. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
a.Flag("enable-feature", "Comma separated feature names to enable. Valid options: exemplar-storage, expand-external-labels, memory-snapshot-on-shutdown, promql-at-modifier, promql-negative-offset, remote-write-receiver. See https://prometheus.io/docs/prometheus/latest/feature_flags/ for more details.").
Default("").StringsVar(&cfg.featureList)
promlogflag.AddFlags(a, &cfg.promlogConfig)
@ -1268,34 +1271,36 @@ func (rm *readyScrapeManager) Get() (*scrape.Manager, error) {
// tsdbOptions is tsdb.Option version with defined units.
// This is required as tsdb.Option fields are unit agnostic (time).
type tsdbOptions struct {
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
AllowOverlappingBlocks bool
WALCompression bool
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
EnableExemplarStorage bool
MaxExemplars int64
WALSegmentSize units.Base2Bytes
MaxBlockChunkSegmentSize units.Base2Bytes
RetentionDuration model.Duration
MaxBytes units.Base2Bytes
NoLockfile bool
AllowOverlappingBlocks bool
WALCompression bool
StripeSize int
MinBlockDuration model.Duration
MaxBlockDuration model.Duration
EnableExemplarStorage bool
MaxExemplars int64
EnableMemorySnapshotOnShutdown bool
}
func (opts tsdbOptions) ToTSDBOptions() tsdb.Options {
return tsdb.Options{
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
WALSegmentSize: int(opts.WALSegmentSize),
MaxBlockChunkSegmentSize: int64(opts.MaxBlockChunkSegmentSize),
RetentionDuration: int64(time.Duration(opts.RetentionDuration) / time.Millisecond),
MaxBytes: int64(opts.MaxBytes),
NoLockfile: opts.NoLockfile,
AllowOverlappingBlocks: opts.AllowOverlappingBlocks,
WALCompression: opts.WALCompression,
StripeSize: opts.StripeSize,
MinBlockDuration: int64(time.Duration(opts.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(opts.MaxBlockDuration) / time.Millisecond),
EnableExemplarStorage: opts.EnableExemplarStorage,
MaxExemplars: opts.MaxExemplars,
EnableMemorySnapshotOnShutdown: opts.EnableMemorySnapshotOnShutdown,
}
}

View file

@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/types/known/anypb"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/util/osutil"
"github.com/prometheus/prometheus/util/strutil"
)
@ -129,30 +128,27 @@ func (c *KumaSDConfig) NewDiscoverer(opts discovery.DiscovererOptions) (discover
return NewKumaHTTPDiscovery(c, logger)
}
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) *targetgroup.Group {
func convertKumaV1MonitoringAssignment(assignment *MonitoringAssignment) []model.LabelSet {
commonLabels := convertKumaUserLabels(assignment.Labels)
commonLabels[kumaMeshLabel] = model.LabelValue(assignment.Mesh)
commonLabels[kumaServiceLabel] = model.LabelValue(assignment.Service)
var targetLabelSets []model.LabelSet
var targets []model.LabelSet
for _, target := range assignment.Targets {
targetLabels := convertKumaUserLabels(target.Labels)
for _, madsTarget := range assignment.Targets {
targetLabels := convertKumaUserLabels(madsTarget.Labels).Merge(commonLabels)
targetLabels[kumaDataplaneLabel] = model.LabelValue(target.Name)
targetLabels[model.InstanceLabel] = model.LabelValue(target.Name)
targetLabels[model.AddressLabel] = model.LabelValue(target.Address)
targetLabels[model.SchemeLabel] = model.LabelValue(target.Scheme)
targetLabels[model.MetricsPathLabel] = model.LabelValue(target.MetricsPath)
targetLabels[kumaDataplaneLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.AddressLabel] = model.LabelValue(madsTarget.Address)
targetLabels[model.InstanceLabel] = model.LabelValue(madsTarget.Name)
targetLabels[model.SchemeLabel] = model.LabelValue(madsTarget.Scheme)
targetLabels[model.MetricsPathLabel] = model.LabelValue(madsTarget.MetricsPath)
targetLabelSets = append(targetLabelSets, targetLabels)
targets = append(targets, targetLabels)
}
return &targetgroup.Group{
Labels: commonLabels,
Targets: targetLabelSets,
}
return targets
}
func convertKumaUserLabels(labels map[string]string) model.LabelSet {
@ -165,12 +161,12 @@ func convertKumaUserLabels(labels map[string]string) model.LabelSet {
}
// kumaMadsV1ResourceParser is an xds.resourceParser.
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*targetgroup.Group, error) {
func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]model.LabelSet, error) {
if typeURL != KumaMadsV1ResourceTypeURL {
return nil, errors.Errorf("recieved invalid typeURL for Kuma MADS v1 Resource: %s", typeURL)
}
var groups []*targetgroup.Group
var targets []model.LabelSet
for _, resource := range resources {
assignment := &MonitoringAssignment{}
@ -179,10 +175,10 @@ func kumaMadsV1ResourceParser(resources []*anypb.Any, typeURL string) ([]*target
return nil, err
}
groups = append(groups, convertKumaV1MonitoringAssignment(assignment))
targets = append(targets, convertKumaV1MonitoringAssignment(assignment)...)
}
return groups, nil
return targets, nil
}
func NewKumaHTTPDiscovery(conf *KumaSDConfig, logger log.Logger) (discovery.Discoverer, error) {

View file

@ -138,65 +138,47 @@ func TestKumaMadsV1ResourceParserValidResources(t *testing.T) {
res, err := getKumaMadsV1DiscoveryResponse(testKumaMadsV1Resources...)
require.NoError(t, err)
groups, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL)
targets, err := kumaMadsV1ResourceParser(res.Resources, KumaMadsV1ResourceTypeURL)
require.NoError(t, err)
require.Len(t, groups, 3)
require.Len(t, targets, 3)
expectedGroup1 := &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
expectedTargets := []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
},
}
require.Equal(t, expectedGroup1, groups[0])
expectedGroup2 := &targetgroup.Group{
Labels: model.LabelSet{
{
"__address__": "10.1.4.33:9090",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
},
{
"__address__": "10.1.1.1",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
},
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Targets: []model.LabelSet{
{
"__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
},
}
require.Equal(t, expectedGroup3, groups[2])
require.Equal(t, expectedTargets, targets)
}
func TestKumaMadsV1ResourceParserInvalidResources(t *testing.T) {
@ -262,66 +244,48 @@ tls_config:
kd.poll(context.Background(), ch)
groups := <-ch
require.Len(t, groups, 3)
require.Len(t, groups, 1)
expectedGroup1 := &targetgroup.Group{
Source: "kuma",
Targets: []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
},
{
"__address__": "10.1.4.33:9090",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
},
},
Labels: model.LabelSet{
targets := groups[0].Targets
require.Len(t, targets, 3)
expectedTargets := []model.LabelSet{
{
"__address__": "10.1.4.32:9090",
"__metrics_path__": "/custom-metrics",
"__scheme__": "http",
"instance": "prometheus-01",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "620506a88",
"__meta_kuma_dataplane": "prometheus-01",
},
}
require.Equal(t, expectedGroup1, groups[0])
expectedGroup2 := &targetgroup.Group{
Source: "kuma",
Labels: model.LabelSet{
{
"__address__": "10.1.4.33:9090",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "prometheus-02",
"__meta_kuma_mesh": "metrics",
"__meta_kuma_service": "grafana",
"__meta_kuma_service": "prometheus",
"__meta_kuma_label_team": "infra",
"__meta_kuma_label_kuma_io_zone": "us-east-1",
"__meta_kuma_label_commit_hash": "3513bba00",
"__meta_kuma_dataplane": "prometheus-02",
},
{
"__address__": "10.1.1.1",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
},
}
require.Equal(t, expectedGroup2, groups[1])
expectedGroup3 := &targetgroup.Group{
Source: "kuma",
Targets: []model.LabelSet{
{
"__address__": "10.1.1.1",
"__meta_kuma_label_role": "ml",
"__meta_kuma_dataplane": "elasticsearch-01",
"__metrics_path__": "",
"__scheme__": "http",
"instance": "elasticsearch-01",
},
},
Labels: model.LabelSet{
"__meta_kuma_mesh": "data",
"__meta_kuma_service": "elasticsearch",
},
}
require.Equal(t, expectedGroup3, groups[2])
require.Equal(t, expectedTargets, targets)
// Should skip the next update.
ctx, cancel := context.WithCancel(context.Background())

View file

@ -15,7 +15,6 @@ package xds
import (
"context"
"github.com/prometheus/common/model"
"time"
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
@ -23,6 +22,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
@ -95,7 +95,9 @@ var (
}
)
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error)
// resourceParser is a function that takes raw discovered objects and translates them into
// targetgroup.Group Targets. On error, no updates are sent to the scrape manager and the failure count is incremented.
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error)
// fetchDiscovery implements long-polling via xDS Fetch REST-JSON.
type fetchDiscovery struct {
@ -154,23 +156,18 @@ func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Grou
return
}
parsedGroups, err := d.parseResources(response.Resources, response.TypeUrl)
parsedTargets, err := d.parseResources(response.Resources, response.TypeUrl)
if err != nil {
level.Error(d.logger).Log("msg", "error parsing resources", "err", err)
d.fetchFailuresCount.Inc()
return
}
for _, group := range parsedGroups {
group.Source = d.source
}
level.Debug(d.logger).Log("msg", "Updated to version", "version", response.VersionInfo, "targets", len(parsedTargets))
level.Debug(d.logger).Log("msg", "updated to version", "version", response.VersionInfo, "groups", len(parsedGroups))
// Check the context before sending an update on the channel.
select {
case <-ctx.Done():
return
case ch <- parsedGroups:
case ch <- []*targetgroup.Group{{Source: d.source, Targets: parsedTargets}}:
}
}

View file

@ -93,9 +93,9 @@ func createTestHTTPServer(t *testing.T, responder discoveryResponder) *httptest.
}))
}
func constantResourceParser(groups []*targetgroup.Group, err error) resourceParser {
return func(resources []*anypb.Any, typeUrl string) ([]*targetgroup.Group, error) {
return groups, err
func constantResourceParser(targets []model.LabelSet, err error) resourceParser {
return func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
return targets, err
}
}
@ -174,13 +174,16 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: constantResourceParser([]*targetgroup.Group{
{},
parseResources: constantResourceParser([]model.LabelSet{
{
Source: "a-custom-source",
Labels: model.LabelSet{
"__meta_custom_xds_label": "a-value",
},
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
},
}, nil),
}
@ -189,13 +192,83 @@ func TestPollingRefreshAttachesGroupMetadata(t *testing.T) {
groups := <-ch
require.NotNil(t, groups)
require.Len(t, groups, 2)
require.Len(t, groups, 1)
for _, group := range groups {
require.Equal(t, source, group.Source)
group := groups[0]
require.Equal(t, source, group.Source)
require.Len(t, group.Targets, 2)
target2 := group.Targets[1]
require.Contains(t, target2, model.LabelName("__meta_custom_xds_label"))
require.Equal(t, model.LabelValue("a-value"), target2["__meta_custom_xds_label"])
}
func TestPollingDisappearingTargets(t *testing.T) {
server := "http://198.161.2.0"
source := "test"
rc := &testResourceClient{
server: server,
protocolVersion: ProtocolV3,
fetch: func(ctx context.Context) (*v3.DiscoveryResponse, error) {
return &v3.DiscoveryResponse{}, nil
},
}
group2 := groups[1]
require.Contains(t, group2.Labels, model.LabelName("__meta_custom_xds_label"))
require.Equal(t, model.LabelValue("a-value"), group2.Labels["__meta_custom_xds_label"])
// On the first poll, send back two targets. On the next, send just one.
counter := 0
parser := func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error) {
counter++
if counter == 1 {
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.5.32:9090",
"instance": "prometheus-02",
},
}, nil
}
return []model.LabelSet{
{
"__meta_custom_xds_label": "a-value",
"__address__": "10.1.4.32:9090",
"instance": "prometheus-01",
},
}, nil
}
pd := &fetchDiscovery{
source: source,
client: rc,
logger: nopLogger,
fetchDuration: testFetchDuration,
fetchFailuresCount: testFetchFailuresCount,
fetchSkipUpdateCount: testFetchSkipUpdateCount,
parseResources: parser,
}
ch := make(chan []*targetgroup.Group, 1)
pd.poll(context.Background(), ch)
groups := <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 2)
pd.poll(context.Background(), ch)
groups = <-ch
require.NotNil(t, groups)
require.Len(t, groups, 1)
require.Equal(t, source, groups[0].Source)
require.Len(t, groups[0].Targets, 1)
}

View file

@ -40,7 +40,7 @@ with more recent data.
More details can be found [here](querying/basics.md#offset-modifier).
## Remote Write Receiver
## Remote Write Receiver
`--enable-feature=remote-write-receiver`
@ -53,3 +53,11 @@ The remote write receiver allows Prometheus to accept remote write requests from
[OpenMetrics](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars) introduces the ability for scrape targets to add exemplars to certain metrics. Exemplars are references to data outside of the MetricSet. A common use case are IDs of program traces.
Exemplar storage is implemented as a fixed size circular buffer that stores exemplars in memory for all series. Enabling this feature will enable the storage of exemplars scraped by Prometheus. The flag `storage.exemplars.exemplars-limit` can be used to control the size of circular buffer by # of exemplars. An exemplar with just a `traceID=<jaeger-trace-id>` uses roughly 100 bytes of memory via the in-memory exemplar storage. If the exemplar storage is enabled, we will also append the exemplars to WAL for local persistence (for WAL duration).
## Memory Snapshot on Shutdown
`--enable-feature=memory-snapshot-on-shutdown`
This takes the snapshot of the chunks that are in memory along with the series information when shutting down and stores
it on disk. This will reduce the startup time since the memory state can be restored with this snapshot and m-mapped
chunks without the need of WAL replay.

View file

@ -210,7 +210,7 @@ can be specified:
rate(http_requests_total[5m] offset -1w)
This feature is enabled by setting `--enable-feature=promql-negative-offset`
flag. See [disabled features](../disabled_features.md) for more details about
flag. See [feature flags](../feature_flags.md) for more details about
this flag.
### @ modifier
@ -251,7 +251,7 @@ These 2 queries will produce the same result.
This modifier is disabled by default since it breaks the invariant that PromQL
does not look ahead of the evaluation time for samples. It can be enabled by setting
`--enable-feature=promql-at-modifier` flag. See [disabled features](../disabled_features.md) for more details about this flag.
`--enable-feature=promql-at-modifier` flag. See [feature flags](../feature_flags.md) for more details about this flag.
Additionally, `start()` and `end()` can also be used as values for the `@` modifier as special values.

View file

@ -1037,24 +1037,22 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline anyways.
var (
max = s.qm.cfg.MaxSamplesPerSend
// Rough estimate, 1% of active series will contain an exemplar on each scrape.
// TODO(cstyan): Casting this many times smells, also we could get index out of bounds issues here.
maxExemplars = int(math.Max(1, float64(max/10)))
max = s.qm.cfg.MaxSamplesPerSend
nPending, nPendingSamples, nPendingExemplars = 0, 0, 0
sampleBuffer = allocateSampleBuffer(max)
buf []byte
pendingData []prompb.TimeSeries
exemplarBuffer [][]prompb.Exemplar
buf []byte
)
totalPending := max
if s.qm.sendExemplars {
exemplarBuffer = allocateExemplarBuffer(maxExemplars)
totalPending += maxExemplars
max += int(float64(max) * 0.1)
}
pendingData = make([]prompb.TimeSeries, totalPending)
var pendingData = make([]prompb.TimeSeries, max)
for i := range pendingData {
pendingData[i].Samples = []prompb.Sample{{}}
if s.qm.sendExemplars {
pendingData[i].Exemplars = []prompb.Exemplar{{}}
}
}
timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline))
stop := func() {
@ -1094,28 +1092,28 @@ func (s *shards) runShard(ctx context.Context, shardID int, queue chan interface
return
}
pendingData[nPending].Samples = pendingData[nPending].Samples[:0]
if s.qm.sendExemplars {
pendingData[nPending].Exemplars = pendingData[nPending].Exemplars[:0]
}
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
switch d := sample.(type) {
case writeSample:
sampleBuffer[nPendingSamples][0] = d.sample
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = sampleBuffer[nPendingSamples]
pendingData[nPending].Exemplars = nil
pendingData[nPending].Samples = append(pendingData[nPending].Samples, d.sample)
nPendingSamples++
nPending++
case writeExemplar:
exemplarBuffer[nPendingExemplars][0] = d.exemplar
pendingData[nPending].Labels = labelsToLabelsProto(d.seriesLabels, pendingData[nPending].Labels)
pendingData[nPending].Samples = nil
pendingData[nPending].Exemplars = exemplarBuffer[nPendingExemplars]
pendingData[nPending].Exemplars = append(pendingData[nPending].Exemplars, d.exemplar)
nPendingExemplars++
nPending++
}
if nPendingSamples >= max || nPendingExemplars >= maxExemplars {
if nPending >= max {
s.sendSamples(ctx, pendingData[:nPending], nPendingSamples, nPendingExemplars, &buf)
s.qm.metrics.pendingSamples.Sub(float64(nPendingSamples))
s.qm.metrics.pendingExemplars.Sub(float64(nPendingExemplars))
@ -1298,19 +1296,3 @@ func buildWriteRequest(samples []prompb.TimeSeries, metadata []prompb.MetricMeta
compressed := snappy.Encode(buf, data)
return compressed, highest, nil
}
func allocateSampleBuffer(capacity int) [][]prompb.Sample {
buf := make([][]prompb.Sample, capacity)
for i := range buf {
buf[i] = []prompb.Sample{{}}
}
return buf
}
func allocateExemplarBuffer(capacity int) [][]prompb.Exemplar {
buf := make([][]prompb.Exemplar, capacity)
for i := range buf {
buf[i] = []prompb.Exemplar{{}}
}
return buf
}

View file

@ -151,6 +151,9 @@ type Options struct {
// Enables the in memory exemplar storage,.
EnableExemplarStorage bool
// Enables the snapshot of in-memory chunks on shutdown. This makes restarts faster.
EnableMemorySnapshotOnShutdown bool
// MaxExemplars sets the size, in # of exemplars stored, of the single circular buffer used to store exemplars in memory.
// See tsdb/exemplar.go, specifically the CircularExemplarStorage struct and it's constructor NewCircularExemplarStorage.
MaxExemplars int64
@ -722,6 +725,7 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
headOpts.SeriesCallback = opts.SeriesLifecycleCallback
headOpts.EnableExemplarStorage = opts.EnableExemplarStorage
headOpts.MaxExemplars.Store(opts.MaxExemplars)
headOpts.EnableMemorySnapshotOnShutdown = opts.EnableMemorySnapshotOnShutdown
db.head, err = NewHead(r, l, wlog, headOpts, stats.Head)
if err != nil {
return nil, err

View file

@ -5,3 +5,4 @@
* [Head Chunks](head_chunks.md)
* [Tombstones](tombstones.md)
* [Wal](wal.md)
* [Memory Snapshot](memory_snapshot.md)

View file

@ -0,0 +1,62 @@
# Memory Snapshot Format
Memory snapshot uses the WAL package and writes each series as a WAL record.
Below are the formats of the individual records.
### Series records
This record is a snapshot of a single series. Only one series exists per record.
It includes the metadata of the series and the in-memory chunk data if it exists.
The sampleBuf is the last 4 samples in the in-memory chunk.
```
┌──────────────────────────┬────────────────────────────┐
│ Record Type <byte> │ Series Ref <uint64>
├──────────────────────────┴────────────────────────────┤
│ Number of Labels <uvarint>
├──────────────────────────────┬────────────────────────┤
│ len(name_1) <uvarint> │ name_1 <bytes>
├──────────────────────────────┼────────────────────────┤
│ len(val_1) <uvarint> │ val_1 <bytes>
├──────────────────────────────┴────────────────────────┤
│ . . . │
├──────────────────────────────┬────────────────────────┤
│ len(name_N) <uvarint> │ name_N <bytes>
├──────────────────────────────┼────────────────────────┤
│ len(val_N) <uvarint> │ val_N <bytes>
├──────────────────────────────┴────────────────────────┤
│ Chunk Range <int64>
├───────────────────────────────────────────────────────┤
│ Chunk Exists <uvarint>
│ # 1 if head chunk exists, 0 otherwise to detect a nil |
| # chunk. Below fields exists only when it's 1 here. |
├───────────────────────────┬───────────────────────────┤
│ Chunk Mint <int64> │ Chunk Maxt <int64>
├───────────────────────────┴───────────────────────────┤
│ Chunk Encoding <byte>
├──────────────────────────────┬────────────────────────┤
│ len(Chunk) <uvarint> │ Chunk <bytes>
├──────────────────────────┬───┴────────────────────────┤
| sampleBuf[0].t <int64> | sampleBuf[0].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[1].t <int64> | sampleBuf[1].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[2].t <int64> | sampleBuf[2].v <float64> |
├──────────────────────────┼────────────────────────────┤
| sampleBuf[3].t <int64> | sampleBuf[3].v <float64> |
└──────────────────────────┴────────────────────────────┘
```
### Tombstone record
This includes all the tombstones in the Head block. A single record is written into
the snapshot for all the tombstones. The encoded tombstones uses the same encoding
as tombstone file in blocks.
```
┌─────────────────────────────────────────────────────────────────┐
│ Record Type <byte>
├───────────────────────────────────┬─────────────────────────────┤
│ len(Encoded Tombstones) <uvarint> │ Encoded Tombstones <bytes>
└───────────────────────────────────┴─────────────────────────────┘
```

View file

@ -17,6 +17,7 @@ import (
"encoding/binary"
"hash"
"hash/crc32"
"math"
"unsafe"
"github.com/dennwc/varint"
@ -40,6 +41,7 @@ func (e *Encbuf) Len() int { return len(e.B) }
func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) }
func (e *Encbuf) PutByte(c byte) { e.B = append(e.B, c) }
func (e *Encbuf) PutBytes(b []byte) { e.B = append(e.B, b...) }
func (e *Encbuf) PutBE32int(x int) { e.PutBE32(uint32(x)) }
func (e *Encbuf) PutUvarint32(x uint32) { e.PutUvarint64(uint64(x)) }
@ -56,6 +58,10 @@ func (e *Encbuf) PutBE64(x uint64) {
e.B = append(e.B, e.C[:8]...)
}
func (e *Encbuf) PutBEFloat64(x float64) {
e.PutBE64(math.Float64bits(x))
}
func (e *Encbuf) PutUvarint64(x uint64) {
n := binary.PutUvarint(e.C[:], x)
e.B = append(e.B, e.C[:n]...)
@ -73,6 +79,12 @@ func (e *Encbuf) PutUvarintStr(s string) {
e.PutString(s)
}
// PutUvarintBytes writes a a variable length byte buffer.
func (e *Encbuf) PutUvarintBytes(b []byte) {
e.PutUvarint(len(b))
e.PutBytes(b)
}
// PutHash appends a hash over the buffers current contents to the buffer.
func (e *Encbuf) PutHash(h hash.Hash) {
h.Reset()
@ -249,6 +261,10 @@ func (d *Decbuf) Be64() uint64 {
return x
}
func (d *Decbuf) Be64Float64() float64 {
return math.Float64frombits(d.Be64())
}
func (d *Decbuf) Be32() uint32 {
if d.E != nil {
return 0

View file

@ -101,6 +101,8 @@ type Head struct {
// chunkDiskMapper is used to write and read Head chunks to/from disk.
chunkDiskMapper *chunks.ChunkDiskMapper
chunkSnapshotMtx sync.Mutex
closedMtx sync.Mutex
closed bool
@ -126,9 +128,10 @@ type HeadOptions struct {
// StripeSize sets the number of entries in the hash map, it must be a power of 2.
// A larger StripeSize will allocate more memory up-front, but will increase performance when handling a large number of series.
// A smaller StripeSize reduces the memory allocated, but can decrease performance with large number of series.
StripeSize int
SeriesCallback SeriesLifecycleCallback
EnableExemplarStorage bool
StripeSize int
SeriesCallback SeriesLifecycleCallback
EnableExemplarStorage bool
EnableMemorySnapshotOnShutdown bool
// Runtime reloadable options.
MaxExemplars atomic.Int64
@ -443,11 +446,25 @@ func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer h.postings.EnsureOrder()
defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() {
// Loading of m-mapped chunks and snapshot can make the mint of the Head
// to go below minValidTime.
if h.MinTime() < h.minValidTime.Load() {
h.minTime.Store(h.minValidTime.Load())
}
}()
level.Info(h.logger).Log("msg", "Replaying on-disk memory mappable chunks if any")
start := time.Now()
mmappedChunks, err := h.loadMmappedChunks()
snapIdx, snapOffset, refSeries, err := h.loadChunkSnapshot()
if err != nil {
return err
}
level.Info(h.logger).Log("msg", "Chunk snapshot loading time", "duration", time.Since(start).String())
mmapChunkReplayStart := time.Now()
mmappedChunks, err := h.loadMmappedChunks(refSeries)
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed", "err", err)
if _, ok := errors.Cause(err).(*chunks.CorruptionErr); ok {
@ -455,10 +472,10 @@ func (h *Head) Init(minValidTime int64) error {
}
// If this fails, data will be recovered from WAL.
// Hence we wont lose any data (given WAL is not corrupt).
mmappedChunks = h.removeCorruptedMmappedChunks(err)
mmappedChunks = h.removeCorruptedMmappedChunks(err, refSeries)
}
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(start).String())
level.Info(h.logger).Log("msg", "On-disk memory mappable chunks replay completed", "duration", time.Since(mmapChunkReplayStart).String())
if h.wal == nil {
level.Info(h.logger).Log("msg", "WAL not found")
return nil
@ -506,6 +523,9 @@ func (h *Head) Init(minValidTime int64) error {
walReplayStart := time.Now()
if snapIdx > startFrom {
startFrom = snapIdx
}
// Backfill segments from the most recent checkpoint onwards.
for i := startFrom; i <= endAt; i++ {
s, err := wal.OpenReadSegment(wal.SegmentName(h.wal.Dir(), i))
@ -513,7 +533,14 @@ func (h *Head) Init(minValidTime int64) error {
return errors.Wrap(err, fmt.Sprintf("open WAL segment: %d", i))
}
sr := wal.NewSegmentBufReader(s)
offset := 0
if i == snapIdx {
offset = snapOffset
}
sr, err := wal.NewSegmentBufReaderWithOffset(offset, s)
if err != nil {
return errors.Wrapf(err, "segment reader (offset=%d)", offset)
}
err = h.loadWAL(wal.NewReader(sr), multiRef, mmappedChunks)
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "Error while closing the wal segments reader", "err", err)
@ -537,29 +564,49 @@ func (h *Head) Init(minValidTime int64) error {
return nil
}
func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) {
func (h *Head) loadMmappedChunks(refSeries map[uint64]*memSeries) (map[uint64][]*mmappedChunk, error) {
mmappedChunks := map[uint64][]*mmappedChunk{}
if err := h.chunkDiskMapper.IterateAllChunks(func(seriesRef, chunkRef uint64, mint, maxt int64, numSamples uint16) error {
if maxt < h.minValidTime.Load() {
return nil
}
slice := mmappedChunks[seriesRef]
if len(slice) > 0 {
if slice[len(slice)-1].maxTime >= mint {
return &chunks.CorruptionErr{
Err: errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef),
}
ms, ok := refSeries[seriesRef]
if !ok {
slice := mmappedChunks[seriesRef]
if len(slice) > 0 && slice[len(slice)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
}
slice = append(slice, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
numSamples: numSamples,
})
mmappedChunks[seriesRef] = slice
return nil
}
slice = append(slice, &mmappedChunk{
if len(ms.mmappedChunks) > 0 && ms.mmappedChunks[len(ms.mmappedChunks)-1].maxTime >= mint {
return errors.Errorf("out of sequence m-mapped chunk for series ref %d", seriesRef)
}
h.metrics.chunks.Inc()
h.metrics.chunksCreated.Inc()
ms.mmappedChunks = append(ms.mmappedChunks, &mmappedChunk{
ref: chunkRef,
minTime: mint,
maxTime: maxt,
numSamples: numSamples,
})
mmappedChunks[seriesRef] = slice
h.updateMinMaxTime(mint, maxt)
if ms.headChunk != nil && maxt >= ms.headChunk.minTime {
// The head chunk was completed and was m-mapped after taking the snapshot.
// Hence remove this chunk.
ms.nextAt = 0
ms.headChunk = nil
ms.app = nil
}
return nil
}); err != nil {
return nil, errors.Wrap(err, "iterate on on-disk chunks")
@ -569,7 +616,7 @@ func (h *Head) loadMmappedChunks() (map[uint64][]*mmappedChunk, error) {
// removeCorruptedMmappedChunks attempts to delete the corrupted mmapped chunks and if it fails, it clears all the previously
// loaded mmapped chunks.
func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChunk {
func (h *Head) removeCorruptedMmappedChunks(err error, refSeries map[uint64]*memSeries) map[uint64][]*mmappedChunk {
level.Info(h.logger).Log("msg", "Deleting mmapped chunk files")
if err := h.chunkDiskMapper.DeleteCorrupted(err); err != nil {
@ -578,7 +625,7 @@ func (h *Head) removeCorruptedMmappedChunks(err error) map[uint64][]*mmappedChun
}
level.Info(h.logger).Log("msg", "Deletion of mmap chunk files successful, reattempting m-mapping the on-disk chunks")
mmappedChunks, err := h.loadMmappedChunks()
mmappedChunks, err := h.loadMmappedChunks(refSeries)
if err != nil {
level.Error(h.logger).Log("msg", "Loading on-disk chunks failed, discarding chunk files completely", "err", err)
mmappedChunks = map[uint64][]*mmappedChunk{}
@ -665,6 +712,9 @@ func (h *Head) Truncate(mint int64) (err error) {
// truncateMemory removes old data before mint from the head.
func (h *Head) truncateMemory(mint int64) (err error) {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
defer func() {
if err != nil {
h.metrics.headTruncateFail.Inc()
@ -800,6 +850,9 @@ func (h *Head) IsQuerierCollidingWithTruncation(querierMint, querierMaxt int64)
// truncateWAL removes old data before mint from the WAL.
func (h *Head) truncateWAL(mint int64) error {
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
if h.wal == nil || mint <= h.lastWALTruncationTime.Load() {
return nil
}
@ -1099,6 +1152,7 @@ func (h *Head) compactable() bool {
}
// Close flushes the WAL and closes the head.
// It also takes a snapshot of in-memory chunks if enabled.
func (h *Head) Close() error {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
@ -1115,10 +1169,14 @@ func (h *Head) Close() error {
}
errs := tsdb_errors.NewMulti(h.chunkDiskMapper.Close())
if errs.Err() == nil && h.opts.EnableMemorySnapshotOnShutdown {
errs.Add(h.performChunkSnapshot())
}
if h.wal != nil {
errs.Add(h.wal.Close())
}
return errs.Err()
}
// String returns an human readable representation of the TSDB head. It's important to
@ -1399,6 +1457,7 @@ type memSeries struct {
ref uint64
lset labels.Labels
mmappedChunks []*mmappedChunk
mmMaxTime int64 // Max time of any mmapped chunk, only used during WAL replay.
headChunk *memChunk
chunkRange int64
firstChunkID int

View file

@ -654,14 +654,23 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, chunkDiskMa
c = s.cutNewHeadChunk(t, e, chunkDiskMapper)
chunkCreated = true
}
numSamples := c.chunk.NumSamples()
// Out of order sample.
if c.maxTime >= t {
return c, false, chunkCreated
}
// If we reach 25% of a chunk's desired sample count, set a definitive time
// at which to start the next chunk.
numSamples := c.chunk.NumSamples()
if numSamples == 0 {
// It could be the new chunk created after reading the chunk snapshot,
// hence we fix the minTime of the chunk here.
c.minTime = t
s.nextAt = rangeForTimestamp(c.minTime, s.chunkRange)
}
// If we reach 25% of a chunk's desired sample count, predict an end time
// for this chunk that will try to make samples equally distributed within
// the remaining chunks in the current chunk range.
// At latest it must happen at the timestamp set when the chunk was cut.
if numSamples == samplesPerChunk/4 {
s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt)

View file

@ -138,6 +138,7 @@ func BenchmarkLoadWAL(b *testing.B) {
batches int
seriesPerBatch int
samplesPerSeries int
mmappedChunkT int64
}{
{ // Less series and more samples. 2 hour WAL with 1 second scrape interval.
batches: 10,
@ -154,6 +155,12 @@ func BenchmarkLoadWAL(b *testing.B) {
seriesPerBatch: 1000,
samplesPerSeries: 480,
},
{ // 2 hour WAL with 15 second scrape interval, and mmapped chunks up to last 100 samples.
batches: 100,
seriesPerBatch: 1000,
samplesPerSeries: 480,
mmappedChunkT: 3800,
},
}
labelsPerSeries := 5
@ -170,7 +177,7 @@ func BenchmarkLoadWAL(b *testing.B) {
}
lastExemplarsPerSeries = exemplarsPerSeries
// fmt.Println("exemplars per series: ", exemplarsPerSeries)
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries),
b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d,mmappedChunkT=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries, c.mmappedChunkT),
func(b *testing.B) {
dir, err := ioutil.TempDir("", "test_load_wal")
require.NoError(b, err)
@ -191,7 +198,7 @@ func BenchmarkLoadWAL(b *testing.B) {
for j := 1; len(lbls) < labelsPerSeries; j++ {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)})
refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 101, Labels: labels.FromMap(lbls)})
}
populateTestWAL(b, w, []interface{}{refSeries})
}
@ -203,7 +210,7 @@ func BenchmarkLoadWAL(b *testing.B) {
refSamples = refSamples[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refSamples = append(refSamples, record.RefSample{
Ref: uint64(k) * 100,
Ref: uint64(k) * 101,
T: int64(i) * 10,
V: float64(i) * 100,
})
@ -212,14 +219,27 @@ func BenchmarkLoadWAL(b *testing.B) {
}
}
// Write samples.
// Write mmapped chunks.
if c.mmappedChunkT != 0 {
chunkDiskMapper, err := chunks.NewChunkDiskMapper(mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize)
require.NoError(b, err)
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
// Create one mmapped chunk per series, with one sample at the given time.
s := newMemSeries(labels.Labels{}, uint64(k)*101, c.mmappedChunkT, nil)
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper)
s.mmapCurrentHeadChunk(chunkDiskMapper)
}
require.NoError(b, chunkDiskMapper.Close())
}
// Write exemplars.
refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch)
for i := 0; i < exemplarsPerSeries; i++ {
for j := 0; j < c.batches; j++ {
refExemplars = refExemplars[:0]
for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ {
refExemplars = append(refExemplars, record.RefExemplar{
Ref: uint64(k) * 100,
Ref: uint64(k) * 101,
T: int64(i) * 10,
V: float64(i) * 100,
Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)),
@ -240,6 +260,8 @@ func BenchmarkLoadWAL(b *testing.B) {
require.NoError(b, err)
h.Init(0)
}
b.StopTimer()
w.Close()
})
}
}
@ -2533,3 +2555,162 @@ func generateHistograms(n int) (r []histogram.SparseHistogram) {
return r
}
func TestChunkSnapshot(t *testing.T) {
head, _ := newTestHead(t, 120*4, false)
defer func() {
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close())
}()
numSeries := 10
expSeries := make(map[string][]tsdbutil.Sample)
expTombstones := make(map[uint64]tombstones.Intervals)
{ // Initial data that goes into snapshot.
// Add some initial samples with >=1 m-map chunk.
app := head.Appender(context.Background())
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(1); ts <= 240; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
// Add some tombstones.
var enc record.Encoder
for i := 1; i <= numSeries; i++ {
ref := uint64(i)
itvs := tombstones.Intervals{
{Mint: 1234, Maxt: 2345},
{Mint: 3456, Maxt: 4567},
}
for _, itv := range itvs {
expTombstones[ref].Add(itv)
}
head.tombstones.AddInterval(ref, itvs...)
err := head.wal.Log(enc.Tombstones([]tombstones.Stone{
{Ref: ref, Intervals: itvs},
}, nil))
require.NoError(t, err)
}
}
// These references should be the ones used for the snapshot.
wlast, woffset, err := head.wal.LastSegmentAndOffset()
require.NoError(t, err)
{ // Creating snapshot and verifying it.
head.opts.EnableMemorySnapshotOnShutdown = true
require.NoError(t, head.Close()) // This will create a snapshot.
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
require.NoError(t, err)
require.Equal(t, wlast, sidx)
require.Equal(t, woffset, soffset)
}
{ // Test the replay of snapshot.
// Create new Head which should replay this snapshot.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// Test query for snapshot replay.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
}
{ // Additional data to only include in WAL and m-mapped chunks and not snapshot. This mimics having an old snapshot on disk.
// Add more samples.
app := head.Appender(context.Background())
for i := 1; i <= numSeries; i++ {
lbls := labels.Labels{labels.Label{Name: "foo", Value: fmt.Sprintf("bar%d", i)}}
lblStr := lbls.String()
// 240 samples should m-map at least 1 chunk.
for ts := int64(241); ts <= 480; ts++ {
val := rand.Float64()
expSeries[lblStr] = append(expSeries[lblStr], sample{ts, val})
_, err := app.Append(0, lbls, ts, val)
require.NoError(t, err)
}
}
require.NoError(t, app.Commit())
// Add more tombstones.
var enc record.Encoder
for i := 1; i <= numSeries; i++ {
ref := uint64(i)
itvs := tombstones.Intervals{
{Mint: 12345, Maxt: 23456},
{Mint: 34567, Maxt: 45678},
}
for _, itv := range itvs {
expTombstones[ref].Add(itv)
}
head.tombstones.AddInterval(ref, itvs...)
err := head.wal.Log(enc.Tombstones([]tombstones.Stone{
{Ref: ref, Intervals: itvs},
}, nil))
require.NoError(t, err)
}
}
{ // Close Head and verify that new snapshot was not created.
head.opts.EnableMemorySnapshotOnShutdown = false
require.NoError(t, head.Close()) // This should not create a snapshot.
_, sidx, soffset, err := LastChunkSnapshot(head.opts.ChunkDirRoot)
require.NoError(t, err)
require.Equal(t, wlast, sidx)
require.Equal(t, woffset, soffset)
}
{ // Test the replay of snapshot, m-map chunks, and WAL.
// Create new Head to replay snapshot, m-map chunks, and WAL.
w, err := wal.NewSize(nil, nil, head.wal.Dir(), 32768, false)
require.NoError(t, err)
head, err = NewHead(nil, nil, w, head.opts, nil)
require.NoError(t, err)
require.NoError(t, head.Init(math.MinInt64))
// Test query when data is replayed from snapshot, m-map chunks, and WAL.
q, err := NewBlockQuerier(head, math.MinInt64, math.MaxInt64)
require.NoError(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchRegexp, "foo", ".*"))
require.Equal(t, expSeries, series)
// Check the tombstones.
tr, err := head.Tombstones()
require.NoError(t, err)
actTombstones := make(map[uint64]tombstones.Intervals)
require.NoError(t, tr.Iter(func(ref uint64, itvs tombstones.Intervals) error {
for _, itv := range itvs {
actTombstones[ref].Add(itv)
}
return nil
}))
require.Equal(t, expTombstones, actTombstones)
}
}

View file

@ -15,8 +15,18 @@ package tsdb
import (
"fmt"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"io/ioutil"
"math"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -202,9 +212,7 @@ Outer:
if created {
// This is the first WAL series record for this series.
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunks.Add(float64(len(mmc)))
mSeries.mmappedChunks = mmc
h.setMMappedChunks(mSeries, mmc)
continue
}
@ -218,9 +226,17 @@ Outer:
// It is possible that some old sample is being processed in processWALSamples that
// could cause race below. So we wait for the goroutine to empty input the buffer and finish
// processing all old samples after emptying the buffer.
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
inputs[idx] <- []record.RefSample{}
for len(inputs[idx]) != 0 {
time.Sleep(1 * time.Millisecond)
select {
case <-outputs[idx]: // allow output side to drain to avoid deadlock
default:
}
}
// Checking if the new m-mapped chunks overlap with the already existing ones.
@ -240,16 +256,12 @@ Outer:
}
// Replacing m-mapped chunks with the new ones (could be empty).
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
h.setMMappedChunks(mSeries, mmc)
// Any samples replayed till now would already be compacted. Resetting the head chunk.
mSeries.nextAt = 0
mSeries.headChunk = nil
mSeries.app = nil
h.updateMinMaxTime(mSeries.minTime(), mSeries.maxTime())
}
//nolint:staticcheck // Ignore SA6002 relax staticcheck verification.
seriesPool.Put(v)
@ -341,18 +353,29 @@ Outer:
return nil
}
// processWALSamples adds a partition of samples it receives to the head and passes
// them on to other workers.
// Samples before the mint timestamp are discarded.
func (h *Head) setMMappedChunks(mSeries *memSeries, mmc []*mmappedChunk) {
h.metrics.chunksCreated.Add(float64(len(mmc)))
h.metrics.chunksRemoved.Add(float64(len(mSeries.mmappedChunks)))
h.metrics.chunks.Add(float64(len(mmc) - len(mSeries.mmappedChunks)))
mSeries.mmappedChunks = mmc
// Cache the last mmapped chunk time, so we can skip calling append() for samples it will reject.
if len(mmc) == 0 {
mSeries.mmMaxTime = math.MinInt64
} else {
mSeries.mmMaxTime = mmc[len(mmc)-1].maxTime
h.updateMinMaxTime(mmc[0].minTime, mSeries.mmMaxTime)
}
}
// processWALSamples adds the samples it receives to the head and passes
// the buffer received to an output channel for reuse.
// Samples before the minValidTime timestamp are discarded.
func (h *Head) processWALSamples(
minValidTime int64,
input <-chan []record.RefSample, output chan<- []record.RefSample,
) (unknownRefs uint64) {
defer close(output)
// Mitigate lock contention in getByID.
refSeries := map[uint64]*memSeries{}
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
for samples := range input {
@ -360,14 +383,13 @@ func (h *Head) processWALSamples(
if s.T < minValidTime {
continue
}
ms := refSeries[s.Ref]
ms := h.series.getByID(s.Ref)
if ms == nil {
ms = h.series.getByID(s.Ref)
if ms == nil {
unknownRefs++
continue
}
refSeries[s.Ref] = ms
unknownRefs++
continue
}
if s.T <= ms.mmMaxTime {
continue
}
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper); chunkCreated {
h.metrics.chunksCreated.Inc()
@ -386,3 +408,474 @@ func (h *Head) processWALSamples(
return unknownRefs
}
const (
chunkSnapshotRecordTypeSeries uint8 = 1
chunkSnapshotRecordTypeTombstones uint8 = 2
)
type chunkSnapshotRecord struct {
ref uint64
lset labels.Labels
chunkRange int64
mc *memChunk
sampleBuf [4]sample
}
func (s *memSeries) encodeToSnapshotRecord(b []byte) []byte {
buf := encoding.Encbuf{B: b}
buf.PutByte(chunkSnapshotRecordTypeSeries)
buf.PutBE64(s.ref)
buf.PutUvarint(len(s.lset))
for _, l := range s.lset {
buf.PutUvarintStr(l.Name)
buf.PutUvarintStr(l.Value)
}
buf.PutBE64int64(s.chunkRange)
s.Lock()
if s.headChunk == nil {
buf.PutUvarint(0)
} else {
buf.PutUvarint(1)
buf.PutBE64int64(s.headChunk.minTime)
buf.PutBE64int64(s.headChunk.maxTime)
buf.PutByte(byte(s.headChunk.chunk.Encoding()))
buf.PutUvarintBytes(s.headChunk.chunk.Bytes())
// Put the sample buf.
for _, smpl := range s.sampleBuf {
buf.PutBE64int64(smpl.t)
buf.PutBEFloat64(smpl.v)
}
}
s.Unlock()
return buf.Get()
}
func decodeSeriesFromChunkSnapshot(b []byte) (csr chunkSnapshotRecord, err error) {
dec := encoding.Decbuf{B: b}
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeSeries {
return csr, errors.Errorf("invalid record type %x", flag)
}
csr.ref = dec.Be64()
// The label set written to the disk is already sorted.
csr.lset = make(labels.Labels, dec.Uvarint())
for i := range csr.lset {
csr.lset[i].Name = dec.UvarintStr()
csr.lset[i].Value = dec.UvarintStr()
}
csr.chunkRange = dec.Be64int64()
if dec.Uvarint() == 0 {
return
}
csr.mc = &memChunk{}
csr.mc.minTime = dec.Be64int64()
csr.mc.maxTime = dec.Be64int64()
enc := chunkenc.Encoding(dec.Byte())
// The underlying bytes gets re-used later, so make a copy.
chunkBytes := dec.UvarintBytes()
chunkBytesCopy := make([]byte, len(chunkBytes))
copy(chunkBytesCopy, chunkBytes)
chk, err := chunkenc.FromData(enc, chunkBytesCopy)
if err != nil {
return csr, errors.Wrap(err, "chunk from data")
}
csr.mc.chunk = chk
for i := range csr.sampleBuf {
csr.sampleBuf[i].t = dec.Be64int64()
csr.sampleBuf[i].v = dec.Be64Float64()
}
err = dec.Err()
if err != nil && len(dec.B) > 0 {
err = errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
return
}
func encodeTombstonesToSnapshotRecord(tr tombstones.Reader) ([]byte, error) {
buf := encoding.Encbuf{}
buf.PutByte(chunkSnapshotRecordTypeTombstones)
b, err := tombstones.Encode(tr)
if err != nil {
return nil, errors.Wrap(err, "encode tombstones")
}
buf.PutUvarintBytes(b)
return buf.Get(), nil
}
func decodeTombstonesSnapshotRecord(b []byte) (tombstones.Reader, error) {
dec := encoding.Decbuf{B: b}
if flag := dec.Byte(); flag != chunkSnapshotRecordTypeTombstones {
return nil, errors.Errorf("invalid record type %x", flag)
}
tr, err := tombstones.Decode(dec.UvarintBytes())
return tr, errors.Wrap(err, "decode tombstones")
}
const chunkSnapshotPrefix = "chunk_snapshot."
// ChunkSnapshot creates a snapshot of all the series and tombstones in the head.
// It deletes the old chunk snapshots if the chunk snapshot creation is successful.
//
// The chunk snapshot is stored in a directory named chunk_snapshot.N.M and is written
// using the WAL package. N is the last WAL segment present during snapshotting and
// M is the offset in segment N upto which data was written.
func (h *Head) ChunkSnapshot() (*ChunkSnapshotStats, error) {
if h.wal == nil {
// If we are not storing any WAL, does not make sense to take a snapshot too.
level.Warn(h.logger).Log("msg", "skipping chunk snapshotting as WAL is disabled")
return &ChunkSnapshotStats{}, nil
}
h.chunkSnapshotMtx.Lock()
defer h.chunkSnapshotMtx.Unlock()
stats := &ChunkSnapshotStats{}
wlast, woffset, err := h.wal.LastSegmentAndOffset()
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "get last wal segment and offset")
}
_, cslast, csoffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil && err != record.ErrNotFound {
return stats, errors.Wrap(err, "find last chunk snapshot")
}
if wlast == cslast && woffset == csoffset {
// Nothing has been written to the WAL/Head since the last snapshot.
return stats, nil
}
snapshotName := fmt.Sprintf(chunkSnapshotPrefix+"%06d.%010d", wlast, woffset)
cpdir := filepath.Join(h.opts.ChunkDirRoot, snapshotName)
cpdirtmp := cpdir + ".tmp"
stats.Dir = cpdir
if err := os.MkdirAll(cpdirtmp, 0777); err != nil {
return stats, errors.Wrap(err, "create chunk snapshot dir")
}
cp, err := wal.New(nil, nil, cpdirtmp, h.wal.CompressionEnabled())
if err != nil {
return stats, errors.Wrap(err, "open chunk snapshot")
}
// Ensures that an early return caused by an error doesn't leave any tmp files.
defer func() {
cp.Close()
os.RemoveAll(cpdirtmp)
}()
var (
buf []byte
recs [][]byte
)
stripeSize := h.series.size
for i := 0; i < stripeSize; i++ {
h.series.locks[i].RLock()
for _, s := range h.series.series[i] {
start := len(buf)
buf = s.encodeToSnapshotRecord(buf)
if len(buf[start:]) == 0 {
continue // All contents discarded.
}
recs = append(recs, buf[start:])
// Flush records in 10 MB increments.
if len(buf) > 10*1024*1024 {
if err := cp.Log(recs...); err != nil {
h.series.locks[i].RUnlock()
return stats, errors.Wrap(err, "flush records")
}
buf, recs = buf[:0], recs[:0]
}
}
stats.TotalSeries += len(h.series.series[i])
h.series.locks[i].RUnlock()
}
// Add tombstones to the snapshot.
tombstonesReader, err := h.Tombstones()
if err != nil {
return stats, errors.Wrap(err, "get tombstones")
}
rec, err := encodeTombstonesToSnapshotRecord(tombstonesReader)
if err != nil {
return stats, errors.Wrap(err, "encode tombstones")
}
recs = append(recs, rec)
// Flush remaining records.
if err := cp.Log(recs...); err != nil {
return stats, errors.Wrap(err, "flush records")
}
if err := cp.Close(); err != nil {
return stats, errors.Wrap(err, "close chunk snapshot")
}
if err := fileutil.Replace(cpdirtmp, cpdir); err != nil {
return stats, errors.Wrap(err, "rename chunk snapshot directory")
}
if err := DeleteChunkSnapshots(h.opts.ChunkDirRoot, cslast, csoffset); err != nil {
// Leftover old chunk snapshots do not cause problems down the line beyond
// occupying disk space.
// They will just be ignored since a higher chunk snapshot exists.
level.Error(h.logger).Log("msg", "delete old chunk snapshots", "err", err)
}
return stats, nil
}
func (h *Head) performChunkSnapshot() error {
level.Info(h.logger).Log("msg", "creating chunk snapshot")
startTime := time.Now()
stats, err := h.ChunkSnapshot()
elapsed := time.Since(startTime)
if err == nil {
level.Info(h.logger).Log("msg", "chunk snapshot complete", "duration", elapsed.String(), "num_series", stats.TotalSeries, "dir", stats.Dir)
}
return errors.Wrap(err, "chunk snapshot")
}
// ChunkSnapshotStats returns stats about a created chunk snapshot.
type ChunkSnapshotStats struct {
TotalSeries int
Dir string
}
// LastChunkSnapshot returns the directory name and index of the most recent chunk snapshot.
// If dir does not contain any chunk snapshots, ErrNotFound is returned.
func LastChunkSnapshot(dir string) (string, int, int, error) {
files, err := ioutil.ReadDir(dir)
if err != nil {
return "", 0, 0, err
}
// Traverse list backwards since there may be multiple chunk snapshots left.
for i := len(files) - 1; i >= 0; i-- {
fi := files[i]
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
continue
}
if !fi.IsDir() {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not a directory", fi.Name())
}
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
return "", 0, 0, errors.Errorf("chunk snapshot %s is not in the right format", fi.Name())
}
idx, err := strconv.Atoi(splits[0])
if err != nil {
continue
}
offset, err := strconv.Atoi(splits[1])
if err != nil {
continue
}
return filepath.Join(dir, fi.Name()), idx, offset, nil
}
return "", 0, 0, record.ErrNotFound
}
// DeleteChunkSnapshots deletes all chunk snapshots in a directory below a given index.
func DeleteChunkSnapshots(dir string, maxIndex, maxOffset int) error {
files, err := ioutil.ReadDir(dir)
if err != nil {
return err
}
errs := tsdb_errors.NewMulti()
for _, fi := range files {
if !strings.HasPrefix(fi.Name(), chunkSnapshotPrefix) {
continue
}
splits := strings.Split(fi.Name()[len(chunkSnapshotPrefix):], ".")
if len(splits) != 2 {
continue
}
idx, err := strconv.Atoi(splits[0])
if err != nil {
continue
}
offset, err := strconv.Atoi(splits[1])
if err != nil {
continue
}
if idx <= maxIndex && offset < maxOffset {
if err := os.RemoveAll(filepath.Join(dir, fi.Name())); err != nil {
errs.Add(err)
}
}
}
return errs.Err()
}
func (h *Head) loadChunkSnapshot() (int, int, map[uint64]*memSeries, error) {
dir, snapIdx, snapOffset, err := LastChunkSnapshot(h.opts.ChunkDirRoot)
if err != nil {
if err == record.ErrNotFound {
return snapIdx, snapOffset, nil, nil
}
return snapIdx, snapOffset, nil, errors.Wrap(err, "find last chunk snapshot")
}
start := time.Now()
sr, err := wal.NewSegmentsReader(dir)
if err != nil {
return snapIdx, snapOffset, nil, errors.Wrap(err, "open chunk snapshot")
}
defer func() {
if err := sr.Close(); err != nil {
level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err)
}
}()
var (
numSeries = 0
unknownRefs = int64(0)
n = runtime.GOMAXPROCS(0)
wg sync.WaitGroup
recordChan = make(chan chunkSnapshotRecord, 5*n)
shardedRefSeries = make([]map[uint64]*memSeries, n)
errChan = make(chan error, n)
)
wg.Add(n)
for i := 0; i < n; i++ {
go func(idx int, rc <-chan chunkSnapshotRecord) {
defer wg.Done()
defer func() {
// If there was an error, drain the channel
// to unblock the main thread.
for range rc {
}
}()
shardedRefSeries[idx] = make(map[uint64]*memSeries)
localRefSeries := shardedRefSeries[idx]
for csr := range rc {
series, _, err := h.getOrCreateWithID(csr.ref, csr.lset.Hash(), csr.lset)
if err != nil {
errChan <- err
return
}
localRefSeries[csr.ref] = series
if h.lastSeriesID.Load() < series.ref {
h.lastSeriesID.Store(series.ref)
}
series.chunkRange = csr.chunkRange
if csr.mc == nil {
continue
}
series.nextAt = csr.mc.maxTime // This will create a new chunk on append.
series.headChunk = csr.mc
for i := range series.sampleBuf {
series.sampleBuf[i].t = csr.sampleBuf[i].t
series.sampleBuf[i].v = csr.sampleBuf[i].v
}
app, err := series.headChunk.chunk.Appender()
if err != nil {
errChan <- err
return
}
series.app = app
h.updateMinMaxTime(csr.mc.minTime, csr.mc.maxTime)
}
}(i, recordChan)
}
r := wal.NewReader(sr)
var loopErr error
Outer:
for r.Next() {
select {
case err := <-errChan:
errChan <- err
break Outer
default:
}
rec := r.Record()
switch rec[0] {
case chunkSnapshotRecordTypeSeries:
numSeries++
csr, err := decodeSeriesFromChunkSnapshot(rec)
if err != nil {
loopErr = errors.Wrap(err, "decode series record")
break Outer
}
recordChan <- csr
case chunkSnapshotRecordTypeTombstones:
tr, err := decodeTombstonesSnapshotRecord(rec)
if err != nil {
loopErr = errors.Wrap(err, "decode tombstones")
break Outer
}
if err = tr.Iter(func(ref uint64, ivs tombstones.Intervals) error {
h.tombstones.AddInterval(ref, ivs...)
return nil
}); err != nil {
loopErr = errors.Wrap(err, "iterate tombstones")
break Outer
}
}
}
close(recordChan)
wg.Wait()
close(errChan)
merr := tsdb_errors.NewMulti(errors.Wrap(loopErr, "decode loop"))
for err := range errChan {
merr.Add(errors.Wrap(err, "record processing"))
}
if err := merr.Err(); err != nil {
return -1, -1, nil, err
}
refSeries := make(map[uint64]*memSeries, numSeries)
for _, shard := range shardedRefSeries {
for k, v := range shard {
refSeries[k] = v
}
}
elapsed := time.Since(start)
level.Info(h.logger).Log("msg", "chunk snapshot loaded", "dir", dir, "num_series", numSeries, "duration", elapsed.String())
if unknownRefs > 0 {
level.Warn(h.logger).Log("msg", "unknown series references during chunk snapshot replay", "count", unknownRefs)
}
return snapIdx, snapOffset, refSeries, nil
}

View file

@ -699,6 +699,22 @@ func (w *WAL) log(rec []byte, final bool) error {
return nil
}
// LastSegmentAndOffset returns the last segment number of the WAL
// and the offset in that file upto which the segment has been filled.
func (w *WAL) LastSegmentAndOffset() (seg, offset int, err error) {
w.mtx.Lock()
defer w.mtx.Unlock()
_, seg, err = Segments(w.Dir())
if err != nil {
return
}
offset = (w.donePages * pageSize) + w.page.alloc
return
}
// Truncate drops all segments before i.
func (w *WAL) Truncate(i int) (err error) {
w.metrics.truncateTotal.Inc()
@ -867,6 +883,21 @@ func NewSegmentBufReader(segs ...*Segment) *segmentBufReader {
}
}
// nolint:golint
func NewSegmentBufReaderWithOffset(offset int, segs ...*Segment) (sbr *segmentBufReader, err error) {
if offset == 0 {
return NewSegmentBufReader(segs...), nil
}
sbr = &segmentBufReader{
buf: bufio.NewReaderSize(segs[0], 16*pageSize),
segs: segs,
}
if offset > 0 {
_, err = sbr.buf.Discard(offset)
}
return sbr, err
}
func (r *segmentBufReader) Close() (err error) {
for _, s := range r.segs {
if e := s.Close(); e != nil {

View file

@ -8,7 +8,7 @@ import { history, historyKeymap } from '@codemirror/history';
import { defaultKeymap, insertNewlineAndIndent } from '@codemirror/commands';
import { bracketMatching } from '@codemirror/matchbrackets';
import { closeBrackets, closeBracketsKeymap } from '@codemirror/closebrackets';
import { searchKeymap, highlightSelectionMatches } from '@codemirror/search';
import { highlightSelectionMatches } from '@codemirror/search';
import { commentKeymap } from '@codemirror/comment';
import { lintKeymap } from '@codemirror/lint';
import { PromQLExtension, CompleteStrategy } from 'codemirror-promql';
@ -139,7 +139,6 @@ const CMExpressionInput: FC<CMExpressionInputProps> = ({
keymap.of([
...closeBracketsKeymap,
...defaultKeymap,
...searchKeymap,
...historyKeymap,
...commentKeymap,
...completionKeymap,