vendor: update tsdb

This commit is contained in:
Fabian Reinartz 2017-05-09 12:53:33 +02:00
parent 8c483e27d3
commit dae4a801cb
20 changed files with 1171 additions and 441 deletions

201
vendor/github.com/prometheus/tsdb/LICENSE generated vendored Normal file
View file

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

5
vendor/github.com/prometheus/tsdb/README.md generated vendored Normal file
View file

@ -0,0 +1,5 @@
# TSDB
This repository contains the new Prometheus storage layer that will be used in its 2.0 release.
A writeup of its design can be found [here](https://fabxc.org/blog/2017-04-10-writing-a-tsdb/).

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -6,7 +19,6 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"sort"
"github.com/oklog/ulid" "github.com/oklog/ulid"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -23,7 +35,7 @@ type DiskBlock interface {
// Index returns an IndexReader over the block's data. // Index returns an IndexReader over the block's data.
Index() IndexReader Index() IndexReader
// Series returns a SeriesReader over the block's data. // Chunks returns a ChunkReader over the block's data.
Chunks() ChunkReader Chunks() ChunkReader
// Close releases all underlying resources of the block. // Close releases all underlying resources of the block.
@ -125,8 +137,10 @@ func writeMetaFile(dir string, meta *BlockMeta) error {
enc := json.NewEncoder(f) enc := json.NewEncoder(f)
enc.SetIndent("", "\t") enc.SetIndent("", "\t")
if err := enc.Encode(&blockMeta{Version: 1, BlockMeta: meta}); err != nil { var merr MultiError
return err if merr.Add(enc.Encode(&blockMeta{Version: 1, BlockMeta: meta})); merr.Err() != nil {
merr.Add(f.Close())
return merr
} }
if err := f.Close(); err != nil { if err := f.Close(); err != nil {
return err return err
@ -228,30 +242,3 @@ func (f *mmapFile) Close() error {
} }
return err1 return err1
} }
// A skiplist maps offsets to values. The values found in the data at an
// offset are strictly greater than the indexed value.
type skiplist interface {
// offset returns the offset to data containing values of x and lower.
offset(x int64) (uint32, bool)
}
// simpleSkiplist is a slice of plain value/offset pairs.
type simpleSkiplist []skiplistPair
type skiplistPair struct {
value int64
offset uint32
}
func (sl simpleSkiplist) offset(x int64) (uint32, bool) {
// Search for the first offset that contains data greater than x.
i := sort.Search(len(sl), func(i int) bool { return sl[i].value >= x })
// If no element was found return false. If the first element is found,
// there's no previous offset actually containing values that are x or lower.
if i == len(sl) || i == 0 {
return 0, false
}
return sl[i-1].offset, true
}

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -15,7 +28,7 @@ import (
) )
const ( const (
// MagicChunks is 4 bytes at the head of series file. // MagicChunks is 4 bytes at the head of a series file.
MagicChunks = 0x85BD40DD MagicChunks = 0x85BD40DD
) )
@ -30,12 +43,23 @@ type ChunkMeta struct {
MinTime, MaxTime int64 // time range the data covers MinTime, MaxTime int64 // time range the data covers
} }
// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *ChunkMeta) writeHash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
return err
}
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
return err
}
return nil
}
// ChunkWriter serializes a time block of chunked series data. // ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface { type ChunkWriter interface {
// WriteChunks writes several chunks. The data field of the ChunkMetas // WriteChunks writes several chunks. The Chunk field of the ChunkMetas
// must be populated. // must be populated.
// After returning successfully, the Ref fields in the ChunkMetas // After returning successfully, the Ref fields in the ChunkMetas
// is set and can be used to retrieve the chunks from the written data. // are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...*ChunkMeta) error WriteChunks(chunks ...*ChunkMeta) error
// Close writes any required finalization and closes the resources // Close writes any required finalization and closes the resources
@ -112,7 +136,9 @@ func (w *chunkWriter) finalizeTail() error {
func (w *chunkWriter) cut() error { func (w *chunkWriter) cut() error {
// Sync current tail to disk and close. // Sync current tail to disk and close.
w.finalizeTail() if err := w.finalizeTail(); err != nil {
return err
}
p, _, err := nextSequenceFile(w.dirFile.Name(), "") p, _, err := nextSequenceFile(w.dirFile.Name(), "")
if err != nil { if err != nil {
@ -150,8 +176,8 @@ func (w *chunkWriter) cut() error {
return nil return nil
} }
func (w *chunkWriter) write(wr io.Writer, b []byte) error { func (w *chunkWriter) write(b []byte) error {
n, err := wr.Write(b) n, err := w.wbuf.Write(b)
w.n += int64(n) w.n += int64(n)
return err return err
} }
@ -159,9 +185,9 @@ func (w *chunkWriter) write(wr io.Writer, b []byte) error {
func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error { func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
// Calculate maximum space we need and cut a new segment in case // Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one. // we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
for _, c := range chks { for _, c := range chks {
maxLen += binary.MaxVarintLen32 + 1 maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
maxLen += int64(len(c.Chunk.Bytes())) maxLen += int64(len(c.Chunk.Bytes()))
} }
newsz := w.n + maxLen newsz := w.n + maxLen
@ -172,14 +198,10 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
} }
} }
// Write chunks sequentially and set the reference field in the ChunkMeta.
w.crc32.Reset()
wr := io.MultiWriter(w.crc32, w.wbuf)
b := make([]byte, binary.MaxVarintLen32) b := make([]byte, binary.MaxVarintLen32)
n := binary.PutUvarint(b, uint64(len(chks))) n := binary.PutUvarint(b, uint64(len(chks)))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(b[:n]); err != nil {
return err return err
} }
seq := uint64(w.seq()) << 32 seq := uint64(w.seq()) << 32
@ -189,21 +211,25 @@ func (w *chunkWriter) WriteChunks(chks ...*ChunkMeta) error {
n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes()))) n = binary.PutUvarint(b, uint64(len(chk.Chunk.Bytes())))
if err := w.write(wr, b[:n]); err != nil { if err := w.write(b[:n]); err != nil {
return err return err
} }
if err := w.write(wr, []byte{byte(chk.Chunk.Encoding())}); err != nil { if err := w.write([]byte{byte(chk.Chunk.Encoding())}); err != nil {
return err return err
} }
if err := w.write(wr, chk.Chunk.Bytes()); err != nil { if err := w.write(chk.Chunk.Bytes()); err != nil {
return err
}
w.crc32.Reset()
if err := chk.writeHash(w.crc32); err != nil {
return err
}
if err := w.write(w.crc32.Sum(nil)); err != nil {
return err return err
} }
chk.Chunk = nil
} }
if err := w.write(w.wbuf, w.crc32.Sum(nil)); err != nil {
return err
}
return nil return nil
} }

View file

