From dae4a801cb9d22f0b99bc365d3895acf1d8e64e2 Mon Sep 17 00:00:00 2001 From: Fabian Reinartz Date: Tue, 9 May 2017 12:53:33 +0200 Subject: [PATCH] vendor: update tsdb --- vendor/github.com/prometheus/tsdb/LICENSE | 201 +++++ vendor/github.com/prometheus/tsdb/README.md | 5 + vendor/github.com/prometheus/tsdb/block.go | 49 +- vendor/github.com/prometheus/tsdb/chunks.go | 66 +- .../prometheus/tsdb/chunks/bstream.go | 43 +- .../prometheus/tsdb/chunks/chunk.go | 15 +- .../github.com/prometheus/tsdb/chunks/xor.go | 43 ++ vendor/github.com/prometheus/tsdb/compact.go | 13 + vendor/github.com/prometheus/tsdb/db.go | 54 +- vendor/github.com/prometheus/tsdb/db_unix.go | 13 + .../github.com/prometheus/tsdb/db_windows.go | 13 + .../prometheus/tsdb/encoding_helpers.go | 157 ++++ vendor/github.com/prometheus/tsdb/head.go | 39 +- vendor/github.com/prometheus/tsdb/index.go | 711 ++++++++++-------- .../prometheus/tsdb/labels/labels.go | 15 +- .../prometheus/tsdb/labels/selector.go | 13 + vendor/github.com/prometheus/tsdb/postings.go | 30 +- vendor/github.com/prometheus/tsdb/querier.go | 90 ++- vendor/github.com/prometheus/tsdb/wal.go | 24 +- vendor/vendor.json | 18 +- 20 files changed, 1171 insertions(+), 441 deletions(-) create mode 100644 vendor/github.com/prometheus/tsdb/LICENSE create mode 100644 vendor/github.com/prometheus/tsdb/README.md create mode 100644 vendor/github.com/prometheus/tsdb/encoding_helpers.go diff --git a/vendor/github.com/prometheus/tsdb/LICENSE b/vendor/github.com/prometheus/tsdb/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/prometheus/tsdb/README.md b/vendor/github.com/prometheus/tsdb/README.md new file mode 100644 index 000000000..51e6e68b6 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/README.md @@ -0,0 +1,5 @@ +# TSDB + +This repository contains the new Prometheus storage layer that will be used in its 2.0 release. + +A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/). diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 4939ee5d3..e1ccfd40d 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -6,7 +19,6 @@ import ( "io/ioutil" "os" "path/filepath" - "sort" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -23,7 +35,7 @@ type DiskBlock interface { // Index returns an IndexReader over the block's data. Index() IndexReader - // Series returns a SeriesReader over the block's data. + // Chunks returns a ChunkReader over the block's data. Chunks() ChunkReader // Close releases all underlying resources of the block. @@ -125,8 +137,10 @@ func writeMetaFile(dir string, meta *BlockMeta) error { enc := json.NewEncoder(f) enc.SetIndent("", "\t") - if err := enc.Encode(&blockMeta{Version: 1, BlockMeta: meta}); err != nil { - return err + var merr MultiError + if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil { + merr.Add(f.Close()) + return merr } if err := f.Close(); err != nil { return err @@ -228,30 +242,3 @@ func (f *mmapFile) Close() error { } return err1 } - -// A skiplist maps offsets to values. The values found in the data at an -// offset are strictly greater than the indexed value. -type skiplist interface { - // offset returns the offset to data containing values of x and lower. - offset(x int64) (uint32, bool) -} - -// simpleSkiplist is a slice of plain value/offset pairs. -type simpleSkiplist []skiplistPair - -type skiplistPair struct { - value int64 - offset uint32 -} - -func (sl simpleSkiplist) offset(x int64) (uint32, bool) { - // Search for the first offset that contains data greater than x. - i := sort.Search(len(sl), func(i int) bool { return sl[i].value >= x }) - - // If no element was found return false. If the first element is found, - // there's no previous offset actually containing values that are x or lower. - if i == len(sl) || i == 0 { - return 0, false - } - return sl[i-1].offset, true -} diff --git a/vendor/github.com/prometheus/tsdb/chunks.go b/vendor/github.com/prometheus/tsdb/chunks.go index 4bdc3a9a2..77663359c 100644 --- a/vendor/github.com/prometheus/tsdb/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -15,7 +28,7 @@ import ( ) const ( - // MagicChunks is 4 bytes at the head of series file. + // MagicChunks is 4 bytes at the head of a series file. MagicChunks = 0x85BD40DD ) @@ -30,12 +43,23 @@ type ChunkMeta struct { MinTime, MaxTime int64 // time range the data covers } +// writeHash writes the chunk encoding and raw data into the provided hash. +func (cm *ChunkMeta) writeHash(h hash.Hash) error { + if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil { + return err + } + if _, err := h.Write(cm.Chunk.Bytes()); err != nil { + return err + } + return nil +} + // ChunkWriter serializes a time block of chunked series data. type ChunkWriter interface { - // WriteChunks writes several chunks. The data field of the ChunkMetas + // WriteChunks writes several chunks. The Chunk field of the ChunkMetas // must be populated. // After returning successfully, the Ref fields in the ChunkMetas - // is set and can be used to retrieve the chunks from the written data. + // are set and can be used to retrieve the chunks from the written data. WriteChunks(chunks ...*ChunkMeta) error // Close writes any required finalization and closes the resources @@ -112,7 +136,9 @@ func (w *chunkWriter) finalizeTail() error { func (w *chunkWriter) cut() error { // Sync current tail to disk and close. - w.finalizeTail() + if err := w.finalizeTail(); err != nil { + return err + } p, _, err := nextSequenceFile(w.dirFile.Name(), "") if err != nil { @@ -150,8 +176,8 @@ func (w *chunkWriter) cut() error { return nil } -func (w *chunkWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) +func (w *chunkWriter) write(b []byte) error { + n, err := w.wbuf.Write(b) w.n += int64(n) return err } @@ -159,9 +185,9 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error { func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. - maxLen := int64(binary.MaxVarintLen32) + maxLen := int64(binary.MaxVarintLen32) // The number of chunks. for _, c := range chks { - maxLen += binary.MaxVarintLen32 + 1 + maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding. maxLen += int64(len(c.Chunk.Bytes())) } newsz := w.n + maxLen @@ -172,14 +198,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { } } - // Write chunks sequentially and set the reference field in the ChunkMeta. - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.wbuf) - b := make([]byte, binary.MaxVarintLen32) n := binary.PutUvarint(b, uint64(len(chks))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } seq := uint64(w.seq()) << 32 @@ -189,21 +211,25 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) - if err := w.write(wr, b[:n]); err != nil { + if err := w.write(b[:n]); err != nil { return err } - if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { + if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil { return err } - if err := w.write(wr, chk.Chunk.Bytes()); err != nil { + if err := w.write(chk.Chunk.Bytes()); err != nil { + return err + } + + w.crc32.Reset() + if err := chk.writeHash(w.crc32); err != nil { + return err + } + if err := w.write(w.crc32.Sum(nil)); err != nil { return err } - chk.Chunk = nil } - if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil { - return err - } return nil } diff --git a/vendor/github.com/prometheus/tsdb/chunks/bstream.go b/vendor/github.com/prometheus/tsdb/chunks/bstream.go index 25fadb26d..ed651db23 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/bstream.go +++ b/vendor/github.com/prometheus/tsdb/chunks/bstream.go @@ -1,8 +1,49 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It received minor modifications to suit Prometheus's needs. + +// Copyright (c) 2015,2016 Damian Gryski +// All rights reserved. + +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: + +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + package chunks import "io" -// bstream is a stream of bits +// bstream is a stream of bits. type bstream struct { stream []byte // the data stream count uint8 // how many bits are valid in current byte diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunk.go b/vendor/github.com/prometheus/tsdb/chunks/chunk.go index 6bff82735..86f456be8 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunk.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunk.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package chunks import ( @@ -5,7 +18,7 @@ import ( "fmt" ) -// Encoding is the identifier for a chunk encoding +// Encoding is the identifier for a chunk encoding. type Encoding uint8 func (e Encoding) String() string { diff --git a/vendor/github.com/prometheus/tsdb/chunks/xor.go b/vendor/github.com/prometheus/tsdb/chunks/xor.go index 2316c1d4f..a72e9ef0c 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/xor.go +++ b/vendor/github.com/prometheus/tsdb/chunks/xor.go @@ -1,3 +1,46 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The code in this file was largely written by Damian Gryski as part of +// https://github.com/dgryski/go-tsz and published under the license below. +// It was modified to accomodate reading from byte slices without modifying +// the underlying bytes, which would panic when reading from mmaped +// read-only byte slices. + +// Copyright (c) 2015,2016 Damian Gryski +// All rights reserved. + +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: + +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + package chunks import ( diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 2b6a8a31c..938697419 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 5a3e19766..864b9cfbd 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // Package tsdb implements a time series storage for float64 sample data. package tsdb @@ -33,6 +46,7 @@ var DefaultOptions = &Options{ MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds AppendableBlocks: 2, + NoLockfile: false, } // Options of the DB storage. @@ -56,10 +70,15 @@ type Options struct { // After a new block is started for timestamp t0 or higher, appends with // timestamps as early as t0 - (n-1) * MinBlockDuration are valid. AppendableBlocks int + + // NoLockfile disables creation and consideration of a lock file. + NoLockfile bool } // Appender allows appending a batch of data. It must be completed with a // call to Commit or Rollback and must not be reused afterwards. +// +// Operations on the Appender interface are not goroutine-safe. type Appender interface { // Add adds a sample pair for the given series. A reference number is // returned which can be used to add further samples in the same or later @@ -80,13 +99,11 @@ type Appender interface { Rollback() error } -const sep = '\xff' - // DB handles reads and writes of time series falling into // a hashed partition of a seriedb. type DB struct { dir string - lockf lockfile.Lockfile + lockf *lockfile.Lockfile logger log.Logger metrics *dbMetrics @@ -146,13 +163,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if err != nil { return nil, err } - lockf, err := lockfile.New(filepath.Join(absdir, "lock")) - if err != nil { - return nil, err - } - if err := lockf.TryLock(); err != nil { - return nil, errors.Wrapf(err, "open DB in %s", dir) - } if l == nil { l = log.NewLogfmtLogger(os.Stdout) @@ -168,7 +178,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db = &DB{ dir: dir, - lockf: lockf, logger: l, metrics: newDBMetrics(r), opts: opts, @@ -176,6 +185,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db donec: make(chan struct{}), stopc: make(chan struct{}), } + if !opts.NoLockfile { + lockf, err := lockfile.New(filepath.Join(absdir, "lock")) + if err != nil { + return nil, err + } + if err := lockf.TryLock(); err != nil { + return nil, errors.Wrapf(err, "open DB in %s", dir) + } + db.lockf = &lockf + } + db.compactor = newCompactor(r, l, &compactorOptions{ maxBlockRange: opts.MaxBlockDuration, }) @@ -452,7 +472,9 @@ func (db *DB) Close() error { var merr MultiError merr.Add(g.Wait()) - merr.Add(db.lockf.Unlock()) + if db.lockf != nil { + merr.Add(db.lockf.Unlock()) + } return merr.Err() } @@ -505,8 +527,8 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) return 0, err } a.samples++ - // Store last byte of sequence number in 3rd byte of refernece. - return ref | (uint64(h.meta.Sequence^0xff) << 40), nil + // Store last byte of sequence number in 3rd byte of reference. + return ref | (uint64(h.meta.Sequence&0xff) << 40), nil } func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { @@ -519,7 +541,7 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { return err } // If the last byte of the sequence does not add up, the reference is not valid. - if uint64(h.meta.Sequence^0xff) != gen { + if uint64(h.meta.Sequence&0xff) != gen { return ErrNotFound } if err := h.app.AddFast(ref, t, v); err != nil { @@ -641,7 +663,7 @@ func (a *dbAppender) Rollback() error { var g errgroup.Group for _, h := range a.heads { - g.Go(h.app.Commit) + g.Go(h.app.Rollback) } return g.Wait() diff --git a/vendor/github.com/prometheus/tsdb/db_unix.go b/vendor/github.com/prometheus/tsdb/db_unix.go index 814bee851..09bb74f3c 100644 --- a/vendor/github.com/prometheus/tsdb/db_unix.go +++ b/vendor/github.com/prometheus/tsdb/db_unix.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // +build !windows,!plan9,!solaris package tsdb diff --git a/vendor/github.com/prometheus/tsdb/db_windows.go b/vendor/github.com/prometheus/tsdb/db_windows.go index ebc1d08ed..700518e7a 100644 --- a/vendor/github.com/prometheus/tsdb/db_windows.go +++ b/vendor/github.com/prometheus/tsdb/db_windows.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go new file mode 100644 index 000000000..91f73a54c --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -0,0 +1,157 @@ +package tsdb + +import ( + "encoding/binary" + "hash" + "unsafe" +) + +// enbuf is a helper type to populate a byte slice with various types. +type encbuf struct { + b []byte + c [binary.MaxVarintLen64]byte +} + +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } + +func (e *encbuf) putBE32(x uint32) { + binary.BigEndian.PutUint32(e.c[:], x) + e.b = append(e.b, e.c[:4]...) +} + +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) +} + +func (e *encbuf) putUvarint64(x uint64) { + n := binary.PutUvarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +func (e *encbuf) putVarint64(x int64) { + n := binary.PutVarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + b := *(*[]byte)(unsafe.Pointer(&s)) + e.putUvarint(len(b)) + e.putString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *encbuf) putHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.b) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.b = h.Sum(e.b) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type decbuf struct { + b []byte + e error +} + +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } + +func (d *decbuf) uvarintStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = errInvalidSize + return "" + } + s := yoloString(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) varint64() int64 { + if d.e != nil { + return 0 + } + x, n := binary.Varint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) uvarint64() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = errInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) be64() uint64 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.b) + d.b = d.b[8:] + return x +} + +func (d *decbuf) be32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = errInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) decbuf(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: errInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { return d.e } +func (d *decbuf) len() int { return len(d.b) } +func (d *decbuf) get() []byte { return d.b } diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 9f2533a0c..678654b3e 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -283,6 +296,10 @@ type refdSample struct { } func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { + if !a.inBounds(t) { + return 0, ErrOutOfBounds + } + hash := lset.Hash() if ms := a.get(hash, lset); ms != nil { @@ -344,7 +361,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { if t < c.maxTime { return ErrOutOfOrderSample } - if c.maxTime == t && ms.lastValue != v { + + // We are allowing exact duplicates as we can encounter them in valid cases + // like federation and erroring out at that time would be extremely noisy. + if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) { return ErrAmendSample } } @@ -410,23 +430,12 @@ func (a *headAppender) Commit() error { return err } - var ( - total = uint64(len(a.samples)) - mint = int64(math.MaxInt64) - maxt = int64(math.MinInt64) - ) + total := uint64(len(a.samples)) for _, s := range a.samples { if !a.series[s.ref].append(s.t, s.v) { total-- } - - if s.t < mint { - mint = s.t - } - if s.t > maxt { - maxt = s.t - } } a.mtx.RUnlock() @@ -632,8 +641,8 @@ func (s *memSeries) append(t int64, v float64) bool { c.minTime = t } else { c = s.head() - // Skip duplicate samples. - if c.maxTime == t && s.lastValue != v { + // Skip duplicate and out of order samples. + if c.maxTime >= t { return false } } diff --git a/vendor/github.com/prometheus/tsdb/index.go b/vendor/github.com/prometheus/tsdb/index.go index 169d7bf3a..c0e96381f 100644 --- a/vendor/github.com/prometheus/tsdb/index.go +++ b/vendor/github.com/prometheus/tsdb/index.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -12,6 +25,8 @@ import ( "sort" "strings" + "math" + "github.com/coreos/etcd/pkg/fileutil" "github.com/pkg/errors" "github.com/prometheus/tsdb/labels" @@ -26,10 +41,48 @@ const ( const compactionPageBytes = minSectorSize * 64 -// IndexWriter serialized the index for a block of series data. -// The methods must generally be called in order they are specified. +type indexWriterSeries struct { + labels labels.Labels + chunks []*ChunkMeta // series file offset of chunks + offset uint32 // index file offset of series reference +} + +type indexWriterSeriesSlice []*indexWriterSeries + +func (s indexWriterSeriesSlice) Len() int { return len(s) } +func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s indexWriterSeriesSlice) Less(i, j int) bool { + return labels.Compare(s[i].labels, s[j].labels) < 0 +} + +type indexWriterStage uint8 + +const ( + idxStagePopulate indexWriterStage = iota + idxStageLabelIndex + idxStagePostings + idxStageDone +) + +func (s indexWriterStage) String() string { + switch s { + case idxStagePopulate: + return "populate" + case idxStageLabelIndex: + return "label index" + case idxStagePostings: + return "postings" + case idxStageDone: + return "done" + } + return "" +} + +// IndexWriter serializes the index for a block of series data. +// The methods must generally be called in the order they are specified in. type IndexWriter interface { - // AddSeries populates the index writer witha series and its offsets + // AddSeries populates the index writer with a series and its offsets // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. @@ -48,22 +101,19 @@ type IndexWriter interface { Close() error } -type indexWriterSeries struct { - labels labels.Labels - chunks []*ChunkMeta // series file offset of chunks - offset uint32 // index file offset of series reference -} - // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - f *os.File - bufw *bufio.Writer - n int64 - started bool + f *os.File + fbuf *bufio.Writer + pos uint64 + + toc indexTOC + stage indexWriterStage // Reusable memory. - b []byte + buf1 encbuf + buf2 encbuf uint32s []uint32 series map[uint32]*indexWriterSeries @@ -74,6 +124,15 @@ type indexWriter struct { crc32 hash.Hash } +type indexTOC struct { + symbols uint64 + series uint64 + labelIndices uint64 + labelIndicesTable uint64 + postings uint64 + postingsTable uint64 +} + func newIndexWriter(dir string) (*indexWriter, error) { df, err := fileutil.OpenDir(dir) if err != nil { @@ -88,12 +147,14 @@ func newIndexWriter(dir string) (*indexWriter, error) { } iw := &indexWriter{ - f: f, - bufw: bufio.NewWriterSize(f, 1<<22), - n: 0, + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + stage: idxStagePopulate, // Reusable memory. - b: make([]byte, 0, 1<<23), + buf1: encbuf{b: make([]byte, 0, 1<<22)}, + buf2: encbuf{b: make([]byte, 0, 1<<22)}, uint32s: make([]uint32, 0, 1<<15), // Caches. @@ -107,40 +168,87 @@ func newIndexWriter(dir string) (*indexWriter, error) { return iw, nil } -func (w *indexWriter) write(wr io.Writer, b []byte) error { - n, err := wr.Write(b) - w.n += int64(n) - return err -} - -// section writes a CRC32 checksummed section of length l and guarded by flag. -func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { - w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.bufw) - - b := [5]byte{flag, 0, 0, 0, 0} - binary.BigEndian.PutUint32(b[1:], uint32(l)) - - if err := w.write(wr, b[:]); err != nil { - return errors.Wrap(err, "writing header") - } - - if err := f(wr); err != nil { - return errors.Wrap(err, "write contents") - } - if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { - return errors.Wrap(err, "writing checksum") +func (w *indexWriter) write(bufs ...[]byte) error { + for _, b := range bufs { + n, err := w.fbuf.Write(b) + w.pos += uint64(n) + if err != nil { + return err + } + // For now the index file must not grow beyond 4GiB. Some of the fixed-sized + // offset references in v1 are only 4 bytes large. + // Once we move to compressed/varint representations in those areas, this limitation + // can be lifted. + if w.pos > math.MaxUint32 { + return errors.Errorf("exceeding max size of 4GiB") + } } return nil } +// addPadding adds zero byte padding until the file size is a multiple of n. +func (w *indexWriter) addPadding(n int) error { + p := n - (int(w.pos) % n) + if p == 0 { + return nil + } + return errors.Wrap(w.write(make([]byte, p)), "add padding") +} + +// ensureStage handles transitions between write stages and ensures that IndexWriter +// methods are called in an order valid for the implementation. +func (w *indexWriter) ensureStage(s indexWriterStage) error { + if w.stage == s { + return nil + } + if w.stage > s { + return errors.Errorf("invalid stage %q, currently at %q", s, w.stage) + } + + // Complete population stage by writing symbols and series. + if w.stage == idxStagePopulate { + w.toc.symbols = w.pos + if err := w.writeSymbols(); err != nil { + return err + } + w.toc.series = w.pos + if err := w.writeSeries(); err != nil { + return err + } + } + + // Mark start of sections in table of contents. + switch s { + case idxStageLabelIndex: + w.toc.labelIndices = w.pos + + case idxStagePostings: + w.toc.labelIndicesTable = w.pos + if err := w.writeOffsetTable(w.labelIndexes); err != nil { + return err + } + w.toc.postings = w.pos + + case idxStageDone: + w.toc.postingsTable = w.pos + if err := w.writeOffsetTable(w.postings); err != nil { + return err + } + if err := w.writeTOC(); err != nil { + return err + } + } + + w.stage = s + return nil +} + func (w *indexWriter) writeMeta() error { - b := [8]byte{} + w.buf1.reset() + w.buf1.putBE32(MagicIndex) + w.buf1.putByte(indexFormatV1) - binary.BigEndian.PutUint32(b[:4], MagicIndex) - b[4] = flagStd - - return w.write(w.bufw, b[:]) + return w.write(w.buf1.get()) } func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { @@ -168,33 +276,27 @@ func (w *indexWriter) writeSymbols() error { } sort.Strings(symbols) - // The start of the section plus a 5 byte section header are our base. - // TODO(fabxc): switch to relative offsets and hold sections in a TOC. - base := uint32(w.n) + 5 + const headerSize = 4 - buf := [binary.MaxVarintLen32]byte{} - w.b = append(w.b[:0], flagStd) + w.buf1.reset() + w.buf2.reset() + + w.buf2.putBE32int(len(symbols)) for _, s := range symbols { - w.symbols[s] = base + uint32(len(w.b)) + w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len()) - n := binary.PutUvarint(buf[:], uint64(len(s))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, s...) + // NOTE: len(s) gives the number of runes, not the number of bytes. + // Therefore the read-back length for strings with unicode characters will + // be off when not using putCstr. + w.buf2.putUvarintStr(s) } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} + w.buf1.putBE32int(w.buf2.len()) + w.buf2.putHash(w.crc32) -type indexWriterSeriesSlice []*indexWriterSeries - -func (s indexWriterSeriesSlice) Len() int { return len(s) } -func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s indexWriterSeriesSlice) Less(i, j int) bool { - return labels.Compare(s[i].labels, s[j].labels) < 0 + err := w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write symbols") } func (w *indexWriter) writeSeries() error { @@ -206,64 +308,52 @@ func (w *indexWriter) writeSeries() error { } sort.Sort(series) - // Current end of file plus 5 bytes for section header. - // TODO(fabxc): switch to relative offsets. - base := uint32(w.n) + 5 + // Header holds number of series. + w.buf1.reset() + w.buf1.putBE32int(len(series)) - w.b = w.b[:0] - buf := make([]byte, binary.MaxVarintLen64) + if err := w.write(w.buf1.get()); err != nil { + return errors.Wrap(err, "write series count") + } for _, s := range series { - // Write label set symbol references. - s.offset = base + uint32(len(w.b)) + s.offset = uint32(w.pos) - n := binary.PutUvarint(buf, uint64(len(s.labels))) - w.b = append(w.b, buf[:n]...) + w.buf2.reset() + w.buf2.putUvarint(len(s.labels)) for _, l := range s.labels { - n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) - w.b = append(w.b, buf[:n]...) - n = binary.PutUvarint(buf, uint64(w.symbols[l.Value])) - w.b = append(w.b, buf[:n]...) + w.buf2.putUvarint32(w.symbols[l.Name]) + w.buf2.putUvarint32(w.symbols[l.Value]) } - // Write chunks meta data including reference into chunk file. - n = binary.PutUvarint(buf, uint64(len(s.chunks))) - w.b = append(w.b, buf[:n]...) + w.buf2.putUvarint(len(s.chunks)) for _, c := range s.chunks { - n = binary.PutVarint(buf, c.MinTime) - w.b = append(w.b, buf[:n]...) - n = binary.PutVarint(buf, c.MaxTime) - w.b = append(w.b, buf[:n]...) + w.buf2.putVarint64(c.MinTime) + w.buf2.putVarint64(c.MaxTime) + w.buf2.putUvarint64(c.Ref) + } - n = binary.PutUvarint(buf, uint64(c.Ref)) - w.b = append(w.b, buf[:n]...) + w.buf1.reset() + w.buf1.putUvarint(w.buf2.len()) + + w.buf2.putHash(w.crc32) + + if err := w.write(w.buf1.get(), w.buf2.get()); err != nil { + return errors.Wrap(err, "write series data") } } - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) init() error { - if err := w.writeSymbols(); err != nil { - return err - } - if err := w.writeSeries(); err != nil { - return err - } - w.started = true - return nil } func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { - if !w.started { - if err := w.init(); err != nil { - return err - } + if len(values)%len(names) != 0 { + return errors.Errorf("invalid value list length %d for %d names", len(values), len(names)) + } + if err := w.ensureStage(idxStageLabelIndex); err != nil { + return errors.Wrap(err, "ensure stage") } valt, err := newStringTuples(values, len(names)) @@ -272,45 +362,84 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { } sort.Sort(valt) + // Align beginning to 4 bytes for more efficient index list scans. + if err := w.addPadding(4); err != nil { + return err + } + w.labelIndexes = append(w.labelIndexes, hashEntry{ - name: strings.Join(names, string(sep)), - offset: uint32(w.n), + keys: names, + offset: w.pos, }) - buf := make([]byte, binary.MaxVarintLen32) - n := binary.PutUvarint(buf, uint64(len(names))) + w.buf2.reset() + w.buf2.putBE32int(len(names)) + w.buf2.putBE32int(valt.Len()) - l := n + len(values)*4 + for _, v := range valt.s { + w.buf2.putBE32(w.symbols[v]) + } - return w.section(l, flagStd, func(wr io.Writer) error { - // First byte indicates tuple size for index. - if err := w.write(wr, buf[:n]); err != nil { - return err + w.buf1.reset() + w.buf1.putBE32int(w.buf2.len()) + + w.buf2.putHash(w.crc32) + + err = w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write label index") +} + +// writeOffsetTable writes a sequence of readable hash entries. +func (w *indexWriter) writeOffsetTable(entries []hashEntry) error { + w.buf1.reset() + w.buf1.putBE32int(len(entries)) + + w.buf2.reset() + + for _, e := range entries { + w.buf2.putUvarint(len(e.keys)) + for _, k := range e.keys { + w.buf2.putUvarintStr(k) } + w.buf2.putUvarint64(e.offset) + } - for _, v := range valt.s { - binary.BigEndian.PutUint32(buf, w.symbols[v]) + w.buf1.putBE32int(w.buf2.len()) + w.buf2.putHash(w.crc32) - if err := w.write(wr, buf[:4]); err != nil { - return err - } - } - return nil - }) + return w.write(w.buf1.get(), w.buf2.get()) +} + +const indexTOCLen = 6*8 + 4 + +func (w *indexWriter) writeTOC() error { + w.buf1.reset() + + w.buf1.putBE64(w.toc.symbols) + w.buf1.putBE64(w.toc.series) + w.buf1.putBE64(w.toc.labelIndices) + w.buf1.putBE64(w.toc.labelIndicesTable) + w.buf1.putBE64(w.toc.postings) + w.buf1.putBE64(w.toc.postingsTable) + + w.buf1.putHash(w.crc32) + + return w.write(w.buf1.get()) } func (w *indexWriter) WritePostings(name, value string, it Postings) error { - if !w.started { - if err := w.init(); err != nil { - return err - } + if err := w.ensureStage(idxStagePostings); err != nil { + return errors.Wrap(err, "ensure stage") } - key := name + string(sep) + value + // Align beginning to 4 bytes for more efficient postings list scans. + if err := w.addPadding(4); err != nil { + return err + } w.postings = append(w.postings, hashEntry{ - name: key, - offset: uint32(w.n), + keys: []string{name, value}, + offset: w.pos, }) // Order of the references in the postings list does not imply order @@ -328,22 +457,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { if err := it.Err(); err != nil { return err } - sort.Sort(uint32slice(refs)) - w.b = w.b[:0] - buf := make([]byte, 4) + w.buf2.reset() + w.buf2.putBE32int(len(refs)) for _, r := range refs { - binary.BigEndian.PutUint32(buf, r) - w.b = append(w.b, buf...) + w.buf2.putBE32(r) } - w.uint32s = refs[:0] + w.buf1.reset() + w.buf1.putBE32int(w.buf2.len()) - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) + w.buf2.putHash(w.crc32) + + err := w.write(w.buf1.get(), w.buf2.get()) + return errors.Wrap(err, "write postings") } type uint32slice []uint32 @@ -353,56 +482,15 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } type hashEntry struct { - name string - offset uint32 -} - -func (w *indexWriter) writeHashmap(h []hashEntry) error { - w.b = w.b[:0] - buf := [binary.MaxVarintLen32]byte{} - - for _, e := range h { - n := binary.PutUvarint(buf[:], uint64(len(e.name))) - w.b = append(w.b, buf[:n]...) - w.b = append(w.b, e.name...) - - n = binary.PutUvarint(buf[:], uint64(e.offset)) - w.b = append(w.b, buf[:n]...) - } - - return w.section(len(w.b), flagStd, func(wr io.Writer) error { - return w.write(wr, w.b) - }) -} - -func (w *indexWriter) finalize() error { - // Write out hash maps to jump to correct label index and postings sections. - lo := uint32(w.n) - if err := w.writeHashmap(w.labelIndexes); err != nil { - return err - } - - po := uint32(w.n) - if err := w.writeHashmap(w.postings); err != nil { - return err - } - - // Terminate index file with offsets to hashmaps. This is the entry Pointer - // for any index query. - // TODO(fabxc): also store offset to series section to allow plain - // iteration over all existing series? - b := [8]byte{} - binary.BigEndian.PutUint32(b[:4], lo) - binary.BigEndian.PutUint32(b[4:], po) - - return w.write(w.bufw, b[:]) + keys []string + offset uint64 } func (w *indexWriter) Close() error { - if err := w.finalize(); err != nil { + if err := w.ensureStage(idxStageDone); err != nil { return err } - if err := w.bufw.Flush(); err != nil { + if err := w.fbuf.Flush(); err != nil { return err } if err := fileutil.Fsync(w.f); err != nil { @@ -440,7 +528,8 @@ type StringTuples interface { type indexReader struct { // The underlying byte slice holding the encoded series data. - b []byte + b []byte + toc indexTOC // Close that releases the underlying resources of the byte slice. c io.Closer @@ -471,57 +560,77 @@ func newIndexReader(dir string) (*indexReader, error) { return nil, errors.Errorf("invalid magic number %x", m) } - // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) - poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) + if err := r.readTOC(); err != nil { + return nil, errors.Wrap(err, "read TOC") + } - flag, b, err := r.section(loff) + r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable) if err != nil { - return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) + return nil, errors.Wrap(err, "read label index table") } - if r.labels, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read label index hashmap") - } - flag, b, err = r.section(poff) + r.postings, err = r.readOffsetTable(r.toc.postingsTable) if err != nil { - return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) - } - if r.postings, err = readHashmap(flag, b); err != nil { - return nil, errors.Wrap(err, "read postings hashmap") + return nil, errors.Wrap(err, "read postings table") } return r, nil } -func readHashmap(flag byte, b []byte) (map[string]uint32, error) { - if flag != flagStd { - return nil, errInvalidFlag +func (r *indexReader) readTOC() error { + d := r.decbufAt(len(r.b) - indexTOCLen) + + r.toc.symbols = d.be64() + r.toc.series = d.be64() + r.toc.labelIndices = d.be64() + r.toc.labelIndicesTable = d.be64() + r.toc.postings = d.be64() + r.toc.postingsTable = d.be64() + + // TODO(fabxc): validate checksum. + + return nil +} + +func (r *indexReader) decbufAt(off int) decbuf { + if len(r.b) < off { + return decbuf{e: errInvalidSize} } - h := make(map[string]uint32, 512) + return decbuf{b: r.b[off:]} +} - for len(b) > 0 { - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read key length") - } - b = b[n:] - - if len(b) < int(l) { - return nil, errors.Wrap(errInvalidSize, "read key") - } - s := string(b[:l]) - b = b[l:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read offset value") - } - b = b[n:] - - h[s] = uint32(o) +// readOffsetTable reads an offset table at the given position and returns a map +// with the key strings concatenated by the 0xff unicode non-character. +func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) { + // A table might not have been written at all, in which case the position + // is zeroed out. + if off == 0 { + return nil, nil } - return h, nil + const sep = "\xff" + + var ( + d1 = r.decbufAt(int(off)) + cnt = d1.be32() + d2 = d1.decbuf(d1.be32int()) + ) + + res := make(map[string]uint32, 512) + + for d2.err() == nil && d2.len() > 0 && cnt > 0 { + keyCount := int(d2.uvarint()) + keys := make([]string, 0, keyCount) + + for i := 0; i < keyCount; i++ { + keys = append(keys, d2.uvarintStr()) + } + res[strings.Join(keys, sep)] = uint32(d2.uvarint()) + + cnt-- + } + + // TODO(fabxc): verify checksum from remainer of d1. + return res, d2.err() } func (r *indexReader) Close() error { @@ -548,25 +657,19 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) { } func (r *indexReader) lookupSymbol(o uint32) (string, error) { - if int(o) > len(r.b) { - return "", errors.Errorf("invalid symbol offset %d", o) - } - l, n := binary.Uvarint(r.b[o:]) - if n < 0 { - return "", errors.New("reading symbol length failed") - } + d := r.decbufAt(int(o)) - end := int(o) + n + int(l) - if end > len(r.b) { - return "", errors.New("invalid length") + s := d.uvarintStr() + if d.err() != nil { + return "", errors.Wrapf(d.err(), "read symbol at %d", o) } - b := r.b[int(o)+n : end] - - return yoloString(b), nil + return s, nil } func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { - key := strings.Join(names, string(sep)) + const sep = "\xff" + + key := strings.Join(names, sep) off, ok := r.labels[key] if !ok { // XXX(fabxc): hot fix. Should return a partial data error and handle cases @@ -575,21 +678,21 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { //return nil, fmt.Errorf("label index doesn't exist") } - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) - } - if flag != flagStd { - return nil, errInvalidFlag - } - l, n := binary.Uvarint(b) - if n < 1 { - return nil, errors.Wrap(errInvalidSize, "read label index size") + d1 := r.decbufAt(int(off)) + d2 := d1.decbuf(d1.be32int()) + + nc := d2.be32int() + d2.be32() // consume unused value entry count. + + if d2.err() != nil { + return nil, errors.Wrap(d2.err(), "read label value index") } + // TODO(fabxc): verify checksum in 4 remaining bytes of d1. + st := &serializedStringTuples{ - l: int(l), - b: b[n:], + l: nc, + b: d2.get(), lookup: r.lookupSymbol, } return st, nil @@ -601,110 +704,89 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil } func (emptyStringTuples) Len() int { return 0 } func (r *indexReader) LabelIndices() ([][]string, error) { + const sep = "\xff" + res := [][]string{} for s := range r.labels { - res = append(res, strings.Split(s, string(sep))) + res = append(res, strings.Split(s, sep)) } return res, nil } func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { - k, n := binary.Uvarint(r.b[ref:]) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of labels") - } + d1 := r.decbufAt(int(ref)) + d2 := d1.decbuf(int(d1.uvarint())) - b := r.b[int(ref)+n:] + k := int(d2.uvarint()) lbls := make(labels.Labels, 0, k) - for i := 0; i < 2*int(k); i += 2 { - o, m := binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") - } - n, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] + for i := 0; i < k; i++ { + lno := uint32(d2.uvarint()) + lvo := uint32(d2.uvarint()) - o, m = binary.Uvarint(b) - if m < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "symbol offset") + if d2.err() != nil { + return nil, nil, errors.Wrap(d2.err(), "read series label offsets") } - v, err := r.lookupSymbol(uint32(o)) - if err != nil { - return nil, nil, errors.Wrap(err, "symbol lookup") - } - b = b[m:] - lbls = append(lbls, labels.Label{ - Name: n, - Value: v, - }) + ln, err := r.lookupSymbol(lno) + if err != nil { + return nil, nil, errors.Wrap(err, "lookup label name") + } + lv, err := r.lookupSymbol(lvo) + if err != nil { + return nil, nil, errors.Wrap(err, "lookup label value") + } + + lbls = append(lbls, labels.Label{Name: ln, Value: lv}) } // Read the chunks meta data. - k, n = binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "number of chunks") - } - - b = b[n:] + k = int(d2.uvarint()) chunks := make([]*ChunkMeta, 0, k) - for i := 0; i < int(k); i++ { - firstTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "first time") - } - b = b[n:] + for i := 0; i < k; i++ { + mint := d2.varint64() + maxt := d2.varint64() + off := d2.uvarint64() - lastTime, n := binary.Varint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "last time") + if d2.err() != nil { + return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i) } - b = b[n:] - - o, n := binary.Uvarint(b) - if n < 1 { - return nil, nil, errors.Wrap(errInvalidSize, "chunk offset") - } - b = b[n:] chunks = append(chunks, &ChunkMeta{ - Ref: o, - MinTime: firstTime, - MaxTime: lastTime, + Ref: off, + MinTime: mint, + MaxTime: maxt, }) } + // TODO(fabxc): verify CRC32. + return lbls, chunks, nil } func (r *indexReader) Postings(name, value string) (Postings, error) { - key := name + string(sep) + value + const sep = "\xff" + key := strings.Join([]string{name, value}, sep) off, ok := r.postings[key] if !ok { return emptyPostings, nil } - flag, b, err := r.section(off) - if err != nil { - return nil, errors.Wrapf(err, "section at %d", off) + d1 := r.decbufAt(int(off)) + d2 := d1.decbuf(d1.be32int()) + + d2.be32() // consume unused postings list length. + + if d2.err() != nil { + return nil, errors.Wrap(d2.err(), "get postings bytes") } - if flag != flagStd { - return nil, errors.Wrapf(errInvalidFlag, "section at %d", off) - } + // TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify. - // Add iterator over the bytes. - if len(b)%4 != 0 { - return nil, errors.Wrap(errInvalidSize, "plain postings entry") - } - return newBigEndianPostings(b), nil + return newBigEndianPostings(d2.get()), nil } type stringTuples struct { @@ -753,7 +835,6 @@ type serializedStringTuples struct { } func (t *serializedStringTuples) Len() int { - // TODO(fabxc): Cache this? return len(t.b) / (4 * t.l) } diff --git a/vendor/github.com/prometheus/tsdb/labels/labels.go b/vendor/github.com/prometheus/tsdb/labels/labels.go index 901cae784..2ee42360e 100644 --- a/vendor/github.com/prometheus/tsdb/labels/labels.go +++ b/vendor/github.com/prometheus/tsdb/labels/labels.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package labels import ( @@ -71,7 +84,7 @@ func (ls Labels) Equals(o Labels) bool { return false } for i, l := range ls { - if l.Name != o[i].Name || l.Value != o[i].Value { + if o[i] != l { return false } } diff --git a/vendor/github.com/prometheus/tsdb/labels/selector.go b/vendor/github.com/prometheus/tsdb/labels/selector.go index 224c2c6e0..a8a7eeeaa 100644 --- a/vendor/github.com/prometheus/tsdb/labels/selector.go +++ b/vendor/github.com/prometheus/tsdb/labels/selector.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package labels import "regexp" diff --git a/vendor/github.com/prometheus/tsdb/postings.go b/vendor/github.com/prometheus/tsdb/postings.go index 2c6f89022..180ac099d 100644 --- a/vendor/github.com/prometheus/tsdb/postings.go +++ b/vendor/github.com/prometheus/tsdb/postings.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -139,30 +152,30 @@ func Merge(its ...Postings) Postings { a := its[0] for _, b := range its[1:] { - a = newMergePostings(a, b) + a = newMergedPostings(a, b) } return a } -type mergePostings struct { +type mergedPostings struct { a, b Postings aok, bok bool cur uint32 } -func newMergePostings(a, b Postings) *mergePostings { - it := &mergePostings{a: a, b: b} +func newMergedPostings(a, b Postings) *mergedPostings { + it := &mergedPostings{a: a, b: b} it.aok = it.a.Next() it.bok = it.b.Next() return it } -func (it *mergePostings) At() uint32 { +func (it *mergedPostings) At() uint32 { return it.cur } -func (it *mergePostings) Next() bool { +func (it *mergedPostings) Next() bool { if !it.aok && !it.bok { return false } @@ -197,13 +210,14 @@ func (it *mergePostings) Next() bool { return true } -func (it *mergePostings) Seek(id uint32) bool { +func (it *mergedPostings) Seek(id uint32) bool { it.aok = it.a.Seek(id) it.bok = it.b.Seek(id) + return it.Next() } -func (it *mergePostings) Err() error { +func (it *mergedPostings) Err() error { if it.a.Err() != nil { return it.a.Err() } diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index bb282d4c1..97970e830 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -153,6 +166,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet { mint: q.mint, maxt: q.maxt, }, + + mint: q.mint, + maxt: q.maxt, } } @@ -397,11 +413,15 @@ func (s *populatedChunkSeries) Next() bool { for s.set.Next() { lset, chks := s.set.At() - for i, c := range chks { - if c.MaxTime < s.mint { - chks = chks[1:] - continue + for len(chks) > 0 { + if chks[0].MaxTime >= s.mint { + break } + chks = chks[1:] + } + + // Break out at the first chunk that has no overlap with mint, maxt. + for i, c := range chks { if c.MinTime > s.maxt { chks = chks[:i] break @@ -411,6 +431,7 @@ func (s *populatedChunkSeries) Next() bool { return false } } + if len(chks) == 0 { continue } @@ -431,12 +452,14 @@ type blockSeriesSet struct { set chunkSeriesSet err error cur Series + + mint, maxt int64 } func (s *blockSeriesSet) Next() bool { for s.set.Next() { lset, chunks := s.set.At() - s.cur = &chunkSeries{labels: lset, chunks: chunks} + s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt} return true } if s.set.Err() != nil { @@ -453,6 +476,8 @@ func (s *blockSeriesSet) Err() error { return s.err } type chunkSeries struct { labels labels.Labels chunks []*ChunkMeta // in-order chunk refs + + mint, maxt int64 } func (s *chunkSeries) Labels() labels.Labels { @@ -460,14 +485,14 @@ func (s *chunkSeries) Labels() labels.Labels { } func (s *chunkSeries) Iterator() SeriesIterator { - return newChunkSeriesIterator(s.chunks) + return newChunkSeriesIterator(s.chunks, s.mint, s.maxt) } // SeriesIterator iterates over the data of a time series. type SeriesIterator interface { // Seek advances the iterator forward to the given timestamp. - // If there's no value exactly at ts, it advances to the last value - // before tt. + // If there's no value exactly at t, it advances to the first value + // after t. Seek(t int64) bool // At returns the current timestamp/value pair. At() (t int64, v float64) @@ -488,7 +513,7 @@ func (s *chainedSeries) Labels() labels.Labels { } func (s *chainedSeries) Iterator() SeriesIterator { - return &chainedSeriesIterator{series: s.series} + return newChainedSeriesIterator(s.series...) } // chainedSeriesIterator implements a series iterater over a list @@ -500,6 +525,14 @@ type chainedSeriesIterator struct { cur SeriesIterator } +func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator { + return &chainedSeriesIterator{ + series: s, + i: 0, + cur: s[0].Iterator(), + } +} + func (it *chainedSeriesIterator) Seek(t int64) bool { // We just scan the chained series sequentially as they are already // pre-selected by relevant time and should be accessed sequentially anyway. @@ -516,9 +549,6 @@ func (it *chainedSeriesIterator) Seek(t int64) bool { } func (it *chainedSeriesIterator) Next() bool { - if it.cur == nil { - it.cur = it.series[it.i].Iterator() - } if it.cur.Next() { return true } @@ -550,17 +580,35 @@ type chunkSeriesIterator struct { i int cur chunks.Iterator + + maxt, mint int64 } -func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator { +func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator { return &chunkSeriesIterator{ chunks: cs, i: 0, cur: cs[0].Chunk.Iterator(), + + mint: mint, + maxt: maxt, } } +func (it *chunkSeriesIterator) inBounds(t int64) bool { + return t >= it.mint && t <= it.maxt +} + func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { + if t > it.maxt { + return false + } + + // Seek to the first valid value after t. + if t < it.mint { + t = it.mint + } + // Only do binary search forward to stay in line with other iterators // that can only move forward. x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) @@ -569,10 +617,10 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { // If the timestamp was not found, it might be in the last chunk. if x == len(it.chunks) { x-- - } - // Go to previous chunk if the chunk doesn't exactly start with t. - // If we are already at the first chunk, we use it as it's the best we have. - if x > 0 && it.chunks[x].MinTime > t { + + // Go to previous chunk if the chunk doesn't exactly start with t. + // If we are already at the first chunk, we use it as it's the best we have. + } else if x > 0 && it.chunks[x].MinTime > t { x-- } @@ -593,9 +641,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) { } func (it *chunkSeriesIterator) Next() bool { - if it.cur.Next() { - return true + for it.cur.Next() { + t, _ := it.cur.At() + if it.inBounds(t) { + return true + } } + if err := it.cur.Err(); err != nil { return false } diff --git a/vendor/github.com/prometheus/tsdb/wal.go b/vendor/github.com/prometheus/tsdb/wal.go index d5fdd9c55..853065f69 100644 --- a/vendor/github.com/prometheus/tsdb/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal.go @@ -1,3 +1,16 @@ +// Copyright 2017 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package tsdb import ( @@ -134,7 +147,7 @@ func (w *WAL) initSegments() error { if len(fns) == 0 { return nil } - // We must open all file in read mode as we may have to truncate along + // We must open all files in read/write mode as we may have to truncate along // the way and any file may become the tail. for _, fn := range fns { f, err := os.OpenFile(fn, os.O_RDWR, 0666) @@ -165,10 +178,10 @@ func (w *WAL) initSegments() error { return nil } -// cut finishes the currently active segments and open the next one. +// cut finishes the currently active segments and opens the next one. // The encoder is reset to point to the new segment. func (w *WAL) cut() error { - // Sync current tail to disc and close. + // Sync current tail to disk and close. if tf := w.tail(); tf != nil { if err := w.sync(); err != nil { return err @@ -263,7 +276,7 @@ func (w *WAL) run(interval time.Duration) { } } -// Close sync all data and closes the underlying resources. +// Close syncs all data and closes the underlying resources. func (w *WAL) Close() error { close(w.stopc) <-w.donec @@ -296,9 +309,10 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error { w.mtx.Lock() defer w.mtx.Unlock() - // Cut to the next segment if exceeds the file size unless it would also + // Cut to the next segment if the entry exceeds the file size unless it would also // exceed the size of a new segment. var ( + // 6-byte header + 4-byte CRC32 + buf. sz = int64(6 + 4 + len(buf)) newsz = w.curN + sz ) diff --git a/vendor/vendor.json b/vendor/vendor.json index 4ef91e4a5..e537b4f82 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -645,22 +645,22 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "R7aNHvNnDiTb6t7z8FehPAo3PdM=", + "checksumSHA1": "0wu/AzUWMurN/T5VBKCrvhf7c7E=", "path": "github.com/prometheus/tsdb", - "revision": "721df536eb1a6e6f91785f138ca24917222f8461", - "revisionTime": "2017-04-09T08:18:19Z" + "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", + "revisionTime": "2017-05-09T10:52:47Z" }, { - "checksumSHA1": "Qwlzvcx5Lbo9Nzb75AGgiiGszZI=", + "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "10c7c9acbe0175a411bce90cd7a0d9d7a13d6a83", - "revisionTime": "2017-04-04T09:27:26Z" + "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", + "revisionTime": "2017-05-09T10:52:47Z" }, { - "checksumSHA1": "ed5dnejBTbr0FKdzKRAC91bHRgE=", + "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=", "path": "github.com/prometheus/tsdb/labels", - "revision": "770d00800212502dfef71a6a7df23e3ced4459d9", - "revisionTime": "2017-04-05T12:14:30Z" + "revision": "44769c1654f699931b2d3a2928326ac2d02d9149", + "revisionTime": "2017-05-09T10:52:47Z" }, { "checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",