prometheus/discovery/xds/xds.go
Ganesh Vernekar 095f572d4a
Sync sparsehistogram branch with main (#9189)
* Fix `kuma_sd` targetgroup reporting (#9157)

* Bundle all xDS targets into a single group

Signed-off-by: austin ce <austin.cawley@gmail.com>

* Snapshot in-memory chunks on shutdown for faster restarts (#7229)

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Rename links

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Remove Individual Data Type Caps in Per-shard Buffering for Remote Write (#8921)

* Moved everything to nPending buffer

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Simplify exemplar capacity addition

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Added pre-allocation

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Don't allocate if not sending exemplars

Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Avoid deadlock when processing duplicate series record (#9170)

* Avoid deadlock when processing duplicate series record

`processWALSamples()` needs to be able to send on its output channel
before it can read the input channel, so reads to allow this in case the
output channel is full.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* processWALSamples: update comment

Previous text seems to relate to an earlier implementation.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Optimise WAL loading by removing extra map and caching min-time (#9160)

* BenchmarkLoadWAL: close WAL after use

So that goroutines are stopped and resources released

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: make series IDs co-prime with #workers

Series are distributed across workers by taking the modulus of the
ID with the number of workers, so multiples of 100 are a poor choice.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* BenchmarkLoadWAL: simulate mmapped chunks

Real Prometheus cuts chunks every 120 samples, then skips those samples
when re-reading the WAL. Simulate this by creating a single mapped chunk
for each series, since the max time is all the reader looks at.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Fix comment

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Remove series map from processWALSamples()

The locks that is commented to reduce contention in are now sharded
32,000 ways, so won't be contended. Removing the map saves memory and
goes just as fast.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* loadWAL: Cache the last mmapped chunk time

So we can skip calling append() for samples it will reject.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Improvements from code review

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Full stops and capitals on comments

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Cache max time in both places mmappedChunks is updated

Including refactor to extract function `setMMappedChunks`, to reduce
code duplication.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Update head min/max time when mmapped chunks added

This ensures we have the correct values if no WAL samples are added for
that series.

Note that `mSeries.maxTime()` was always `math.MinInt64` before, since
that function doesn't consider mmapped chunks.

Signed-off-by: Bryan Boreham <bjboreham@gmail.com>

* Split Go and React Tests (#8897)

* Added go-ci and react-ci

Co-authored-by: Julien Pivotto <roidelapluie@inuits.eu>
Signed-off-by: Levi Harrison <git@leviharrison.dev>

* Remove search keymap from new expression editor (#9184)

Signed-off-by: Julius Volz <julius.volz@gmail.com>

Co-authored-by: Austin Cawley-Edwards <austin.cawley@gmail.com>
Co-authored-by: Levi Harrison <git@leviharrison.dev>
Co-authored-by: Julien Pivotto <roidelapluie@inuits.eu>
Co-authored-by: Bryan Boreham <bjboreham@gmail.com>
Co-authored-by: Julius Volz <julius.volz@gmail.com>
2021-08-11 15:43:17 +05:30

174 lines
5.1 KiB
Go

// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package xds
import (
"context"
"time"
v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"
"google.golang.org/protobuf/types/known/anypb"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/discovery/targetgroup"
)
const (
// Constants for instrumentation.
namespace = "prometheus"
)
// ProtocolVersion is the xDS protocol version.
type ProtocolVersion string
const (
ProtocolV3 = ProtocolVersion("v3")
)
type HTTPConfig struct {
config.HTTPClientConfig `yaml:",inline"`
}
// SDConfig is a base config for xDS-based SD mechanisms.
type SDConfig struct {
HTTPClientConfig config.HTTPClientConfig `yaml:",inline"`
RefreshInterval model.Duration `yaml:"refresh_interval,omitempty"`
FetchTimeout model.Duration `yaml:"fetch_timeout,omitempty"`
Server string `yaml:"server,omitempty"`
}
// mustRegisterMessage registers the provided message type in the typeRegistry, and panics
// if there is an error.
func mustRegisterMessage(typeRegistry *protoregistry.Types, mt protoreflect.MessageType) {
if err := typeRegistry.RegisterMessage(mt); err != nil {
panic(err)
}
}
func init() {
// Register top-level SD Configs.
discovery.RegisterConfig(&KumaSDConfig{})
// Register metrics.
prometheus.MustRegister(kumaFetchDuration, kumaFetchSkipUpdateCount, kumaFetchFailuresCount)
// Register protobuf types that need to be marshalled/ unmarshalled.
mustRegisterMessage(protoTypes, (&v3.DiscoveryRequest{}).ProtoReflect().Type())
mustRegisterMessage(protoTypes, (&v3.DiscoveryResponse{}).ProtoReflect().Type())
mustRegisterMessage(protoTypes, (&MonitoringAssignment{}).ProtoReflect().Type())
}
var (
protoTypes = new(protoregistry.Types)
protoUnmarshalOptions = proto.UnmarshalOptions{
DiscardUnknown: true, // Only want known fields.
Merge: true, // Always using new messages.
Resolver: protoTypes, // Only want known types.
}
protoJSONUnmarshalOptions = protojson.UnmarshalOptions{
DiscardUnknown: true, // Only want known fields.
Resolver: protoTypes, // Only want known types.
}
protoJSONMarshalOptions = protojson.MarshalOptions{
UseProtoNames: true,
Resolver: protoTypes, // Only want known types.
}
)
// resourceParser is a function that takes raw discovered objects and translates them into
// targetgroup.Group Targets. On error, no updates are sent to the scrape manager and the failure count is incremented.
type resourceParser func(resources []*anypb.Any, typeUrl string) ([]model.LabelSet, error)
// fetchDiscovery implements long-polling via xDS Fetch REST-JSON.
type fetchDiscovery struct {
client ResourceClient
source string
refreshInterval time.Duration
parseResources resourceParser
logger log.Logger
fetchDuration prometheus.Observer
fetchSkipUpdateCount prometheus.Counter
fetchFailuresCount prometheus.Counter
}
func (d *fetchDiscovery) Run(ctx context.Context, ch chan<- []*targetgroup.Group) {
defer d.client.Close()
ticker := time.NewTicker(d.refreshInterval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
default:
d.poll(ctx, ch)
<-ticker.C
}
}
}
func (d *fetchDiscovery) poll(ctx context.Context, ch chan<- []*targetgroup.Group) {
t0 := time.Now()
response, err := d.client.Fetch(ctx)
elapsed := time.Since(t0)
d.fetchDuration.Observe(elapsed.Seconds())
// Check the context before in order to exit early.
select {
case <-ctx.Done():
return
default:
}
if err != nil {
level.Error(d.logger).Log("msg", "error parsing resources", "err", err)
d.fetchFailuresCount.Inc()
return
}
if response == nil {
// No update needed.
d.fetchSkipUpdateCount.Inc()
return
}
parsedTargets, err := d.parseResources(response.Resources, response.TypeUrl)
if err != nil {
level.Error(d.logger).Log("msg", "error parsing resources", "err", err)
d.fetchFailuresCount.Inc()
return
}
level.Debug(d.logger).Log("msg", "Updated to version", "version", response.VersionInfo, "targets", len(parsedTargets))
select {
case <-ctx.Done():
return
case ch <- []*targetgroup.Group{{Source: d.source, Targets: parsedTargets}}:
}
}