mirror of
https://github.com/prometheus/prometheus.git
synced 2024-12-25 21:54:10 -08:00
* Added query logging for prometheus. Options added: 1) active.queries.filepath: Filename where queries will be recorded 2) active.queries.filesize: Size of the file where queries will be recorded. Functionality added: All active queries are now logged in a file. If prometheus crashes unexpectedly, these queries are also printed out on stdout in the rerun. Queries are written concurrently to an mmaped file, and removed once they are done. Their positions in the file are reused. They are written in json format. However, due to dynamic nature of application, the json has an extra comma after the last query, and is missing an ending ']'. There may also null bytes in the tail of file. Signed-off-by: Advait Bhatwadekar <advait123@ymail.com>
This commit is contained in:
parent
0f00737308
commit
5d401f1e1b
|
@ -353,12 +353,14 @@ func main() {
|
||||||
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
|
scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage)
|
||||||
|
|
||||||
opts = promql.EngineOpts{
|
opts = promql.EngineOpts{
|
||||||
Logger: log.With(logger, "component", "query engine"),
|
Logger: log.With(logger, "component", "query engine"),
|
||||||
Reg: prometheus.DefaultRegisterer,
|
Reg: prometheus.DefaultRegisterer,
|
||||||
MaxConcurrent: cfg.queryConcurrency,
|
MaxConcurrent: cfg.queryConcurrency,
|
||||||
MaxSamples: cfg.queryMaxSamples,
|
MaxSamples: cfg.queryMaxSamples,
|
||||||
Timeout: time.Duration(cfg.queryTimeout),
|
Timeout: time.Duration(cfg.queryTimeout),
|
||||||
|
ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")),
|
||||||
}
|
}
|
||||||
|
|
||||||
queryEngine = promql.NewEngine(opts)
|
queryEngine = promql.NewEngine(opts)
|
||||||
|
|
||||||
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
ruleManager = rules.NewManager(&rules.ManagerOptions{
|
||||||
|
@ -406,7 +408,7 @@ func main() {
|
||||||
cfg.web.Flags[f.Name] = f.Value.String()
|
cfg.web.Flags[f.Name] = f.Value.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager
|
// Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager.
|
||||||
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
|
webHandler := web.New(log.With(logger, "component", "web"), &cfg.web)
|
||||||
|
|
||||||
// Monitor outgoing connections on default transport with conntrack.
|
// Monitor outgoing connections on default transport with conntrack.
|
||||||
|
|
1
go.mod
1
go.mod
|
@ -9,6 +9,7 @@ require (
|
||||||
github.com/aws/aws-sdk-go v1.15.24
|
github.com/aws/aws-sdk-go v1.15.24
|
||||||
github.com/cespare/xxhash v1.1.0
|
github.com/cespare/xxhash v1.1.0
|
||||||
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
|
||||||
|
github.com/edsrzf/mmap-go v1.0.0
|
||||||
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
|
github.com/evanphx/json-patch v4.1.0+incompatible // indirect
|
||||||
github.com/go-kit/kit v0.8.0
|
github.com/go-kit/kit v0.8.0
|
||||||
github.com/go-logfmt/logfmt v0.4.0
|
github.com/go-logfmt/logfmt v0.4.0
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -54,6 +54,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
|
||||||
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
|
||||||
|
github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw=
|
||||||
|
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
|
||||||
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||||
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
|
||||||
github.com/evanphx/json-patch v4.1.0+incompatible h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=
|
github.com/evanphx/json-patch v4.1.0+incompatible h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc=
|
||||||
|
|
|
@ -176,7 +176,19 @@ func (q *query) Exec(ctx context.Context) *Result {
|
||||||
span.SetTag(queryTag, q.stmt.String())
|
span.SetTag(queryTag, q.stmt.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Log query in active log.
|
||||||
|
var queryIndex int
|
||||||
|
if q.ng.activeQueryTracker != nil {
|
||||||
|
queryIndex = q.ng.activeQueryTracker.Insert(q.q)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec query.
|
||||||
res, warnings, err := q.ng.exec(ctx, q)
|
res, warnings, err := q.ng.exec(ctx, q)
|
||||||
|
|
||||||
|
// Delete query from active log.
|
||||||
|
if q.ng.activeQueryTracker != nil {
|
||||||
|
q.ng.activeQueryTracker.Delete(queryIndex)
|
||||||
|
}
|
||||||
return &Result{Err: err, Value: res, Warnings: warnings}
|
return &Result{Err: err, Value: res, Warnings: warnings}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,11 +213,12 @@ func contextErr(err error, env string) error {
|
||||||
|
|
||||||
// EngineOpts contains configuration options used when creating a new Engine.
|
// EngineOpts contains configuration options used when creating a new Engine.
|
||||||
type EngineOpts struct {
|
type EngineOpts struct {
|
||||||
Logger log.Logger
|
Logger log.Logger
|
||||||
Reg prometheus.Registerer
|
Reg prometheus.Registerer
|
||||||
MaxConcurrent int
|
MaxConcurrent int
|
||||||
MaxSamples int
|
MaxSamples int
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
ActiveQueryTracker *ActiveQueryTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine handles the lifetime of queries from beginning to end.
|
// Engine handles the lifetime of queries from beginning to end.
|
||||||
|
@ -216,6 +229,7 @@ type Engine struct {
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
gate *gate.Gate
|
gate *gate.Gate
|
||||||
maxSamplesPerQuery int
|
maxSamplesPerQuery int
|
||||||
|
activeQueryTracker *ActiveQueryTracker
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine returns a new engine.
|
// NewEngine returns a new engine.
|
||||||
|
@ -282,12 +296,14 @@ func NewEngine(opts EngineOpts) *Engine {
|
||||||
metrics.queryResultSort,
|
metrics.queryResultSort,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Engine{
|
return &Engine{
|
||||||
gate: gate.New(opts.MaxConcurrent),
|
gate: gate.New(opts.MaxConcurrent),
|
||||||
timeout: opts.Timeout,
|
timeout: opts.Timeout,
|
||||||
logger: opts.Logger,
|
logger: opts.Logger,
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
maxSamplesPerQuery: opts.MaxSamples,
|
maxSamplesPerQuery: opts.MaxSamples,
|
||||||
|
activeQueryTracker: opts.ActiveQueryTracker,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
183
promql/query_logger.go
Normal file
183
promql/query_logger.go
Normal file
|
@ -0,0 +1,183 @@
|
||||||
|
// Copyright 2019 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package promql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/edsrzf/mmap-go"
|
||||||
|
"github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ActiveQueryTracker struct {
|
||||||
|
mmapedFile []byte
|
||||||
|
getNextIndex chan int
|
||||||
|
logger log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type Entry struct {
|
||||||
|
Query string `json:"query"`
|
||||||
|
Timestamp int64 `json:"timestamp_sec"`
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
entrySize int = 1000
|
||||||
|
)
|
||||||
|
|
||||||
|
func parseBrokenJson(brokenJson []byte, logger log.Logger) (bool, string) {
|
||||||
|
queries := strings.ReplaceAll(string(brokenJson), "\x00", "")
|
||||||
|
queries = queries[:len(queries)-1] + "]"
|
||||||
|
|
||||||
|
// Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['.
|
||||||
|
if len(queries) == 1 {
|
||||||
|
return false, "[]"
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, queries
|
||||||
|
}
|
||||||
|
|
||||||
|
func logUnfinishedQueries(filename string, filesize int, logger log.Logger) {
|
||||||
|
if _, err := os.Stat(filename); err == nil {
|
||||||
|
fd, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Failed to open query log file", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
brokenJson := make([]byte, filesize)
|
||||||
|
_, err = fd.Read(brokenJson)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Failed to read query log file", "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
queriesExist, queries := parseBrokenJson(brokenJson, logger)
|
||||||
|
if !queriesExist {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
level.Info(logger).Log("msg", "These queries didn't finish in prometheus' last run:", "queries", queries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getMMapedFile(filename string, filesize int, logger log.Logger) (error, []byte) {
|
||||||
|
|
||||||
|
file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Error opening query log file", "file", filename, "err", err)
|
||||||
|
return err, []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = file.Truncate(int64(filesize))
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err)
|
||||||
|
return err, []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attemped size", filesize, "err", err)
|
||||||
|
return err, []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err, fileAsBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.Logger) *ActiveQueryTracker {
|
||||||
|
err := os.MkdirAll(localStoragePath, 0777)
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Failed to create directory for logging active queries")
|
||||||
|
}
|
||||||
|
|
||||||
|
filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxQueries*entrySize
|
||||||
|
logUnfinishedQueries(filename, filesize, logger)
|
||||||
|
|
||||||
|
err, fileAsBytes := getMMapedFile(filename, filesize, logger)
|
||||||
|
if err != nil {
|
||||||
|
panic("Unable to create mmap-ed active query log")
|
||||||
|
}
|
||||||
|
|
||||||
|
copy(fileAsBytes, "[")
|
||||||
|
activeQueryTracker := ActiveQueryTracker{
|
||||||
|
mmapedFile: fileAsBytes,
|
||||||
|
getNextIndex: make(chan int, maxQueries),
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
|
||||||
|
activeQueryTracker.generateIndices(maxQueries)
|
||||||
|
|
||||||
|
return &activeQueryTracker
|
||||||
|
}
|
||||||
|
|
||||||
|
func trimStringByBytes(str string, size int) string {
|
||||||
|
bytesStr := []byte(str)
|
||||||
|
|
||||||
|
trimIndex := len(bytesStr)
|
||||||
|
if size < len(bytesStr) {
|
||||||
|
for !utf8.RuneStart(bytesStr[size]) {
|
||||||
|
size -= 1
|
||||||
|
}
|
||||||
|
trimIndex = size
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(bytesStr[:trimIndex])
|
||||||
|
}
|
||||||
|
|
||||||
|
func _newJsonEntry(query string, timestamp int64, logger log.Logger) []byte {
|
||||||
|
entry := Entry{query, timestamp}
|
||||||
|
jsonEntry, err := json.Marshal(entry)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
level.Error(logger).Log("msg", "Cannot create json of query", "query", query)
|
||||||
|
return []byte{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return jsonEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJsonEntry(query string, logger log.Logger) []byte {
|
||||||
|
timestamp := time.Now().Unix()
|
||||||
|
minEntryJson := _newJsonEntry("", timestamp, logger)
|
||||||
|
|
||||||
|
query = trimStringByBytes(query, entrySize-(len(minEntryJson)+1))
|
||||||
|
jsonEntry := _newJsonEntry(query, timestamp, logger)
|
||||||
|
|
||||||
|
return jsonEntry
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tracker ActiveQueryTracker) generateIndices(maxQueries int) {
|
||||||
|
for i := 0; i < maxQueries; i++ {
|
||||||
|
tracker.getNextIndex <- 1 + (i * entrySize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tracker ActiveQueryTracker) Delete(insertIndex int) {
|
||||||
|
copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize))
|
||||||
|
tracker.getNextIndex <- insertIndex
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tracker ActiveQueryTracker) Insert(query string) int {
|
||||||
|
i, fileBytes := <-tracker.getNextIndex, tracker.mmapedFile
|
||||||
|
entry := newJsonEntry(query, tracker.logger)
|
||||||
|
start, end := i, i+entrySize
|
||||||
|
|
||||||
|
copy(fileBytes[start:], entry)
|
||||||
|
copy(fileBytes[end-1:], ",")
|
||||||
|
|
||||||
|
return i
|
||||||
|
}
|
138
promql/query_logger_test.go
Normal file
138
promql/query_logger_test.go
Normal file
|
@ -0,0 +1,138 @@
|
||||||
|
// Copyright 2019 The Prometheus Authors
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package promql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"regexp"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQueryLogging(t *testing.T) {
|
||||||
|
fileAsBytes := make([]byte, 4096)
|
||||||
|
queryLogger := ActiveQueryTracker{
|
||||||
|
mmapedFile: fileAsBytes,
|
||||||
|
logger: nil,
|
||||||
|
getNextIndex: make(chan int, 4),
|
||||||
|
}
|
||||||
|
|
||||||
|
queryLogger.generateIndices(4)
|
||||||
|
veryLongString := "MassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybe"
|
||||||
|
queries := []string{
|
||||||
|
"TestQuery",
|
||||||
|
veryLongString,
|
||||||
|
"",
|
||||||
|
"SpecialCharQuery{host=\"2132132\", id=123123}",
|
||||||
|
}
|
||||||
|
|
||||||
|
want := []string{
|
||||||
|
`^{"query":"TestQuery","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
`^{"query":"` + trimStringByBytes(veryLongString, entrySize-40) + `","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
`^{"query":"","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
`^{"query":"SpecialCharQuery{host=\\"2132132\\", id=123123}","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for inserts of queries.
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
start := 1 + i*entrySize
|
||||||
|
end := start + entrySize
|
||||||
|
|
||||||
|
queryLogger.Insert(queries[i])
|
||||||
|
|
||||||
|
have := string(fileAsBytes[start:end])
|
||||||
|
if !regexp.MustCompile(want[i]).MatchString(have) {
|
||||||
|
t.Fatalf("Query not written correctly: %s.\nHave %s\nWant %s", queries[i], have, want[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all queries have been deleted.
|
||||||
|
for i := 0; i < 4; i++ {
|
||||||
|
queryLogger.Delete(1 + i*entrySize)
|
||||||
|
}
|
||||||
|
if !regexp.MustCompile(`^\x00+$`).Match(fileAsBytes[1 : 1+entrySize*4]) {
|
||||||
|
t.Fatalf("All queries not deleted properly. Have %s\nWant only null bytes \\x00", string(fileAsBytes[1:1+entrySize*4]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestIndexReuse(t *testing.T) {
|
||||||
|
queryBytes := make([]byte, 1+3*entrySize)
|
||||||
|
queryLogger := ActiveQueryTracker{
|
||||||
|
mmapedFile: queryBytes,
|
||||||
|
logger: nil,
|
||||||
|
getNextIndex: make(chan int, 3),
|
||||||
|
}
|
||||||
|
|
||||||
|
queryLogger.generateIndices(3)
|
||||||
|
queryLogger.Insert("TestQuery1")
|
||||||
|
queryLogger.Insert("TestQuery2")
|
||||||
|
queryLogger.Insert("TestQuery3")
|
||||||
|
|
||||||
|
queryLogger.Delete(1 + entrySize)
|
||||||
|
queryLogger.Delete(1)
|
||||||
|
newQuery2 := "ThisShouldBeInsertedAtIndex2"
|
||||||
|
newQuery1 := "ThisShouldBeInsertedAtIndex1"
|
||||||
|
queryLogger.Insert(newQuery2)
|
||||||
|
queryLogger.Insert(newQuery1)
|
||||||
|
|
||||||
|
want := []string{
|
||||||
|
`^{"query":"ThisShouldBeInsertedAtIndex1","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
`^{"query":"ThisShouldBeInsertedAtIndex2","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
`^{"query":"TestQuery3","timestamp_sec":\d+}\x00*,$`,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check all bytes and verify new query was inserted at index 2
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
start := 1 + i*entrySize
|
||||||
|
end := start + entrySize
|
||||||
|
|
||||||
|
have := queryBytes[start:end]
|
||||||
|
if !regexp.MustCompile(want[i]).Match(have) {
|
||||||
|
t.Fatalf("Index not reused properly:\nHave %s\nWant %s", string(queryBytes[start:end]), want[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMMapFile(t *testing.T) {
|
||||||
|
file, err := ioutil.TempFile("", "mmapedFile")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create temp test file. %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filename := file.Name()
|
||||||
|
defer os.Remove(filename)
|
||||||
|
|
||||||
|
err, fileAsBytes := getMMapedFile(filename, 2, nil)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't create test mmaped file")
|
||||||
|
}
|
||||||
|
copy(fileAsBytes, "ab")
|
||||||
|
|
||||||
|
f, err := os.Open(filename)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Couldn't open test mmaped file")
|
||||||
|
}
|
||||||
|
|
||||||
|
bytes := make([]byte, 4)
|
||||||
|
n, err := f.Read(bytes)
|
||||||
|
|
||||||
|
if n != 2 || err != nil {
|
||||||
|
t.Fatalf("Error reading file")
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(bytes[:2]) != string(fileAsBytes) {
|
||||||
|
t.Fatalf("Mmap failed")
|
||||||
|
}
|
||||||
|
}
|
|
@ -438,7 +438,7 @@ func (t *Test) exec(tc testCommand) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
case *evalCmd:
|
case *evalCmd:
|
||||||
q, err := t.queryEngine.NewInstantQuery(t.storage, cmd.expr, cmd.start)
|
q, err := t.QueryEngine().NewInstantQuery(t.storage, cmd.expr, cmd.start)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
8
vendor/github.com/edsrzf/mmap-go/.gitignore
generated
vendored
Normal file
8
vendor/github.com/edsrzf/mmap-go/.gitignore
generated
vendored
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
*.out
|
||||||
|
*.5
|
||||||
|
*.6
|
||||||
|
*.8
|
||||||
|
*.swp
|
||||||
|
_obj
|
||||||
|
_test
|
||||||
|
testdata
|
25
vendor/github.com/edsrzf/mmap-go/LICENSE
generated
vendored
Normal file
25
vendor/github.com/edsrzf/mmap-go/LICENSE
generated
vendored
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
Copyright (c) 2011, Evan Shaw <edsrzf@gmail.com>
|
||||||
|
All rights reserved.
|
||||||
|
|
||||||
|
Redistribution and use in source and binary forms, with or without
|
||||||
|
modification, are permitted provided that the following conditions are met:
|
||||||
|
* Redistributions of source code must retain the above copyright
|
||||||
|
notice, this list of conditions and the following disclaimer.
|
||||||
|
* Redistributions in binary form must reproduce the above copyright
|
||||||
|
notice, this list of conditions and the following disclaimer in the
|
||||||
|
documentation and/or other materials provided with the distribution.
|
||||||
|
* Neither the name of the copyright holder nor the
|
||||||
|
names of its contributors may be used to endorse or promote products
|
||||||
|
derived from this software without specific prior written permission.
|
||||||
|
|
||||||
|
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||||
|
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||||
|
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||||
|
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
|
||||||
|
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||||
|
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||||
|
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||||
|
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||||
|
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||||
|
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
12
vendor/github.com/edsrzf/mmap-go/README.md
generated
vendored
Normal file
12
vendor/github.com/edsrzf/mmap-go/README.md
generated
vendored
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
mmap-go
|
||||||
|
=======
|
||||||
|
|
||||||
|
mmap-go is a portable mmap package for the [Go programming language](http://golang.org).
|
||||||
|
It has been tested on Linux (386, amd64), OS X, and Windows (386). It should also
|
||||||
|
work on other Unix-like platforms, but hasn't been tested with them. I'm interested
|
||||||
|
to hear about the results.
|
||||||
|
|
||||||
|
I haven't been able to add more features without adding significant complexity,
|
||||||
|
so mmap-go doesn't support mprotect, mincore, and maybe a few other things.
|
||||||
|
If you're running on a Unix-like platform and need some of these features,
|
||||||
|
I suggest Gustavo Niemeyer's [gommap](http://labix.org/gommap).
|
117
vendor/github.com/edsrzf/mmap-go/mmap.go
generated
vendored
Normal file
117
vendor/github.com/edsrzf/mmap-go/mmap.go
generated
vendored
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
// Copyright 2011 Evan Shaw. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// This file defines the common package interface and contains a little bit of
|
||||||
|
// factored out logic.
|
||||||
|
|
||||||
|
// Package mmap allows mapping files into memory. It tries to provide a simple, reasonably portable interface,
|
||||||
|
// but doesn't go out of its way to abstract away every little platform detail.
|
||||||
|
// This specifically means:
|
||||||
|
// * forked processes may or may not inherit mappings
|
||||||
|
// * a file's timestamp may or may not be updated by writes through mappings
|
||||||
|
// * specifying a size larger than the file's actual size can increase the file's size
|
||||||
|
// * If the mapped file is being modified by another process while your program's running, don't expect consistent results between platforms
|
||||||
|
package mmap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// RDONLY maps the memory read-only.
|
||||||
|
// Attempts to write to the MMap object will result in undefined behavior.
|
||||||
|
RDONLY = 0
|
||||||
|
// RDWR maps the memory as read-write. Writes to the MMap object will update the
|
||||||
|
// underlying file.
|
||||||
|
RDWR = 1 << iota
|
||||||
|
// COPY maps the memory as copy-on-write. Writes to the MMap object will affect
|
||||||
|
// memory, but the underlying file will remain unchanged.
|
||||||
|
COPY
|
||||||
|
// If EXEC is set, the mapped memory is marked as executable.
|
||||||
|
EXEC
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// If the ANON flag is set, the mapped memory will not be backed by a file.
|
||||||
|
ANON = 1 << iota
|
||||||
|
)
|
||||||
|
|
||||||
|
// MMap represents a file mapped into memory.
|
||||||
|
type MMap []byte
|
||||||
|
|
||||||
|
// Map maps an entire file into memory.
|
||||||
|
// If ANON is set in flags, f is ignored.
|
||||||
|
func Map(f *os.File, prot, flags int) (MMap, error) {
|
||||||
|
return MapRegion(f, -1, prot, flags, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// MapRegion maps part of a file into memory.
|
||||||
|
// The offset parameter must be a multiple of the system's page size.
|
||||||
|
// If length < 0, the entire file will be mapped.
|
||||||
|
// If ANON is set in flags, f is ignored.
|
||||||
|
func MapRegion(f *os.File, length int, prot, flags int, offset int64) (MMap, error) {
|
||||||
|
if offset%int64(os.Getpagesize()) != 0 {
|
||||||
|
return nil, errors.New("offset parameter must be a multiple of the system's page size")
|
||||||
|
}
|
||||||
|
|
||||||
|
var fd uintptr
|
||||||
|
if flags&ANON == 0 {
|
||||||
|
fd = uintptr(f.Fd())
|
||||||
|
if length < 0 {
|
||||||
|
fi, err := f.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
length = int(fi.Size())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if length <= 0 {
|
||||||
|
return nil, errors.New("anonymous mapping requires non-zero length")
|
||||||
|
}
|
||||||
|
fd = ^uintptr(0)
|
||||||
|
}
|
||||||
|
return mmap(length, uintptr(prot), uintptr(flags), fd, offset)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MMap) header() *reflect.SliceHeader {
|
||||||
|
return (*reflect.SliceHeader)(unsafe.Pointer(m))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MMap) addrLen() (uintptr, uintptr) {
|
||||||
|
header := m.header()
|
||||||
|
return header.Data, uintptr(header.Len)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock keeps the mapped region in physical memory, ensuring that it will not be
|
||||||
|
// swapped out.
|
||||||
|
func (m MMap) Lock() error {
|
||||||
|
return m.lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unlock reverses the effect of Lock, allowing the mapped region to potentially
|
||||||
|
// be swapped out.
|
||||||
|
// If m is already unlocked, aan error will result.
|
||||||
|
func (m MMap) Unlock() error {
|
||||||
|
return m.unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush synchronizes the mapping's contents to the file's contents on disk.
|
||||||
|
func (m MMap) Flush() error {
|
||||||
|
return m.flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmap deletes the memory mapped region, flushes any remaining changes, and sets
|
||||||
|
// m to nil.
|
||||||
|
// Trying to read or write any remaining references to m after Unmap is called will
|
||||||
|
// result in undefined behavior.
|
||||||
|
// Unmap should only be called on the slice value that was originally returned from
|
||||||
|
// a call to Map. Calling Unmap on a derived slice may cause errors.
|
||||||
|
func (m *MMap) Unmap() error {
|
||||||
|
err := m.unmap()
|
||||||
|
*m = nil
|
||||||
|
return err
|
||||||
|
}
|
51
vendor/github.com/edsrzf/mmap-go/mmap_unix.go
generated
vendored
Normal file
51
vendor/github.com/edsrzf/mmap-go/mmap_unix.go
generated
vendored
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
// Copyright 2011 Evan Shaw. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
// +build darwin dragonfly freebsd linux openbsd solaris netbsd
|
||||||
|
|
||||||
|
package mmap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
func mmap(len int, inprot, inflags, fd uintptr, off int64) ([]byte, error) {
|
||||||
|
flags := unix.MAP_SHARED
|
||||||
|
prot := unix.PROT_READ
|
||||||
|
switch {
|
||||||
|
case inprot© != 0:
|
||||||
|
prot |= unix.PROT_WRITE
|
||||||
|
flags = unix.MAP_PRIVATE
|
||||||
|
case inprot&RDWR != 0:
|
||||||
|
prot |= unix.PROT_WRITE
|
||||||
|
}
|
||||||
|
if inprot&EXEC != 0 {
|
||||||
|
prot |= unix.PROT_EXEC
|
||||||
|
}
|
||||||
|
if inflags&ANON != 0 {
|
||||||
|
flags |= unix.MAP_ANON
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := unix.Mmap(int(fd), off, len, prot, flags)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) flush() error {
|
||||||
|
return unix.Msync([]byte(m), unix.MS_SYNC)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) lock() error {
|
||||||
|
return unix.Mlock([]byte(m))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) unlock() error {
|
||||||
|
return unix.Munlock([]byte(m))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) unmap() error {
|
||||||
|
return unix.Munmap([]byte(m))
|
||||||
|
}
|
143
vendor/github.com/edsrzf/mmap-go/mmap_windows.go
generated
vendored
Normal file
143
vendor/github.com/edsrzf/mmap-go/mmap_windows.go
generated
vendored
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
// Copyright 2011 Evan Shaw. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package mmap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/sys/windows"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mmap on Windows is a two-step process.
|
||||||
|
// First, we call CreateFileMapping to get a handle.
|
||||||
|
// Then, we call MapviewToFile to get an actual pointer into memory.
|
||||||
|
// Because we want to emulate a POSIX-style mmap, we don't want to expose
|
||||||
|
// the handle -- only the pointer. We also want to return only a byte slice,
|
||||||
|
// not a struct, so it's convenient to manipulate.
|
||||||
|
|
||||||
|
// We keep this map so that we can get back the original handle from the memory address.
|
||||||
|
|
||||||
|
type addrinfo struct {
|
||||||
|
file windows.Handle
|
||||||
|
mapview windows.Handle
|
||||||
|
}
|
||||||
|
|
||||||
|
var handleLock sync.Mutex
|
||||||
|
var handleMap = map[uintptr]*addrinfo{}
|
||||||
|
|
||||||
|
func mmap(len int, prot, flags, hfile uintptr, off int64) ([]byte, error) {
|
||||||
|
flProtect := uint32(windows.PAGE_READONLY)
|
||||||
|
dwDesiredAccess := uint32(windows.FILE_MAP_READ)
|
||||||
|
switch {
|
||||||
|
case prot© != 0:
|
||||||
|
flProtect = windows.PAGE_WRITECOPY
|
||||||
|
dwDesiredAccess = windows.FILE_MAP_COPY
|
||||||
|
case prot&RDWR != 0:
|
||||||
|
flProtect = windows.PAGE_READWRITE
|
||||||
|
dwDesiredAccess = windows.FILE_MAP_WRITE
|
||||||
|
}
|
||||||
|
if prot&EXEC != 0 {
|
||||||
|
flProtect <<= 4
|
||||||
|
dwDesiredAccess |= windows.FILE_MAP_EXECUTE
|
||||||
|
}
|
||||||
|
|
||||||
|
// The maximum size is the area of the file, starting from 0,
|
||||||
|
// that we wish to allow to be mappable. It is the sum of
|
||||||
|
// the length the user requested, plus the offset where that length
|
||||||
|
// is starting from. This does not map the data into memory.
|
||||||
|
maxSizeHigh := uint32((off + int64(len)) >> 32)
|
||||||
|
maxSizeLow := uint32((off + int64(len)) & 0xFFFFFFFF)
|
||||||
|
// TODO: Do we need to set some security attributes? It might help portability.
|
||||||
|
h, errno := windows.CreateFileMapping(windows.Handle(hfile), nil, flProtect, maxSizeHigh, maxSizeLow, nil)
|
||||||
|
if h == 0 {
|
||||||
|
return nil, os.NewSyscallError("CreateFileMapping", errno)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Actually map a view of the data into memory. The view's size
|
||||||
|
// is the length the user requested.
|
||||||
|
fileOffsetHigh := uint32(off >> 32)
|
||||||
|
fileOffsetLow := uint32(off & 0xFFFFFFFF)
|
||||||
|
addr, errno := windows.MapViewOfFile(h, dwDesiredAccess, fileOffsetHigh, fileOffsetLow, uintptr(len))
|
||||||
|
if addr == 0 {
|
||||||
|
return nil, os.NewSyscallError("MapViewOfFile", errno)
|
||||||
|
}
|
||||||
|
handleLock.Lock()
|
||||||
|
handleMap[addr] = &addrinfo{
|
||||||
|
file: windows.Handle(hfile),
|
||||||
|
mapview: h,
|
||||||
|
}
|
||||||
|
handleLock.Unlock()
|
||||||
|
|
||||||
|
m := MMap{}
|
||||||
|
dh := m.header()
|
||||||
|
dh.Data = addr
|
||||||
|
dh.Len = len
|
||||||
|
dh.Cap = dh.Len
|
||||||
|
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) flush() error {
|
||||||
|
addr, len := m.addrLen()
|
||||||
|
errno := windows.FlushViewOfFile(addr, len)
|
||||||
|
if errno != nil {
|
||||||
|
return os.NewSyscallError("FlushViewOfFile", errno)
|
||||||
|
}
|
||||||
|
|
||||||
|
handleLock.Lock()
|
||||||
|
defer handleLock.Unlock()
|
||||||
|
handle, ok := handleMap[addr]
|
||||||
|
if !ok {
|
||||||
|
// should be impossible; we would've errored above
|
||||||
|
return errors.New("unknown base address")
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = windows.FlushFileBuffers(handle.file)
|
||||||
|
return os.NewSyscallError("FlushFileBuffers", errno)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) lock() error {
|
||||||
|
addr, len := m.addrLen()
|
||||||
|
errno := windows.VirtualLock(addr, len)
|
||||||
|
return os.NewSyscallError("VirtualLock", errno)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) unlock() error {
|
||||||
|
addr, len := m.addrLen()
|
||||||
|
errno := windows.VirtualUnlock(addr, len)
|
||||||
|
return os.NewSyscallError("VirtualUnlock", errno)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m MMap) unmap() error {
|
||||||
|
err := m.flush()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
addr := m.header().Data
|
||||||
|
// Lock the UnmapViewOfFile along with the handleMap deletion.
|
||||||
|
// As soon as we unmap the view, the OS is free to give the
|
||||||
|
// same addr to another new map. We don't want another goroutine
|
||||||
|
// to insert and remove the same addr into handleMap while
|
||||||
|
// we're trying to remove our old addr/handle pair.
|
||||||
|
handleLock.Lock()
|
||||||
|
defer handleLock.Unlock()
|
||||||
|
err = windows.UnmapViewOfFile(addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
handle, ok := handleMap[addr]
|
||||||
|
if !ok {
|
||||||
|
// should be impossible; we would've errored above
|
||||||
|
return errors.New("unknown base address")
|
||||||
|
}
|
||||||
|
delete(handleMap, addr)
|
||||||
|
|
||||||
|
e := windows.CloseHandle(windows.Handle(handle.mapview))
|
||||||
|
return os.NewSyscallError("CloseHandle", e)
|
||||||
|
}
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
|
@ -73,6 +73,8 @@ github.com/cespare/xxhash
|
||||||
github.com/davecgh/go-spew/spew
|
github.com/davecgh/go-spew/spew
|
||||||
# github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
# github.com/dgrijalva/jwt-go v3.2.0+incompatible
|
||||||
github.com/dgrijalva/jwt-go
|
github.com/dgrijalva/jwt-go
|
||||||
|
# github.com/edsrzf/mmap-go v1.0.0
|
||||||
|
github.com/edsrzf/mmap-go
|
||||||
# github.com/evanphx/json-patch v4.1.0+incompatible
|
# github.com/evanphx/json-patch v4.1.0+incompatible
|
||||||
github.com/evanphx/json-patch
|
github.com/evanphx/json-patch
|
||||||
# github.com/ghodss/yaml v1.0.0
|
# github.com/ghodss/yaml v1.0.0
|
||||||
|
|
Loading…
Reference in a new issue