2017-05-10 02:44:13 -07:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 01:51:29 -07:00
"context"
2017-05-10 02:44:13 -07:00
"math"
2018-09-07 14:26:04 -07:00
"strconv"
2017-05-10 02:44:13 -07:00
"sync"
2018-02-01 05:20:38 -08:00
"sync/atomic"
2017-05-10 02:44:13 -07:00
"time"
2017-08-11 11:45:52 -07:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2018-09-07 14:26:04 -07:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2020-06-01 08:21:13 -07:00
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
2018-09-07 14:26:04 -07:00
2017-05-10 02:44:13 -07:00
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/config"
2019-03-08 08:29:25 -08:00
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
2017-10-23 13:28:17 -07:00
"github.com/prometheus/prometheus/prompb"
2019-09-19 02:15:41 -07:00
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
2017-05-10 02:44:13 -07:00
)
const (
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time . Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
)
2020-02-03 13:47:03 -08:00
type queueManagerMetrics struct {
2020-04-24 20:39:46 -07:00
reg prometheus . Registerer
succeededSamplesTotal prometheus . Counter
failedSamplesTotal prometheus . Counter
retriedSamplesTotal prometheus . Counter
droppedSamplesTotal prometheus . Counter
enqueueRetriesTotal prometheus . Counter
sentBatchDuration prometheus . Histogram
highestSentTimestamp * maxGauge
pendingSamples prometheus . Gauge
shardCapacity prometheus . Gauge
numShards prometheus . Gauge
maxNumShards prometheus . Gauge
minNumShards prometheus . Gauge
desiredNumShards prometheus . Gauge
bytesSent prometheus . Counter
2020-02-03 13:47:03 -08:00
}
2020-04-24 20:39:46 -07:00
func newQueueManagerMetrics ( r prometheus . Registerer , rn , e string ) * queueManagerMetrics {
m := & queueManagerMetrics {
reg : r ,
}
constLabels := prometheus . Labels {
remoteName : rn ,
endpoint : e ,
}
m . succeededSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "succeeded_samples_total" ,
Help : "Total number of samples successfully sent to remote storage." ,
ConstLabels : constLabels ,
} )
m . failedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "failed_samples_total" ,
Help : "Total number of samples which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
m . retriedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "retried_samples_total" ,
Help : "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
m . droppedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "dropped_samples_total" ,
Help : "Total number of samples which were dropped after being read from the WAL before being sent via remote write." ,
ConstLabels : constLabels ,
} )
m . enqueueRetriesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "enqueue_retries_total" ,
Help : "Total number of times enqueue has failed because a shards queue was full." ,
ConstLabels : constLabels ,
} )
m . sentBatchDuration = prometheus . NewHistogram ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_batch_duration_seconds" ,
Help : "Duration of sample batch send calls to the remote storage." ,
Buckets : prometheus . DefBuckets ,
ConstLabels : constLabels ,
} )
m . highestSentTimestamp = & maxGauge {
Gauge : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "queue_highest_sent_timestamp_seconds" ,
Help : "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch." ,
ConstLabels : constLabels ,
} ) ,
}
m . pendingSamples = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "pending_samples" ,
Help : "The number of samples pending in the queues shards to be sent to the remote storage." ,
ConstLabels : constLabels ,
} )
m . shardCapacity = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shard_capacity" ,
Help : "The capacity of each shard of the queue used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . numShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards" ,
Help : "The number of shards used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . maxNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_max" ,
Help : "The maximum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . minNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_min" ,
Help : "The minimum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . desiredNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_desired" ,
Help : "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out." ,
ConstLabels : constLabels ,
} )
m . bytesSent = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_bytes_total" ,
Help : "The total number of bytes sent by the queue." ,
ConstLabels : constLabels ,
} )
2020-02-03 13:47:03 -08:00
2020-06-25 23:33:52 -07:00
return m
}
func ( m * queueManagerMetrics ) register ( ) {
if m . reg != nil {
m . reg . MustRegister (
2020-02-03 13:47:03 -08:00
m . succeededSamplesTotal ,
m . failedSamplesTotal ,
m . retriedSamplesTotal ,
m . droppedSamplesTotal ,
m . enqueueRetriesTotal ,
m . sentBatchDuration ,
2020-04-24 20:39:46 -07:00
m . highestSentTimestamp ,
m . pendingSamples ,
2020-02-03 13:47:03 -08:00
m . shardCapacity ,
m . numShards ,
m . maxNumShards ,
m . minNumShards ,
m . desiredNumShards ,
m . bytesSent ,
)
}
}
2017-05-10 02:44:13 -07:00
2020-04-24 20:39:46 -07:00
func ( m * queueManagerMetrics ) unregister ( ) {
if m . reg != nil {
m . reg . Unregister ( m . succeededSamplesTotal )
m . reg . Unregister ( m . failedSamplesTotal )
m . reg . Unregister ( m . retriedSamplesTotal )
m . reg . Unregister ( m . droppedSamplesTotal )
m . reg . Unregister ( m . enqueueRetriesTotal )
m . reg . Unregister ( m . sentBatchDuration )
m . reg . Unregister ( m . highestSentTimestamp )
m . reg . Unregister ( m . pendingSamples )
m . reg . Unregister ( m . shardCapacity )
m . reg . Unregister ( m . numShards )
m . reg . Unregister ( m . maxNumShards )
m . reg . Unregister ( m . minNumShards )
m . reg . Unregister ( m . desiredNumShards )
m . reg . Unregister ( m . bytesSent )
}
}
2017-05-10 02:44:13 -07:00
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
// Store stores the given samples in the remote storage.
2018-09-07 14:26:04 -07:00
Store ( context . Context , [ ] byte ) error
2019-12-12 12:47:23 -08:00
// Name uniquely identifies the remote storage.
2017-05-10 02:44:13 -07:00
Name ( ) string
2019-12-12 12:47:23 -08:00
// Endpoint is the remote read or write endpoint for the storage client.
Endpoint ( ) string
2017-05-10 02:44:13 -07:00
}
// QueueManager manages a queue of samples to be sent to the Storage
2018-09-07 14:26:04 -07:00
// indicated by the provided StorageClient. Implements writeTo interface
// used by WAL Watcher.
2017-05-10 02:44:13 -07:00
type QueueManager struct {
2019-10-21 14:54:25 -07:00
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
lastSendTimestamp int64
2019-03-05 04:21:11 -08:00
logger log . Logger
flushDeadline time . Duration
cfg config . QueueConfig
2019-03-08 08:29:25 -08:00
externalLabels labels . Labels
relabelConfigs [ ] * relabel . Config
2019-09-19 02:15:41 -07:00
watcher * wal . Watcher
2018-09-07 14:26:04 -07:00
2020-03-30 20:39:29 -07:00
clientMtx sync . RWMutex
storeClient StorageClient
2019-09-13 10:23:58 -07:00
seriesMtx sync . Mutex
2019-08-07 12:39:07 -07:00
seriesLabels map [ uint64 ] labels . Labels
2018-09-07 14:26:04 -07:00
seriesSegmentIndexes map [ uint64 ] int
droppedSeries map [ uint64 ] struct { }
2017-05-10 02:44:13 -07:00
shards * shards
numShards int
reshardChan chan int
2019-01-18 04:48:16 -08:00
quit chan struct { }
wg sync . WaitGroup
2017-05-10 02:44:13 -07:00
2019-02-19 23:51:08 -08:00
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
2019-03-05 04:21:11 -08:00
2020-04-24 20:39:46 -07:00
metrics * queueManagerMetrics
2017-05-10 02:44:13 -07:00
}
// NewQueueManager builds a new QueueManager.
2020-04-24 20:39:46 -07:00
func NewQueueManager (
metrics * queueManagerMetrics ,
watcherMetrics * wal . WatcherMetrics ,
readerMetrics * wal . LiveReaderMetrics ,
logger log . Logger ,
walDir string ,
samplesIn * ewmaRate ,
cfg config . QueueConfig ,
externalLabels labels . Labels ,
relabelConfigs [ ] * relabel . Config ,
client StorageClient ,
flushDeadline time . Duration ,
) * QueueManager {
2017-08-11 11:45:52 -07:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2019-03-05 04:21:11 -08:00
2019-12-12 12:47:23 -08:00
logger = log . With ( logger , remoteName , client . Name ( ) , endpoint , client . Endpoint ( ) )
2017-05-10 02:44:13 -07:00
t := & QueueManager {
2019-03-05 04:21:11 -08:00
logger : logger ,
2018-05-23 07:03:54 -07:00
flushDeadline : flushDeadline ,
2017-05-10 02:44:13 -07:00
cfg : cfg ,
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
2020-03-30 20:39:29 -07:00
storeClient : client ,
2017-05-10 02:44:13 -07:00
2019-08-07 12:39:07 -07:00
seriesLabels : make ( map [ uint64 ] labels . Labels ) ,
2018-09-07 14:26:04 -07:00
seriesSegmentIndexes : make ( map [ uint64 ] int ) ,
droppedSeries : make ( map [ uint64 ] struct { } ) ,
2018-12-04 09:32:14 -08:00
numShards : cfg . MinShards ,
2017-05-10 02:44:13 -07:00
reshardChan : make ( chan int ) ,
quit : make ( chan struct { } ) ,
2018-09-07 14:26:04 -07:00
samplesIn : samplesIn ,
2019-02-19 23:51:08 -08:00
samplesDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2017-05-10 02:44:13 -07:00
samplesOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2020-02-03 13:47:03 -08:00
metrics : metrics ,
2019-03-01 11:04:26 -08:00
}
2018-09-07 14:26:04 -07:00
2020-03-20 09:34:15 -07:00
t . watcher = wal . NewWatcher ( watcherMetrics , readerMetrics , logger , client . Name ( ) , t , walDir )
2019-03-05 04:21:11 -08:00
t . shards = t . newShards ( )
2017-05-10 02:44:13 -07:00
return t
}
2018-09-07 14:26:04 -07:00
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
2019-09-19 02:15:41 -07:00
func ( t * QueueManager ) Append ( samples [ ] record . RefSample ) bool {
2019-06-27 11:48:21 -07:00
outer :
2019-08-12 09:22:02 -07:00
for _ , s := range samples {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
2019-08-12 09:22:02 -07:00
lbls , ok := t . seriesLabels [ s . Ref ]
2019-06-27 11:48:21 -07:00
if ! ok {
2020-04-24 20:39:46 -07:00
t . metrics . droppedSamplesTotal . Inc ( )
2019-02-19 23:51:08 -08:00
t . samplesDropped . incr ( 1 )
2019-08-12 09:22:02 -07:00
if _ , ok := t . droppedSeries [ s . Ref ] ; ! ok {
2020-04-11 01:22:18 -07:00
level . Info ( t . logger ) . Log ( "msg" , "Dropped sample for series that was not explicitly dropped via relabelling" , "ref" , s . Ref )
2018-09-07 14:26:04 -07:00
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
continue
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2019-01-18 04:48:16 -08:00
// This will only loop if the queues are being resharded.
backoff := t . cfg . MinBackoff
2018-09-07 14:26:04 -07:00
for {
select {
case <- t . quit :
return false
default :
}
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
if t . shards . enqueue ( s . Ref , sample {
labels : lbls ,
t : s . T ,
v : s . V ,
} ) {
2018-09-07 14:26:04 -07:00
continue outer
}
2019-01-18 04:48:16 -08:00
2020-04-24 20:39:46 -07:00
t . metrics . enqueueRetriesTotal . Inc ( )
2018-09-07 14:26:04 -07:00
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
if backoff > t . cfg . MaxBackoff {
backoff = t . cfg . MaxBackoff
}
2017-05-10 02:44:13 -07:00
}
}
2018-09-07 14:26:04 -07:00
return true
2017-05-10 02:44:13 -07:00
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func ( t * QueueManager ) Start ( ) {
2020-06-25 23:33:52 -07:00
// Register and initialise some metrics.
t . metrics . register ( )
2020-04-24 20:39:46 -07:00
t . metrics . shardCapacity . Set ( float64 ( t . cfg . Capacity ) )
t . metrics . pendingSamples . Set ( 0 )
t . metrics . maxNumShards . Set ( float64 ( t . cfg . MaxShards ) )
t . metrics . minNumShards . Set ( float64 ( t . cfg . MinShards ) )
t . metrics . desiredNumShards . Set ( float64 ( t . cfg . MinShards ) )
2019-04-23 01:49:17 -07:00
2018-09-07 14:26:04 -07:00
t . shards . start ( t . numShards )
t . watcher . Start ( )
2017-05-10 02:44:13 -07:00
t . wg . Add ( 2 )
go t . updateShardsLoop ( )
go t . reshardLoop ( )
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func ( t * QueueManager ) Stop ( ) {
2017-08-11 11:45:52 -07:00
level . Info ( t . logger ) . Log ( "msg" , "Stopping remote storage..." )
2018-09-07 14:26:04 -07:00
defer level . Info ( t . logger ) . Log ( "msg" , "Remote storage stopped." )
2017-05-10 02:44:13 -07:00
close ( t . quit )
2019-04-16 03:25:19 -07:00
t . wg . Wait ( )
// Wait for all QueueManager routines to end before stopping shards and WAL watcher. This
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
2018-09-07 14:26:04 -07:00
t . shards . stop ( )
t . watcher . Stop ( )
2019-03-13 03:02:36 -07:00
// On shutdown, release the strings in the labels from the intern pool.
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
2019-03-13 03:02:36 -07:00
for _ , labels := range t . seriesLabels {
2019-08-07 12:39:07 -07:00
releaseLabels ( labels )
2019-03-13 03:02:36 -07:00
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2020-04-24 20:39:46 -07:00
t . metrics . unregister ( )
2018-09-07 14:26:04 -07:00
}
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
2019-09-19 02:15:41 -07:00
func ( t * QueueManager ) StoreSeries ( series [ ] record . RefSeries , index int ) {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
for _ , s := range series {
2019-03-08 08:29:25 -08:00
ls := processExternalLabels ( s . Labels , t . externalLabels )
2019-08-07 12:39:07 -07:00
lbls := relabel . Process ( ls , t . relabelConfigs ... )
if len ( lbls ) == 0 {
2018-09-07 14:26:04 -07:00
t . droppedSeries [ s . Ref ] = struct { } { }
continue
}
2019-06-27 11:48:21 -07:00
t . seriesSegmentIndexes [ s . Ref ] = index
2019-08-07 12:39:07 -07:00
internLabels ( lbls )
2019-03-11 16:44:23 -07:00
2019-03-13 03:02:36 -07:00
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
2019-06-27 11:48:21 -07:00
if orig , ok := t . seriesLabels [ s . Ref ] ; ok {
2019-08-07 12:39:07 -07:00
releaseLabels ( orig )
2019-03-11 16:44:23 -07:00
}
2019-08-07 12:39:07 -07:00
t . seriesLabels [ s . Ref ] = lbls
2018-09-07 14:26:04 -07:00
}
}
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
// SeriesReset is used when reading a checkpoint. WAL Watcher should have
// stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps.
func ( t * QueueManager ) SeriesReset ( index int ) {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
for k , v := range t . seriesSegmentIndexes {
if v < index {
delete ( t . seriesSegmentIndexes , k )
2019-08-07 12:39:07 -07:00
releaseLabels ( t . seriesLabels [ k ] )
2019-03-11 16:44:23 -07:00
delete ( t . seriesLabels , k )
2019-09-13 10:23:58 -07:00
delete ( t . droppedSeries , k )
2018-09-07 14:26:04 -07:00
}
}
}
2017-08-11 11:45:52 -07:00
2020-03-30 20:39:29 -07:00
// SetClient updates the client used by a queue. Used when only client specific
// fields are updated to avoid restarting the queue.
func ( t * QueueManager ) SetClient ( c StorageClient ) {
t . clientMtx . Lock ( )
t . storeClient = c
t . clientMtx . Unlock ( )
}
func ( t * QueueManager ) client ( ) StorageClient {
t . clientMtx . RLock ( )
defer t . clientMtx . RUnlock ( )
return t . storeClient
}
2019-08-07 12:39:07 -07:00
func internLabels ( lbls labels . Labels ) {
for i , l := range lbls {
lbls [ i ] . Name = interner . intern ( l . Name )
lbls [ i ] . Value = interner . intern ( l . Value )
}
}
func releaseLabels ( ls labels . Labels ) {
2019-03-11 16:44:23 -07:00
for _ , l := range ls {
interner . release ( l . Name )
interner . release ( l . Value )
}
}
2019-03-13 03:02:36 -07:00
// processExternalLabels merges externalLabels into ls. If ls contains
2019-03-08 08:29:25 -08:00
// a label in externalLabels, the value in ls wins.
2019-11-18 11:53:33 -08:00
func processExternalLabels ( ls labels . Labels , externalLabels labels . Labels ) labels . Labels {
2019-03-08 08:29:25 -08:00
i , j , result := 0 , 0 , make ( labels . Labels , 0 , len ( ls ) + len ( externalLabels ) )
for i < len ( ls ) && j < len ( externalLabels ) {
if ls [ i ] . Name < externalLabels [ j ] . Name {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
} else if ls [ i ] . Name > externalLabels [ j ] . Name {
result = append ( result , externalLabels [ j ] )
j ++
} else {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
j ++
2018-09-07 14:26:04 -07:00
}
}
2019-03-08 08:29:25 -08:00
for ; i < len ( ls ) ; i ++ {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
}
result = append ( result , externalLabels [ j : ] ... )
return result
2017-05-10 02:44:13 -07:00
}
func ( t * QueueManager ) updateShardsLoop ( ) {
defer t . wg . Done ( )
2017-10-09 09:36:20 -07:00
ticker := time . NewTicker ( shardUpdateDuration )
defer ticker . Stop ( )
2017-05-10 02:44:13 -07:00
for {
select {
2017-10-09 09:36:20 -07:00
case <- ticker . C :
2019-10-21 14:54:25 -07:00
desiredShards := t . calculateDesiredShards ( )
2020-04-20 15:20:39 -07:00
if ! t . shouldReshard ( desiredShards ) {
2019-10-21 14:54:25 -07:00
continue
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- desiredShards :
2019-11-26 05:22:56 -08:00
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , desiredShards )
2019-10-21 14:54:25 -07:00
t . numShards = desiredShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
2017-05-10 02:44:13 -07:00
case <- t . quit :
return
}
}
}
2020-04-20 15:20:39 -07:00
// shouldReshard returns if resharding should occur
func ( t * QueueManager ) shouldReshard ( desiredShards int ) bool {
if desiredShards == t . numShards {
return false
}
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time . Now ( ) . Add ( - 2 * time . Duration ( t . cfg . BatchSendDeadline ) ) . Unix ( )
lsts := atomic . LoadInt64 ( & t . lastSendTimestamp )
if lsts < minSendTimestamp {
level . Warn ( t . logger ) . Log ( "msg" , "Skipping resharding, last successful send was beyond threshold" , "lastSendTimestamp" , lsts , "minSendTimestamp" , minSendTimestamp )
return false
}
return true
}
2019-10-21 14:54:25 -07:00
// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func ( t * QueueManager ) calculateDesiredShards ( ) int {
2017-05-10 02:44:13 -07:00
t . samplesOut . tick ( )
2019-02-19 23:51:08 -08:00
t . samplesDropped . tick ( )
2017-05-10 02:44:13 -07:00
t . samplesOutDuration . tick ( )
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
2019-08-13 02:10:21 -07:00
samplesInRate = t . samplesIn . rate ( )
samplesOutRate = t . samplesOut . rate ( )
samplesKeptRatio = samplesOutRate / ( t . samplesDropped . rate ( ) + samplesOutRate )
samplesOutDuration = t . samplesOutDuration . rate ( ) / float64 ( time . Second )
samplesPendingRate = samplesInRate * samplesKeptRatio - samplesOutRate
2020-04-24 20:39:46 -07:00
highestSent = t . metrics . highestSentTimestamp . Get ( )
2019-03-01 11:04:26 -08:00
highestRecv = highestTimestamp . Get ( )
2020-01-02 09:30:56 -08:00
delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio
2017-05-10 02:44:13 -07:00
)
2019-08-13 02:10:21 -07:00
if samplesOutRate <= 0 {
2019-10-21 14:54:25 -07:00
return t . numShards
2017-05-10 02:44:13 -07:00
}
2019-12-23 10:03:54 -08:00
// When behind we will try to catch up on a proporation of samples per tick.
// This works similarly to an integral accumulator in that pending samples
// is the result of the error integral.
const integralGain = 0.1 / float64 ( shardUpdateDuration / time . Second )
2017-05-10 02:44:13 -07:00
var (
2019-08-13 02:10:21 -07:00
timePerSample = samplesOutDuration / samplesOutRate
2019-12-23 10:03:54 -08:00
desiredShards = timePerSample * ( samplesInRate * samplesKeptRatio + integralGain * samplesPending )
2017-05-10 02:44:13 -07:00
)
2020-04-24 20:39:46 -07:00
t . metrics . desiredNumShards . Set ( desiredShards )
2019-08-10 08:24:58 -07:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.calculateDesiredShards" ,
2019-08-13 02:10:21 -07:00
"samplesInRate" , samplesInRate ,
"samplesOutRate" , samplesOutRate ,
2019-03-01 11:04:26 -08:00
"samplesKeptRatio" , samplesKeptRatio ,
2019-08-13 02:10:21 -07:00
"samplesPendingRate" , samplesPendingRate ,
2019-03-01 11:04:26 -08:00
"samplesPending" , samplesPending ,
"samplesOutDuration" , samplesOutDuration ,
"timePerSample" , timePerSample ,
"desiredShards" , desiredShards ,
"highestSent" , highestSent ,
2019-08-13 02:10:21 -07:00
"highestRecv" , highestRecv ,
)
2017-05-10 02:44:13 -07:00
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64 ( t . numShards ) * ( 1. - shardToleranceFraction )
upperBound = float64 ( t . numShards ) * ( 1. + shardToleranceFraction )
)
2017-08-11 11:45:52 -07:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
2017-05-10 02:44:13 -07:00
if lowerBound <= desiredShards && desiredShards <= upperBound {
2019-10-21 14:54:25 -07:00
return t . numShards
2017-05-10 02:44:13 -07:00
}
numShards := int ( math . Ceil ( desiredShards ) )
2020-01-02 09:30:56 -08:00
// Do not downshard if we are more than ten seconds back.
if numShards < t . numShards && delay > 10.0 {
level . Debug ( t . logger ) . Log ( "msg" , "Not downsharding due to being too far behind" )
return t . numShards
}
2017-05-10 02:44:13 -07:00
if numShards > t . cfg . MaxShards {
numShards = t . cfg . MaxShards
2018-12-04 09:32:14 -08:00
} else if numShards < t . cfg . MinShards {
numShards = t . cfg . MinShards
2017-05-10 02:44:13 -07:00
}
2019-10-21 14:54:25 -07:00
return numShards
2017-05-10 02:44:13 -07:00
}
func ( t * QueueManager ) reshardLoop ( ) {
defer t . wg . Done ( )
for {
select {
case numShards := <- t . reshardChan :
2018-09-07 14:26:04 -07:00
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t . shards . stop ( )
t . shards . start ( numShards )
2017-05-10 02:44:13 -07:00
case <- t . quit :
return
}
}
}
2018-09-07 14:26:04 -07:00
func ( t * QueueManager ) newShards ( ) * shards {
s := & shards {
qm : t ,
done : make ( chan struct { } ) ,
}
return s
}
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
type sample struct {
labels labels . Labels
t int64
v float64
}
2017-05-10 02:44:13 -07:00
type shards struct {
2018-09-07 14:26:04 -07:00
mtx sync . RWMutex // With the WAL, this is never actually contended.
qm * QueueManager
2019-08-12 09:22:02 -07:00
queues [ ] chan sample
2018-09-07 14:26:04 -07:00
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
2018-02-01 05:20:38 -08:00
done chan struct { }
running int32
2018-09-07 14:26:04 -07:00
// Soft shutdown context will prevent new enqueues and deadlocks.
softShutdown chan struct { }
// Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate.
hardShutdown context . CancelFunc
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// start the shards; must be called before any call to enqueue.
func ( s * shards ) start ( n int ) {
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
2019-08-12 09:22:02 -07:00
newQueues := make ( [ ] chan sample , n )
2018-09-07 14:26:04 -07:00
for i := 0 ; i < n ; i ++ {
2019-08-12 09:22:02 -07:00
newQueues [ i ] = make ( chan sample , s . qm . cfg . Capacity )
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
s . queues = newQueues
var hardShutdownCtx context . Context
hardShutdownCtx , s . hardShutdown = context . WithCancel ( context . Background ( ) )
s . softShutdown = make ( chan struct { } )
s . running = int32 ( n )
s . done = make ( chan struct { } )
for i := 0 ; i < n ; i ++ {
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
2017-05-10 02:44:13 -07:00
}
2020-04-24 20:39:46 -07:00
s . qm . metrics . numShards . Set ( float64 ( n ) )
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// stop the shards; subsequent call to enqueue will return false.
func ( s * shards ) stop ( ) {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
2019-03-03 03:35:29 -08:00
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
2018-09-07 14:26:04 -07:00
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s . mtx . RLock ( )
close ( s . softShutdown )
s . mtx . RUnlock ( )
// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
for _ , queue := range s . queues {
close ( queue )
2017-05-10 02:44:13 -07:00
}
2018-01-31 07:41:48 -08:00
select {
2018-02-01 05:20:38 -08:00
case <- s . done :
2018-05-29 01:51:29 -07:00
return
2018-09-07 14:26:04 -07:00
case <- time . After ( s . qm . flushDeadline ) :
2018-01-31 07:41:48 -08:00
level . Error ( s . qm . logger ) . Log ( "msg" , "Failed to flush all samples on shutdown" )
}
2018-05-29 01:51:29 -07:00
2018-05-29 03:35:43 -07:00
// Force an unclean shutdown.
2018-09-07 14:26:04 -07:00
s . hardShutdown ( )
2018-05-29 01:51:29 -07:00
<- s . done
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
2019-08-12 09:22:02 -07:00
func ( s * shards ) enqueue ( ref uint64 , sample sample ) bool {
2018-09-07 14:26:04 -07:00
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
select {
case <- s . softShutdown :
return false
default :
}
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
shard := uint64 ( ref ) % uint64 ( len ( s . queues ) )
2017-05-10 02:44:13 -07:00
select {
2018-09-07 14:26:04 -07:00
case <- s . softShutdown :
return false
2017-05-10 02:44:13 -07:00
case s . queues [ shard ] <- sample :
return true
}
}
2019-08-12 09:22:02 -07:00
func ( s * shards ) runShard ( ctx context . Context , shardID int , queue chan sample ) {
2018-02-01 05:20:38 -08:00
defer func ( ) {
if atomic . AddInt32 ( & s . running , - 1 ) == 0 {
close ( s . done )
}
} ( )
2019-08-12 09:22:02 -07:00
shardNum := strconv . Itoa ( shardID )
2017-05-10 02:44:13 -07:00
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
2019-08-12 09:22:02 -07:00
var (
max = s . qm . cfg . MaxSamplesPerSend
nPending = 0
pendingSamples = allocateTimeSeries ( max )
buf [ ] byte
)
2019-06-27 11:48:21 -07:00
2018-08-24 07:55:21 -07:00
timer := time . NewTimer ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2018-03-12 07:27:48 -07:00
stop := func ( ) {
2018-03-09 04:00:26 -08:00
if ! timer . Stop ( ) {
select {
case <- timer . C :
default :
}
}
2018-03-12 07:27:48 -07:00
}
defer stop ( )
2018-01-24 04:36:29 -08:00
2017-05-10 02:44:13 -07:00
for {
select {
2018-09-07 14:26:04 -07:00
case <- ctx . Done ( ) :
2018-05-29 01:51:29 -07:00
return
2017-05-10 02:44:13 -07:00
case sample , ok := <- queue :
if ! ok {
2019-08-12 09:22:02 -07:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "Flushing samples to remote storage..." , "count" , nPending )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
2020-04-24 20:39:46 -07:00
s . qm . metrics . pendingSamples . Sub ( float64 ( nPending ) )
2017-08-11 11:45:52 -07:00
level . Debug ( s . qm . logger ) . Log ( "msg" , "Done flushing." )
2017-05-10 02:44:13 -07:00
}
return
}
2018-09-07 14:26:04 -07:00
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
2019-08-12 09:22:02 -07:00
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingSamples [ nPending ] . Labels = labelsToLabelsProto ( sample . labels , pendingSamples [ nPending ] . Labels )
pendingSamples [ nPending ] . Samples [ 0 ] . Timestamp = sample . t
pendingSamples [ nPending ] . Samples [ 0 ] . Value = sample . v
nPending ++
2020-04-24 20:39:46 -07:00
s . qm . metrics . pendingSamples . Inc ( )
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
if nPending >= max {
s . sendSamples ( ctx , pendingSamples , & buf )
nPending = 0
2020-04-24 20:39:46 -07:00
s . qm . metrics . pendingSamples . Sub ( float64 ( max ) )
2018-03-09 04:00:26 -08:00
2018-03-12 07:27:48 -07:00
stop ( )
2018-08-24 07:55:21 -07:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 02:44:13 -07:00
}
2018-03-09 04:00:26 -08:00
2018-01-24 04:36:29 -08:00
case <- timer . C :
2019-08-12 09:22:02 -07:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "runShard timer ticked, sending samples" , "samples" , nPending , "shard" , shardNum )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
2020-04-24 20:39:46 -07:00
s . qm . metrics . pendingSamples . Sub ( float64 ( nPending ) )
2020-02-09 19:51:21 -08:00
nPending = 0
2017-05-10 02:44:13 -07:00
}
2018-08-24 07:55:21 -07:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 02:44:13 -07:00
}
}
}
2019-06-27 11:48:21 -07:00
func ( s * shards ) sendSamples ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) {
2017-05-10 02:44:13 -07:00
begin := time . Now ( )
2019-06-27 11:48:21 -07:00
err := s . sendSamplesWithBackoff ( ctx , samples , buf )
2019-02-12 06:58:25 -08:00
if err != nil {
2018-09-07 14:26:04 -07:00
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , len ( samples ) , "err" , err )
2020-04-24 20:39:46 -07:00
s . qm . metrics . failedSamplesTotal . Add ( float64 ( len ( samples ) ) )
2018-09-07 14:26:04 -07:00
}
2017-05-10 02:44:13 -07:00
2018-04-08 02:51:54 -07:00
// These counters are used to calculate the dynamic sharding, and as such
2017-05-10 02:44:13 -07:00
// should be maintained irrespective of success or failure.
s . qm . samplesOut . incr ( int64 ( len ( samples ) ) )
s . qm . samplesOutDuration . incr ( int64 ( time . Since ( begin ) ) )
2020-04-15 08:03:28 -07:00
atomic . StoreInt64 ( & s . qm . lastSendTimestamp , time . Now ( ) . Unix ( ) )
2017-05-10 02:44:13 -07:00
}
// sendSamples to the remote storage with backoff for recoverable errors.
2019-06-27 11:48:21 -07:00
func ( s * shards ) sendSamplesWithBackoff ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) error {
req , highest , err := buildWriteRequest ( samples , * buf )
2018-09-07 14:26:04 -07:00
if err != nil {
2019-03-01 11:04:26 -08:00
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
2018-09-07 14:26:04 -07:00
return err
}
2019-03-01 11:04:26 -08:00
2020-06-01 08:21:13 -07:00
backoff := s . qm . cfg . MinBackoff
reqSize := len ( * buf )
sampleCount := len ( samples )
* buf = req
try := 0
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
attemptStore := func ( ) error {
span , ctx := opentracing . StartSpanFromContext ( ctx , "Remote Send Batch" )
defer span . Finish ( )
span . SetTag ( "samples" , sampleCount )
span . SetTag ( "request_size" , reqSize )
span . SetTag ( "try" , try )
span . SetTag ( "remote_name" , s . qm . storeClient . Name ( ) )
span . SetTag ( "remote_url" , s . qm . storeClient . Endpoint ( ) )
begin := time . Now ( )
err := s . qm . client ( ) . Store ( ctx , * buf )
s . qm . metrics . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
if err != nil {
span . LogKV ( "error" , err )
ext . Error . Set ( span , true )
return err
}
return nil
}
2018-09-07 14:26:04 -07:00
for {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2017-05-10 02:44:13 -07:00
2020-06-01 08:21:13 -07:00
err = attemptStore ( )
2018-09-07 14:26:04 -07:00
2020-06-01 08:21:13 -07:00
if err != nil {
// If the error is unrecoverable, we should not retry.
if _ , ok := err . ( recoverableError ) ; ! ok {
return err
}
2017-05-10 02:44:13 -07:00
2020-06-01 08:21:13 -07:00
// If we make it this far, we've encountered a recoverable error and will retry.
s . qm . metrics . retriedSamplesTotal . Add ( float64 ( sampleCount ) )
level . Warn ( s . qm . logger ) . Log ( "msg" , "Failed to send batch, retrying" , "err" , err )
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
if backoff > s . qm . cfg . MaxBackoff {
backoff = s . qm . cfg . MaxBackoff
}
2018-09-07 14:26:04 -07:00
2020-06-01 08:21:13 -07:00
try ++
continue
2017-05-10 02:44:13 -07:00
}
2020-06-01 08:21:13 -07:00
// Since we retry forever on recoverable errors, this needs to stay inside the loop.
s . qm . metrics . succeededSamplesTotal . Add ( float64 ( sampleCount ) )
s . qm . metrics . bytesSent . Add ( float64 ( reqSize ) )
s . qm . metrics . highestSentTimestamp . Set ( float64 ( highest / 1000 ) )
return nil
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
}
2019-06-27 11:48:21 -07:00
func buildWriteRequest ( samples [ ] prompb . TimeSeries , buf [ ] byte ) ( [ ] byte , int64 , error ) {
2018-09-07 14:26:04 -07:00
var highest int64
for _ , ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
if ts . Samples [ 0 ] . Timestamp > highest {
highest = ts . Samples [ 0 ] . Timestamp
}
}
req := & prompb . WriteRequest {
Timeseries : samples ,
}
data , err := proto . Marshal ( req )
if err != nil {
return nil , highest , err
}
2017-05-10 02:44:13 -07:00
2019-06-27 11:48:21 -07:00
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf [ 0 : cap ( buf ) ]
}
compressed := snappy . Encode ( buf , data )
2018-09-07 14:26:04 -07:00
return compressed , highest , nil
2017-05-10 02:44:13 -07:00
}
2019-08-12 09:22:02 -07:00
func allocateTimeSeries ( capacity int ) [ ] prompb . TimeSeries {
timeseries := make ( [ ] prompb . TimeSeries , capacity )
// We only ever send one sample per timeseries, so preallocate with length one.
for i := range timeseries {
timeseries [ i ] . Samples = [ ] prompb . Sample { { } }
}
return timeseries
}