diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 7b68003da..464031dc2 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -353,12 +353,14 @@ func main() { scrapeManager = scrape.NewManager(log.With(logger, "component", "scrape manager"), fanoutStorage) opts = promql.EngineOpts{ - Logger: log.With(logger, "component", "query engine"), - Reg: prometheus.DefaultRegisterer, - MaxConcurrent: cfg.queryConcurrency, - MaxSamples: cfg.queryMaxSamples, - Timeout: time.Duration(cfg.queryTimeout), + Logger: log.With(logger, "component", "query engine"), + Reg: prometheus.DefaultRegisterer, + MaxConcurrent: cfg.queryConcurrency, + MaxSamples: cfg.queryMaxSamples, + Timeout: time.Duration(cfg.queryTimeout), + ActiveQueryTracker: promql.NewActiveQueryTracker(cfg.localStoragePath, cfg.queryConcurrency, log.With(logger, "component", "activeQueryTracker")), } + queryEngine = promql.NewEngine(opts) ruleManager = rules.NewManager(&rules.ManagerOptions{ @@ -406,7 +408,7 @@ func main() { cfg.web.Flags[f.Name] = f.Value.String() } - // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager + // Depends on cfg.web.ScrapeManager so needs to be after cfg.web.ScrapeManager = scrapeManager. webHandler := web.New(log.With(logger, "component", "web"), &cfg.web) // Monitor outgoing connections on default transport with conntrack. diff --git a/go.mod b/go.mod index f5531fe1c..5cfe6ea3e 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/aws/aws-sdk-go v1.15.24 github.com/cespare/xxhash v1.1.0 github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/edsrzf/mmap-go v1.0.0 github.com/evanphx/json-patch v4.1.0+incompatible // indirect github.com/go-kit/kit v0.8.0 github.com/go-logfmt/logfmt v0.4.0 diff --git a/go.sum b/go.sum index c4e9bad0e..da3b8e32a 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= +github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.1.0+incompatible h1:K1MDoo4AZ4wU0GIU/fPmtZg7VpzLjCxu+UwBD1FvwOc= diff --git a/promql/engine.go b/promql/engine.go index ebba3eaf9..b37206fca 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -176,7 +176,19 @@ func (q *query) Exec(ctx context.Context) *Result { span.SetTag(queryTag, q.stmt.String()) } + // Log query in active log. + var queryIndex int + if q.ng.activeQueryTracker != nil { + queryIndex = q.ng.activeQueryTracker.Insert(q.q) + } + + // Exec query. res, warnings, err := q.ng.exec(ctx, q) + + // Delete query from active log. + if q.ng.activeQueryTracker != nil { + q.ng.activeQueryTracker.Delete(queryIndex) + } return &Result{Err: err, Value: res, Warnings: warnings} } @@ -201,11 +213,12 @@ func contextErr(err error, env string) error { // EngineOpts contains configuration options used when creating a new Engine. type EngineOpts struct { - Logger log.Logger - Reg prometheus.Registerer - MaxConcurrent int - MaxSamples int - Timeout time.Duration + Logger log.Logger + Reg prometheus.Registerer + MaxConcurrent int + MaxSamples int + Timeout time.Duration + ActiveQueryTracker *ActiveQueryTracker } // Engine handles the lifetime of queries from beginning to end. @@ -216,6 +229,7 @@ type Engine struct { timeout time.Duration gate *gate.Gate maxSamplesPerQuery int + activeQueryTracker *ActiveQueryTracker } // NewEngine returns a new engine. @@ -282,12 +296,14 @@ func NewEngine(opts EngineOpts) *Engine { metrics.queryResultSort, ) } + return &Engine{ gate: gate.New(opts.MaxConcurrent), timeout: opts.Timeout, logger: opts.Logger, metrics: metrics, maxSamplesPerQuery: opts.MaxSamples, + activeQueryTracker: opts.ActiveQueryTracker, } } diff --git a/promql/query_logger.go b/promql/query_logger.go new file mode 100644 index 000000000..81a2277cf --- /dev/null +++ b/promql/query_logger.go @@ -0,0 +1,183 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promql + +import ( + "encoding/json" + "github.com/edsrzf/mmap-go" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "os" + "path/filepath" + "strings" + "time" + "unicode/utf8" +) + +type ActiveQueryTracker struct { + mmapedFile []byte + getNextIndex chan int + logger log.Logger +} + +type Entry struct { + Query string `json:"query"` + Timestamp int64 `json:"timestamp_sec"` +} + +const ( + entrySize int = 1000 +) + +func parseBrokenJson(brokenJson []byte, logger log.Logger) (bool, string) { + queries := strings.ReplaceAll(string(brokenJson), "\x00", "") + queries = queries[:len(queries)-1] + "]" + + // Conditional because of implementation detail: len() = 1 implies file consisted of a single char: '['. + if len(queries) == 1 { + return false, "[]" + } + + return true, queries +} + +func logUnfinishedQueries(filename string, filesize int, logger log.Logger) { + if _, err := os.Stat(filename); err == nil { + fd, err := os.Open(filename) + if err != nil { + level.Error(logger).Log("msg", "Failed to open query log file", "err", err) + return + } + + brokenJson := make([]byte, filesize) + _, err = fd.Read(brokenJson) + if err != nil { + level.Error(logger).Log("msg", "Failed to read query log file", "err", err) + return + } + + queriesExist, queries := parseBrokenJson(brokenJson, logger) + if !queriesExist { + return + } + level.Info(logger).Log("msg", "These queries didn't finish in prometheus' last run:", "queries", queries) + } +} + +func getMMapedFile(filename string, filesize int, logger log.Logger) (error, []byte) { + + file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644) + if err != nil { + level.Error(logger).Log("msg", "Error opening query log file", "file", filename, "err", err) + return err, []byte{} + } + + err = file.Truncate(int64(filesize)) + if err != nil { + level.Error(logger).Log("msg", "Error setting filesize.", "filesize", filesize, "err", err) + return err, []byte{} + } + + fileAsBytes, err := mmap.Map(file, mmap.RDWR, 0) + if err != nil { + level.Error(logger).Log("msg", "Failed to mmap", "file", filename, "Attemped size", filesize, "err", err) + return err, []byte{} + } + + return err, fileAsBytes +} + +func NewActiveQueryTracker(localStoragePath string, maxQueries int, logger log.Logger) *ActiveQueryTracker { + err := os.MkdirAll(localStoragePath, 0777) + if err != nil { + level.Error(logger).Log("msg", "Failed to create directory for logging active queries") + } + + filename, filesize := filepath.Join(localStoragePath, "queries.active"), 1+maxQueries*entrySize + logUnfinishedQueries(filename, filesize, logger) + + err, fileAsBytes := getMMapedFile(filename, filesize, logger) + if err != nil { + panic("Unable to create mmap-ed active query log") + } + + copy(fileAsBytes, "[") + activeQueryTracker := ActiveQueryTracker{ + mmapedFile: fileAsBytes, + getNextIndex: make(chan int, maxQueries), + logger: logger, + } + + activeQueryTracker.generateIndices(maxQueries) + + return &activeQueryTracker +} + +func trimStringByBytes(str string, size int) string { + bytesStr := []byte(str) + + trimIndex := len(bytesStr) + if size < len(bytesStr) { + for !utf8.RuneStart(bytesStr[size]) { + size -= 1 + } + trimIndex = size + } + + return string(bytesStr[:trimIndex]) +} + +func _newJsonEntry(query string, timestamp int64, logger log.Logger) []byte { + entry := Entry{query, timestamp} + jsonEntry, err := json.Marshal(entry) + + if err != nil { + level.Error(logger).Log("msg", "Cannot create json of query", "query", query) + return []byte{} + } + + return jsonEntry +} + +func newJsonEntry(query string, logger log.Logger) []byte { + timestamp := time.Now().Unix() + minEntryJson := _newJsonEntry("", timestamp, logger) + + query = trimStringByBytes(query, entrySize-(len(minEntryJson)+1)) + jsonEntry := _newJsonEntry(query, timestamp, logger) + + return jsonEntry +} + +func (tracker ActiveQueryTracker) generateIndices(maxQueries int) { + for i := 0; i < maxQueries; i++ { + tracker.getNextIndex <- 1 + (i * entrySize) + } +} + +func (tracker ActiveQueryTracker) Delete(insertIndex int) { + copy(tracker.mmapedFile[insertIndex:], strings.Repeat("\x00", entrySize)) + tracker.getNextIndex <- insertIndex +} + +func (tracker ActiveQueryTracker) Insert(query string) int { + i, fileBytes := <-tracker.getNextIndex, tracker.mmapedFile + entry := newJsonEntry(query, tracker.logger) + start, end := i, i+entrySize + + copy(fileBytes[start:], entry) + copy(fileBytes[end-1:], ",") + + return i +} diff --git a/promql/query_logger_test.go b/promql/query_logger_test.go new file mode 100644 index 000000000..cc3743244 --- /dev/null +++ b/promql/query_logger_test.go @@ -0,0 +1,138 @@ +// Copyright 2019 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package promql + +import ( + "io/ioutil" + "os" + "regexp" + "testing" +) + +func TestQueryLogging(t *testing.T) { + fileAsBytes := make([]byte, 4096) + queryLogger := ActiveQueryTracker{ + mmapedFile: fileAsBytes, + logger: nil, + getNextIndex: make(chan int, 4), + } + + queryLogger.generateIndices(4) + veryLongString := "MassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybeMassiveQueryThatNeverEndsAndExceedsTwoHundredBytesWhichIsTheSizeOfEntrySizeAndShouldThusBeTruncatedAndIamJustGoingToRepeatTheSameCharactersAgainProbablyBecauseWeAreStillOnlyHalfWayDoneOrMaybeNotOrMaybe" + queries := []string{ + "TestQuery", + veryLongString, + "", + "SpecialCharQuery{host=\"2132132\", id=123123}", + } + + want := []string{ + `^{"query":"TestQuery","timestamp_sec":\d+}\x00*,$`, + `^{"query":"` + trimStringByBytes(veryLongString, entrySize-40) + `","timestamp_sec":\d+}\x00*,$`, + `^{"query":"","timestamp_sec":\d+}\x00*,$`, + `^{"query":"SpecialCharQuery{host=\\"2132132\\", id=123123}","timestamp_sec":\d+}\x00*,$`, + } + + // Check for inserts of queries. + for i := 0; i < 4; i++ { + start := 1 + i*entrySize + end := start + entrySize + + queryLogger.Insert(queries[i]) + + have := string(fileAsBytes[start:end]) + if !regexp.MustCompile(want[i]).MatchString(have) { + t.Fatalf("Query not written correctly: %s.\nHave %s\nWant %s", queries[i], have, want[i]) + } + } + + // Check if all queries have been deleted. + for i := 0; i < 4; i++ { + queryLogger.Delete(1 + i*entrySize) + } + if !regexp.MustCompile(`^\x00+$`).Match(fileAsBytes[1 : 1+entrySize*4]) { + t.Fatalf("All queries not deleted properly. Have %s\nWant only null bytes \\x00", string(fileAsBytes[1:1+entrySize*4])) + } +} + +func TestIndexReuse(t *testing.T) { + queryBytes := make([]byte, 1+3*entrySize) + queryLogger := ActiveQueryTracker{ + mmapedFile: queryBytes, + logger: nil, + getNextIndex: make(chan int, 3), + } + + queryLogger.generateIndices(3) + queryLogger.Insert("TestQuery1") + queryLogger.Insert("TestQuery2") + queryLogger.Insert("TestQuery3") + + queryLogger.Delete(1 + entrySize) + queryLogger.Delete(1) + newQuery2 := "ThisShouldBeInsertedAtIndex2" + newQuery1 := "ThisShouldBeInsertedAtIndex1" + queryLogger.Insert(newQuery2) + queryLogger.Insert(newQuery1) + + want := []string{ + `^{"query":"ThisShouldBeInsertedAtIndex1","timestamp_sec":\d+}\x00*,$`, + `^{"query":"ThisShouldBeInsertedAtIndex2","timestamp_sec":\d+}\x00*,$`, + `^{"query":"TestQuery3","timestamp_sec":\d+}\x00*,$`, + } + + // Check all bytes and verify new query was inserted at index 2 + for i := 0; i < 3; i++ { + start := 1 + i*entrySize + end := start + entrySize + + have := queryBytes[start:end] + if !regexp.MustCompile(want[i]).Match(have) { + t.Fatalf("Index not reused properly:\nHave %s\nWant %s", string(queryBytes[start:end]), want[i]) + } + } +} + +func TestMMapFile(t *testing.T) { + file, err := ioutil.TempFile("", "mmapedFile") + if err != nil { + t.Fatalf("Couldn't create temp test file. %s", err) + } + + filename := file.Name() + defer os.Remove(filename) + + err, fileAsBytes := getMMapedFile(filename, 2, nil) + + if err != nil { + t.Fatalf("Couldn't create test mmaped file") + } + copy(fileAsBytes, "ab") + + f, err := os.Open(filename) + if err != nil { + t.Fatalf("Couldn't open test mmaped file") + } + + bytes := make([]byte, 4) + n, err := f.Read(bytes) + + if n != 2 || err != nil { + t.Fatalf("Error reading file") + } + + if string(bytes[:2]) != string(fileAsBytes) { + t.Fatalf("Mmap failed") + } +} diff --git a/promql/test.go b/promql/test.go index 5fa750425..3632de4a0 100644 --- a/promql/test.go +++ b/promql/test.go @@ -438,7 +438,7 @@ func (t *Test) exec(tc testCommand) error { } case *evalCmd: - q, err := t.queryEngine.NewInstantQuery(t.storage, cmd.expr, cmd.start) + q, err := t.QueryEngine().NewInstantQuery(t.storage, cmd.expr, cmd.start) if err != nil { return err } diff --git a/vendor/github.com/edsrzf/mmap-go/.gitignore b/vendor/github.com/edsrzf/mmap-go/.gitignore new file mode 100644 index 000000000..9aa02c1ed --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/.gitignore @@ -0,0 +1,8 @@ +*.out +*.5 +*.6 +*.8 +*.swp +_obj +_test +testdata diff --git a/vendor/github.com/edsrzf/mmap-go/LICENSE b/vendor/github.com/edsrzf/mmap-go/LICENSE new file mode 100644 index 000000000..8f05f338a --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/LICENSE @@ -0,0 +1,25 @@ +Copyright (c) 2011, Evan Shaw +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the + names of its contributors may be used to endorse or promote products + derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/vendor/github.com/edsrzf/mmap-go/README.md b/vendor/github.com/edsrzf/mmap-go/README.md new file mode 100644 index 000000000..4cc2bfe1c --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/README.md @@ -0,0 +1,12 @@ +mmap-go +======= + +mmap-go is a portable mmap package for the [Go programming language](http://golang.org). +It has been tested on Linux (386, amd64), OS X, and Windows (386). It should also +work on other Unix-like platforms, but hasn't been tested with them. I'm interested +to hear about the results. + +I haven't been able to add more features without adding significant complexity, +so mmap-go doesn't support mprotect, mincore, and maybe a few other things. +If you're running on a Unix-like platform and need some of these features, +I suggest Gustavo Niemeyer's [gommap](http://labix.org/gommap). diff --git a/vendor/github.com/edsrzf/mmap-go/mmap.go b/vendor/github.com/edsrzf/mmap-go/mmap.go new file mode 100644 index 000000000..29655bd22 --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/mmap.go @@ -0,0 +1,117 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// This file defines the common package interface and contains a little bit of +// factored out logic. + +// Package mmap allows mapping files into memory. It tries to provide a simple, reasonably portable interface, +// but doesn't go out of its way to abstract away every little platform detail. +// This specifically means: +// * forked processes may or may not inherit mappings +// * a file's timestamp may or may not be updated by writes through mappings +// * specifying a size larger than the file's actual size can increase the file's size +// * If the mapped file is being modified by another process while your program's running, don't expect consistent results between platforms +package mmap + +import ( + "errors" + "os" + "reflect" + "unsafe" +) + +const ( + // RDONLY maps the memory read-only. + // Attempts to write to the MMap object will result in undefined behavior. + RDONLY = 0 + // RDWR maps the memory as read-write. Writes to the MMap object will update the + // underlying file. + RDWR = 1 << iota + // COPY maps the memory as copy-on-write. Writes to the MMap object will affect + // memory, but the underlying file will remain unchanged. + COPY + // If EXEC is set, the mapped memory is marked as executable. + EXEC +) + +const ( + // If the ANON flag is set, the mapped memory will not be backed by a file. + ANON = 1 << iota +) + +// MMap represents a file mapped into memory. +type MMap []byte + +// Map maps an entire file into memory. +// If ANON is set in flags, f is ignored. +func Map(f *os.File, prot, flags int) (MMap, error) { + return MapRegion(f, -1, prot, flags, 0) +} + +// MapRegion maps part of a file into memory. +// The offset parameter must be a multiple of the system's page size. +// If length < 0, the entire file will be mapped. +// If ANON is set in flags, f is ignored. +func MapRegion(f *os.File, length int, prot, flags int, offset int64) (MMap, error) { + if offset%int64(os.Getpagesize()) != 0 { + return nil, errors.New("offset parameter must be a multiple of the system's page size") + } + + var fd uintptr + if flags&ANON == 0 { + fd = uintptr(f.Fd()) + if length < 0 { + fi, err := f.Stat() + if err != nil { + return nil, err + } + length = int(fi.Size()) + } + } else { + if length <= 0 { + return nil, errors.New("anonymous mapping requires non-zero length") + } + fd = ^uintptr(0) + } + return mmap(length, uintptr(prot), uintptr(flags), fd, offset) +} + +func (m *MMap) header() *reflect.SliceHeader { + return (*reflect.SliceHeader)(unsafe.Pointer(m)) +} + +func (m *MMap) addrLen() (uintptr, uintptr) { + header := m.header() + return header.Data, uintptr(header.Len) +} + +// Lock keeps the mapped region in physical memory, ensuring that it will not be +// swapped out. +func (m MMap) Lock() error { + return m.lock() +} + +// Unlock reverses the effect of Lock, allowing the mapped region to potentially +// be swapped out. +// If m is already unlocked, aan error will result. +func (m MMap) Unlock() error { + return m.unlock() +} + +// Flush synchronizes the mapping's contents to the file's contents on disk. +func (m MMap) Flush() error { + return m.flush() +} + +// Unmap deletes the memory mapped region, flushes any remaining changes, and sets +// m to nil. +// Trying to read or write any remaining references to m after Unmap is called will +// result in undefined behavior. +// Unmap should only be called on the slice value that was originally returned from +// a call to Map. Calling Unmap on a derived slice may cause errors. +func (m *MMap) Unmap() error { + err := m.unmap() + *m = nil + return err +} diff --git a/vendor/github.com/edsrzf/mmap-go/mmap_unix.go b/vendor/github.com/edsrzf/mmap-go/mmap_unix.go new file mode 100644 index 000000000..25b13e51f --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/mmap_unix.go @@ -0,0 +1,51 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin dragonfly freebsd linux openbsd solaris netbsd + +package mmap + +import ( + "golang.org/x/sys/unix" +) + +func mmap(len int, inprot, inflags, fd uintptr, off int64) ([]byte, error) { + flags := unix.MAP_SHARED + prot := unix.PROT_READ + switch { + case inprot© != 0: + prot |= unix.PROT_WRITE + flags = unix.MAP_PRIVATE + case inprot&RDWR != 0: + prot |= unix.PROT_WRITE + } + if inprot&EXEC != 0 { + prot |= unix.PROT_EXEC + } + if inflags&ANON != 0 { + flags |= unix.MAP_ANON + } + + b, err := unix.Mmap(int(fd), off, len, prot, flags) + if err != nil { + return nil, err + } + return b, nil +} + +func (m MMap) flush() error { + return unix.Msync([]byte(m), unix.MS_SYNC) +} + +func (m MMap) lock() error { + return unix.Mlock([]byte(m)) +} + +func (m MMap) unlock() error { + return unix.Munlock([]byte(m)) +} + +func (m MMap) unmap() error { + return unix.Munmap([]byte(m)) +} diff --git a/vendor/github.com/edsrzf/mmap-go/mmap_windows.go b/vendor/github.com/edsrzf/mmap-go/mmap_windows.go new file mode 100644 index 000000000..7910da257 --- /dev/null +++ b/vendor/github.com/edsrzf/mmap-go/mmap_windows.go @@ -0,0 +1,143 @@ +// Copyright 2011 Evan Shaw. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package mmap + +import ( + "errors" + "os" + "sync" + + "golang.org/x/sys/windows" +) + +// mmap on Windows is a two-step process. +// First, we call CreateFileMapping to get a handle. +// Then, we call MapviewToFile to get an actual pointer into memory. +// Because we want to emulate a POSIX-style mmap, we don't want to expose +// the handle -- only the pointer. We also want to return only a byte slice, +// not a struct, so it's convenient to manipulate. + +// We keep this map so that we can get back the original handle from the memory address. + +type addrinfo struct { + file windows.Handle + mapview windows.Handle +} + +var handleLock sync.Mutex +var handleMap = map[uintptr]*addrinfo{} + +func mmap(len int, prot, flags, hfile uintptr, off int64) ([]byte, error) { + flProtect := uint32(windows.PAGE_READONLY) + dwDesiredAccess := uint32(windows.FILE_MAP_READ) + switch { + case prot© != 0: + flProtect = windows.PAGE_WRITECOPY + dwDesiredAccess = windows.FILE_MAP_COPY + case prot&RDWR != 0: + flProtect = windows.PAGE_READWRITE + dwDesiredAccess = windows.FILE_MAP_WRITE + } + if prot&EXEC != 0 { + flProtect <<= 4 + dwDesiredAccess |= windows.FILE_MAP_EXECUTE + } + + // The maximum size is the area of the file, starting from 0, + // that we wish to allow to be mappable. It is the sum of + // the length the user requested, plus the offset where that length + // is starting from. This does not map the data into memory. + maxSizeHigh := uint32((off + int64(len)) >> 32) + maxSizeLow := uint32((off + int64(len)) & 0xFFFFFFFF) + // TODO: Do we need to set some security attributes? It might help portability. + h, errno := windows.CreateFileMapping(windows.Handle(hfile), nil, flProtect, maxSizeHigh, maxSizeLow, nil) + if h == 0 { + return nil, os.NewSyscallError("CreateFileMapping", errno) + } + + // Actually map a view of the data into memory. The view's size + // is the length the user requested. + fileOffsetHigh := uint32(off >> 32) + fileOffsetLow := uint32(off & 0xFFFFFFFF) + addr, errno := windows.MapViewOfFile(h, dwDesiredAccess, fileOffsetHigh, fileOffsetLow, uintptr(len)) + if addr == 0 { + return nil, os.NewSyscallError("MapViewOfFile", errno) + } + handleLock.Lock() + handleMap[addr] = &addrinfo{ + file: windows.Handle(hfile), + mapview: h, + } + handleLock.Unlock() + + m := MMap{} + dh := m.header() + dh.Data = addr + dh.Len = len + dh.Cap = dh.Len + + return m, nil +} + +func (m MMap) flush() error { + addr, len := m.addrLen() + errno := windows.FlushViewOfFile(addr, len) + if errno != nil { + return os.NewSyscallError("FlushViewOfFile", errno) + } + + handleLock.Lock() + defer handleLock.Unlock() + handle, ok := handleMap[addr] + if !ok { + // should be impossible; we would've errored above + return errors.New("unknown base address") + } + + errno = windows.FlushFileBuffers(handle.file) + return os.NewSyscallError("FlushFileBuffers", errno) +} + +func (m MMap) lock() error { + addr, len := m.addrLen() + errno := windows.VirtualLock(addr, len) + return os.NewSyscallError("VirtualLock", errno) +} + +func (m MMap) unlock() error { + addr, len := m.addrLen() + errno := windows.VirtualUnlock(addr, len) + return os.NewSyscallError("VirtualUnlock", errno) +} + +func (m MMap) unmap() error { + err := m.flush() + if err != nil { + return err + } + + addr := m.header().Data + // Lock the UnmapViewOfFile along with the handleMap deletion. + // As soon as we unmap the view, the OS is free to give the + // same addr to another new map. We don't want another goroutine + // to insert and remove the same addr into handleMap while + // we're trying to remove our old addr/handle pair. + handleLock.Lock() + defer handleLock.Unlock() + err = windows.UnmapViewOfFile(addr) + if err != nil { + return err + } + + handle, ok := handleMap[addr] + if !ok { + // should be impossible; we would've errored above + return errors.New("unknown base address") + } + delete(handleMap, addr) + + e := windows.CloseHandle(windows.Handle(handle.mapview)) + return os.NewSyscallError("CloseHandle", e) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index e990bb82f..b22854595 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -73,6 +73,8 @@ github.com/cespare/xxhash github.com/davecgh/go-spew/spew # github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go +# github.com/edsrzf/mmap-go v1.0.0 +github.com/edsrzf/mmap-go # github.com/evanphx/json-patch v4.1.0+incompatible github.com/evanphx/json-patch # github.com/ghodss/yaml v1.0.0