diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 045ec53777..79675ec831 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -307,7 +307,7 @@ func main() { var ( localStorage = &tsdb.ReadyStorage{} - remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), localStorage.StartTime, time.Duration(cfg.RemoteFlushDeadline)) + remoteStorage = remote.NewStorage(log.With(logger, "component", "remote"), prometheus.DefaultRegisterer, localStorage.StartTime, cfg.localStoragePath, time.Duration(cfg.RemoteFlushDeadline)) fanoutStorage = storage.NewFanout(logger, localStorage, remoteStorage) ) diff --git a/config/config.go b/config/config.go index d12c8d651d..004d58a855 100644 --- a/config/config.go +++ b/config/config.go @@ -107,9 +107,10 @@ var ( MinShards: 1, MaxSamplesPerSend: 100, - // By default, buffer 100 batches, which at 100ms per batch is 10s. At - // 1000 shards, this will buffer 10M samples total. - Capacity: 100 * 100, + // Each shard will have a max of 10 samples pending in it's channel, plus the pending + // samples that have been enqueued. Theoretically we should only ever have about 110 samples + // per shard pending. At 1000 shards that's 110k. + Capacity: 10, BatchSendDeadline: model.Duration(5 * time.Second), // Max number of times to retry a batch on recoverable errors. diff --git a/storage/remote/client.go b/storage/remote/client.go index 2679f42b6b..b182cda614 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -70,15 +70,10 @@ type recoverableError struct { error } -// Store sends a batch of samples to the HTTP endpoint. -func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error { - data, err := proto.Marshal(req) - if err != nil { - return err - } - - compressed := snappy.Encode(nil, data) - httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) +// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled +// and encoded bytes from codec.go. +func (c *Client) Store(ctx context.Context, req []byte) error { + httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(req)) if err != nil { // Errors from NewRequest are from unparseable URLs, so are not // recoverable. diff --git a/storage/remote/client_test.go b/storage/remote/client_test.go index 73ec875a5d..9ef3c9bd26 100644 --- a/storage/remote/client_test.go +++ b/storage/remote/client_test.go @@ -26,7 +26,6 @@ import ( config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/prompb" ) var longErrMessage = strings.Repeat("error message", maxErrMsgLen) @@ -74,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) { t.Fatal(err) } - err = c.Store(context.Background(), &prompb.WriteRequest{}) + err = c.Store(context.Background(), []byte{}) if !reflect.DeepEqual(err, test.err) { t.Errorf("%d. Unexpected error; want %v, got %v", i, test.err, err) } diff --git a/storage/remote/codec.go b/storage/remote/codec.go index 6f49bfbbaa..82dd4126e4 100644 --- a/storage/remote/codec.go +++ b/storage/remote/codec.go @@ -80,28 +80,6 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error return err } -// ToWriteRequest converts an array of samples into a WriteRequest proto. -func ToWriteRequest(samples []*model.Sample) *prompb.WriteRequest { - req := &prompb.WriteRequest{ - Timeseries: make([]prompb.TimeSeries, 0, len(samples)), - } - - for _, s := range samples { - ts := prompb.TimeSeries{ - Labels: MetricToLabelProtos(s.Metric), - Samples: []prompb.Sample{ - { - Value: float64(s.Value), - Timestamp: int64(s.Timestamp), - }, - }, - } - req.Timeseries = append(req.Timeseries, ts) - } - - return req -} - // ToQuery builds a Query proto. func ToQuery(from, to int64, matchers []*labels.Matcher, p *storage.SelectParams) (*prompb.Query, error) { ms, err := toLabelMatchers(matchers) @@ -364,21 +342,6 @@ func fromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, erro return result, nil } -// MetricToLabelProtos builds a []*prompb.Label from a model.Metric -func MetricToLabelProtos(metric model.Metric) []prompb.Label { - labels := make([]prompb.Label, 0, len(metric)) - for k, v := range metric { - labels = append(labels, prompb.Label{ - Name: string(k), - Value: string(v), - }) - } - sort.Slice(labels, func(i int, j int) bool { - return labels[i].Name < labels[j].Name - }) - return labels -} - // LabelProtosToMetric unpack a []*prompb.Label to a model.Metric func LabelProtosToMetric(labelPairs []*prompb.Label) model.Metric { metric := make(model.Metric, len(labelPairs)) @@ -400,6 +363,26 @@ func labelProtosToLabels(labelPairs []prompb.Label) labels.Labels { return result } +func labelsetToLabelsProto(ls model.LabelSet) []prompb.Label { + result := make([]prompb.Label, 0, len(ls)) + keys := make([]string, 0, len(ls)) + + for k := range ls { + keys = append(keys, string(k)) + } + sort.Strings(keys) + + for _, k := range keys { + ln := model.LabelName(k) + result = append(result, prompb.Label{ + Name: k, + Value: string(ls[ln]), + }) + } + + return result +} + func labelsToLabelsProto(labels labels.Labels) []prompb.Label { result := make([]prompb.Label, 0, len(labels)) for _, l := range labels { @@ -410,11 +393,3 @@ func labelsToLabelsProto(labels labels.Labels) []prompb.Label { } return result } - -func labelsToMetric(ls labels.Labels) model.Metric { - metric := make(model.Metric, len(ls)) - for _, l := range ls { - metric[model.LabelName(l.Name)] = model.LabelValue(l.Value) - } - return metric -} diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index e76912e1b9..1d27009069 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -16,6 +16,7 @@ package remote import ( "context" "math" + "strconv" "sync" "sync/atomic" "time" @@ -24,12 +25,16 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" pkgrelabel "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/relabel" + "github.com/prometheus/tsdb" ) // String constants for instrumentation. @@ -66,7 +71,16 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "failed_samples_total", - Help: "Total number of samples which failed on send to remote storage.", + Help: "Total number of samples which failed on send to remote storage, non-recoverable errors.", + }, + []string{queue}, + ) + retriedSamplesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "retried_samples_total", + Help: "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable.", }, []string{queue}, ) @@ -75,7 +89,16 @@ var ( Namespace: namespace, Subsystem: subsystem, Name: "dropped_samples_total", - Help: "Total number of samples which were dropped due to the queue being full.", + Help: "Total number of samples which were dropped after being read from the WAL before being sent via remote write.", + }, + []string{queue}, + ) + enqueueRetriesTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "enqueue_retries_total", + Help: "Total number of times enqueue has failed because a shards queue was full.", }, []string{queue}, ) @@ -89,12 +112,30 @@ var ( }, []string{queue}, ) - queueLength = prometheus.NewGaugeVec( + queueLastSendTimestamp = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: namespace, Subsystem: subsystem, - Name: "queue_length", - Help: "The number of processed samples queued to be sent to the remote storage.", + Name: "queue_last_send_timestamp", + Help: "Timestamp of the last successful send by this queue.", + }, + []string{queue}, + ) + queueHighestSentTimestamp = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "queue_highest_sent_timestamp", + Help: "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue.", + }, + []string{queue}, + ) + queuePendingSamples = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "pending_samples", + Help: "The number of samples pending in the queues shards to be sent to the remote storage.", }, []string{queue}, ) @@ -121,9 +162,13 @@ var ( func init() { prometheus.MustRegister(succeededSamplesTotal) prometheus.MustRegister(failedSamplesTotal) + prometheus.MustRegister(retriedSamplesTotal) prometheus.MustRegister(droppedSamplesTotal) + prometheus.MustRegister(enqueueRetriesTotal) prometheus.MustRegister(sentBatchDuration) - prometheus.MustRegister(queueLength) + prometheus.MustRegister(queueLastSendTimestamp) + prometheus.MustRegister(queueHighestSentTimestamp) + prometheus.MustRegister(queuePendingSamples) prometheus.MustRegister(shardCapacity) prometheus.MustRegister(numShards) } @@ -132,37 +177,54 @@ func init() { // external timeseries database. type StorageClient interface { // Store stores the given samples in the remote storage. - Store(context.Context, *prompb.WriteRequest) error + Store(context.Context, []byte) error // Name identifies the remote storage implementation. Name() string } // QueueManager manages a queue of samples to be sent to the Storage -// indicated by the provided StorageClient. +// indicated by the provided StorageClient. Implements writeTo interface +// used by WAL Watcher. type QueueManager struct { logger log.Logger - flushDeadline time.Duration - cfg config.QueueConfig - externalLabels model.LabelSet - relabelConfigs []*pkgrelabel.Config - client StorageClient - queueName string - logLimiter *rate.Limiter + flushDeadline time.Duration + cfg config.QueueConfig + externalLabels model.LabelSet + relabelConfigs []*pkgrelabel.Config + client StorageClient + queueName string + logLimiter *rate.Limiter + watcher *WALWatcher + lastSendTimestampMetric prometheus.Gauge + highestSentTimestampMetric prometheus.Gauge + pendingSamplesMetric prometheus.Gauge + enqueueRetriesMetric prometheus.Counter + + lastSendTimestamp int64 + highestSentTimestamp int64 + timestampLock sync.Mutex + + highestTimestampIn *int64 // highest timestamp of any sample ingested by remote storage via scrape (Appender) + + seriesMtx sync.Mutex + seriesLabels map[uint64][]prompb.Label + seriesSegmentIndexes map[uint64]int + droppedSeries map[uint64]struct{} - shardsMtx sync.RWMutex shards *shards numShards int reshardChan chan int - quit chan struct{} - wg sync.WaitGroup + + quit chan struct{} + wg sync.WaitGroup samplesIn, samplesOut, samplesOutDuration *ewmaRate integralAccumulator float64 } // NewQueueManager builds a new QueueManager. -func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration) *QueueManager { +func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, highestTimestampIn *int64, cfg config.QueueConfig, externalLabels model.LabelSet, relabelConfigs []*pkgrelabel.Config, client StorageClient, flushDeadline time.Duration, startTime int64) *QueueManager { if logger == nil { logger = log.NewNopLogger() } else { @@ -177,16 +239,29 @@ func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels m client: client, queueName: client.Name(), + highestTimestampIn: highestTimestampIn, + + seriesLabels: make(map[uint64][]prompb.Label), + seriesSegmentIndexes: make(map[uint64]int), + droppedSeries: make(map[uint64]struct{}), + logLimiter: rate.NewLimiter(logRateLimit, logBurst), numShards: cfg.MinShards, reshardChan: make(chan int), quit: make(chan struct{}), - samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesIn: samplesIn, samplesOut: newEWMARate(ewmaWeight, shardUpdateDuration), samplesOutDuration: newEWMARate(ewmaWeight, shardUpdateDuration), } - t.shards = t.newShards(t.numShards) + + t.lastSendTimestampMetric = queueLastSendTimestamp.WithLabelValues(t.queueName) + t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName) + t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName) + t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName) + t.watcher = NewWALWatcher(logger, t, walDir, startTime) + t.shards = t.newShards() + numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) shardCapacity.WithLabelValues(t.queueName).Set(float64(t.cfg.Capacity)) @@ -194,77 +269,158 @@ func NewQueueManager(logger log.Logger, cfg config.QueueConfig, externalLabels m sentBatchDuration.WithLabelValues(t.queueName) succeededSamplesTotal.WithLabelValues(t.queueName) failedSamplesTotal.WithLabelValues(t.queueName) - droppedSamplesTotal.WithLabelValues(t.queueName) + retriedSamplesTotal.WithLabelValues(t.queueName) + // Reset pending samples metric to 0. + t.pendingSamplesMetric.Set(0) return t } -// Append queues a sample to be sent to the remote storage. It drops the -// sample on the floor if the queue is full. -// Always returns nil. -func (t *QueueManager) Append(s *model.Sample) error { - snew := *s - snew.Metric = s.Metric.Clone() +// Append queues a sample to be sent to the remote storage. Blocks until all samples are +// enqueued on their shards or a shutdown signal is received. +func (t *QueueManager) Append(s []tsdb.RefSample) bool { + type enqueuable struct { + ts prompb.TimeSeries + ref uint64 + } - for ln, lv := range t.externalLabels { - if _, ok := s.Metric[ln]; !ok { - snew.Metric[ln] = lv + tempSamples := make([]enqueuable, 0, len(s)) + t.seriesMtx.Lock() + for _, sample := range s { + // If we have no labels for the series, due to relabelling or otherwise, don't send the sample. + if _, ok := t.seriesLabels[sample.Ref]; !ok { + droppedSamplesTotal.WithLabelValues(t.queueName).Inc() + if _, ok := t.droppedSeries[sample.Ref]; !ok && t.logLimiter.Allow() { + level.Info(t.logger).Log("msg", "dropped sample for series that was not explicitly dropped via relabelling", "ref", sample.Ref) + } + continue + } + tempSamples = append(tempSamples, enqueuable{ + ts: prompb.TimeSeries{ + Labels: t.seriesLabels[sample.Ref], + Samples: []prompb.Sample{ + prompb.Sample{ + Value: float64(sample.V), + Timestamp: sample.T, + }, + }, + }, + ref: sample.Ref, + }) + } + t.seriesMtx.Unlock() + + backoff := t.cfg.MinBackoff +outer: + for _, sample := range tempSamples { + // This will result in spin/busy waiting if the queues are being resharded + // or shutting down. TODO backoff. + for { + select { + case <-t.quit: + return false + default: + } + + if t.shards.enqueue(sample.ref, sample.ts) { + continue outer + } + t.enqueueRetriesMetric.Inc() + time.Sleep(time.Duration(backoff)) + backoff = backoff * 2 + if backoff > t.cfg.MaxBackoff { + backoff = t.cfg.MaxBackoff + } } } - - snew.Metric = model.Metric( - relabel.Process(model.LabelSet(snew.Metric), t.relabelConfigs...)) - - if snew.Metric == nil { - return nil - } - - t.shardsMtx.RLock() - enqueued := t.shards.enqueue(&snew) - t.shardsMtx.RUnlock() - - if enqueued { - queueLength.WithLabelValues(t.queueName).Inc() - } else { - droppedSamplesTotal.WithLabelValues(t.queueName).Inc() - if t.logLimiter.Allow() { - level.Warn(t.logger).Log("msg", "Remote storage queue full, discarding sample. Multiple subsequent messages of this kind may be suppressed.") - } - } - return nil -} - -// NeedsThrottling implements storage.SampleAppender. It will always return -// false as a remote storage drops samples on the floor if backlogging instead -// of asking for throttling. -func (*QueueManager) NeedsThrottling() bool { - return false + return true } // Start the queue manager sending samples to the remote storage. // Does not block. func (t *QueueManager) Start() { + t.shards.start(t.numShards) + t.watcher.Start() + t.wg.Add(2) go t.updateShardsLoop() go t.reshardLoop() - - t.shardsMtx.Lock() - defer t.shardsMtx.Unlock() - t.shards.start() } // Stop stops sending samples to the remote storage and waits for pending // sends to complete. func (t *QueueManager) Stop() { level.Info(t.logger).Log("msg", "Stopping remote storage...") + defer level.Info(t.logger).Log("msg", "Remote storage stopped.") + close(t.quit) + t.shards.stop() + t.watcher.Stop() t.wg.Wait() +} - t.shardsMtx.Lock() - defer t.shardsMtx.Unlock() - t.shards.stop(t.flushDeadline) +func (t *QueueManager) Name() string { + return t.queueName +} - level.Info(t.logger).Log("msg", "Remote storage stopped.") +// Find out which series are dropped after relabelling and make sure we have a metric label for them. +func (t *QueueManager) diffKeys(ref uint64, original, relabelled model.LabelSet) { + numDropped := len(original) - len(relabelled) + if numDropped == 0 { + return + } +} + +// StoreSeries keeps track of which series we know about for lookups when sending samples to remote. +func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { + temp := make(map[uint64][]prompb.Label, len(series)) + for _, s := range series { + ls := make(model.LabelSet, len(s.Labels)) + for _, label := range s.Labels { + ls[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + t.processExternalLabels(ls) + rl := relabel.Process(ls, t.relabelConfigs...) + + t.diffKeys(s.Ref, ls, rl) + if len(rl) == 0 { + t.droppedSeries[s.Ref] = struct{}{} + continue + } + temp[s.Ref] = labelsetToLabelsProto(rl) + } + + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() + for ref, labels := range temp { + t.seriesLabels[ref] = labels + t.seriesSegmentIndexes[ref] = index + } +} + +// SeriesReset is used when reading a checkpoint. WAL Watcher should have +// stored series records with the checkpoints index number, so we can now +// delete any ref ID's lower than that # from the two maps. +func (t *QueueManager) SeriesReset(index int) { + t.seriesMtx.Lock() + defer t.seriesMtx.Unlock() + + // Check for series that are in segments older than the checkpoint + // that were not also present in the checkpoint. + for k, v := range t.seriesSegmentIndexes { + if v < index { + delete(t.seriesLabels, k) + delete(t.seriesSegmentIndexes, k) + } + } +} + +func (t *QueueManager) processExternalLabels(ls model.LabelSet) { + for ln, lv := range t.externalLabels { + if _, ok := ls[ln]; !ok { + ls[ln] = lv + } + } } func (t *QueueManager) updateShardsLoop() { @@ -275,6 +431,12 @@ func (t *QueueManager) updateShardsLoop() { for { select { case <-ticker.C: + now := time.Now().Unix() + threshold := int64(time.Duration(2 * t.cfg.BatchSendDeadline).Seconds()) + if now-t.lastSendTimestamp > threshold { + level.Debug(t.logger).Log("msg", "Skipping resharding, last successful send was beyond threshold") + continue + } t.calculateDesiredShards() case <-t.quit: return @@ -351,107 +513,150 @@ func (t *QueueManager) reshardLoop() { for { select { case numShards := <-t.reshardChan: - t.reshard(numShards) + // We start the newShards after we have stopped (the therefore completely + // flushed) the oldShards, to guarantee we only every deliver samples in + // order. + t.shards.stop() + t.shards.start(numShards) case <-t.quit: return } } } -func (t *QueueManager) reshard(n int) { - numShards.WithLabelValues(t.queueName).Set(float64(n)) - - t.shardsMtx.Lock() - newShards := t.newShards(n) - oldShards := t.shards - t.shards = newShards - t.shardsMtx.Unlock() - - oldShards.stop(t.flushDeadline) - - // We start the newShards after we have stopped (the therefore completely - // flushed) the oldShards, to guarantee we only every deliver samples in - // order. - newShards.start() -} - -type shards struct { - qm *QueueManager - queues []chan *model.Sample - done chan struct{} - running int32 - ctx context.Context - cancel context.CancelFunc -} - -func (t *QueueManager) newShards(numShards int) *shards { - queues := make([]chan *model.Sample, numShards) - for i := 0; i < numShards; i++ { - queues[i] = make(chan *model.Sample, t.cfg.Capacity) - } - ctx, cancel := context.WithCancel(context.Background()) +func (t *QueueManager) newShards() *shards { s := &shards{ - qm: t, - queues: queues, - done: make(chan struct{}), - running: int32(numShards), - ctx: ctx, - cancel: cancel, + qm: t, + done: make(chan struct{}), } return s } -func (s *shards) start() { - for i := 0; i < len(s.queues); i++ { - go s.runShard(i) +// Check and set highestSentTimestamp +func (t *QueueManager) setHighestSentTimestamp(highest int64) { + t.timestampLock.Lock() + defer t.timestampLock.Unlock() + if highest > t.highestSentTimestamp { + t.highestSentTimestamp = highest + t.highestSentTimestampMetric.Set(float64(t.highestSentTimestamp)) } } -func (s *shards) stop(deadline time.Duration) { - // Attempt a clean shutdown. - for _, shard := range s.queues { - close(shard) +func (t *QueueManager) setLastSendTimestamp(now time.Time) { + t.timestampLock.Lock() + defer t.timestampLock.Unlock() + t.lastSendTimestampMetric.Set(float64(now.UnixNano()) / 1e9) + t.lastSendTimestamp = now.Unix() +} + +type shards struct { + mtx sync.RWMutex // With the WAL, this is never actually contended. + + qm *QueueManager + queues []chan prompb.TimeSeries + + // Emulate a wait group with a channel and an atomic int, as you + // cannot select on a wait group. + done chan struct{} + running int32 + + // Soft shutdown context will prevent new enqueues and deadlocks. + softShutdown chan struct{} + + // Hard shutdown context is used to terminate outgoing HTTP connections + // after giving them a chance to terminate. + hardShutdown context.CancelFunc +} + +// start the shards; must be called before any call to enqueue. +func (s *shards) start(n int) { + s.mtx.Lock() + defer s.mtx.Unlock() + + newQueues := make([]chan prompb.TimeSeries, n) + for i := 0; i < n; i++ { + newQueues[i] = make(chan prompb.TimeSeries, s.qm.cfg.Capacity) + } + + s.queues = newQueues + + var hardShutdownCtx context.Context + hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background()) + s.softShutdown = make(chan struct{}) + s.running = int32(n) + s.done = make(chan struct{}) + for i := 0; i < n; i++ { + go s.runShard(hardShutdownCtx, i, newQueues[i]) + } + numShards.WithLabelValues(s.qm.queueName).Set(float64(n)) +} + +// stop the shards; subsequent call to enqueue will return false. +func (s *shards) stop() { + // Attempt a clean shutdown, but only wait flushDeadline for all the shards + // to cleanly exit. As we're doing RPCs, enqueue can block indefinately. + // We must be able so call stop concurrently, hence we can only take the + // RLock here. + s.mtx.RLock() + close(s.softShutdown) + s.mtx.RUnlock() + + // Enqueue should now be unblocked, so we can take the write lock. This + // also ensures we don't race with writes to the queues, and get a panic: + // send on closed channel. + s.mtx.Lock() + defer s.mtx.Unlock() + for _, queue := range s.queues { + close(queue) } select { case <-s.done: return - case <-time.After(deadline): + case <-time.After(s.qm.flushDeadline): level.Error(s.qm.logger).Log("msg", "Failed to flush all samples on shutdown") } // Force an unclean shutdown. - s.cancel() + s.hardShutdown() <-s.done } -func (s *shards) enqueue(sample *model.Sample) bool { - s.qm.samplesIn.incr(1) - - fp := sample.Metric.FastFingerprint() - shard := uint64(fp) % uint64(len(s.queues)) +// enqueue a sample. If we are currently in the process of shutting down or resharding, +// will return false; in this case, you should back off and retry. +func (s *shards) enqueue(ref uint64, sample prompb.TimeSeries) bool { + s.mtx.RLock() + defer s.mtx.RUnlock() select { + case <-s.softShutdown: + return false + default: + } + + shard := uint64(ref) % uint64(len(s.queues)) + select { + case <-s.softShutdown: + return false case s.queues[shard] <- sample: return true - default: - return false } } -func (s *shards) runShard(i int) { +func (s *shards) runShard(ctx context.Context, i int, queue chan prompb.TimeSeries) { defer func() { if atomic.AddInt32(&s.running, -1) == 0 { close(s.done) } }() - queue := s.queues[i] + shardNum := strconv.Itoa(i) // Send batches of at most MaxSamplesPerSend samples to the remote storage. // If we have fewer samples than that, flush them out after a deadline // anyways. - pendingSamples := model.Samples{} + pendingSamples := []prompb.TimeSeries{} + max := s.qm.cfg.MaxSamplesPerSend timer := time.NewTimer(time.Duration(s.qm.cfg.BatchSendDeadline)) stop := func() { if !timer.Stop() { @@ -465,25 +670,29 @@ func (s *shards) runShard(i int) { for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return case sample, ok := <-queue: if !ok { if len(pendingSamples) > 0 { level.Debug(s.qm.logger).Log("msg", "Flushing samples to remote storage...", "count", len(pendingSamples)) - s.sendSamples(pendingSamples) + s.sendSamples(ctx, pendingSamples) level.Debug(s.qm.logger).Log("msg", "Done flushing.") } return } - queueLength.WithLabelValues(s.qm.queueName).Dec() + // Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff) + // retries endlessly, so once we reach > 100 samples, if we can never send to the endpoint we'll + // stop reading from the queue (which has a size of 10). pendingSamples = append(pendingSamples, sample) + s.qm.pendingSamplesMetric.Inc() - if len(pendingSamples) >= s.qm.cfg.MaxSamplesPerSend { - s.sendSamples(pendingSamples[:s.qm.cfg.MaxSamplesPerSend]) - pendingSamples = pendingSamples[s.qm.cfg.MaxSamplesPerSend:] + if len(pendingSamples) >= max { + s.sendSamples(ctx, pendingSamples[:max]) + pendingSamples = pendingSamples[max:] + s.qm.pendingSamplesMetric.Sub(float64(max)) stop() timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) @@ -491,17 +700,24 @@ func (s *shards) runShard(i int) { case <-timer.C: if len(pendingSamples) > 0 { - s.sendSamples(pendingSamples) + level.Debug(s.qm.logger).Log("msg", "runShard timer ticked, sending samples", "samples", len(pendingSamples), "shard", shardNum) + n := len(pendingSamples) + s.sendSamples(ctx, pendingSamples) pendingSamples = pendingSamples[:0] + s.qm.pendingSamplesMetric.Sub(float64(n)) } timer.Reset(time.Duration(s.qm.cfg.BatchSendDeadline)) } } } -func (s *shards) sendSamples(samples model.Samples) { +func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries) { begin := time.Now() - s.sendSamplesWithBackoff(samples) + err := s.sendSamplesWithBackoff(ctx, samples) + if err != nil && s.qm.logLimiter.Allow() { + level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err) + failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + } // These counters are used to calculate the dynamic sharding, and as such // should be maintained irrespective of success or failure. @@ -510,30 +726,67 @@ func (s *shards) sendSamples(samples model.Samples) { } // sendSamples to the remote storage with backoff for recoverable errors. -func (s *shards) sendSamplesWithBackoff(samples model.Samples) { +func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.TimeSeries) error { backoff := s.qm.cfg.MinBackoff - req := ToWriteRequest(samples) - - for retries := s.qm.cfg.MaxRetries; retries > 0; retries-- { + req, highest, err := buildWriteRequest(samples) + // Failing to build the write request is non-recoverable, since it will + // only error if marshaling the proto to bytes fails. + if err != nil { + return err + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } begin := time.Now() - err := s.qm.client.Store(s.ctx, req) + err := s.qm.client.Store(ctx, req) sentBatchDuration.WithLabelValues(s.qm.queueName).Observe(time.Since(begin).Seconds()) + if err == nil { succeededSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) - return + now := time.Now() + s.qm.setLastSendTimestamp(now) + s.qm.setHighestSentTimestamp(highest) + return nil } - level.Warn(s.qm.logger).Log("msg", "Error sending samples to remote storage", "count", len(samples), "err", err) if _, ok := err.(recoverableError); !ok { - break + return err } + retriedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) + + if s.qm.logLimiter.Allow() { + level.Error(s.qm.logger).Log("err", err) + } + time.Sleep(time.Duration(backoff)) backoff = backoff * 2 if backoff > s.qm.cfg.MaxBackoff { backoff = s.qm.cfg.MaxBackoff } } - - failedSamplesTotal.WithLabelValues(s.qm.queueName).Add(float64(len(samples))) +} + +func buildWriteRequest(samples []prompb.TimeSeries) ([]byte, int64, error) { + var highest int64 + for _, ts := range samples { + // At the moment we only ever append a TimeSeries with a single sample in it. + if ts.Samples[0].Timestamp > highest { + highest = ts.Samples[0].Timestamp + } + } + req := &prompb.WriteRequest{ + Timeseries: samples, + } + + data, err := proto.Marshal(req) + if err != nil { + return nil, highest, err + } + + compressed := snappy.Encode(nil, data) + return compressed, highest, nil } diff --git a/storage/remote/queue_manager_test.go b/storage/remote/queue_manager_test.go index d70c7789aa..789875d6d8 100644 --- a/storage/remote/queue_manager_test.go +++ b/storage/remote/queue_manager_test.go @@ -22,13 +22,229 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + "github.com/golang/snappy" + "github.com/stretchr/testify/require" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/labels" ) const defaultFlushDeadline = 1 * time.Minute +func TestSampleDelivery(t *testing.T) { + // Let's create an even number of send batches so we don't run into the + // batch timeout case. + n := config.DefaultQueueConfig.Capacity * 2 + samples, series := createTimeseries(n) + + c := NewTestStorageClient() + c.expectSamples(samples[:len(samples)/2], series) + + cfg := config.DefaultQueueConfig + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + cfg.MaxShards = 1 + var temp int64 + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m.seriesLabels = refSeriesToLabelsProto(series) + + // These should be received by the client. + m.Start() + m.Append(samples[:len(samples)/2]) + defer m.Stop() + + c.waitForExpectedSamples(t) + m.Append(samples[len(samples)/2:]) + c.expectSamples(samples[len(samples)/2:], series) + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryTimeout(t *testing.T) { + // Let's send one less sample than batch size, and wait the timeout duration + n := 9 + samples, series := createTimeseries(n) + c := NewTestStorageClient() + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) + var temp int64 + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m.seriesLabels = refSeriesToLabelsProto(series) + m.Start() + defer m.Stop() + + // Send the samples twice, waiting for the samples in the meantime. + c.expectSamples(samples, series) + m.Append(samples) + c.waitForExpectedSamples(t) + + c.expectSamples(samples, series) + m.Append(samples) + c.waitForExpectedSamples(t) +} + +func TestSampleDeliveryOrder(t *testing.T) { + ts := 10 + n := config.DefaultQueueConfig.MaxSamplesPerSend * ts + samples := make([]tsdb.RefSample, 0, n) + series := make([]tsdb.RefSeries, 0, n) + for i := 0; i < n; i++ { + name := fmt.Sprintf("test_metric_%d", i%ts) + samples = append(samples, tsdb.RefSample{ + Ref: uint64(i), + T: int64(i), + V: float64(i), + }) + series = append(series, tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: name}}, + }) + } + + c := NewTestStorageClient() + c.expectSamples(samples, series) + var temp int64 + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline, 0) + m.seriesLabels = refSeriesToLabelsProto(series) + + m.Start() + defer m.Stop() + // These should be received by the client. + m.Append(samples) + c.waitForExpectedSamples(t) +} + +func TestShutdown(t *testing.T) { + deadline := 5 * time.Second + c := NewTestBlockedStorageClient() + + var temp int64 + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline, 0) + samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) + m.seriesLabels = refSeriesToLabelsProto(series) + m.Start() + + // Append blocks to guarantee delivery, so we do it in the background. + go func() { + m.Append(samples) + }() + time.Sleep(1 * time.Second) + + // Test to ensure that Stop doesn't block. + start := time.Now() + m.Stop() + // The samples will never be delivered, so duration should + // be at least equal to deadline, otherwise the flush deadline + // was not respected. + duration := time.Since(start) + if duration > time.Duration(deadline+(deadline/10)) { + t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) + } + if duration < time.Duration(deadline) { + t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) + } +} + +func TestSeriesReset(t *testing.T) { + c := NewTestBlockedStorageClient() + deadline := 5 * time.Second + var temp int64 + numSegments := 4 + numSeries := 25 + + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, config.DefaultQueueConfig, nil, nil, c, deadline, 0) + for i := 0; i < numSegments; i++ { + series := []tsdb.RefSeries{} + for j := 0; j < numSeries; j++ { + series = append(series, tsdb.RefSeries{Ref: uint64((i * 100) + j), Labels: labels.Labels{labels.Label{Name: "a", Value: "a"}}}) + } + m.StoreSeries(series, i) + } + testutil.Equals(t, numSegments*numSeries, len(m.seriesLabels)) + m.SeriesReset(2) + testutil.Equals(t, numSegments*numSeries/2, len(m.seriesLabels)) +} + +func TestReshard(t *testing.T) { + size := 10 // Make bigger to find more races. + n := config.DefaultQueueConfig.Capacity * size + samples, series := createTimeseries(n) + + c := NewTestStorageClient() + c.expectSamples(samples, series) + + cfg := config.DefaultQueueConfig + cfg.MaxShards = 1 + + var temp int64 + m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), &temp, cfg, nil, nil, c, defaultFlushDeadline, 0) + m.seriesLabels = refSeriesToLabelsProto(series) + + m.Start() + defer m.Stop() + + go func() { + for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity { + sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity]) + require.True(t, sent) + time.Sleep(100 * time.Millisecond) + } + }() + + for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ { + m.shards.stop() + m.shards.start(i) + time.Sleep(100 * time.Millisecond) + } + + c.waitForExpectedSamples(t) +} + +func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) { + samples := make([]tsdb.RefSample, 0, n) + series := make([]tsdb.RefSeries, 0, n) + for i := 0; i < n; i++ { + name := fmt.Sprintf("test_metric_%d", i) + samples = append(samples, tsdb.RefSample{ + Ref: uint64(i), + T: int64(i), + V: float64(i), + }) + series = append(series, tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{Name: "__name__", Value: name}}, + }) + } + return samples, series +} + +func getSeriesNameFromRef(r tsdb.RefSeries) string { + for _, l := range r.Labels { + if l.Name == "__name__" { + return l.Value + } + } + return "" +} + +func refSeriesToLabelsProto(series []tsdb.RefSeries) map[uint64][]prompb.Label { + result := make(map[uint64][]prompb.Label) + for _, s := range series { + for _, l := range s.Labels { + result[s.Ref] = append(result[s.Ref], prompb.Label{ + Name: l.Name, + Value: l.Value, + }) + } + } + return result +} + type TestStorageClient struct { receivedSamples map[string][]prompb.Sample expectedSamples map[string][]prompb.Sample @@ -43,7 +259,7 @@ func NewTestStorageClient() *TestStorageClient { } } -func (c *TestStorageClient) expectSamples(ss model.Samples) { +func (c *TestStorageClient) expectSamples(ss []tsdb.RefSample, series []tsdb.RefSeries) { c.mtx.Lock() defer c.mtx.Unlock() @@ -51,10 +267,10 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { c.receivedSamples = map[string][]prompb.Sample{} for _, s := range ss { - ts := labelProtosToLabels(MetricToLabelProtos(s.Metric)).String() - c.expectedSamples[ts] = append(c.expectedSamples[ts], prompb.Sample{ - Timestamp: int64(s.Timestamp), - Value: float64(s.Value), + seriesName := getSeriesNameFromRef(series[s.Ref]) + c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ + Timestamp: s.T, + Value: s.V, }) } c.wg.Add(len(ss)) @@ -62,7 +278,6 @@ func (c *TestStorageClient) expectSamples(ss model.Samples) { func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { c.wg.Wait() - c.mtx.Lock() defer c.mtx.Unlock() for ts, expectedSamples := range c.expectedSamples { @@ -72,15 +287,31 @@ func (c *TestStorageClient) waitForExpectedSamples(t *testing.T) { } } -func (c *TestStorageClient) Store(_ context.Context, req *prompb.WriteRequest) error { +func (c *TestStorageClient) Store(_ context.Context, req []byte) error { c.mtx.Lock() defer c.mtx.Unlock() + reqBuf, err := snappy.Decode(nil, req) + if err != nil { + return err + } + + var reqProto prompb.WriteRequest + if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { + return err + } + count := 0 - for _, ts := range req.Timeseries { - labels := labelProtosToLabels(ts.Labels).String() + for _, ts := range reqProto.Timeseries { + var seriesName string + labels := labelProtosToLabels(ts.Labels) + for _, label := range labels { + if label.Name == "__name__" { + seriesName = label.Value + } + } for _, sample := range ts.Samples { count++ - c.receivedSamples[labels] = append(c.receivedSamples[labels], sample) + c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) } } c.wg.Add(-count) @@ -91,133 +322,21 @@ func (c *TestStorageClient) Name() string { return "teststorageclient" } -func TestSampleDelivery(t *testing.T) { - // Let's create an even number of send batches so we don't run into the - // batch timeout case. - n := config.DefaultQueueConfig.Capacity * 2 - - samples := make(model.Samples, 0, n) - for i := 0; i < n; i++ { - name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) - samples = append(samples, &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: name, - }, - Value: model.SampleValue(i), - }) - } - - c := NewTestStorageClient() - c.expectSamples(samples[:len(samples)/2]) - - cfg := config.DefaultQueueConfig - cfg.MaxShards = 1 - m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) - - // These should be received by the client. - for _, s := range samples[:len(samples)/2] { - m.Append(s) - } - // These will be dropped because the queue is full. - for _, s := range samples[len(samples)/2:] { - m.Append(s) - } - m.Start() - defer m.Stop() - - c.waitForExpectedSamples(t) -} - -func TestSampleDeliveryTimeout(t *testing.T) { - // Let's send one less sample than batch size, and wait the timeout duration - n := config.DefaultQueueConfig.Capacity - 1 - - samples := make(model.Samples, 0, n) - for i := 0; i < n; i++ { - name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) - samples = append(samples, &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: name, - }, - Value: model.SampleValue(i), - }) - } - - c := NewTestStorageClient() - - cfg := config.DefaultQueueConfig - cfg.MaxShards = 1 - cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) - m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) - m.Start() - defer m.Stop() - - // Send the samples twice, waiting for the samples in the meantime. - c.expectSamples(samples) - for _, s := range samples { - m.Append(s) - } - c.waitForExpectedSamples(t) - - c.expectSamples(samples) - for _, s := range samples { - m.Append(s) - } - c.waitForExpectedSamples(t) -} - -func TestSampleDeliveryOrder(t *testing.T) { - ts := 10 - n := config.DefaultQueueConfig.MaxSamplesPerSend * ts - - samples := make(model.Samples, 0, n) - for i := 0; i < n; i++ { - name := model.LabelValue(fmt.Sprintf("test_metric_%d", i%ts)) - samples = append(samples, &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: name, - }, - Value: model.SampleValue(i), - Timestamp: model.Time(i), - }) - } - - c := NewTestStorageClient() - c.expectSamples(samples) - m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) - - // These should be received by the client. - for _, s := range samples { - m.Append(s) - } - m.Start() - defer m.Stop() - - c.waitForExpectedSamples(t) -} - // TestBlockingStorageClient is a queue_manager StorageClient which will block -// on any calls to Store(), until the `block` channel is closed, at which point -// the `numCalls` property will contain a count of how many times Store() was -// called. +// on any calls to Store(), until the request's Context is cancelled, at which +// point the `numCalls` property will contain a count of how many times Store() +// was called. type TestBlockingStorageClient struct { numCalls uint64 - block chan bool } func NewTestBlockedStorageClient() *TestBlockingStorageClient { - return &TestBlockingStorageClient{ - block: make(chan bool), - numCalls: 0, - } + return &TestBlockingStorageClient{} } -func (c *TestBlockingStorageClient) Store(ctx context.Context, _ *prompb.WriteRequest) error { +func (c *TestBlockingStorageClient) Store(ctx context.Context, _ []byte) error { atomic.AddUint64(&c.numCalls, 1) - select { - case <-c.block: - case <-ctx.Done(): - } + <-ctx.Done() return nil } @@ -225,106 +344,6 @@ func (c *TestBlockingStorageClient) NumCalls() uint64 { return atomic.LoadUint64(&c.numCalls) } -func (c *TestBlockingStorageClient) unlock() { - close(c.block) -} - func (c *TestBlockingStorageClient) Name() string { return "testblockingstorageclient" } - -func (t *QueueManager) queueLen() int { - t.shardsMtx.Lock() - defer t.shardsMtx.Unlock() - queueLength := 0 - for _, shard := range t.shards.queues { - queueLength += len(shard) - } - return queueLength -} - -func TestSpawnNotMoreThanMaxConcurrentSendsGoroutines(t *testing.T) { - // Our goal is to fully empty the queue: - // `MaxSamplesPerSend*Shards` samples should be consumed by the - // per-shard goroutines, and then another `MaxSamplesPerSend` - // should be left on the queue. - n := config.DefaultQueueConfig.MaxSamplesPerSend * 2 - - samples := make(model.Samples, 0, n) - for i := 0; i < n; i++ { - name := model.LabelValue(fmt.Sprintf("test_metric_%d", i)) - samples = append(samples, &model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: name, - }, - Value: model.SampleValue(i), - }) - } - - c := NewTestBlockedStorageClient() - cfg := config.DefaultQueueConfig - cfg.MaxShards = 1 - cfg.Capacity = n - m := NewQueueManager(nil, cfg, nil, nil, c, defaultFlushDeadline) - - m.Start() - - defer func() { - c.unlock() - m.Stop() - }() - - for _, s := range samples { - m.Append(s) - } - - // Wait until the runShard() loops drain the queue. If things went right, it - // should then immediately block in sendSamples(), but, in case of error, - // it would spawn too many goroutines, and thus we'd see more calls to - // client.Store() - // - // The timed wait is maybe non-ideal, but, in order to verify that we're - // not spawning too many concurrent goroutines, we have to wait on the - // Run() loop to consume a specific number of elements from the - // queue... and it doesn't signal that in any obvious way, except by - // draining the queue. We cap the waiting at 1 second -- that should give - // plenty of time, and keeps the failure fairly quick if we're not draining - // the queue properly. - for i := 0; i < 100 && m.queueLen() > 0; i++ { - time.Sleep(10 * time.Millisecond) - } - - if m.queueLen() != config.DefaultQueueConfig.MaxSamplesPerSend { - t.Fatalf("Failed to drain QueueManager queue, %d elements left", - m.queueLen(), - ) - } - - numCalls := c.NumCalls() - if numCalls != uint64(1) { - t.Errorf("Saw %d concurrent sends, expected 1", numCalls) - } -} - -func TestShutdown(t *testing.T) { - deadline := 10 * time.Second - c := NewTestBlockedStorageClient() - m := NewQueueManager(nil, config.DefaultQueueConfig, nil, nil, c, deadline) - for i := 0; i < config.DefaultQueueConfig.MaxSamplesPerSend; i++ { - m.Append(&model.Sample{ - Metric: model.Metric{ - model.MetricNameLabel: model.LabelValue(fmt.Sprintf("test_metric_%d", i)), - }, - Value: model.SampleValue(i), - Timestamp: model.Time(i), - }) - } - m.Start() - - start := time.Now() - m.Stop() - duration := time.Since(start) - if duration > deadline+(deadline/10) { - t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) - } -} diff --git a/storage/remote/storage.go b/storage/remote/storage.go index 776edc07f0..e672cf2861 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -19,9 +19,12 @@ import ( "time" "github.com/go-kit/kit/log" + + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/timestamp" "github.com/prometheus/prometheus/storage" ) @@ -35,7 +38,12 @@ type Storage struct { mtx sync.RWMutex // For writes - queues []*QueueManager + walDir string + queues []*QueueManager + samplesIn *ewmaRate + samplesInMetric prometheus.Counter + highestTimestamp int64 + highestTimestampMetric prometheus.Gauge // For reads queryables []storage.Queryable @@ -44,15 +52,30 @@ type Storage struct { } // NewStorage returns a remote.Storage. -func NewStorage(l log.Logger, stCallback startTimeCallback, flushDeadline time.Duration) *Storage { +func NewStorage(l log.Logger, reg prometheus.Registerer, stCallback startTimeCallback, walDir string, flushDeadline time.Duration) *Storage { if l == nil { l = log.NewNopLogger() } - return &Storage{ + shardUpdateDuration := 10 * time.Second + s := &Storage{ logger: l, localStartTimeCallback: stCallback, flushDeadline: flushDeadline, + walDir: walDir, + // queues: make(map[*QueueManager]struct{}), + samplesIn: newEWMARate(ewmaWeight, shardUpdateDuration), + samplesInMetric: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_remote_storage_samples_in_total", + Help: "Samples in to remote storage, compare to samples out for queue managers.", + }), + highestTimestampMetric: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_remote_storage_highest_timestamp_in", + Help: "Highest timestamp that has come into the remote storage via the Appender interface.", + }), } + reg.MustRegister(s.samplesInMetric) + reg.MustRegister(s.highestTimestampMetric) + return s } // ApplyConfig updates the state as the new config requires. @@ -61,7 +84,6 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { defer s.mtx.Unlock() // Update write queues - newQueues := []*QueueManager{} // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. @@ -74,13 +96,20 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { if err != nil { return err } + // Convert to int64 for comparison with timestamps from samples + // we will eventually read from the WAL on startup. + startTime := timestamp.FromTime(time.Now()) newQueues = append(newQueues, NewQueueManager( s.logger, + s.walDir, + s.samplesIn, + &s.highestTimestamp, rwConf.QueueConfig, conf.GlobalConfig.ExternalLabels, rwConf.WriteRelabelConfigs, c, s.flushDeadline, + startTime, )) } diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go new file mode 100644 index 0000000000..3ce700ba5c --- /dev/null +++ b/storage/remote/wal_watcher.go @@ -0,0 +1,548 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "context" + "fmt" + "io" + "os" + "path" + "strconv" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/fileutil" + "github.com/prometheus/tsdb/wal" +) + +var ( + watcherSamplesRecordsRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "samples_records_read_total", + Help: "Number of samples records read by the WAL watcher from the WAL.", + }, + []string{queue}, + ) + watcherSeriesRecordsRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "series_records_read_total", + Help: "Number of series records read by the WAL watcher from the WAL.", + }, + []string{queue}, + ) + watcherTombstoneRecordsRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "tombstone_records_read_total", + Help: "Number of tombstone records read by the WAL watcher from the WAL.", + }, + []string{queue}, + ) + watcherInvalidRecordsRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "invalid_records_read_total", + Help: "Number of invalid records read by the WAL watcher from the WAL.", + }, + []string{queue}, + ) + watcherUnknownTypeRecordsRead = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "unknown_records_read_total", + Help: "Number of records read by the WAL watcher from the WAL of an unknown record type.", + }, + []string{queue}, + ) + watcherRecordDecodeFails = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "record_decode_failures_total", + Help: "Number of records read by the WAL watcher that resulted in an error when decoding.", + }, + []string{queue}, + ) + watcherSamplesSentPreTailing = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "samples_sent_pre_tailing_total", + Help: "Number of sample records read by the WAL watcher and sent to remote write during replay of existing WAL.", + }, + []string{queue}, + ) + watcherCurrentSegment = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "prometheus", + Subsystem: "wal_watcher", + Name: "current_segment", + Help: "Current segment the WAL watcher is reading records from.", + }, + []string{queue}, + ) +) + +func init() { + prometheus.MustRegister(watcherSamplesRecordsRead) + prometheus.MustRegister(watcherSeriesRecordsRead) + prometheus.MustRegister(watcherTombstoneRecordsRead) + prometheus.MustRegister(watcherInvalidRecordsRead) + prometheus.MustRegister(watcherUnknownTypeRecordsRead) + prometheus.MustRegister(watcherRecordDecodeFails) + prometheus.MustRegister(watcherSamplesSentPreTailing) + prometheus.MustRegister(watcherCurrentSegment) +} + +type writeTo interface { + Append([]tsdb.RefSample) bool + StoreSeries([]tsdb.RefSeries, int) + SeriesReset(int) + Name() string +} + +// WALWatcher watches the TSDB WAL for a given WriteTo. +type WALWatcher struct { + writer writeTo + logger log.Logger + walDir string + currentSegment int + lastCheckpoint string + startTime int64 + + samplesReadMetric prometheus.Counter + seriesReadMetric prometheus.Counter + tombstonesReadMetric prometheus.Counter + invalidReadMetric prometheus.Counter + unknownReadMetric prometheus.Counter + recordDecodeFailsMetric prometheus.Counter + samplesSentPreTailing prometheus.Counter + currentSegmentMetric prometheus.Gauge + + ctx context.Context + cancel context.CancelFunc + quit chan struct{} +} + +// NewWALWatcher creates a new WAL watcher for a given WriteTo. +func NewWALWatcher(logger log.Logger, writer writeTo, walDir string, startTime int64) *WALWatcher { + if logger == nil { + logger = log.NewNopLogger() + } + ctx, cancel := context.WithCancel(context.Background()) + w := &WALWatcher{ + logger: logger, + writer: writer, + walDir: path.Join(walDir, "wal"), + startTime: startTime, + ctx: ctx, + cancel: cancel, + quit: make(chan struct{}), + } + + w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.writer.Name()) + w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.writer.Name()) + w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.writer.Name()) + w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.writer.Name()) + w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.writer.Name()) + w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.writer.Name()) + w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.writer.Name()) + w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.writer.Name()) + + return w +} + +func (w *WALWatcher) Start() { + level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.writer.Name()) + go w.runWatcher() +} + +func (w *WALWatcher) Stop() { + level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.writer.Name()) + close(w.quit) +} + +// TODO: fix the exit logic for this function +// The stop param is used to stop at the end of the existing WAL on startup, +// since scraped samples may be written to the latest segment before we finish reading it. +func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) { + var ( + dec tsdb.RecordDecoder + series []tsdb.RefSeries + samples []tsdb.RefSample + ret bool + ) + for !isClosed(w.quit) { + for r.Next() { + series = series[:0] + rec := r.Record() + // If the timestamp is > start then we should Append this sample and exit readSeriesRecords, + // because this is the first sample written to the WAL after the WAL watcher was started. + typ := dec.Type(rec) + if typ == tsdb.RecordSamples { + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + continue + } + for _, s := range samples { + if s.T > w.startTime { + w.writer.Append(samples) + ret = true + w.samplesSentPreTailing.Inc() + } + } + if ret { + level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start") + level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) + return + } + } + if typ != tsdb.RecordSeries { + continue + } + + series, err := dec.Series(rec, nil) + if err != nil { + level.Error(log.With(w.logger)).Log("err", err) + break + } + + w.writer.StoreSeries(series, index) + } + // Since we only call readSeriesRecords on fully written WAL segments or checkpoints, + // Error() will only return an error if something actually went wrong when reading + // a record, either it was invalid or it was only partially written to the WAL. + if err := r.Err(); err != nil { + level.Error(w.logger).Log("err", err) + return + } + // Ensure we read all of the bytes in the segment or checkpoint. + if r.TotalRead() >= stop { + level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) + return + } + } +} + +// Read all the series records from a Checkpoint directory. +func (w *WALWatcher) readCheckpoint(checkpointDir string) error { + sr, err := wal.NewSegmentsReader(checkpointDir) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer sr.Close() + + split := strings.Split(checkpointDir, ".") + if len(split) != 2 { + return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir) + } + + i, err := strconv.Atoi(split[1]) + if err != nil { + i = w.currentSegment - 1 + } + + size, err := getCheckpointSize(checkpointDir) + if err != nil { + level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir) + } + + w.readSeriesRecords(wal.NewLiveReader(sr), i, size) + level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) + w.writer.SeriesReset(i) + return nil +} + +// When starting the WAL watcher, there is potentially an existing WAL. In that case, we +// should read to the end of the newest existing segment before reading new records that +// are written to it, storing data from series records along the way. +// Unfortunately this function is duplicates some of TSDB Head.Init(). +func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*wal.Segment, *wal.LiveReader, error) { + // Backfill from the checkpoint first if it exists. + defer level.Debug(w.logger).Log("msg", "done reading existing WAL") + dir, startFrom, err := tsdb.LastCheckpoint(walDir) + if err != nil && err != tsdb.ErrNotFound { + return nil, nil, errors.Wrap(err, "find last checkpoint") + } + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir) + if err == nil { + w.lastCheckpoint = dir + err = w.readCheckpoint(dir) + if err != nil { + return nil, nil, err + } + startFrom++ + } + + // Backfill segments from the last checkpoint onwards if at least 2 segments exist. + if lastSegment > 0 { + for i := firstSegment; i < lastSegment; i++ { + seg, err := wal.OpenReadSegment(wal.SegmentName(walDir, i)) + if err != nil { + return nil, nil, err + } + sz, _ := getSegmentSize(walDir, i) + w.readSeriesRecords(wal.NewLiveReader(seg), i, sz) + } + } + + // We want to start the WAL Watcher from the end of the last segment on start, + // so we make sure to return the wal.Segment pointer + segment, err := wal.OpenReadSegment(wal.SegmentName(w.walDir, lastSegment)) + if err != nil { + return nil, nil, err + } + + r := wal.NewLiveReader(segment) + sz, _ := getSegmentSize(walDir, lastSegment) + w.readSeriesRecords(r, lastSegment, sz) + return segment, r, nil +} + +func (w *WALWatcher) decodeRecord(rec []byte) error { + var ( + dec tsdb.RecordDecoder + series []tsdb.RefSeries + samples []tsdb.RefSample + ) + switch dec.Type(rec) { + case tsdb.RecordSeries: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + level.Error(log.With(w.logger)).Log("err", err) + break + } + w.seriesReadMetric.Add(float64(len(series))) + w.writer.StoreSeries(series, w.currentSegment) + case tsdb.RecordSamples: + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + level.Error(log.With(w.logger)).Log("err", err) + break + } + w.samplesReadMetric.Add(float64(len(samples))) + // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). + w.writer.Append(samples) + case tsdb.RecordTombstones: + w.tombstonesReadMetric.Add(float64(len(samples))) + case tsdb.RecordInvalid: + w.invalidReadMetric.Add(float64(len(samples))) + return errors.New("invalid record") + default: + level.Info(w.logger).Log("msg", "unknown TSDB record type in decodeSegment") + return errors.New("unknown TSDB record type") + } + return nil +} + +func (w *WALWatcher) readSegment(r *wal.LiveReader) { + for r.Next() && !isClosed(w.quit) { + err := w.decodeRecord(r.Record()) + if err != nil { + level.Error(w.logger).Log("err", err) + } + } + if err := r.Err(); err != nil && err != io.EOF { + level.Error(w.logger).Log("err", err) + } +} + +func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { + readTimeout := 10 * time.Millisecond + readTicker := time.NewTicker(readTimeout) + defer readTicker.Stop() + checkpointTicker := time.NewTicker(5 * time.Second) + defer checkpointTicker.Stop() + segmentTicker := time.NewTicker(100 * time.Millisecond) + defer segmentTicker.Stop() + + currentSegmentName := fmt.Sprintf("%08d", w.currentSegment) + w.currentSegmentMetric.Set(float64(w.currentSegment)) + + for { + select { + case <-w.quit: + level.Info(w.logger).Log("msg", "quitting WAL watcher watch loop") + return errors.New("quit channel") + case <-checkpointTicker.C: + // check if there is a new checkpoint + dir, _, err := tsdb.LastCheckpoint(w.walDir) + if err != nil && err != tsdb.ErrNotFound { + continue + } + cn, err := checkpointNum(dir) + if err != nil { + continue + } + // TODO: callum, simplify the nesting here + if err == nil && dir != w.lastCheckpoint { + level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir) + d, err := strconv.Atoi(cn) + if err != nil { + level.Error(w.logger).Log("err", err) + } else if d < w.currentSegment { + w.lastCheckpoint = dir + // This potentially takes a long time, should we run it in another go routine? + err = w.readCheckpoint(w.lastCheckpoint) + if err != nil { + level.Error(w.logger).Log("err", err) + } + } else { + level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint for now", "current", currentSegmentName, "checkpoint", dir) + } + } + case <-segmentTicker.C: + // check if new segments exist + _, last, err := wl.Segments() + if err != nil { + level.Error(w.logger).Log("err", err) + continue + } + if last > w.currentSegment { + w.readSegment(reader) + level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", currentSegmentName, "new", fmt.Sprintf("%08d", last)) + return nil + } + case <-readTicker.C: + w.readSegment(reader) + } + } +} + +func (w *WALWatcher) runWatcher() { + // The WAL dir may not exist when Prometheus first starts up. + for { + if _, err := os.Stat(w.walDir); os.IsNotExist(err) { + time.Sleep(time.Second) + } else { + break + } + } + + nw, err := wal.New(nil, nil, w.walDir) + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + + first, last, err := nw.Segments() + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + + if last == -1 { + level.Error(w.logger).Log("err", err) + return + } + + // Read series records in the current WAL and latest checkpoint, get the segment pointer back. + // TODO: callum, handle maintaining the WAL pointer somehow across apply configs? + segment, reader, err := w.readToEnd(w.walDir, first, last) + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + + w.currentSegment = last + + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + + for { + level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) + // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. + // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. + err := w.watch(nw, reader) + segment.Close() + if err != nil { + level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) + return + } + w.currentSegment++ + segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) + reader = wal.NewLiveReader(segment) + // TODO: callum, is this error really fatal? + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + } +} + +func checkpointNum(dir string) (string, error) { + // Checkpoint dir names are in the format checkpoint.000001 + chunks := strings.Split(dir, ".") + if len(chunks) != 2 { + return "", errors.Errorf("invalid checkpoint dir string: %s", dir) + } + return chunks[1], nil +} + +func getCheckpointSize(dir string) (int64, error) { + i := int64(0) + segs, err := fileutil.ReadDir(dir) + if err != nil { + return 0, err + } + for _, fn := range segs { + num, err := strconv.Atoi(fn) + if err != nil { + return i, err + } + sz, err := getSegmentSize(dir, num) + if err != nil { + return i, err + } + i += sz + } + return i, nil +} + +// Get size of segment. +func getSegmentSize(dir string, index int) (int64, error) { + i := int64(-1) + fi, err := os.Stat(wal.SegmentName(dir, index)) + if err == nil { + i = fi.Size() + } + return i, err +} + +func isClosed(c chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go new file mode 100644 index 0000000000..3a0b8f6da9 --- /dev/null +++ b/storage/remote/wal_watcher_test.go @@ -0,0 +1,371 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package remote + +import ( + "fmt" + "io/ioutil" + "math/rand" + "os" + "path" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/timestamp" + "github.com/prometheus/prometheus/prompb" + "github.com/prometheus/prometheus/util/testutil" + "github.com/prometheus/tsdb" + "github.com/prometheus/tsdb/labels" + "github.com/prometheus/tsdb/wal" +) + +type writeToMock struct { + samplesAppended int + seriesLabels map[uint64][]prompb.Label + seriesSegmentIndexes map[uint64]int +} + +func (wtm *writeToMock) Append(s []tsdb.RefSample) bool { + wtm.samplesAppended += len(s) + return true +} + +func (wtm *writeToMock) StoreSeries(series []tsdb.RefSeries, index int) { + temp := make(map[uint64][]prompb.Label, len(series)) + for _, s := range series { + ls := make(model.LabelSet, len(s.Labels)) + for _, label := range s.Labels { + ls[model.LabelName(label.Name)] = model.LabelValue(label.Value) + } + + temp[s.Ref] = labelsetToLabelsProto(ls) + } + // wtm.seriesMtx.Lock() + // defer t.seriesMtx.Unlock() + for ref, labels := range temp { + wtm.seriesLabels[ref] = labels + wtm.seriesSegmentIndexes[ref] = index + } +} + +func (wtm *writeToMock) SeriesReset(index int) { + // Check for series that are in segments older than the checkpoint + // that were not also present in the checkpoint. + for k, v := range wtm.seriesSegmentIndexes { + if v < index { + delete(wtm.seriesLabels, k) + delete(wtm.seriesSegmentIndexes, k) + } + } +} + +func (wtm *writeToMock) Name() string { + return "" +} + +func newWriteToMock() *writeToMock { + return &writeToMock{ + seriesLabels: make(map[uint64][]prompb.Label), + seriesSegmentIndexes: make(map[uint64]int), + } +} + +// we need a way to check the value of the wal watcher records read metrics, the samples and series records +// with these we could write some example segments and checkpoints and then write tests for readSegment/watch +// to see if we get back the write number of series records/samples records/etc., and that we read a whole checkpoint +// on startup and when a new one is created +// +// we could do the same thing for readToEnd, readCheckpoint, readSeriesRecords, etc. +func Test_readToEnd_noCheckpoint(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + dir, err := ioutil.TempDir("", "readToEnd_noCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + testutil.Ok(t, err) + + // var input [][]byte + var recs [][]byte + + enc := tsdb.RecordEncoder{} + + for i := 0; i < seriesCount; i++ { + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + recs = append(recs, series) + for j := 0; j < samplesCount; j++ { + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) + + recs = append(recs, sample) + + // Randomly batch up records. + if rand.Intn(4) < 3 { + testutil.Ok(t, w.Log(recs...)) + recs = recs[:0] + } + } + } + testutil.Ok(t, w.Log(recs...)) + + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + st := timestamp.FromTime(time.Now()) + watcher := NewWALWatcher(nil, wt, dir, st) + _, _, err = watcher.readToEnd(wdir, first, last) + testutil.Ok(t, err) + testutil.Equals(t, seriesCount, len(wt.seriesLabels)) +} + +func Test_readToEnd_withCheckpoint(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + dir, err := ioutil.TempDir("", "readToEnd_withCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + os.Create(wal.SegmentName(wdir, 30)) + + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + testutil.Ok(t, err) + + // write to the initial segment then checkpoint + for i := 0; i < seriesCount*10; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount*10; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) + w.Truncate(32) + + // write more records after checkpointing + for i := 0; i < seriesCount*10; i++ { + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(i), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount*10; j++ { + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(j), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + st := timestamp.FromTime(time.Now()) + watcher := NewWALWatcher(nil, wt, dir, st) + _, _, err = watcher.readToEnd(wdir, first, last) + testutil.Ok(t, err) + testutil.Equals(t, seriesCount*10*2, len(wt.seriesLabels)) +} + +func Test_readCheckpoint(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + dir, err := ioutil.TempDir("", "readCheckpoint") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + os.Create(wal.SegmentName(wdir, 30)) + + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, 128*pageSize) + testutil.Ok(t, err) + + // write to the initial segment then checkpoint + for i := 0; i < seriesCount*10; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount*10; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + tsdb.Checkpoint(w, 30, 31, func(x uint64) bool { return true }, 0) + w.Truncate(32) + + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + st := timestamp.FromTime(time.Now()) + watcher := NewWALWatcher(nil, wt, dir, st) + _, _, err = watcher.readToEnd(wdir, first, last) + testutil.Ok(t, err) + testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) +} + +func Test_checkpoint_seriesReset(t *testing.T) { + pageSize := 32 * 1024 + const seriesCount = 10 + const samplesCount = 250 + + dir, err := ioutil.TempDir("", "seriesReset") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wdir := path.Join(dir, "wal") + err = os.Mkdir(wdir, 0777) + testutil.Ok(t, err) + + enc := tsdb.RecordEncoder{} + w, err := wal.NewSize(nil, nil, wdir, pageSize) + // w. + testutil.Ok(t, err) + + // write to the initial segment then checkpoint + for i := 0; i < seriesCount*10; i++ { + ref := i + 100 + series := enc.Series([]tsdb.RefSeries{ + tsdb.RefSeries{ + Ref: uint64(ref), + Labels: labels.Labels{labels.Label{"__name__", fmt.Sprintf("metric_%d", i)}}, + }, + }, nil) + testutil.Ok(t, w.Log(series)) + + for j := 0; j < samplesCount*10; j++ { + inner := rand.Intn(ref + 1) + sample := enc.Samples([]tsdb.RefSample{ + tsdb.RefSample{ + Ref: uint64(inner), + T: int64(i), + V: float64(i), + }, + }, nil) + testutil.Ok(t, w.Log(sample)) + } + } + + first, last, err := w.Segments() + testutil.Ok(t, err) + + wt := newWriteToMock() + st := timestamp.FromTime(time.Now()) + watcher := NewWALWatcher(nil, wt, dir, st) + _, _, err = watcher.readToEnd(wdir, first, last) + testutil.Ok(t, err) + testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) + + // If you modify the checkpoint and truncate segment #'s run the test to see how + // many series records you end up with and change the last Equals check accordingly + // or modify the Equals to Assert(len(wt.seriesLabels) < seriesCount*10) + _, err = tsdb.Checkpoint(w, 50, 200, func(x uint64) bool { return true }, 0) + testutil.Ok(t, err) + w.Truncate(200) + + cp, _, err := tsdb.LastCheckpoint(path.Join(dir, "wal")) + testutil.Ok(t, err) + err = watcher.readCheckpoint(cp) + testutil.Ok(t, err) +} + +func Test_decodeRecord(t *testing.T) { + dir, err := ioutil.TempDir("", "decodeRecord") + testutil.Ok(t, err) + defer os.RemoveAll(dir) + + wt := newWriteToMock() + st := timestamp.FromTime(time.Now()) + watcher := NewWALWatcher(nil, wt, dir, st) + + // decode a series record + enc := tsdb.RecordEncoder{} + buf := enc.Series([]tsdb.RefSeries{tsdb.RefSeries{Ref: 1234, Labels: labels.Labels{}}}, nil) + watcher.decodeRecord(buf) + testutil.Ok(t, err) + + testutil.Equals(t, 1, len(wt.seriesLabels)) + + // decode a samples record + buf = enc.Samples([]tsdb.RefSample{tsdb.RefSample{Ref: 100, T: 1, V: 1.0}, tsdb.RefSample{Ref: 100, T: 2, V: 2.0}}, nil) + watcher.decodeRecord(buf) + testutil.Ok(t, err) + + testutil.Equals(t, 2, wt.samplesAppended) +} diff --git a/storage/remote/write.go b/storage/remote/write.go index 93beca2f5b..c718f88103 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -14,7 +14,6 @@ package remote import ( - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" ) @@ -26,16 +25,10 @@ func (s *Storage) Appender() (storage.Appender, error) { // Add implements storage.Appender. func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) { - s.mtx.RLock() - defer s.mtx.RUnlock() - for _, q := range s.queues { - if err := q.Append(&model.Sample{ - Metric: labelsToMetric(l), - Timestamp: model.Time(t), - Value: model.SampleValue(v), - }); err != nil { - panic(err) // QueueManager.Append() should always return nil as per doc string. - } + s.samplesIn.incr(1) + s.samplesInMetric.Inc() + if t > s.highestTimestamp { + s.highestTimestamp = t } return 0, nil } @@ -47,7 +40,8 @@ func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error { } // Commit implements storage.Appender. -func (*Storage) Commit() error { +func (s *Storage) Commit() error { + s.highestTimestampMetric.Set(float64(s.highestTimestamp)) return nil } diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go index 0efdd32d34..05043138e6 100644 --- a/web/api/v1/api_test.go +++ b/web/api/v1/api_test.go @@ -33,6 +33,7 @@ import ( "github.com/go-kit/kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/client_golang/prometheus" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" "github.com/prometheus/common/promlog" @@ -279,9 +280,14 @@ func TestEndpoints(t *testing.T) { Format: &af, } - remote := remote.NewStorage(promlog.New(&promlogConfig), func() (int64, error) { + dbDir, err := ioutil.TempDir("", "tsdb-api-ready") + testutil.Ok(t, err) + defer os.RemoveAll(dbDir) + + testutil.Ok(t, err) + remote := remote.NewStorage(promlog.New(&promlogConfig), prometheus.DefaultRegisterer, func() (int64, error) { return 0, nil - }, 1*time.Second) + }, dbDir, 1*time.Second) err = remote.ApplyConfig(&config.Config{ RemoteReadConfigs: []*config.RemoteReadConfig{