Add WaitForIndexing.

Change-Id: I5a5c975c4246632f937413322c855bbe63d00802
This commit is contained in:
Bjoern Rabenstein 2014-09-24 14:41:38 +02:00
parent c7aad110fb
commit 0031a448e2
3 changed files with 34 additions and 9 deletions

View file

@ -104,6 +104,11 @@ type Persistence interface {
// DropArchivedFingerprint.) If the queue is full, this method blocks // DropArchivedFingerprint.) If the queue is full, this method blocks
// until the metric can be queued. This method is goroutine-safe. // until the metric can be queued. This method is goroutine-safe.
UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint) UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint)
// WaitForIndexing waits until all items in the indexing queue are
// processed. If queue processing is currently on hold (to gather more
// ops for batching), this method will trigger an immediate start of
// processing.
WaitForIndexing()
// ArchiveMetric persists the mapping of the given fingerprint to the // ArchiveMetric persists the mapping of the given fingerprint to the
// given metric, together with the first and last timestamp of the // given metric, together with the first and last timestamp of the

View file

@ -81,6 +81,7 @@ type diskPersistence struct {
indexingQueue chan indexingOp indexingQueue chan indexingOp
indexingStopped chan struct{} indexingStopped chan struct{}
indexingFlush chan chan int
} }
// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. // NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use.
@ -117,6 +118,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (Persistence, error) {
indexingQueue: make(chan indexingOp, indexingQueueCapacity), indexingQueue: make(chan indexingOp, indexingQueueCapacity),
indexingStopped: make(chan struct{}), indexingStopped: make(chan struct{}),
indexingFlush: make(chan chan int),
} }
go p.processIndexingQueue() go p.processIndexingQueue()
return p, nil return p, nil
@ -511,6 +513,17 @@ func (p *diskPersistence) ArchiveMetric(
return nil return nil
} }
// WaitForIndexing implements persistence.
func (p *diskPersistence) WaitForIndexing() {
wait := make(chan int)
for {
p.indexingFlush <- wait
if <-wait == 0 {
break
}
}
}
func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
) { ) {
@ -636,19 +649,31 @@ func (p *diskPersistence) processIndexingQueue() {
batchSize = 0 batchSize = 0
nameToValues = index.LabelNameLabelValuesMapping{} nameToValues = index.LabelNameLabelValuesMapping{}
pairToFPs = index.LabelPairFingerprintsMapping{} pairToFPs = index.LabelPairFingerprintsMapping{}
batchTimeout.Reset(indexingBatchTimeout)
} }
var flush chan chan int
loop: loop:
for { for {
// Only process flush requests if the queue is currently empty.
if len(p.indexingQueue) == 0 {
flush = p.indexingFlush
} else {
flush = nil
}
select { select {
case <-batchTimeout.C: case <-batchTimeout.C:
if batchSize > 0 { if batchSize > 0 {
commitBatch() commitBatch()
} } else {
batchTimeout.Reset(indexingBatchTimeout) batchTimeout.Reset(indexingBatchTimeout)
}
case r := <-flush:
if batchSize > 0 {
commitBatch()
}
r <- len(p.indexingQueue)
case op, ok := <-p.indexingQueue: case op, ok := <-p.indexingQueue:
batchTimeout.Stop()
if !ok { if !ok {
if batchSize > 0 { if batchSize > 0 {
commitBatch() commitBatch()
@ -696,7 +721,6 @@ loop:
if batchSize >= indexingMaxBatchSize { if batchSize >= indexingMaxBatchSize {
commitBatch() commitBatch()
} }
batchTimeout.Reset(indexingBatchTimeout)
} }
} }
close(p.indexingStopped) close(p.indexingStopped)

View file

@ -16,7 +16,6 @@ package local
import ( import (
"reflect" "reflect"
"testing" "testing"
"time"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
@ -261,15 +260,11 @@ func TestIndexing(t *testing.T) {
} }
indexedFpsToMetrics[fp] = m indexedFpsToMetrics[fp] = m
} }
// TODO: Find a better solution than sleeping.
time.Sleep(2 * indexingBatchTimeout)
verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence))
} }
for i := len(batches) - 1; i >= 0; i-- { for i := len(batches) - 1; i >= 0; i-- {
b := batches[i] b := batches[i]
// TODO: Find a better solution than sleeping.
time.Sleep(2 * indexingBatchTimeout)
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence))
for fp, m := range b.fpToMetric { for fp, m := range b.fpToMetric {
p.UnindexMetric(m, fp) p.UnindexMetric(m, fp)
@ -286,6 +281,7 @@ func TestIndexing(t *testing.T) {
} }
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) { func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) {
p.WaitForIndexing()
for fp, m := range indexedFpsToMetrics { for fp, m := range indexedFpsToMetrics {
// Compare archived metrics with input metrics. // Compare archived metrics with input metrics.
mOut, err := p.GetArchivedMetric(fp) mOut, err := p.GetArchivedMetric(fp)