mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-24 21:24:05 -08:00
promql: make active query tracker context-aware (#6701)
* promql: make query logger context-aware * Remove gate Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
parent
ddd49b743c
commit
d992c36b3a
|
@ -33,7 +33,6 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/prometheus/common/model"
|
"github.com/prometheus/common/model"
|
||||||
|
|
||||||
"github.com/prometheus/prometheus/pkg/gate"
|
|
||||||
"github.com/prometheus/prometheus/pkg/labels"
|
"github.com/prometheus/prometheus/pkg/labels"
|
||||||
"github.com/prometheus/prometheus/pkg/timestamp"
|
"github.com/prometheus/prometheus/pkg/timestamp"
|
||||||
"github.com/prometheus/prometheus/pkg/value"
|
"github.com/prometheus/prometheus/pkg/value"
|
||||||
|
@ -189,12 +188,6 @@ func (q *query) Exec(ctx context.Context) *Result {
|
||||||
span.SetTag(queryTag, q.stmt.String())
|
span.SetTag(queryTag, q.stmt.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Log query in active log.
|
|
||||||
if q.ng.activeQueryTracker != nil {
|
|
||||||
queryIndex := q.ng.activeQueryTracker.Insert(q.q)
|
|
||||||
defer q.ng.activeQueryTracker.Delete(queryIndex)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exec query.
|
// Exec query.
|
||||||
res, warnings, err := q.ng.exec(ctx, q)
|
res, warnings, err := q.ng.exec(ctx, q)
|
||||||
|
|
||||||
|
@ -236,7 +229,6 @@ type Engine struct {
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
metrics *engineMetrics
|
metrics *engineMetrics
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
gate *gate.Gate
|
|
||||||
maxSamplesPerQuery int
|
maxSamplesPerQuery int
|
||||||
activeQueryTracker *ActiveQueryTracker
|
activeQueryTracker *ActiveQueryTracker
|
||||||
queryLogger QueryLogger
|
queryLogger QueryLogger
|
||||||
|
@ -323,7 +315,6 @@ func NewEngine(opts EngineOpts) *Engine {
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Engine{
|
return &Engine{
|
||||||
gate: gate.New(opts.MaxConcurrent),
|
|
||||||
timeout: opts.Timeout,
|
timeout: opts.Timeout,
|
||||||
logger: opts.Logger,
|
logger: opts.Logger,
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
|
@ -466,12 +457,16 @@ func (ng *Engine) exec(ctx context.Context, q *query) (v Value, w storage.Warnin
|
||||||
defer execSpanTimer.Finish()
|
defer execSpanTimer.Finish()
|
||||||
|
|
||||||
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
|
queueSpanTimer, _ := q.stats.GetSpanTimer(ctx, stats.ExecQueueTime, ng.metrics.queryQueueTime)
|
||||||
|
// Log query in active log. The active log guarantees that we don't run over
|
||||||
if err := ng.gate.Start(ctx); err != nil {
|
// MaxConcurrent queries.
|
||||||
return nil, nil, contextErr(err, "query queue")
|
if ng.activeQueryTracker != nil {
|
||||||
|
queryIndex, err := ng.activeQueryTracker.Insert(ctx, q.q)
|
||||||
|
if err != nil {
|
||||||
|
queueSpanTimer.Finish()
|
||||||
|
return nil, nil, contextErr(err, "query queue")
|
||||||
|
}
|
||||||
|
defer ng.activeQueryTracker.Delete(queryIndex)
|
||||||
}
|
}
|
||||||
defer ng.gate.Done()
|
|
||||||
|
|
||||||
queueSpanTimer.Finish()
|
queueSpanTimer.Finish()
|
||||||
|
|
||||||
// Cancel when execution is done or an error was raised.
|
// Cancel when execution is done or an error was raised.
|
||||||
|
|
|
@ -16,6 +16,8 @@ package promql
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
@ -28,12 +30,20 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestQueryConcurrency(t *testing.T) {
|
func TestQueryConcurrency(t *testing.T) {
|
||||||
|
maxConcurrency := 10
|
||||||
|
|
||||||
|
dir, err := ioutil.TempDir("", "test_concurrency")
|
||||||
|
testutil.Ok(t, err)
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
queryTracker := NewActiveQueryTracker(dir, maxConcurrency, nil)
|
||||||
|
|
||||||
opts := EngineOpts{
|
opts := EngineOpts{
|
||||||
Logger: nil,
|
Logger: nil,
|
||||||
Reg: nil,
|
Reg: nil,
|
||||||
MaxConcurrent: 10,
|
MaxConcurrent: maxConcurrency,
|
||||||
MaxSamples: 10,
|
MaxSamples: 10,
|
||||||
Timeout: 100 * time.Second,
|
Timeout: 100 * time.Second,
|
||||||
|
ActiveQueryTracker: queryTracker,
|
||||||
}
|
}
|
||||||
|
|
||||||
engine := NewEngine(opts)
|
engine := NewEngine(opts)
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package promql
|
package promql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -174,13 +175,17 @@ func (tracker ActiveQueryTracker) Delete(insertIndex int) {
|
||||||
tracker.getNextIndex <- insertIndex
|
tracker.getNextIndex <- insertIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tracker ActiveQueryTracker) Insert(query string) int {
|
func (tracker ActiveQueryTracker) Insert(ctx context.Context, query string) (int, error) {
|
||||||
i, fileBytes := <-tracker.getNextIndex, tracker.mmapedFile
|
select {
|
||||||
entry := newJSONEntry(query, tracker.logger)
|
case i := <-tracker.getNextIndex:
|
||||||
start, end := i, i+entrySize
|
fileBytes := tracker.mmapedFile
|
||||||
|
entry := newJSONEntry(query, tracker.logger)
|
||||||
|
start, end := i, i+entrySize
|
||||||
|
|
||||||
copy(fileBytes[start:], entry)
|
copy(fileBytes[start:], entry)
|
||||||
copy(fileBytes[end-1:], ",")
|
copy(fileBytes[end-1:], ",")
|
||||||
|
return i, nil
|
||||||
return i
|
case <-ctx.Done():
|
||||||
|
return 0, ctx.Err()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
package promql
|
package promql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
@ -51,7 +52,7 @@ func TestQueryLogging(t *testing.T) {
|
||||||
start := 1 + i*entrySize
|
start := 1 + i*entrySize
|
||||||
end := start + entrySize
|
end := start + entrySize
|
||||||
|
|
||||||
queryLogger.Insert(queries[i])
|
queryLogger.Insert(context.Background(), queries[i])
|
||||||
|
|
||||||
have := string(fileAsBytes[start:end])
|
have := string(fileAsBytes[start:end])
|
||||||
if !regexp.MustCompile(want[i]).MatchString(have) {
|
if !regexp.MustCompile(want[i]).MatchString(have) {
|
||||||
|
@ -77,16 +78,16 @@ func TestIndexReuse(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
queryLogger.generateIndices(3)
|
queryLogger.generateIndices(3)
|
||||||
queryLogger.Insert("TestQuery1")
|
queryLogger.Insert(context.Background(), "TestQuery1")
|
||||||
queryLogger.Insert("TestQuery2")
|
queryLogger.Insert(context.Background(), "TestQuery2")
|
||||||
queryLogger.Insert("TestQuery3")
|
queryLogger.Insert(context.Background(), "TestQuery3")
|
||||||
|
|
||||||
queryLogger.Delete(1 + entrySize)
|
queryLogger.Delete(1 + entrySize)
|
||||||
queryLogger.Delete(1)
|
queryLogger.Delete(1)
|
||||||
newQuery2 := "ThisShouldBeInsertedAtIndex2"
|
newQuery2 := "ThisShouldBeInsertedAtIndex2"
|
||||||
newQuery1 := "ThisShouldBeInsertedAtIndex1"
|
newQuery1 := "ThisShouldBeInsertedAtIndex1"
|
||||||
queryLogger.Insert(newQuery2)
|
queryLogger.Insert(context.Background(), newQuery2)
|
||||||
queryLogger.Insert(newQuery1)
|
queryLogger.Insert(context.Background(), newQuery1)
|
||||||
|
|
||||||
want := []string{
|
want := []string{
|
||||||
`^{"query":"ThisShouldBeInsertedAtIndex1","timestamp_sec":\d+}\x00*,$`,
|
`^{"query":"ThisShouldBeInsertedAtIndex1","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
|
Loading…
Reference in a new issue