Fix append queue telemetry and parameterize sizes.

The original append queue telemetry never worked, because it was
updated only upon the exit of the select statement, which would
usually liberate the queues of contents.  This has been fixed to
be reported arbitrarily.

The queue sizes are now parameterizable via flags.
This commit is contained in:
Matt T. Proud 2013-04-16 17:13:29 +02:00
parent 888b64c781
commit d468271e2f
2 changed files with 13 additions and 3 deletions

View file

@ -39,6 +39,8 @@ var (
scrapeResultsQueueCapacity = flag.Int("scrapeResultsQueueCapacity", 4096, "The size of the scrape results queue.")
ruleResultsQueueCapacity = flag.Int("ruleResultsQueueCapacity", 4096, "The size of the rule results queue.")
concurrentRetrievalAllowance = flag.Int("concurrentRetrievalAllowance", 15, "The number of concurrent metrics retrieval requests allowed.")
diskAppendQueueCapacity = flag.Int("queue.diskAppendCapacity", 1000000, "The size of the queue for items that are pending writing to disk.")
memoryAppendQueueCapacity = flag.Int("queue.memoryAppendCapacity", 1000000, "The size of the queue for items that are pending writing to memory.")
)
func main() {
@ -48,7 +50,7 @@ func main() {
log.Fatalf("Error loading configuration from %s: %v", *configFile, err)
}
ts, err := metric.NewTieredStorage(5000, 5000, 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
ts, err := metric.NewTieredStorage(uint(*memoryAppendQueueCapacity), uint(*diskAppendQueueCapacity), 100, time.Second*30, time.Second*1, time.Second*20, *metricsStoragePath)
if err != nil {
log.Fatalf("Error opening storage: %s", err)
}

View file

@ -166,9 +166,17 @@ func (t *tieredStorage) Serve() {
writeMemoryTicker = time.Tick(t.writeMemoryInterval)
)
for {
t.reportQueues()
go func() {
reportTicker := time.Tick(time.Second)
for {
<-reportTicker
t.reportQueues()
}
}()
for {
select {
case <-writeMemoryTicker:
t.writeMemory()