diff --git a/storage/local/interface.go b/storage/local/interface.go index b4e216b1ce..79dc47a5d4 100644 --- a/storage/local/interface.go +++ b/storage/local/interface.go @@ -104,6 +104,11 @@ type Persistence interface { // DropArchivedFingerprint.) If the queue is full, this method blocks // until the metric can be queued. This method is goroutine-safe. UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) + // WaitForIndexing waits until all items in the indexing queue are + // processed. If queue processing is currently on hold (to gather more + // ops for batching), this method will trigger an immediate start of + // processing. + WaitForIndexing() // ArchiveMetric persists the mapping of the given fingerprint to the // given metric, together with the first and last timestamp of the diff --git a/storage/local/persistence.go b/storage/local/persistence.go index d105cf081b..37ff9b781a 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -81,6 +81,7 @@ type diskPersistence struct { indexingQueue chan indexingOp indexingStopped chan struct{} + indexingFlush chan chan int } // NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. @@ -117,6 +118,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) { indexingQueue: make(chan indexingOp, indexingQueueCapacity), indexingStopped: make(chan struct{}), + indexingFlush: make(chan chan int), } go p.processIndexingQueue() return p, nil @@ -511,6 +513,17 @@ func (p *diskPersistence) ArchiveMetric( return nil } +// WaitForIndexing implements persistence. +func (p *diskPersistence) WaitForIndexing() { + wait := make(chan int) + for { + p.indexingFlush <- wait + if <-wait == 0 { + break + } + } +} + func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, ) { @@ -636,19 +649,31 @@ func (p *diskPersistence) processIndexingQueue() { batchSize = 0 nameToValues = index.LabelNameLabelValuesMapping{} pairToFPs = index.LabelPairFingerprintsMapping{} + batchTimeout.Reset(indexingBatchTimeout) } + var flush chan chan int loop: for { + // Only process flush requests if the queue is currently empty. + if len(p.indexingQueue) == 0 { + flush = p.indexingFlush + } else { + flush = nil + } select { case <-batchTimeout.C: if batchSize > 0 { commitBatch() + } else { + batchTimeout.Reset(indexingBatchTimeout) } - batchTimeout.Reset(indexingBatchTimeout) + case r := <-flush: + if batchSize > 0 { + commitBatch() + } + r <- len(p.indexingQueue) case op, ok := <-p.indexingQueue: - batchTimeout.Stop() - if !ok { if batchSize > 0 { commitBatch() @@ -696,7 +721,6 @@ loop: if batchSize >= indexingMaxBatchSize { commitBatch() } - batchTimeout.Reset(indexingBatchTimeout) } } close(p.indexingStopped) diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 525c74bf38..32da344fa4 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -16,7 +16,6 @@ package local import ( "reflect" "testing" - "time" clientmodel "github.com/prometheus/client_golang/model" @@ -261,15 +260,11 @@ func TestIndexing(t *testing.T) { } indexedFpsToMetrics[fp] = m } - // TODO: Find a better solution than sleeping. - time.Sleep(2 * indexingBatchTimeout) verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) } for i := len(batches) - 1; i >= 0; i-- { b := batches[i] - // TODO: Find a better solution than sleeping. - time.Sleep(2 * indexingBatchTimeout) verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) for fp, m := range b.fpToMetric { p.UnindexMetric(m, fp) @@ -286,6 +281,7 @@ func TestIndexing(t *testing.T) { } func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) { + p.WaitForIndexing() for fp, m := range indexedFpsToMetrics { // Compare archived metrics with input metrics. mOut, err := p.GetArchivedMetric(fp)