2017-05-10 02:44:13 -07:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 01:51:29 -07:00
"context"
2017-05-10 02:44:13 -07:00
"math"
2018-09-07 14:26:04 -07:00
"strconv"
2017-05-10 02:44:13 -07:00
"sync"
2018-02-01 05:20:38 -08:00
"sync/atomic"
2017-05-10 02:44:13 -07:00
"time"
2017-08-11 11:45:52 -07:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2018-09-07 14:26:04 -07:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2017-05-10 02:44:13 -07:00
"github.com/prometheus/client_golang/prometheus"
2019-03-01 11:04:26 -08:00
"github.com/prometheus/client_golang/prometheus/promauto"
2017-05-10 02:44:13 -07:00
"github.com/prometheus/prometheus/config"
2019-03-08 08:29:25 -08:00
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
2017-10-23 13:28:17 -07:00
"github.com/prometheus/prometheus/prompb"
2019-09-19 02:15:41 -07:00
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
2017-05-10 02:44:13 -07:00
)
// String constants for instrumentation.
const (
namespace = "prometheus"
subsystem = "remote_storage"
queue = "queue"
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time . Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
)
var (
2019-03-01 11:04:26 -08:00
succeededSamplesTotal = promauto . NewCounterVec (
2017-05-10 02:44:13 -07:00
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "succeeded_samples_total" ,
Help : "Total number of samples successfully sent to remote storage." ,
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
failedSamplesTotal = promauto . NewCounterVec (
2017-05-10 02:44:13 -07:00
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "failed_samples_total" ,
2018-09-07 14:26:04 -07:00
Help : "Total number of samples which failed on send to remote storage, non-recoverable errors." ,
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
retriedSamplesTotal = promauto . NewCounterVec (
2018-09-07 14:26:04 -07:00
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "retried_samples_total" ,
Help : "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable." ,
2017-05-10 02:44:13 -07:00
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
droppedSamplesTotal = promauto . NewCounterVec (
2017-05-10 02:44:13 -07:00
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "dropped_samples_total" ,
2018-09-07 14:26:04 -07:00
Help : "Total number of samples which were dropped after being read from the WAL before being sent via remote write." ,
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
enqueueRetriesTotal = promauto . NewCounterVec (
2018-09-07 14:26:04 -07:00
prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "enqueue_retries_total" ,
Help : "Total number of times enqueue has failed because a shards queue was full." ,
2017-05-10 02:44:13 -07:00
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
sentBatchDuration = promauto . NewHistogramVec (
2017-05-10 02:44:13 -07:00
prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_batch_duration_seconds" ,
Help : "Duration of sample batch send calls to the remote storage." ,
Buckets : prometheus . DefBuckets ,
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
queueHighestSentTimestamp = promauto . NewGaugeVec (
2018-09-07 14:26:04 -07:00
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2019-02-19 23:51:08 -08:00
Name : "queue_highest_sent_timestamp_seconds" ,
2019-02-12 06:11:43 -08:00
Help : "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch." ,
2018-09-07 14:26:04 -07:00
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
queuePendingSamples = promauto . NewGaugeVec (
2017-05-10 02:44:13 -07:00
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2018-09-07 14:26:04 -07:00
Name : "pending_samples" ,
Help : "The number of samples pending in the queues shards to be sent to the remote storage." ,
2017-05-10 02:44:13 -07:00
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
shardCapacity = promauto . NewGaugeVec (
2017-05-10 02:44:13 -07:00
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2018-07-17 21:15:16 -07:00
Name : "shard_capacity" ,
Help : "The capacity of each shard of the queue used for parallel sending to the remote storage." ,
2017-05-10 02:44:13 -07:00
} ,
[ ] string { queue } ,
)
2019-03-01 11:04:26 -08:00
numShards = promauto . NewGaugeVec (
2017-05-10 02:44:13 -07:00
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards" ,
Help : "The number of shards used for parallel sending to the remote storage." ,
} ,
[ ] string { queue } ,
)
2019-07-19 14:53:26 -07:00
maxNumShards = promauto . NewGaugeVec (
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_max" ,
Help : "The maximum number of shards that the queue is allowed to run." ,
} ,
[ ] string { queue } ,
)
minNumShards = promauto . NewGaugeVec (
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_min" ,
Help : "The minimum number of shards that the queue is allowed to run." ,
} ,
[ ] string { queue } ,
)
desiredNumShards = promauto . NewGaugeVec (
prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_desired" ,
Help : "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out." ,
} ,
[ ] string { queue } ,
)
2017-05-10 02:44:13 -07:00
)
// StorageClient defines an interface for sending a batch of samples to an
// external timeseries database.
type StorageClient interface {
// Store stores the given samples in the remote storage.
2018-09-07 14:26:04 -07:00
Store ( context . Context , [ ] byte ) error
2017-05-10 02:44:13 -07:00
// Name identifies the remote storage implementation.
Name ( ) string
}
// QueueManager manages a queue of samples to be sent to the Storage
2018-09-07 14:26:04 -07:00
// indicated by the provided StorageClient. Implements writeTo interface
// used by WAL Watcher.
2017-05-10 02:44:13 -07:00
type QueueManager struct {
2019-10-21 14:54:25 -07:00
// https://golang.org/pkg/sync/atomic/#pkg-note-BUG
lastSendTimestamp int64
2019-03-05 04:21:11 -08:00
logger log . Logger
flushDeadline time . Duration
cfg config . QueueConfig
2019-03-08 08:29:25 -08:00
externalLabels labels . Labels
relabelConfigs [ ] * relabel . Config
2019-03-05 04:21:11 -08:00
client StorageClient
2019-09-19 02:15:41 -07:00
watcher * wal . Watcher
2018-09-07 14:26:04 -07:00
2019-09-13 10:23:58 -07:00
seriesMtx sync . Mutex
2019-08-07 12:39:07 -07:00
seriesLabels map [ uint64 ] labels . Labels
2018-09-07 14:26:04 -07:00
seriesSegmentIndexes map [ uint64 ] int
droppedSeries map [ uint64 ] struct { }
2017-05-10 02:44:13 -07:00
shards * shards
numShards int
reshardChan chan int
2019-01-18 04:48:16 -08:00
quit chan struct { }
wg sync . WaitGroup
2017-05-10 02:44:13 -07:00
2019-02-19 23:51:08 -08:00
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
integralAccumulator float64
2019-08-13 02:10:21 -07:00
startedAt time . Time
2019-03-05 04:21:11 -08:00
highestSentTimestampMetric * maxGauge
pendingSamplesMetric prometheus . Gauge
enqueueRetriesMetric prometheus . Counter
droppedSamplesTotal prometheus . Counter
numShardsMetric prometheus . Gauge
failedSamplesTotal prometheus . Counter
sentBatchDuration prometheus . Observer
succeededSamplesTotal prometheus . Counter
retriedSamplesTotal prometheus . Counter
2019-04-23 01:49:17 -07:00
shardCapacity prometheus . Gauge
2019-07-19 14:53:26 -07:00
maxNumShards prometheus . Gauge
minNumShards prometheus . Gauge
desiredNumShards prometheus . Gauge
2017-05-10 02:44:13 -07:00
}
// NewQueueManager builds a new QueueManager.
2019-09-19 02:15:41 -07:00
func NewQueueManager ( reg prometheus . Registerer , logger log . Logger , walDir string , samplesIn * ewmaRate , cfg config . QueueConfig , externalLabels labels . Labels , relabelConfigs [ ] * relabel . Config , client StorageClient , flushDeadline time . Duration ) * QueueManager {
2017-08-11 11:45:52 -07:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2019-03-05 04:21:11 -08:00
name := client . Name ( )
logger = log . With ( logger , "queue" , name )
2017-05-10 02:44:13 -07:00
t := & QueueManager {
2019-03-05 04:21:11 -08:00
logger : logger ,
2018-05-23 07:03:54 -07:00
flushDeadline : flushDeadline ,
2017-05-10 02:44:13 -07:00
cfg : cfg ,
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
client : client ,
2019-08-07 12:39:07 -07:00
seriesLabels : make ( map [ uint64 ] labels . Labels ) ,
2018-09-07 14:26:04 -07:00
seriesSegmentIndexes : make ( map [ uint64 ] int ) ,
droppedSeries : make ( map [ uint64 ] struct { } ) ,
2018-12-04 09:32:14 -08:00
numShards : cfg . MinShards ,
2017-05-10 02:44:13 -07:00
reshardChan : make ( chan int ) ,
quit : make ( chan struct { } ) ,
2018-09-07 14:26:04 -07:00
samplesIn : samplesIn ,
2019-02-19 23:51:08 -08:00
samplesDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2017-05-10 02:44:13 -07:00
samplesOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2019-03-01 11:04:26 -08:00
}
2018-09-07 14:26:04 -07:00
2019-09-19 02:15:41 -07:00
t . watcher = wal . NewWatcher ( reg , wal . NewWatcherMetrics ( reg ) , logger , name , t , walDir )
2019-03-05 04:21:11 -08:00
t . shards = t . newShards ( )
2017-05-10 02:44:13 -07:00
return t
}
2018-09-07 14:26:04 -07:00
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
2019-09-19 02:15:41 -07:00
func ( t * QueueManager ) Append ( samples [ ] record . RefSample ) bool {
2019-06-27 11:48:21 -07:00
outer :
2019-08-12 09:22:02 -07:00
for _ , s := range samples {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
2019-08-12 09:22:02 -07:00
lbls , ok := t . seriesLabels [ s . Ref ]
2019-06-27 11:48:21 -07:00
if ! ok {
2019-03-05 04:21:11 -08:00
t . droppedSamplesTotal . Inc ( )
2019-02-19 23:51:08 -08:00
t . samplesDropped . incr ( 1 )
2019-08-12 09:22:02 -07:00
if _ , ok := t . droppedSeries [ s . Ref ] ; ! ok {
level . Info ( t . logger ) . Log ( "msg" , "dropped sample for series that was not explicitly dropped via relabelling" , "ref" , s . Ref )
2018-09-07 14:26:04 -07:00
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
continue
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2019-01-18 04:48:16 -08:00
// This will only loop if the queues are being resharded.
backoff := t . cfg . MinBackoff
2018-09-07 14:26:04 -07:00
for {
select {
case <- t . quit :
return false
default :
}
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
if t . shards . enqueue ( s . Ref , sample {
labels : lbls ,
t : s . T ,
v : s . V ,
} ) {
2018-09-07 14:26:04 -07:00
continue outer
}
2019-01-18 04:48:16 -08:00
2018-09-07 14:26:04 -07:00
t . enqueueRetriesMetric . Inc ( )
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
if backoff > t . cfg . MaxBackoff {
backoff = t . cfg . MaxBackoff
}
2017-05-10 02:44:13 -07:00
}
}
2018-09-07 14:26:04 -07:00
return true
2017-05-10 02:44:13 -07:00
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func ( t * QueueManager ) Start ( ) {
2019-08-13 02:10:21 -07:00
t . startedAt = time . Now ( )
2019-04-23 01:49:17 -07:00
// Setup the QueueManagers metrics. We do this here rather than in the
// constructor because of the ordering of creating Queue Managers's, stopping them,
// and then starting new ones in storage/remote/storage.go ApplyConfig.
name := t . client . Name ( )
t . highestSentTimestampMetric = & maxGauge {
Gauge : queueHighestSentTimestamp . WithLabelValues ( name ) ,
}
t . pendingSamplesMetric = queuePendingSamples . WithLabelValues ( name )
t . enqueueRetriesMetric = enqueueRetriesTotal . WithLabelValues ( name )
t . droppedSamplesTotal = droppedSamplesTotal . WithLabelValues ( name )
t . numShardsMetric = numShards . WithLabelValues ( name )
t . failedSamplesTotal = failedSamplesTotal . WithLabelValues ( name )
t . sentBatchDuration = sentBatchDuration . WithLabelValues ( name )
t . succeededSamplesTotal = succeededSamplesTotal . WithLabelValues ( name )
t . retriedSamplesTotal = retriedSamplesTotal . WithLabelValues ( name )
t . shardCapacity = shardCapacity . WithLabelValues ( name )
2019-07-19 14:53:26 -07:00
t . maxNumShards = maxNumShards . WithLabelValues ( name )
t . minNumShards = minNumShards . WithLabelValues ( name )
t . desiredNumShards = desiredNumShards . WithLabelValues ( name )
2019-04-23 01:49:17 -07:00
// Initialise some metrics.
t . shardCapacity . Set ( float64 ( t . cfg . Capacity ) )
t . pendingSamplesMetric . Set ( 0 )
2019-07-19 14:53:26 -07:00
t . maxNumShards . Set ( float64 ( t . cfg . MaxShards ) )
t . minNumShards . Set ( float64 ( t . cfg . MinShards ) )
t . desiredNumShards . Set ( float64 ( t . cfg . MinShards ) )
2019-04-23 01:49:17 -07:00
2018-09-07 14:26:04 -07:00
t . shards . start ( t . numShards )
t . watcher . Start ( )
2017-05-10 02:44:13 -07:00
t . wg . Add ( 2 )
go t . updateShardsLoop ( )
go t . reshardLoop ( )
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func ( t * QueueManager ) Stop ( ) {
2017-08-11 11:45:52 -07:00
level . Info ( t . logger ) . Log ( "msg" , "Stopping remote storage..." )
2018-09-07 14:26:04 -07:00
defer level . Info ( t . logger ) . Log ( "msg" , "Remote storage stopped." )
2017-05-10 02:44:13 -07:00
close ( t . quit )
2019-04-16 03:25:19 -07:00
t . wg . Wait ( )
// Wait for all QueueManager routines to end before stopping shards and WAL watcher. This
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
2018-09-07 14:26:04 -07:00
t . shards . stop ( )
t . watcher . Stop ( )
2019-03-13 03:02:36 -07:00
// On shutdown, release the strings in the labels from the intern pool.
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
2019-03-13 03:02:36 -07:00
for _ , labels := range t . seriesLabels {
2019-08-07 12:39:07 -07:00
releaseLabels ( labels )
2019-03-13 03:02:36 -07:00
}
2019-09-13 10:23:58 -07:00
t . seriesMtx . Unlock ( )
2019-04-23 01:49:17 -07:00
// Delete metrics so we don't have alerts for queues that are gone.
name := t . client . Name ( )
queueHighestSentTimestamp . DeleteLabelValues ( name )
queuePendingSamples . DeleteLabelValues ( name )
enqueueRetriesTotal . DeleteLabelValues ( name )
droppedSamplesTotal . DeleteLabelValues ( name )
numShards . DeleteLabelValues ( name )
failedSamplesTotal . DeleteLabelValues ( name )
sentBatchDuration . DeleteLabelValues ( name )
succeededSamplesTotal . DeleteLabelValues ( name )
retriedSamplesTotal . DeleteLabelValues ( name )
shardCapacity . DeleteLabelValues ( name )
2019-07-19 14:53:26 -07:00
maxNumShards . DeleteLabelValues ( name )
minNumShards . DeleteLabelValues ( name )
desiredNumShards . DeleteLabelValues ( name )
2018-09-07 14:26:04 -07:00
}
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
2019-09-19 02:15:41 -07:00
func ( t * QueueManager ) StoreSeries ( series [ ] record . RefSeries , index int ) {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
for _ , s := range series {
2019-03-08 08:29:25 -08:00
ls := processExternalLabels ( s . Labels , t . externalLabels )
2019-08-07 12:39:07 -07:00
lbls := relabel . Process ( ls , t . relabelConfigs ... )
if len ( lbls ) == 0 {
2018-09-07 14:26:04 -07:00
t . droppedSeries [ s . Ref ] = struct { } { }
continue
}
2019-06-27 11:48:21 -07:00
t . seriesSegmentIndexes [ s . Ref ] = index
2019-08-07 12:39:07 -07:00
internLabels ( lbls )
2019-03-11 16:44:23 -07:00
2019-03-13 03:02:36 -07:00
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
2019-06-27 11:48:21 -07:00
if orig , ok := t . seriesLabels [ s . Ref ] ; ok {
2019-08-07 12:39:07 -07:00
releaseLabels ( orig )
2019-03-11 16:44:23 -07:00
}
2019-08-07 12:39:07 -07:00
t . seriesLabels [ s . Ref ] = lbls
2018-09-07 14:26:04 -07:00
}
}
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
// SeriesReset is used when reading a checkpoint. WAL Watcher should have
// stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps.
func ( t * QueueManager ) SeriesReset ( index int ) {
2019-09-13 10:23:58 -07:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 14:26:04 -07:00
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
for k , v := range t . seriesSegmentIndexes {
if v < index {
delete ( t . seriesSegmentIndexes , k )
2019-08-07 12:39:07 -07:00
releaseLabels ( t . seriesLabels [ k ] )
2019-03-11 16:44:23 -07:00
delete ( t . seriesLabels , k )
2019-09-13 10:23:58 -07:00
delete ( t . droppedSeries , k )
2018-09-07 14:26:04 -07:00
}
}
}
2017-08-11 11:45:52 -07:00
2019-08-07 12:39:07 -07:00
func internLabels ( lbls labels . Labels ) {
for i , l := range lbls {
lbls [ i ] . Name = interner . intern ( l . Name )
lbls [ i ] . Value = interner . intern ( l . Value )
}
}
func releaseLabels ( ls labels . Labels ) {
2019-03-11 16:44:23 -07:00
for _ , l := range ls {
interner . release ( l . Name )
interner . release ( l . Value )
}
}
2019-03-13 03:02:36 -07:00
// processExternalLabels merges externalLabels into ls. If ls contains
2019-03-08 08:29:25 -08:00
// a label in externalLabels, the value in ls wins.
2019-11-18 11:53:33 -08:00
func processExternalLabels ( ls labels . Labels , externalLabels labels . Labels ) labels . Labels {
2019-03-08 08:29:25 -08:00
i , j , result := 0 , 0 , make ( labels . Labels , 0 , len ( ls ) + len ( externalLabels ) )
for i < len ( ls ) && j < len ( externalLabels ) {
if ls [ i ] . Name < externalLabels [ j ] . Name {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
} else if ls [ i ] . Name > externalLabels [ j ] . Name {
result = append ( result , externalLabels [ j ] )
j ++
} else {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
j ++
2018-09-07 14:26:04 -07:00
}
}
2019-03-08 08:29:25 -08:00
for ; i < len ( ls ) ; i ++ {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
}
result = append ( result , externalLabels [ j : ] ... )
return result
2017-05-10 02:44:13 -07:00
}
func ( t * QueueManager ) updateShardsLoop ( ) {
defer t . wg . Done ( )
2017-10-09 09:36:20 -07:00
ticker := time . NewTicker ( shardUpdateDuration )
defer ticker . Stop ( )
2017-05-10 02:44:13 -07:00
for {
select {
2017-10-09 09:36:20 -07:00
case <- ticker . C :
2019-10-21 14:54:25 -07:00
desiredShards := t . calculateDesiredShards ( )
if desiredShards == t . numShards {
continue
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- desiredShards :
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , numShards )
t . numShards = desiredShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
2017-05-10 02:44:13 -07:00
case <- t . quit :
return
}
}
}
2019-10-21 14:54:25 -07:00
// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func ( t * QueueManager ) calculateDesiredShards ( ) int {
2017-05-10 02:44:13 -07:00
t . samplesOut . tick ( )
2019-02-19 23:51:08 -08:00
t . samplesDropped . tick ( )
2017-05-10 02:44:13 -07:00
t . samplesOutDuration . tick ( )
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
2019-08-13 02:10:21 -07:00
samplesInRate = t . samplesIn . rate ( )
samplesOutRate = t . samplesOut . rate ( )
samplesKeptRatio = samplesOutRate / ( t . samplesDropped . rate ( ) + samplesOutRate )
samplesOutDuration = t . samplesOutDuration . rate ( ) / float64 ( time . Second )
samplesPendingRate = samplesInRate * samplesKeptRatio - samplesOutRate
2019-03-01 11:04:26 -08:00
highestSent = t . highestSentTimestampMetric . Get ( )
highestRecv = highestTimestamp . Get ( )
2019-08-13 02:10:21 -07:00
samplesPending = ( highestRecv - highestSent ) * samplesInRate * samplesKeptRatio
2017-05-10 02:44:13 -07:00
)
2019-08-13 02:10:21 -07:00
if samplesOutRate <= 0 {
2019-10-21 14:54:25 -07:00
return t . numShards
2017-05-10 02:44:13 -07:00
}
2019-08-13 02:10:21 -07:00
// We use an integral accumulator, like in a PID, to help dampen
// oscillation. The accumulator will correct for any errors not accounted
// for in the desired shard calculation by adjusting for pending samples.
const integralGain = 0.2
// Initialise the integral accumulator as the average rate of samples
// pending. This accounts for pending samples that were created while the
// WALWatcher starts up.
if t . integralAccumulator == 0 {
elapsed := time . Since ( t . startedAt ) / time . Second
t . integralAccumulator = integralGain * samplesPending / float64 ( elapsed )
}
t . integralAccumulator += samplesPendingRate * integralGain
2019-10-21 14:54:25 -07:00
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time . Now ( ) . Add ( - 2 * time . Duration ( t . cfg . BatchSendDeadline ) ) . Unix ( )
lsts := atomic . LoadInt64 ( & t . lastSendTimestamp )
if lsts < minSendTimestamp {
level . Warn ( t . logger ) . Log ( "msg" , "Skipping resharding, last successful send was beyond threshold" , "lastSendTimestamp" , lsts , "minSendTimestamp" , minSendTimestamp )
return t . numShards
}
2017-05-10 02:44:13 -07:00
var (
2019-08-13 02:10:21 -07:00
timePerSample = samplesOutDuration / samplesOutRate
desiredShards = timePerSample * ( samplesInRate + t . integralAccumulator )
2017-05-10 02:44:13 -07:00
)
2019-08-10 08:24:58 -07:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.calculateDesiredShards" ,
2019-08-13 02:10:21 -07:00
"samplesInRate" , samplesInRate ,
"samplesOutRate" , samplesOutRate ,
2019-03-01 11:04:26 -08:00
"samplesKeptRatio" , samplesKeptRatio ,
2019-08-13 02:10:21 -07:00
"samplesPendingRate" , samplesPendingRate ,
2019-03-01 11:04:26 -08:00
"samplesPending" , samplesPending ,
"samplesOutDuration" , samplesOutDuration ,
"timePerSample" , timePerSample ,
"desiredShards" , desiredShards ,
"highestSent" , highestSent ,
2019-08-13 02:10:21 -07:00
"highestRecv" , highestRecv ,
"integralAccumulator" , t . integralAccumulator ,
)
2017-05-10 02:44:13 -07:00
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64 ( t . numShards ) * ( 1. - shardToleranceFraction )
upperBound = float64 ( t . numShards ) * ( 1. + shardToleranceFraction )
)
2017-08-11 11:45:52 -07:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
2017-05-10 02:44:13 -07:00
if lowerBound <= desiredShards && desiredShards <= upperBound {
2019-10-21 14:54:25 -07:00
return t . numShards
2017-05-10 02:44:13 -07:00
}
numShards := int ( math . Ceil ( desiredShards ) )
2019-07-19 14:53:26 -07:00
t . desiredNumShards . Set ( float64 ( numShards ) )
2017-05-10 02:44:13 -07:00
if numShards > t . cfg . MaxShards {
numShards = t . cfg . MaxShards
2018-12-04 09:32:14 -08:00
} else if numShards < t . cfg . MinShards {
numShards = t . cfg . MinShards
2017-05-10 02:44:13 -07:00
}
2019-10-21 14:54:25 -07:00
return numShards
2017-05-10 02:44:13 -07:00
}
func ( t * QueueManager ) reshardLoop ( ) {
defer t . wg . Done ( )
for {
select {
case numShards := <- t . reshardChan :
2018-09-07 14:26:04 -07:00
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t . shards . stop ( )
t . shards . start ( numShards )
2017-05-10 02:44:13 -07:00
case <- t . quit :
return
}
}
}
2018-09-07 14:26:04 -07:00
func ( t * QueueManager ) newShards ( ) * shards {
s := & shards {
qm : t ,
done : make ( chan struct { } ) ,
}
return s
}
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
type sample struct {
labels labels . Labels
t int64
v float64
}
2017-05-10 02:44:13 -07:00
type shards struct {
2018-09-07 14:26:04 -07:00
mtx sync . RWMutex // With the WAL, this is never actually contended.
qm * QueueManager
2019-08-12 09:22:02 -07:00
queues [ ] chan sample
2018-09-07 14:26:04 -07:00
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
2018-02-01 05:20:38 -08:00
done chan struct { }
running int32
2018-09-07 14:26:04 -07:00
// Soft shutdown context will prevent new enqueues and deadlocks.
softShutdown chan struct { }
// Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate.
hardShutdown context . CancelFunc
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// start the shards; must be called before any call to enqueue.
func ( s * shards ) start ( n int ) {
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
2019-08-12 09:22:02 -07:00
newQueues := make ( [ ] chan sample , n )
2018-09-07 14:26:04 -07:00
for i := 0 ; i < n ; i ++ {
2019-08-12 09:22:02 -07:00
newQueues [ i ] = make ( chan sample , s . qm . cfg . Capacity )
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
s . queues = newQueues
var hardShutdownCtx context . Context
hardShutdownCtx , s . hardShutdown = context . WithCancel ( context . Background ( ) )
s . softShutdown = make ( chan struct { } )
s . running = int32 ( n )
s . done = make ( chan struct { } )
for i := 0 ; i < n ; i ++ {
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
2017-05-10 02:44:13 -07:00
}
2019-03-05 04:21:11 -08:00
s . qm . numShardsMetric . Set ( float64 ( n ) )
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// stop the shards; subsequent call to enqueue will return false.
func ( s * shards ) stop ( ) {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
2019-03-03 03:35:29 -08:00
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
2018-09-07 14:26:04 -07:00
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s . mtx . RLock ( )
close ( s . softShutdown )
s . mtx . RUnlock ( )
// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
for _ , queue := range s . queues {
close ( queue )
2017-05-10 02:44:13 -07:00
}
2018-01-31 07:41:48 -08:00
select {
2018-02-01 05:20:38 -08:00
case <- s . done :
2018-05-29 01:51:29 -07:00
return
2018-09-07 14:26:04 -07:00
case <- time . After ( s . qm . flushDeadline ) :
2018-01-31 07:41:48 -08:00
level . Error ( s . qm . logger ) . Log ( "msg" , "Failed to flush all samples on shutdown" )
}
2018-05-29 01:51:29 -07:00
2018-05-29 03:35:43 -07:00
// Force an unclean shutdown.
2018-09-07 14:26:04 -07:00
s . hardShutdown ( )
2018-05-29 01:51:29 -07:00
<- s . done
2017-05-10 02:44:13 -07:00
}
2018-09-07 14:26:04 -07:00
// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
2019-08-12 09:22:02 -07:00
func ( s * shards ) enqueue ( ref uint64 , sample sample ) bool {
2018-09-07 14:26:04 -07:00
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
select {
case <- s . softShutdown :
return false
default :
}
2017-05-10 02:44:13 -07:00
2018-09-07 14:26:04 -07:00
shard := uint64 ( ref ) % uint64 ( len ( s . queues ) )
2017-05-10 02:44:13 -07:00
select {
2018-09-07 14:26:04 -07:00
case <- s . softShutdown :
return false
2017-05-10 02:44:13 -07:00
case s . queues [ shard ] <- sample :
return true
}
}
2019-08-12 09:22:02 -07:00
func ( s * shards ) runShard ( ctx context . Context , shardID int , queue chan sample ) {
2018-02-01 05:20:38 -08:00
defer func ( ) {
if atomic . AddInt32 ( & s . running , - 1 ) == 0 {
close ( s . done )
}
} ( )
2019-08-12 09:22:02 -07:00
shardNum := strconv . Itoa ( shardID )
2017-05-10 02:44:13 -07:00
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
2019-08-12 09:22:02 -07:00
var (
max = s . qm . cfg . MaxSamplesPerSend
nPending = 0
pendingSamples = allocateTimeSeries ( max )
buf [ ] byte
)
2019-06-27 11:48:21 -07:00
2018-08-24 07:55:21 -07:00
timer := time . NewTimer ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2018-03-12 07:27:48 -07:00
stop := func ( ) {
2018-03-09 04:00:26 -08:00
if ! timer . Stop ( ) {
select {
case <- timer . C :
default :
}
}
2018-03-12 07:27:48 -07:00
}
defer stop ( )
2018-01-24 04:36:29 -08:00
2017-05-10 02:44:13 -07:00
for {
select {
2018-09-07 14:26:04 -07:00
case <- ctx . Done ( ) :
2018-05-29 01:51:29 -07:00
return
2017-05-10 02:44:13 -07:00
case sample , ok := <- queue :
if ! ok {
2019-08-12 09:22:02 -07:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "Flushing samples to remote storage..." , "count" , nPending )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
s . qm . pendingSamplesMetric . Sub ( float64 ( nPending ) )
2017-08-11 11:45:52 -07:00
level . Debug ( s . qm . logger ) . Log ( "msg" , "Done flushing." )
2017-05-10 02:44:13 -07:00
}
return
}
2018-09-07 14:26:04 -07:00
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
2019-08-12 09:22:02 -07:00
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingSamples [ nPending ] . Labels = labelsToLabelsProto ( sample . labels , pendingSamples [ nPending ] . Labels )
pendingSamples [ nPending ] . Samples [ 0 ] . Timestamp = sample . t
pendingSamples [ nPending ] . Samples [ 0 ] . Value = sample . v
nPending ++
2018-09-07 14:26:04 -07:00
s . qm . pendingSamplesMetric . Inc ( )
2017-05-10 02:44:13 -07:00
2019-08-12 09:22:02 -07:00
if nPending >= max {
s . sendSamples ( ctx , pendingSamples , & buf )
nPending = 0
2018-09-07 14:26:04 -07:00
s . qm . pendingSamplesMetric . Sub ( float64 ( max ) )
2018-03-09 04:00:26 -08:00
2018-03-12 07:27:48 -07:00
stop ( )
2018-08-24 07:55:21 -07:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 02:44:13 -07:00
}
2018-03-09 04:00:26 -08:00
2018-01-24 04:36:29 -08:00
case <- timer . C :
2019-08-12 09:22:02 -07:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "runShard timer ticked, sending samples" , "samples" , nPending , "shard" , shardNum )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
nPending = 0
s . qm . pendingSamplesMetric . Sub ( float64 ( nPending ) )
2017-05-10 02:44:13 -07:00
}
2018-08-24 07:55:21 -07:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 02:44:13 -07:00
}
}
}
2019-06-27 11:48:21 -07:00
func ( s * shards ) sendSamples ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) {
2017-05-10 02:44:13 -07:00
begin := time . Now ( )
2019-06-27 11:48:21 -07:00
err := s . sendSamplesWithBackoff ( ctx , samples , buf )
2019-02-12 06:58:25 -08:00
if err != nil {
2018-09-07 14:26:04 -07:00
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , len ( samples ) , "err" , err )
2019-03-05 04:21:11 -08:00
s . qm . failedSamplesTotal . Add ( float64 ( len ( samples ) ) )
2018-09-07 14:26:04 -07:00
}
2017-05-10 02:44:13 -07:00
2018-04-08 02:51:54 -07:00
// These counters are used to calculate the dynamic sharding, and as such
2017-05-10 02:44:13 -07:00
// should be maintained irrespective of success or failure.
s . qm . samplesOut . incr ( int64 ( len ( samples ) ) )
s . qm . samplesOutDuration . incr ( int64 ( time . Since ( begin ) ) )
}
// sendSamples to the remote storage with backoff for recoverable errors.
2019-06-27 11:48:21 -07:00
func ( s * shards ) sendSamplesWithBackoff ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) error {
2017-05-10 02:44:13 -07:00
backoff := s . qm . cfg . MinBackoff
2019-06-27 11:48:21 -07:00
req , highest , err := buildWriteRequest ( samples , * buf )
* buf = req
2018-09-07 14:26:04 -07:00
if err != nil {
2019-03-01 11:04:26 -08:00
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
2018-09-07 14:26:04 -07:00
return err
}
2019-03-01 11:04:26 -08:00
2018-09-07 14:26:04 -07:00
for {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2017-05-10 02:44:13 -07:00
begin := time . Now ( )
2018-09-07 14:26:04 -07:00
err := s . qm . client . Store ( ctx , req )
2017-05-10 02:44:13 -07:00
2019-03-05 04:21:11 -08:00
s . qm . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
2018-09-07 14:26:04 -07:00
2017-05-10 02:44:13 -07:00
if err == nil {
2019-03-05 04:21:11 -08:00
s . qm . succeededSamplesTotal . Add ( float64 ( len ( samples ) ) )
2019-03-01 11:04:26 -08:00
s . qm . highestSentTimestampMetric . Set ( float64 ( highest / 1000 ) )
2019-10-21 14:54:25 -07:00
atomic . StoreInt64 ( & s . qm . lastSendTimestamp , time . Now ( ) . Unix ( ) )
2018-09-07 14:26:04 -07:00
return nil
2017-05-10 02:44:13 -07:00
}
if _ , ok := err . ( recoverableError ) ; ! ok {
2018-09-07 14:26:04 -07:00
return err
2017-05-10 02:44:13 -07:00
}
2019-03-05 04:21:11 -08:00
s . qm . retriedSamplesTotal . Add ( float64 ( len ( samples ) ) )
2019-03-01 11:04:26 -08:00
level . Debug ( s . qm . logger ) . Log ( "msg" , "failed to send batch, retrying" , "err" , err )
2018-09-07 14:26:04 -07:00
2018-08-24 07:55:21 -07:00
time . Sleep ( time . Duration ( backoff ) )
2017-05-10 02:44:13 -07:00
backoff = backoff * 2
if backoff > s . qm . cfg . MaxBackoff {
backoff = s . qm . cfg . MaxBackoff
}
}
2018-09-07 14:26:04 -07:00
}
2019-06-27 11:48:21 -07:00
func buildWriteRequest ( samples [ ] prompb . TimeSeries , buf [ ] byte ) ( [ ] byte , int64 , error ) {
2018-09-07 14:26:04 -07:00
var highest int64
for _ , ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
if ts . Samples [ 0 ] . Timestamp > highest {
highest = ts . Samples [ 0 ] . Timestamp
}
}
req := & prompb . WriteRequest {
Timeseries : samples ,
}
data , err := proto . Marshal ( req )
if err != nil {
return nil , highest , err
}
2017-05-10 02:44:13 -07:00
2019-06-27 11:48:21 -07:00
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf [ 0 : cap ( buf ) ]
}
compressed := snappy . Encode ( buf , data )
2018-09-07 14:26:04 -07:00
return compressed , highest , nil
2017-05-10 02:44:13 -07:00
}
2019-08-12 09:22:02 -07:00
func allocateTimeSeries ( capacity int ) [ ] prompb . TimeSeries {
timeseries := make ( [ ] prompb . TimeSeries , capacity )
// We only ever send one sample per timeseries, so preallocate with length one.
for i := range timeseries {
timeseries [ i ] . Samples = [ ] prompb . Sample { { } }
}
return timeseries
}