@ -1,8 +1,49 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It received minor modifications to suit Prometheus's needs.
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
// All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunks package chunks
import "io" import "io"
// bstream is a stream of bits // bstream is a stream of bits.
type bstream struct { type bstream struct {
stream []byte // the data stream stream []byte // the data stream
count uint8 // how many bits are valid in current byte count uint8 // how many bits are valid in current byte

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks package chunks
import ( import (
@ -5,7 +18,7 @@ import (
"fmt" "fmt"
) )
// Encoding is the identifier for a chunk encoding // Encoding is the identifier for a chunk encoding.
type Encoding uint8 type Encoding uint8
func (e Encoding) String() string { func (e Encoding) String() string {

View file

@ -1,3 +1,46 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// The code in this file was largely written by Damian Gryski as part of
// https://github.com/dgryski/go-tsz and published under the license below.
// It was modified to accomodate reading from byte slices without modifying
// the underlying bytes, which would panic when reading from mmaped
// read-only byte slices.
// Copyright (c) 2015,2016 Damian Gryski <damian@gryski.com>
// All rights reserved.
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
//
// * Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package chunks package chunks
import ( import (

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package tsdb implements a time series storage for float64 sample data. // Package tsdb implements a time series storage for float64 sample data.
package tsdb package tsdb
@ -33,6 +46,7 @@ var DefaultOptions = &Options{
MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds MinBlockDuration: 3 * 60 * 60 * 1000, // 2 hours in milliseconds
MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds MaxBlockDuration: 24 * 60 * 60 * 1000, // 1 days in milliseconds
AppendableBlocks: 2, AppendableBlocks: 2,
NoLockfile: false,
} }
// Options of the DB storage. // Options of the DB storage.
@ -56,10 +70,15 @@ type Options struct {
// After a new block is started for timestamp t0 or higher, appends with // After a new block is started for timestamp t0 or higher, appends with
// timestamps as early as t0 - (n-1) * MinBlockDuration are valid. // timestamps as early as t0 - (n-1) * MinBlockDuration are valid.
AppendableBlocks int AppendableBlocks int
// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool
} }
// Appender allows appending a batch of data. It must be completed with a // Appender allows appending a batch of data. It must be completed with a
// call to Commit or Rollback and must not be reused afterwards. // call to Commit or Rollback and must not be reused afterwards.
//
// Operations on the Appender interface are not goroutine-safe.
type Appender interface { type Appender interface {
// Add adds a sample pair for the given series. A reference number is // Add adds a sample pair for the given series. A reference number is
// returned which can be used to add further samples in the same or later // returned which can be used to add further samples in the same or later
@ -80,13 +99,11 @@ type Appender interface {
Rollback() error Rollback() error
} }
const sep = '\xff'
// DB handles reads and writes of time series falling into // DB handles reads and writes of time series falling into
// a hashed partition of a seriedb. // a hashed partition of a seriedb.
type DB struct { type DB struct {
dir string dir string
lockf lockfile.Lockfile lockf *lockfile.Lockfile
logger log.Logger logger log.Logger
metrics *dbMetrics metrics *dbMetrics
@ -146,13 +163,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
if err != nil { if err != nil {
return nil, err return nil, err
} }
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
if err != nil {
return nil, err
}
if err := lockf.TryLock(); err != nil {
return nil, errors.Wrapf(err, "open DB in %s", dir)
}
if l == nil { if l == nil {
l = log.NewLogfmtLogger(os.Stdout) l = log.NewLogfmtLogger(os.Stdout)
@ -168,7 +178,6 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
db = &DB{ db = &DB{
dir: dir, dir: dir,
lockf: lockf,
logger: l, logger: l,
metrics: newDBMetrics(r), metrics: newDBMetrics(r),
opts: opts, opts: opts,
@ -176,6 +185,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
donec: make(chan struct{}), donec: make(chan struct{}),
stopc: make(chan struct{}), stopc: make(chan struct{}),
} }
if !opts.NoLockfile {
lockf, err := lockfile.New(filepath.Join(absdir, "lock"))
if err != nil {
return nil, err
}
if err := lockf.TryLock(); err != nil {
return nil, errors.Wrapf(err, "open DB in %s", dir)
}
db.lockf = &lockf
}
db.compactor = newCompactor(r, l, &compactorOptions{ db.compactor = newCompactor(r, l, &compactorOptions{
maxBlockRange: opts.MaxBlockDuration, maxBlockRange: opts.MaxBlockDuration,
}) })
@ -452,7 +472,9 @@ func (db *DB) Close() error {
var merr MultiError var merr MultiError
merr.Add(g.Wait()) merr.Add(g.Wait())
merr.Add(db.lockf.Unlock()) if db.lockf != nil {
merr.Add(db.lockf.Unlock())
}
return merr.Err() return merr.Err()
} }
@ -505,8 +527,8 @@ func (a *dbAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error)
return 0, err return 0, err
} }
a.samples++ a.samples++
// Store last byte of sequence number in 3rd byte of refernece. // Store last byte of sequence number in 3rd byte of reference.
return ref | (uint64(h.meta.Sequence^0xff) << 40), nil return ref | (uint64(h.meta.Sequence&0xff) << 40), nil
} }
func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error { func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
@ -519,7 +541,7 @@ func (a *dbAppender) AddFast(ref uint64, t int64, v float64) error {
return err return err
} }
// If the last byte of the sequence does not add up, the reference is not valid. // If the last byte of the sequence does not add up, the reference is not valid.
if uint64(h.meta.Sequence^0xff) != gen { if uint64(h.meta.Sequence&0xff) != gen {
return ErrNotFound return ErrNotFound
} }
if err := h.app.AddFast(ref, t, v); err != nil { if err := h.app.AddFast(ref, t, v); err != nil {
@ -641,7 +663,7 @@ func (a *dbAppender) Rollback() error {
var g errgroup.Group var g errgroup.Group
for _, h := range a.heads { for _, h := range a.heads {
g.Go(h.app.Commit) g.Go(h.app.Rollback)
} }
return g.Wait() return g.Wait()

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !windows,!plan9,!solaris // +build !windows,!plan9,!solaris
package tsdb package tsdb

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (

157
vendor/github.com/prometheus/tsdb/encoding_helpers.go generated vendored Normal file
View file

@ -0,0 +1,157 @@
package tsdb
import (
"encoding/binary"
"hash"
"unsafe"
)
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...)
}
func (e *encbuf) putBE64(x uint64) {
binary.BigEndian.PutUint64(e.c[:], x)
e.b = append(e.b, e.c[:8]...)
}
func (e *encbuf) putUvarint64(x uint64) {
n := binary.PutUvarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
func (e *encbuf) putVarint64(x int64) {
n := binary.PutVarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
func (e *encbuf) putUvarintStr(s string) {
b := *(*[]byte)(unsafe.Pointer(&s))
e.putUvarint(len(b))
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using
// any datum, the err() method must be checked.
type decbuf struct {
b []byte
e error
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = errInvalidSize
return ""
}
s := yoloString(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) varint64() int64 {
if d.e != nil {
return 0
}
x, n := binary.Varint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) uvarint64() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = errInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) be64() uint64 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint64(d.b)
d.b = d.b[8:]
return x
}
func (d *decbuf) be32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = errInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: errInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -283,6 +296,10 @@ type refdSample struct {
} }
func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) {
if !a.inBounds(t) {
return 0, ErrOutOfBounds
}
hash := lset.Hash() hash := lset.Hash()
if ms := a.get(hash, lset); ms != nil { if ms := a.get(hash, lset); ms != nil {
@ -344,7 +361,10 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error {
if t < c.maxTime { if t < c.maxTime {
return ErrOutOfOrderSample return ErrOutOfOrderSample
} }
if c.maxTime == t && ms.lastValue != v {
// We are allowing exact duplicates as we can encounter them in valid cases
// like federation and erroring out at that time would be extremely noisy.
if c.maxTime == t && math.Float64bits(ms.lastValue) != math.Float64bits(v) {
return ErrAmendSample return ErrAmendSample
} }
} }
@ -410,23 +430,12 @@ func (a *headAppender) Commit() error {
return err return err
} }
var ( total := uint64(len(a.samples))
total = uint64(len(a.samples))
mint = int64(math.MaxInt64)
maxt = int64(math.MinInt64)
)
for _, s := range a.samples { for _, s := range a.samples {
if !a.series[s.ref].append(s.t, s.v) { if !a.series[s.ref].append(s.t, s.v) {
total-- total--
} }
if s.t < mint {
mint = s.t
}
if s.t > maxt {
maxt = s.t
}
} }
a.mtx.RUnlock() a.mtx.RUnlock()
@ -632,8 +641,8 @@ func (s *memSeries) append(t int64, v float64) bool {
c.minTime = t c.minTime = t
} else { } else {
c = s.head() c = s.head()
// Skip duplicate samples. // Skip duplicate and out of order samples.
if c.maxTime == t && s.lastValue != v { if c.maxTime >= t {
return false return false
} }
} }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -12,6 +25,8 @@ import (
"sort" "sort"
"strings" "strings"
"math"
"github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/fileutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/tsdb/labels" "github.com/prometheus/tsdb/labels"
@ -26,10 +41,48 @@ const (
const compactionPageBytes = minSectorSize * 64 const compactionPageBytes = minSectorSize * 64
// IndexWriter serialized the index for a block of series data. type indexWriterSeries struct {
// The methods must generally be called in order they are specified. labels labels.Labels
chunks []*ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
}
type indexWriterSeriesSlice []*indexWriterSeries
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
}
type indexWriterStage uint8
const (
idxStagePopulate indexWriterStage = iota
idxStageLabelIndex
idxStagePostings
idxStageDone
)
func (s indexWriterStage) String() string {
switch s {
case idxStagePopulate:
return "populate"
case idxStageLabelIndex:
return "label index"
case idxStagePostings:
return "postings"
case idxStageDone:
return "done"
}
return "<unknown>"
}
// IndexWriter serializes the index for a block of series data.
// The methods must generally be called in the order they are specified in.
type IndexWriter interface { type IndexWriter interface {
// AddSeries populates the index writer witha series and its offsets // AddSeries populates the index writer with a series and its offsets
// of chunks that the index can reference. // of chunks that the index can reference.
// The reference number is used to resolve a series against the postings // The reference number is used to resolve a series against the postings
// list iterator. It only has to be available during the write processing. // list iterator. It only has to be available during the write processing.
@ -48,22 +101,19 @@ type IndexWriter interface {
Close() error Close() error
} }
type indexWriterSeries struct {
labels labels.Labels
chunks []*ChunkMeta // series file offset of chunks
offset uint32 // index file offset of series reference
}
// indexWriter implements the IndexWriter interface for the standard // indexWriter implements the IndexWriter interface for the standard
// serialization format. // serialization format.
type indexWriter struct { type indexWriter struct {
f *os.File f *os.File
bufw *bufio.Writer fbuf *bufio.Writer
n int64 pos uint64
started bool
toc indexTOC
stage indexWriterStage
// Reusable memory. // Reusable memory.
b []byte buf1 encbuf
buf2 encbuf
uint32s []uint32 uint32s []uint32
series map[uint32]*indexWriterSeries series map[uint32]*indexWriterSeries
@ -74,6 +124,15 @@ type indexWriter struct {
crc32 hash.Hash crc32 hash.Hash
} }
type indexTOC struct {
symbols uint64
series uint64
labelIndices uint64
labelIndicesTable uint64
postings uint64
postingsTable uint64
}
func newIndexWriter(dir string) (*indexWriter, error) { func newIndexWriter(dir string) (*indexWriter, error) {
df, err := fileutil.OpenDir(dir) df, err := fileutil.OpenDir(dir)
if err != nil { if err != nil {
@ -88,12 +147,14 @@ func newIndexWriter(dir string) (*indexWriter, error) {
} }
iw := &indexWriter{ iw := &indexWriter{
f: f, f: f,
bufw: bufio.NewWriterSize(f, 1<<22), fbuf: bufio.NewWriterSize(f, 1<<22),
n: 0, pos: 0,
stage: idxStagePopulate,
// Reusable memory. // Reusable memory.
b: make([]byte, 0, 1<<23), buf1: encbuf{b: make([]byte, 0, 1<<22)},
buf2: encbuf{b: make([]byte, 0, 1<<22)},
uint32s: make([]uint32, 0, 1<<15), uint32s: make([]uint32, 0, 1<<15),
// Caches. // Caches.
@ -107,40 +168,87 @@ func newIndexWriter(dir string) (*indexWriter, error) {
return iw, nil return iw, nil
} }
func (w *indexWriter) write(wr io.Writer, b []byte) error { func (w *indexWriter) write(bufs ...[]byte) error {
n, err := wr.Write(b) for _, b := range bufs {
w.n += int64(n) n, err := w.fbuf.Write(b)
return err w.pos += uint64(n)
} if err != nil {
return err
// section writes a CRC32 checksummed section of length l and guarded by flag. }
func (w *indexWriter) section(l int, flag byte, f func(w io.Writer) error) error { // For now the index file must not grow beyond 4GiB. Some of the fixed-sized
w.crc32.Reset() // offset references in v1 are only 4 bytes large.
wr := io.MultiWriter(w.crc32, w.bufw) // Once we move to compressed/varint representations in those areas, this limitation
// can be lifted.
b := [5]byte{flag, 0, 0, 0, 0} if w.pos > math.MaxUint32 {
binary.BigEndian.PutUint32(b[1:], uint32(l)) return errors.Errorf("exceeding max size of 4GiB")
}
if err := w.write(wr, b[:]); err != nil {
return errors.Wrap(err, "writing header")
}
if err := f(wr); err != nil {
return errors.Wrap(err, "write contents")
}
if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil {
return errors.Wrap(err, "writing checksum")
} }
return nil return nil
} }
// addPadding adds zero byte padding until the file size is a multiple of n.
func (w *indexWriter) addPadding(n int) error {
p := n - (int(w.pos) % n)
if p == 0 {
return nil
}
return errors.Wrap(w.write(make([]byte, p)), "add padding")
}
// ensureStage handles transitions between write stages and ensures that IndexWriter
// methods are called in an order valid for the implementation.
func (w *indexWriter) ensureStage(s indexWriterStage) error {
if w.stage == s {
return nil
}
if w.stage > s {
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
}
// Complete population stage by writing symbols and series.
if w.stage == idxStagePopulate {
w.toc.symbols = w.pos
if err := w.writeSymbols(); err != nil {
return err
}
w.toc.series = w.pos
if err := w.writeSeries(); err != nil {
return err
}
}
// Mark start of sections in table of contents.
switch s {
case idxStageLabelIndex:
w.toc.labelIndices = w.pos
case idxStagePostings:
w.toc.labelIndicesTable = w.pos
if err := w.writeOffsetTable(w.labelIndexes); err != nil {
return err
}
w.toc.postings = w.pos
case idxStageDone:
w.toc.postingsTable = w.pos
if err := w.writeOffsetTable(w.postings); err != nil {
return err
}
if err := w.writeTOC(); err != nil {
return err
}
}
w.stage = s
return nil
}
func (w *indexWriter) writeMeta() error { func (w *indexWriter) writeMeta() error {
b := [8]byte{} w.buf1.reset()
w.buf1.putBE32(MagicIndex)
w.buf1.putByte(indexFormatV1)
binary.BigEndian.PutUint32(b[:4], MagicIndex) return w.write(w.buf1.get())
b[4] = flagStd
return w.write(w.bufw, b[:])
} }
func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error { func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...*ChunkMeta) error {
@ -168,33 +276,27 @@ func (w *indexWriter) writeSymbols() error {
} }
sort.Strings(symbols) sort.Strings(symbols)
// The start of the section plus a 5 byte section header are our base. const headerSize = 4
// TODO(fabxc): switch to relative offsets and hold sections in a TOC.
base := uint32(w.n) + 5
buf := [binary.MaxVarintLen32]byte{} w.buf1.reset()
w.b = append(w.b[:0], flagStd) w.buf2.reset()
w.buf2.putBE32int(len(symbols))
for _, s := range symbols { for _, s := range symbols {
w.symbols[s] = base + uint32(len(w.b)) w.symbols[s] = uint32(w.pos) + headerSize + uint32(w.buf2.len())
n := binary.PutUvarint(buf[:], uint64(len(s))) // NOTE: len(s) gives the number of runes, not the number of bytes.
w.b = append(w.b, buf[:n]...) // Therefore the read-back length for strings with unicode characters will
w.b = append(w.b, s...) // be off when not using putCstr.
w.buf2.putUvarintStr(s)
} }
return w.section(len(w.b), flagStd, func(wr io.Writer) error { w.buf1.putBE32int(w.buf2.len())
return w.write(wr, w.b) w.buf2.putHash(w.crc32)
})
}
type indexWriterSeriesSlice []*indexWriterSeries err := w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write symbols")
func (s indexWriterSeriesSlice) Len() int { return len(s) }
func (s indexWriterSeriesSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s indexWriterSeriesSlice) Less(i, j int) bool {
return labels.Compare(s[i].labels, s[j].labels) < 0
} }
func (w *indexWriter) writeSeries() error { func (w *indexWriter) writeSeries() error {
@ -206,64 +308,52 @@ func (w *indexWriter) writeSeries() error {
} }
sort.Sort(series) sort.Sort(series)
// Current end of file plus 5 bytes for section header. // Header holds number of series.
// TODO(fabxc): switch to relative offsets. w.buf1.reset()
base := uint32(w.n) + 5 w.buf1.putBE32int(len(series))
w.b = w.b[:0] if err := w.write(w.buf1.get()); err != nil {
buf := make([]byte, binary.MaxVarintLen64) return errors.Wrap(err, "write series count")
}
for _, s := range series { for _, s := range series {
// Write label set symbol references. s.offset = uint32(w.pos)
s.offset = base + uint32(len(w.b))
n := binary.PutUvarint(buf, uint64(len(s.labels))) w.buf2.reset()
w.b = append(w.b, buf[:n]...) w.buf2.putUvarint(len(s.labels))
for _, l := range s.labels { for _, l := range s.labels {
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name])) w.buf2.putUvarint32(w.symbols[l.Name])
w.b = append(w.b, buf[:n]...) w.buf2.putUvarint32(w.symbols[l.Value])
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
w.b = append(w.b, buf[:n]...)
} }
// Write chunks meta data including reference into chunk file. w.buf2.putUvarint(len(s.chunks))
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
w.b = append(w.b, buf[:n]...)
for _, c := range s.chunks { for _, c := range s.chunks {
n = binary.PutVarint(buf, c.MinTime) w.buf2.putVarint64(c.MinTime)
w.b = append(w.b, buf[:n]...) w.buf2.putVarint64(c.MaxTime)
n = binary.PutVarint(buf, c.MaxTime) w.buf2.putUvarint64(c.Ref)
w.b = append(w.b, buf[:n]...) }
n = binary.PutUvarint(buf, uint64(c.Ref)) w.buf1.reset()
w.b = append(w.b, buf[:n]...) w.buf1.putUvarint(w.buf2.len())
w.buf2.putHash(w.crc32)
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
return errors.Wrap(err, "write series data")
} }
} }
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
}
func (w *indexWriter) init() error {
if err := w.writeSymbols(); err != nil {
return err
}
if err := w.writeSeries(); err != nil {
return err
}
w.started = true
return nil return nil
} }
func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
if !w.started { if len(values)%len(names) != 0 {
if err := w.init(); err != nil { return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
return err }
} if err := w.ensureStage(idxStageLabelIndex); err != nil {
return errors.Wrap(err, "ensure stage")
} }
valt, err := newStringTuples(values, len(names)) valt, err := newStringTuples(values, len(names))
@ -272,45 +362,84 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
} }
sort.Sort(valt) sort.Sort(valt)
// Align beginning to 4 bytes for more efficient index list scans.
if err := w.addPadding(4); err != nil {
return err
}
w.labelIndexes = append(w.labelIndexes, hashEntry{ w.labelIndexes = append(w.labelIndexes, hashEntry{
name: strings.Join(names, string(sep)), keys: names,
offset: uint32(w.n), offset: w.pos,
}) })
buf := make([]byte, binary.MaxVarintLen32) w.buf2.reset()
n := binary.PutUvarint(buf, uint64(len(names))) w.buf2.putBE32int(len(names))
w.buf2.putBE32int(valt.Len())
l := n + len(values)*4 for _, v := range valt.s {
w.buf2.putBE32(w.symbols[v])
}
return w.section(l, flagStd, func(wr io.Writer) error { w.buf1.reset()
// First byte indicates tuple size for index. w.buf1.putBE32int(w.buf2.len())
if err := w.write(wr, buf[:n]); err != nil {
return err w.buf2.putHash(w.crc32)
err = w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write label index")
}
// writeOffsetTable writes a sequence of readable hash entries.
func (w *indexWriter) writeOffsetTable(entries []hashEntry) error {
w.buf1.reset()
w.buf1.putBE32int(len(entries))
w.buf2.reset()
for _, e := range entries {
w.buf2.putUvarint(len(e.keys))
for _, k := range e.keys {
w.buf2.putUvarintStr(k)
} }
w.buf2.putUvarint64(e.offset)
}
for _, v := range valt.s { w.buf1.putBE32int(w.buf2.len())
binary.BigEndian.PutUint32(buf, w.symbols[v]) w.buf2.putHash(w.crc32)
if err := w.write(wr, buf[:4]); err != nil { return w.write(w.buf1.get(), w.buf2.get())
return err }
}
} const indexTOCLen = 6*8 + 4
return nil
}) func (w *indexWriter) writeTOC() error {
w.buf1.reset()
w.buf1.putBE64(w.toc.symbols)
w.buf1.putBE64(w.toc.series)
w.buf1.putBE64(w.toc.labelIndices)
w.buf1.putBE64(w.toc.labelIndicesTable)
w.buf1.putBE64(w.toc.postings)
w.buf1.putBE64(w.toc.postingsTable)
w.buf1.putHash(w.crc32)
return w.write(w.buf1.get())
} }
func (w *indexWriter) WritePostings(name, value string, it Postings) error { func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if !w.started { if err := w.ensureStage(idxStagePostings); err != nil {
if err := w.init(); err != nil { return errors.Wrap(err, "ensure stage")
return err
}
} }
key := name + string(sep) + value // Align beginning to 4 bytes for more efficient postings list scans.
if err := w.addPadding(4); err != nil {
return err
}
w.postings = append(w.postings, hashEntry{ w.postings = append(w.postings, hashEntry{
name: key, keys: []string{name, value},
offset: uint32(w.n), offset: w.pos,
}) })
// Order of the references in the postings list does not imply order // Order of the references in the postings list does not imply order
@ -328,22 +457,22 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error {
if err := it.Err(); err != nil { if err := it.Err(); err != nil {
return err return err
} }
sort.Sort(uint32slice(refs)) sort.Sort(uint32slice(refs))
w.b = w.b[:0] w.buf2.reset()
buf := make([]byte, 4) w.buf2.putBE32int(len(refs))
for _, r := range refs { for _, r := range refs {
binary.BigEndian.PutUint32(buf, r) w.buf2.putBE32(r)
w.b = append(w.b, buf...)
} }
w.uint32s = refs[:0] w.buf1.reset()
w.buf1.putBE32int(w.buf2.len())
return w.section(len(w.b), flagStd, func(wr io.Writer) error { w.buf2.putHash(w.crc32)
return w.write(wr, w.b)
}) err := w.write(w.buf1.get(), w.buf2.get())
return errors.Wrap(err, "write postings")
} }
type uint32slice []uint32 type uint32slice []uint32
@ -353,56 +482,15 @@ func (s uint32slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] } func (s uint32slice) Less(i, j int) bool { return s[i] < s[j] }
type hashEntry struct { type hashEntry struct {
name string keys []string
offset uint32 offset uint64
}
func (w *indexWriter) writeHashmap(h []hashEntry) error {
w.b = w.b[:0]
buf := [binary.MaxVarintLen32]byte{}
for _, e := range h {
n := binary.PutUvarint(buf[:], uint64(len(e.name)))
w.b = append(w.b, buf[:n]...)
w.b = append(w.b, e.name...)
n = binary.PutUvarint(buf[:], uint64(e.offset))
w.b = append(w.b, buf[:n]...)
}
return w.section(len(w.b), flagStd, func(wr io.Writer) error {
return w.write(wr, w.b)
})
}
func (w *indexWriter) finalize() error {
// Write out hash maps to jump to correct label index and postings sections.
lo := uint32(w.n)
if err := w.writeHashmap(w.labelIndexes); err != nil {
return err
}
po := uint32(w.n)
if err := w.writeHashmap(w.postings); err != nil {
return err
}
// Terminate index file with offsets to hashmaps. This is the entry Pointer
// for any index query.
// TODO(fabxc): also store offset to series section to allow plain
// iteration over all existing series?
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po)
return w.write(w.bufw, b[:])
} }
func (w *indexWriter) Close() error { func (w *indexWriter) Close() error {
if err := w.finalize(); err != nil { if err := w.ensureStage(idxStageDone); err != nil {
return err return err
} }
if err := w.bufw.Flush(); err != nil { if err := w.fbuf.Flush(); err != nil {
return err return err
} }
if err := fileutil.Fsync(w.f); err != nil { if err := fileutil.Fsync(w.f); err != nil {
@ -440,7 +528,8 @@ type StringTuples interface {
type indexReader struct { type indexReader struct {
// The underlying byte slice holding the encoded series data. // The underlying byte slice holding the encoded series data.
b []byte b []byte
toc indexTOC
// Close that releases the underlying resources of the byte slice. // Close that releases the underlying resources of the byte slice.
c io.Closer c io.Closer
@ -471,57 +560,77 @@ func newIndexReader(dir string) (*indexReader, error) {
return nil, errors.Errorf("invalid magic number %x", m) return nil, errors.Errorf("invalid magic number %x", m)
} }
// The last two 4 bytes hold the pointers to the hashmaps. if err := r.readTOC(); err != nil {
loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) return nil, errors.Wrap(err, "read TOC")
poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) }
flag, b, err := r.section(loff) r.labels, err = r.readOffsetTable(r.toc.labelIndicesTable)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) return nil, errors.Wrap(err, "read label index table")
} }
if r.labels, err = readHashmap(flag, b); err != nil { r.postings, err = r.readOffsetTable(r.toc.postingsTable)
return nil, errors.Wrap(err, "read label index hashmap")
}
flag, b, err = r.section(poff)
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) return nil, errors.Wrap(err, "read postings table")
}
if r.postings, err = readHashmap(flag, b); err != nil {
return nil, errors.Wrap(err, "read postings hashmap")
} }
return r, nil return r, nil
} }
func readHashmap(flag byte, b []byte) (map[string]uint32, error) { func (r *indexReader) readTOC() error {
if flag != flagStd { d := r.decbufAt(len(r.b) - indexTOCLen)
return nil, errInvalidFlag
r.toc.symbols = d.be64()
r.toc.series = d.be64()
r.toc.labelIndices = d.be64()
r.toc.labelIndicesTable = d.be64()
r.toc.postings = d.be64()
r.toc.postingsTable = d.be64()
// TODO(fabxc): validate checksum.
return nil
}
func (r *indexReader) decbufAt(off int) decbuf {
if len(r.b) < off {
return decbuf{e: errInvalidSize}
} }
h := make(map[string]uint32, 512) return decbuf{b: r.b[off:]}
}
for len(b) > 0 { // readOffsetTable reads an offset table at the given position and returns a map
l, n := binary.Uvarint(b) // with the key strings concatenated by the 0xff unicode non-character.
if n < 1 { func (r *indexReader) readOffsetTable(off uint64) (map[string]uint32, error) {
return nil, errors.Wrap(errInvalidSize, "read key length") // A table might not have been written at all, in which case the position
} // is zeroed out.
b = b[n:] if off == 0 {
return nil, nil
if len(b) < int(l) {
return nil, errors.Wrap(errInvalidSize, "read key")
}
s := string(b[:l])
b = b[l:]
o, n := binary.Uvarint(b)
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read offset value")
}
b = b[n:]
h[s] = uint32(o)
} }
return h, nil const sep = "\xff"
var (
d1 = r.decbufAt(int(off))
cnt = d1.be32()
d2 = d1.decbuf(d1.be32int())
)
res := make(map[string]uint32, 512)
for d2.err() == nil && d2.len() > 0 && cnt > 0 {
keyCount := int(d2.uvarint())
keys := make([]string, 0, keyCount)
for i := 0; i < keyCount; i++ {
keys = append(keys, d2.uvarintStr())
}
res[strings.Join(keys, sep)] = uint32(d2.uvarint())
cnt--
}
// TODO(fabxc): verify checksum from remainer of d1.
return res, d2.err()
} }
func (r *indexReader) Close() error { func (r *indexReader) Close() error {
@ -548,25 +657,19 @@ func (r *indexReader) section(o uint32) (byte, []byte, error) {
} }
func (r *indexReader) lookupSymbol(o uint32) (string, error) { func (r *indexReader) lookupSymbol(o uint32) (string, error) {
if int(o) > len(r.b) { d := r.decbufAt(int(o))
return "", errors.Errorf("invalid symbol offset %d", o)
}
l, n := binary.Uvarint(r.b[o:])
if n < 0 {
return "", errors.New("reading symbol length failed")
}
end := int(o) + n + int(l) s := d.uvarintStr()
if end > len(r.b) { if d.err() != nil {
return "", errors.New("invalid length") return "", errors.Wrapf(d.err(), "read symbol at %d", o)
} }
b := r.b[int(o)+n : end] return s, nil
return yoloString(b), nil
} }
func (r *indexReader) LabelValues(names ...string) (StringTuples, error) { func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
key := strings.Join(names, string(sep)) const sep = "\xff"
key := strings.Join(names, sep)
off, ok := r.labels[key] off, ok := r.labels[key]
if !ok { if !ok {
// XXX(fabxc): hot fix. Should return a partial data error and handle cases // XXX(fabxc): hot fix. Should return a partial data error and handle cases
@ -575,21 +678,21 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
//return nil, fmt.Errorf("label index doesn't exist") //return nil, fmt.Errorf("label index doesn't exist")
} }
flag, b, err := r.section(off) d1 := r.decbufAt(int(off))
if err != nil { d2 := d1.decbuf(d1.be32int())
return nil, errors.Wrapf(err, "section at %d", off)
} nc := d2.be32int()
if flag != flagStd { d2.be32() // consume unused value entry count.
return nil, errInvalidFlag
} if d2.err() != nil {
l, n := binary.Uvarint(b) return nil, errors.Wrap(d2.err(), "read label value index")
if n < 1 {
return nil, errors.Wrap(errInvalidSize, "read label index size")
} }
// TODO(fabxc): verify checksum in 4 remaining bytes of d1.
st := &serializedStringTuples{ st := &serializedStringTuples{
l: int(l), l: nc,
b: b[n:], b: d2.get(),
lookup: r.lookupSymbol, lookup: r.lookupSymbol,
} }
return st, nil return st, nil
@ -601,110 +704,89 @@ func (emptyStringTuples) At(i int) ([]string, error) { return nil, nil }
func (emptyStringTuples) Len() int { return 0 } func (emptyStringTuples) Len() int { return 0 }
func (r *indexReader) LabelIndices() ([][]string, error) { func (r *indexReader) LabelIndices() ([][]string, error) {
const sep = "\xff"
res := [][]string{} res := [][]string{}
for s := range r.labels { for s := range r.labels {
res = append(res, strings.Split(s, string(sep))) res = append(res, strings.Split(s, sep))
} }
return res, nil return res, nil
} }
func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) { func (r *indexReader) Series(ref uint32) (labels.Labels, []*ChunkMeta, error) {
k, n := binary.Uvarint(r.b[ref:]) d1 := r.decbufAt(int(ref))
if n < 1 { d2 := d1.decbuf(int(d1.uvarint()))
return nil, nil, errors.Wrap(errInvalidSize, "number of labels")
}
b := r.b[int(ref)+n:] k := int(d2.uvarint())
lbls := make(labels.Labels, 0, k) lbls := make(labels.Labels, 0, k)
for i := 0; i < 2*int(k); i += 2 { for i := 0; i < k; i++ {
o, m := binary.Uvarint(b) lno := uint32(d2.uvarint())
if m < 1 { lvo := uint32(d2.uvarint())
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
}
n, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
}
b = b[m:]
o, m = binary.Uvarint(b) if d2.err() != nil {
if m < 1 { return nil, nil, errors.Wrap(d2.err(), "read series label offsets")
return nil, nil, errors.Wrap(errInvalidSize, "symbol offset")
} }
v, err := r.lookupSymbol(uint32(o))
if err != nil {
return nil, nil, errors.Wrap(err, "symbol lookup")
}
b = b[m:]
lbls = append(lbls, labels.Label{ ln, err := r.lookupSymbol(lno)
Name: n, if err != nil {
Value: v, return nil, nil, errors.Wrap(err, "lookup label name")
}) }
lv, err := r.lookupSymbol(lvo)
if err != nil {
return nil, nil, errors.Wrap(err, "lookup label value")
}
lbls = append(lbls, labels.Label{Name: ln, Value: lv})
} }
// Read the chunks meta data. // Read the chunks meta data.
k, n = binary.Uvarint(b) k = int(d2.uvarint())
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "number of chunks")
}
b = b[n:]
chunks := make([]*ChunkMeta, 0, k) chunks := make([]*ChunkMeta, 0, k)
for i := 0; i < int(k); i++ { for i := 0; i < k; i++ {
firstTime, n := binary.Varint(b) mint := d2.varint64()
if n < 1 { maxt := d2.varint64()
return nil, nil, errors.Wrap(errInvalidSize, "first time") off := d2.uvarint64()
}
b = b[n:]
lastTime, n := binary.Varint(b) if d2.err() != nil {
if n < 1 { return nil, nil, errors.Wrapf(d2.err(), "read meta for chunk %d", i)
return nil, nil, errors.Wrap(errInvalidSize, "last time")
} }
b = b[n:]
o, n := binary.Uvarint(b)
if n < 1 {
return nil, nil, errors.Wrap(errInvalidSize, "chunk offset")
}
b = b[n:]
chunks = append(chunks, &ChunkMeta{ chunks = append(chunks, &ChunkMeta{
Ref: o, Ref: off,
MinTime: firstTime, MinTime: mint,
MaxTime: lastTime, MaxTime: maxt,
}) })
} }
// TODO(fabxc): verify CRC32.
return lbls, chunks, nil return lbls, chunks, nil
} }
func (r *indexReader) Postings(name, value string) (Postings, error) { func (r *indexReader) Postings(name, value string) (Postings, error) {
key := name + string(sep) + value const sep = "\xff"
key := strings.Join([]string{name, value}, sep)
off, ok := r.postings[key] off, ok := r.postings[key]
if !ok { if !ok {
return emptyPostings, nil return emptyPostings, nil
} }
flag, b, err := r.section(off) d1 := r.decbufAt(int(off))
if err != nil { d2 := d1.decbuf(d1.be32int())
return nil, errors.Wrapf(err, "section at %d", off)
d2.be32() // consume unused postings list length.
if d2.err() != nil {
return nil, errors.Wrap(d2.err(), "get postings bytes")
} }
if flag != flagStd { // TODO(fabxc): read checksum from 4 remainer bytes of d1 and verify.
return nil, errors.Wrapf(errInvalidFlag, "section at %d", off)
}
// Add iterator over the bytes. return newBigEndianPostings(d2.get()), nil
if len(b)%4 != 0 {
return nil, errors.Wrap(errInvalidSize, "plain postings entry")
}
return newBigEndianPostings(b), nil
} }
type stringTuples struct { type stringTuples struct {
@ -753,7 +835,6 @@ type serializedStringTuples struct {
} }
func (t *serializedStringTuples) Len() int { func (t *serializedStringTuples) Len() int {
// TODO(fabxc): Cache this?
return len(t.b) / (4 * t.l) return len(t.b) / (4 * t.l)
} }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package labels package labels
import ( import (
@ -71,7 +84,7 @@ func (ls Labels) Equals(o Labels) bool {
return false return false
} }
for i, l := range ls { for i, l := range ls {
if l.Name != o[i].Name || l.Value != o[i].Value { if o[i] != l {
return false return false
} }
} }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package labels package labels
import "regexp" import "regexp"

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -139,30 +152,30 @@ func Merge(its ...Postings) Postings {
a := its[0] a := its[0]
for _, b := range its[1:] { for _, b := range its[1:] {
a = newMergePostings(a, b) a = newMergedPostings(a, b)
} }
return a return a
} }
type mergePostings struct { type mergedPostings struct {
a, b Postings a, b Postings
aok, bok bool aok, bok bool
cur uint32 cur uint32
} }
func newMergePostings(a, b Postings) *mergePostings { func newMergedPostings(a, b Postings) *mergedPostings {
it := &mergePostings{a: a, b: b} it := &mergedPostings{a: a, b: b}
it.aok = it.a.Next() it.aok = it.a.Next()
it.bok = it.b.Next() it.bok = it.b.Next()
return it return it
} }
func (it *mergePostings) At() uint32 { func (it *mergedPostings) At() uint32 {
return it.cur return it.cur
} }
func (it *mergePostings) Next() bool { func (it *mergedPostings) Next() bool {
if !it.aok && !it.bok { if !it.aok && !it.bok {
return false return false
} }
@ -197,13 +210,14 @@ func (it *mergePostings) Next() bool {
return true return true
} }
func (it *mergePostings) Seek(id uint32) bool { func (it *mergedPostings) Seek(id uint32) bool {
it.aok = it.a.Seek(id) it.aok = it.a.Seek(id)
it.bok = it.b.Seek(id) it.bok = it.b.Seek(id)
return it.Next() return it.Next()
} }
func (it *mergePostings) Err() error { func (it *mergedPostings) Err() error {
if it.a.Err() != nil { if it.a.Err() != nil {
return it.a.Err() return it.a.Err()
} }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -153,6 +166,9 @@ func (q *blockQuerier) Select(ms ...labels.Matcher) SeriesSet {
mint: q.mint, mint: q.mint,
maxt: q.maxt, maxt: q.maxt,
}, },
mint: q.mint,
maxt: q.maxt,
} }
} }
@ -397,11 +413,15 @@ func (s *populatedChunkSeries) Next() bool {
for s.set.Next() { for s.set.Next() {
lset, chks := s.set.At() lset, chks := s.set.At()
for i, c := range chks { for len(chks) > 0 {
if c.MaxTime < s.mint { if chks[0].MaxTime >= s.mint {
chks = chks[1:] break
continue
} }
chks = chks[1:]
}
// Break out at the first chunk that has no overlap with mint, maxt.
for i, c := range chks {
if c.MinTime > s.maxt { if c.MinTime > s.maxt {
chks = chks[:i] chks = chks[:i]
break break
@ -411,6 +431,7 @@ func (s *populatedChunkSeries) Next() bool {
return false return false
} }
} }
if len(chks) == 0 { if len(chks) == 0 {
continue continue
} }
@ -431,12 +452,14 @@ type blockSeriesSet struct {
set chunkSeriesSet set chunkSeriesSet
err error err error
cur Series cur Series
mint, maxt int64
} }
func (s *blockSeriesSet) Next() bool { func (s *blockSeriesSet) Next() bool {
for s.set.Next() { for s.set.Next() {
lset, chunks := s.set.At() lset, chunks := s.set.At()
s.cur = &chunkSeries{labels: lset, chunks: chunks} s.cur = &chunkSeries{labels: lset, chunks: chunks, mint: s.mint, maxt: s.maxt}
return true return true
} }
if s.set.Err() != nil { if s.set.Err() != nil {
@ -453,6 +476,8 @@ func (s *blockSeriesSet) Err() error { return s.err }
type chunkSeries struct { type chunkSeries struct {
labels labels.Labels labels labels.Labels
chunks []*ChunkMeta // in-order chunk refs chunks []*ChunkMeta // in-order chunk refs
mint, maxt int64
} }
func (s *chunkSeries) Labels() labels.Labels { func (s *chunkSeries) Labels() labels.Labels {
@ -460,14 +485,14 @@ func (s *chunkSeries) Labels() labels.Labels {
} }
func (s *chunkSeries) Iterator() SeriesIterator { func (s *chunkSeries) Iterator() SeriesIterator {
return newChunkSeriesIterator(s.chunks) return newChunkSeriesIterator(s.chunks, s.mint, s.maxt)
} }
// SeriesIterator iterates over the data of a time series. // SeriesIterator iterates over the data of a time series.
type SeriesIterator interface { type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp. // Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at ts, it advances to the last value // If there's no value exactly at t, it advances to the first value
// before tt. // after t.
Seek(t int64) bool Seek(t int64) bool
// At returns the current timestamp/value pair. // At returns the current timestamp/value pair.
At() (t int64, v float64) At() (t int64, v float64)
@ -488,7 +513,7 @@ func (s *chainedSeries) Labels() labels.Labels {
} }
func (s *chainedSeries) Iterator() SeriesIterator { func (s *chainedSeries) Iterator() SeriesIterator {
return &chainedSeriesIterator{series: s.series} return newChainedSeriesIterator(s.series...)
} }
// chainedSeriesIterator implements a series iterater over a list // chainedSeriesIterator implements a series iterater over a list
@ -500,6 +525,14 @@ type chainedSeriesIterator struct {
cur SeriesIterator cur SeriesIterator
} }
func newChainedSeriesIterator(s ...Series) *chainedSeriesIterator {
return &chainedSeriesIterator{
series: s,
i: 0,
cur: s[0].Iterator(),
}
}
func (it *chainedSeriesIterator) Seek(t int64) bool { func (it *chainedSeriesIterator) Seek(t int64) bool {
// We just scan the chained series sequentially as they are already // We just scan the chained series sequentially as they are already
// pre-selected by relevant time and should be accessed sequentially anyway. // pre-selected by relevant time and should be accessed sequentially anyway.
@ -516,9 +549,6 @@ func (it *chainedSeriesIterator) Seek(t int64) bool {
} }
func (it *chainedSeriesIterator) Next() bool { func (it *chainedSeriesIterator) Next() bool {
if it.cur == nil {
it.cur = it.series[it.i].Iterator()
}
if it.cur.Next() { if it.cur.Next() {
return true return true
} }
@ -550,17 +580,35 @@ type chunkSeriesIterator struct {
i int i int
cur chunks.Iterator cur chunks.Iterator
maxt, mint int64
} }
func newChunkSeriesIterator(cs []*ChunkMeta) *chunkSeriesIterator { func newChunkSeriesIterator(cs []*ChunkMeta, mint, maxt int64) *chunkSeriesIterator {
return &chunkSeriesIterator{ return &chunkSeriesIterator{
chunks: cs, chunks: cs,
i: 0, i: 0,
cur: cs[0].Chunk.Iterator(), cur: cs[0].Chunk.Iterator(),
mint: mint,
maxt: maxt,
} }
} }
func (it *chunkSeriesIterator) inBounds(t int64) bool {
return t >= it.mint && t <= it.maxt
}
func (it *chunkSeriesIterator) Seek(t int64) (ok bool) { func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t > it.maxt {
return false
}
// Seek to the first valid value after t.
if t < it.mint {
t = it.mint
}
// Only do binary search forward to stay in line with other iterators // Only do binary search forward to stay in line with other iterators
// that can only move forward. // that can only move forward.
x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t }) x := sort.Search(len(it.chunks[it.i:]), func(i int) bool { return it.chunks[i].MinTime >= t })
@ -569,10 +617,10 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
// If the timestamp was not found, it might be in the last chunk. // If the timestamp was not found, it might be in the last chunk.
if x == len(it.chunks) { if x == len(it.chunks) {
x-- x--
}
// Go to previous chunk if the chunk doesn't exactly start with t. // Go to previous chunk if the chunk doesn't exactly start with t.
// If we are already at the first chunk, we use it as it's the best we have. // If we are already at the first chunk, we use it as it's the best we have.
if x > 0 && it.chunks[x].MinTime > t { } else if x > 0 && it.chunks[x].MinTime > t {
x-- x--
} }
@ -593,9 +641,13 @@ func (it *chunkSeriesIterator) At() (t int64, v float64) {
} }
func (it *chunkSeriesIterator) Next() bool { func (it *chunkSeriesIterator) Next() bool {
if it.cur.Next() { for it.cur.Next() {
return true t, _ := it.cur.At()
if it.inBounds(t) {
return true
}
} }
if err := it.cur.Err(); err != nil { if err := it.cur.Err(); err != nil {
return false return false
} }

View file

@ -1,3 +1,16 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb package tsdb
import ( import (
@ -134,7 +147,7 @@ func (w *WAL) initSegments() error {
if len(fns) == 0 { if len(fns) == 0 {
return nil return nil
} }
// We must open all file in read mode as we may have to truncate along // We must open all files in read/write mode as we may have to truncate along
// the way and any file may become the tail. // the way and any file may become the tail.
for _, fn := range fns { for _, fn := range fns {
f, err := os.OpenFile(fn, os.O_RDWR, 0666) f, err := os.OpenFile(fn, os.O_RDWR, 0666)
@ -165,10 +178,10 @@ func (w *WAL) initSegments() error {
return nil return nil
} }
// cut finishes the currently active segments and open the next one. // cut finishes the currently active segments and opens the next one.
// The encoder is reset to point to the new segment. // The encoder is reset to point to the new segment.
func (w *WAL) cut() error { func (w *WAL) cut() error {
// Sync current tail to disc and close. // Sync current tail to disk and close.
if tf := w.tail(); tf != nil { if tf := w.tail(); tf != nil {
if err := w.sync(); err != nil { if err := w.sync(); err != nil {
return err return err
@ -263,7 +276,7 @@ func (w *WAL) run(interval time.Duration) {
} }
} }
// Close sync all data and closes the underlying resources. // Close syncs all data and closes the underlying resources.
func (w *WAL) Close() error { func (w *WAL) Close() error {
close(w.stopc) close(w.stopc)
<-w.donec <-w.donec
@ -296,9 +309,10 @@ func (w *WAL) entry(et WALEntryType, flag byte, buf []byte) error {
w.mtx.Lock() w.mtx.Lock()
defer w.mtx.Unlock() defer w.mtx.Unlock()
// Cut to the next segment if exceeds the file size unless it would also // Cut to the next segment if the entry exceeds the file size unless it would also
// exceed the size of a new segment. // exceed the size of a new segment.
var ( var (
// 6-byte header + 4-byte CRC32 + buf.
sz = int64(6 + 4 + len(buf)) sz = int64(6 + 4 + len(buf))
newsz = w.curN + sz newsz = w.curN + sz
) )

18
vendor/vendor.json vendored
View file

@ -645,22 +645,22 @@
"revisionTime": "2016-04-11T19:08:41Z" "revisionTime": "2016-04-11T19:08:41Z"
}, },
{ {
"checksumSHA1": "R7aNHvNnDiTb6t7z8FehPAo3PdM=", "checksumSHA1": "0wu/AzUWMurN/T5VBKCrvhf7c7E=",
"path": "github.com/prometheus/tsdb", "path": "github.com/prometheus/tsdb",
"revision": "721df536eb1a6e6f91785f138ca24917222f8461", "revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
"revisionTime": "2017-04-09T08:18:19Z" "revisionTime": "2017-05-09T10:52:47Z"
}, },
{ {
"checksumSHA1": "Qwlzvcx5Lbo9Nzb75AGgiiGszZI=", "checksumSHA1": "9EH3v+JdbikCUJAgD4VEOPIaWfs=",
"path": "github.com/prometheus/tsdb/chunks", "path": "github.com/prometheus/tsdb/chunks",
"revision": "10c7c9acbe0175a411bce90cd7a0d9d7a13d6a83", "revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
"revisionTime": "2017-04-04T09:27:26Z" "revisionTime": "2017-05-09T10:52:47Z"
}, },
{ {
"checksumSHA1": "ed5dnejBTbr0FKdzKRAC91bHRgE=", "checksumSHA1": "3RHZcB/ZvIae9K0tJxNlajJg0jA=",
"path": "github.com/prometheus/tsdb/labels", "path": "github.com/prometheus/tsdb/labels",
"revision": "770d00800212502dfef71a6a7df23e3ced4459d9", "revision": "44769c1654f699931b2d3a2928326ac2d02d9149",
"revisionTime": "2017-04-05T12:14:30Z" "revisionTime": "2017-05-09T10:52:47Z"
}, },
{ {
"checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=", "checksumSHA1": "+49Vr4Me28p3cR+gxX5SUQHbbas=",