mirror of
https://github.com/prometheus/prometheus.git
synced 2024-11-09 23:24:05 -08:00
Merge encoding_helpers.go to tsdbutil (#526)
remove duplicate encoding helper funcs and move to own package so they can be reused. Signed-off-by: naivewong <867245430@qq.com>
This commit is contained in:
parent
77d5a7d47a
commit
e7436e13f0
244
encoding/encoding.go
Normal file
244
encoding/encoding.go
Normal file
|
@ -0,0 +1,244 @@
|
|||
// Copyright 2018 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package encoding
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidSize = errors.New("invalid size")
|
||||
ErrInvalidChecksum = errors.New("invalid checksum")
|
||||
)
|
||||
|
||||
// enbuf is a helper type to populate a byte slice with various types.
|
||||
type Encbuf struct {
|
||||
B []byte
|
||||
C [binary.MaxVarintLen64]byte
|
||||
}
|
||||
|
||||
func (e *Encbuf) Reset() { e.B = e.B[:0] }
|
||||
func (e *Encbuf) Get() []byte { return e.B }
|
||||
func (e *Encbuf) Len() int { return len(e.B) }
|
||||
|
||||
func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) }
|
||||
func (e *Encbuf) PutByte(c byte) { e.B = append(e.B, c) }
|
||||
|
||||
func (e *Encbuf) PutBE32int(x int) { e.PutBE32(uint32(x)) }
|
||||
func (e *Encbuf) PutUvarint32(x uint32) { e.PutUvarint64(uint64(x)) }
|
||||
func (e *Encbuf) PutBE64int64(x int64) { e.PutBE64(uint64(x)) }
|
||||
func (e *Encbuf) PutUvarint(x int) { e.PutUvarint64(uint64(x)) }
|
||||
|
||||
func (e *Encbuf) PutBE32(x uint32) {
|
||||
binary.BigEndian.PutUint32(e.C[:], x)
|
||||
e.B = append(e.B, e.C[:4]...)
|
||||
}
|
||||
|
||||
func (e *Encbuf) PutBE64(x uint64) {
|
||||
binary.BigEndian.PutUint64(e.C[:], x)
|
||||
e.B = append(e.B, e.C[:8]...)
|
||||
}
|
||||
|
||||
func (e *Encbuf) PutUvarint64(x uint64) {
|
||||
n := binary.PutUvarint(e.C[:], x)
|
||||
e.B = append(e.B, e.C[:n]...)
|
||||
}
|
||||
|
||||
func (e *Encbuf) PutVarint64(x int64) {
|
||||
n := binary.PutVarint(e.C[:], x)
|
||||
e.B = append(e.B, e.C[:n]...)
|
||||
}
|
||||
|
||||
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
|
||||
func (e *Encbuf) PutUvarintStr(s string) {
|
||||
b := *(*[]byte)(unsafe.Pointer(&s))
|
||||
e.PutUvarint(len(b))
|
||||
e.PutString(s)
|
||||
}
|
||||
|
||||
// putHash appends a hash over the buffers current contents to the buffer.
|
||||
func (e *Encbuf) PutHash(h hash.Hash) {
|
||||
h.Reset()
|
||||
_, err := h.Write(e.B)
|
||||
if err != nil {
|
||||
panic(err) // The CRC32 implementation does not error
|
||||
}
|
||||
e.B = h.Sum(e.B)
|
||||
}
|
||||
|
||||
// decbuf provides safe methods to extract data from a byte slice. It does all
|
||||
// necessary bounds checking and advancing of the byte slice.
|
||||
// Several datums can be extracted without checking for errors. However, before using
|
||||
// any datum, the err() method must be checked.
|
||||
type Decbuf struct {
|
||||
B []byte
|
||||
E error
|
||||
}
|
||||
|
||||
// NewDecbufAt returns a new decoding buffer. It expects the first 4 bytes
|
||||
// after offset to hold the big endian encoded content length, followed by the contents and the expected
|
||||
// checksum.
|
||||
func NewDecbufAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf {
|
||||
if bs.Len() < off+4 {
|
||||
return Decbuf{E: ErrInvalidSize}
|
||||
}
|
||||
b := bs.Range(off, off+4)
|
||||
l := int(binary.BigEndian.Uint32(b))
|
||||
|
||||
if bs.Len() < off+4+l+4 {
|
||||
return Decbuf{E: ErrInvalidSize}
|
||||
}
|
||||
|
||||
// Load bytes holding the contents plus a CRC32 checksum.
|
||||
b = bs.Range(off+4, off+4+l+4)
|
||||
dec := Decbuf{B: b[:len(b)-4]}
|
||||
|
||||
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.Crc32(castagnoliTable) != exp {
|
||||
return Decbuf{E: ErrInvalidChecksum}
|
||||
}
|
||||
return dec
|
||||
}
|
||||
|
||||
// NewDecbufUvarintAt returns a new decoding buffer. It expects the first bytes
|
||||
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
|
||||
// checksum.
|
||||
func NewDecbufUvarintAt(bs ByteSlice, off int, castagnoliTable *crc32.Table) Decbuf {
|
||||
// We never have to access this method at the far end of the byte slice. Thus just checking
|
||||
// against the MaxVarintLen32 is sufficient.
|
||||
if bs.Len() < off+binary.MaxVarintLen32 {
|
||||
return Decbuf{E: ErrInvalidSize}
|
||||
}
|
||||
b := bs.Range(off, off+binary.MaxVarintLen32)
|
||||
|
||||
l, n := binary.Uvarint(b)
|
||||
if n <= 0 || n > binary.MaxVarintLen32 {
|
||||
return Decbuf{E: errors.Errorf("invalid uvarint %d", n)}
|
||||
}
|
||||
|
||||
if bs.Len() < off+n+int(l)+4 {
|
||||
return Decbuf{E: ErrInvalidSize}
|
||||
}
|
||||
|
||||
// Load bytes holding the contents plus a CRC32 checksum.
|
||||
b = bs.Range(off+n, off+n+int(l)+4)
|
||||
dec := Decbuf{B: b[:len(b)-4]}
|
||||
|
||||
if dec.Crc32(castagnoliTable) != binary.BigEndian.Uint32(b[len(b)-4:]) {
|
||||
return Decbuf{E: ErrInvalidChecksum}
|
||||
}
|
||||
return dec
|
||||
}
|
||||
|
||||
func (d *Decbuf) Uvarint() int { return int(d.Uvarint64()) }
|
||||
func (d *Decbuf) Be32int() int { return int(d.Be32()) }
|
||||
func (d *Decbuf) Be64int64() int64 { return int64(d.Be64()) }
|
||||
|
||||
// Crc32 returns a CRC32 checksum over the remaining bytes.
|
||||
func (d *Decbuf) Crc32(castagnoliTable *crc32.Table) uint32 {
|
||||
return crc32.Checksum(d.B, castagnoliTable)
|
||||
}
|
||||
|
||||
func (d *Decbuf) UvarintStr() string {
|
||||
l := d.Uvarint64()
|
||||
if d.E != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.B) < int(l) {
|
||||
d.E = ErrInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := string(d.B[:l])
|
||||
d.B = d.B[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *Decbuf) Varint64() int64 {
|
||||
if d.E != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Varint(d.B)
|
||||
if n < 1 {
|
||||
d.E = ErrInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.B = d.B[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *Decbuf) Uvarint64() uint64 {
|
||||
if d.E != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Uvarint(d.B)
|
||||
if n < 1 {
|
||||
d.E = ErrInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.B = d.B[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *Decbuf) Be64() uint64 {
|
||||
if d.E != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.B) < 8 {
|
||||
d.E = ErrInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint64(d.B)
|
||||
d.B = d.B[8:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *Decbuf) Be32() uint32 {
|
||||
if d.E != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.B) < 4 {
|
||||
d.E = ErrInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint32(d.B)
|
||||
d.B = d.B[4:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *Decbuf) Byte() byte {
|
||||
if d.E != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.B) < 1 {
|
||||
d.E = ErrInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := d.B[0]
|
||||
d.B = d.B[1:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *Decbuf) Err() error { return d.E }
|
||||
func (d *Decbuf) Len() int { return len(d.B) }
|
||||
func (d *Decbuf) Get() []byte { return d.B }
|
||||
|
||||
// ByteSlice abstracts a byte slice.
|
||||
type ByteSlice interface {
|
||||
Len() int
|
||||
Range(start, end int) []byte
|
||||
}
|
|
@ -1,160 +0,0 @@
|
|||
// Copyright 2018 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tsdb
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var errInvalidSize = errors.New("invalid size")
|
||||
|
||||
// encbuf is a helper type to populate a byte slice with various types.
|
||||
type encbuf struct {
|
||||
b []byte
|
||||
c [binary.MaxVarintLen64]byte
|
||||
}
|
||||
|
||||
func (e *encbuf) reset() { e.b = e.b[:0] }
|
||||
func (e *encbuf) get() []byte { return e.b }
|
||||
|
||||
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
|
||||
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
|
||||
|
||||
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
||||
|
||||
func (e *encbuf) putBE32(x uint32) {
|
||||
binary.BigEndian.PutUint32(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:4]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putBE64(x uint64) {
|
||||
binary.BigEndian.PutUint64(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:8]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putUvarint64(x uint64) {
|
||||
n := binary.PutUvarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putVarint64(x int64) {
|
||||
n := binary.PutVarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
|
||||
func (e *encbuf) putUvarintStr(s string) {
|
||||
b := *(*[]byte)(unsafe.Pointer(&s))
|
||||
e.putUvarint(len(b))
|
||||
e.putString(s)
|
||||
}
|
||||
|
||||
// decbuf provides safe methods to extract data from a byte slice. It does all
|
||||
// necessary bounds checking and advancing of the byte slice.
|
||||
// Several datums can be extracted without checking for errors. However, before using
|
||||
// any datum, the err() method must be checked.
|
||||
type decbuf struct {
|
||||
b []byte
|
||||
e error
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.b) < int(l) {
|
||||
d.e = errInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := string(d.b[:l])
|
||||
d.b = d.b[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *decbuf) varint64() int64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Varint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Uvarint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 4 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint64(d.b)
|
||||
d.b = d.b[8:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be32() uint32 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 4 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint32(d.b)
|
||||
d.b = d.b[4:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) byte() byte {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := d.b[0]
|
||||
d.b = d.b[1:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) err() error { return d.e }
|
||||
func (d *decbuf) len() int { return len(d.b) }
|
||||
func (d *decbuf) get() []byte { return d.b }
|
3
head.go
3
head.go
|
@ -28,6 +28,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/tsdb/index"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/tsdb/wal"
|
||||
|
@ -1117,7 +1118,7 @@ func (h *headIndexReader) Symbols() (map[string]struct{}, error) {
|
|||
// LabelValues returns the possible label values
|
||||
func (h *headIndexReader) LabelValues(names ...string) (index.StringTuples, error) {
|
||||
if len(names) != 1 {
|
||||
return nil, errInvalidSize
|
||||
return nil, encoding.ErrInvalidSize
|
||||
}
|
||||
|
||||
h.head.symMtx.RLock()
|
||||
|
|
|
@ -1,218 +0,0 @@
|
|||
// Copyright 2018 The Prometheus Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package index
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// enbuf is a helper type to populate a byte slice with various types.
|
||||
type encbuf struct {
|
||||
b []byte
|
||||
c [binary.MaxVarintLen64]byte
|
||||
}
|
||||
|
||||
func (e *encbuf) reset() { e.b = e.b[:0] }
|
||||
func (e *encbuf) get() []byte { return e.b }
|
||||
func (e *encbuf) len() int { return len(e.b) }
|
||||
|
||||
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
|
||||
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
|
||||
|
||||
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
|
||||
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
|
||||
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
|
||||
|
||||
func (e *encbuf) putBE32(x uint32) {
|
||||
binary.BigEndian.PutUint32(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:4]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putBE64(x uint64) {
|
||||
binary.BigEndian.PutUint64(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:8]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putUvarint64(x uint64) {
|
||||
n := binary.PutUvarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
func (e *encbuf) putVarint64(x int64) {
|
||||
n := binary.PutVarint(e.c[:], x)
|
||||
e.b = append(e.b, e.c[:n]...)
|
||||
}
|
||||
|
||||
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
|
||||
func (e *encbuf) putUvarintStr(s string) {
|
||||
b := *(*[]byte)(unsafe.Pointer(&s))
|
||||
e.putUvarint(len(b))
|
||||
e.putString(s)
|
||||
}
|
||||
|
||||
// putHash appends a hash over the buffers current contents to the buffer.
|
||||
func (e *encbuf) putHash(h hash.Hash) {
|
||||
h.Reset()
|
||||
_, err := h.Write(e.b)
|
||||
if err != nil {
|
||||
panic(err) // The CRC32 implementation does not error
|
||||
}
|
||||
e.b = h.Sum(e.b)
|
||||
}
|
||||
|
||||
// decbuf provides safe methods to extract data from a byte slice. It does all
|
||||
// necessary bounds checking and advancing of the byte slice.
|
||||
// Several datums can be extracted without checking for errors. However, before using
|
||||
// any datum, the err() method must be checked.
|
||||
type decbuf struct {
|
||||
b []byte
|
||||
e error
|
||||
}
|
||||
|
||||
// newDecbufAt returns a new decoding buffer. It expects the first 4 bytes
|
||||
// after offset to hold the big endian encoded content length, followed by the contents and the expected
|
||||
// checksum.
|
||||
func newDecbufAt(bs ByteSlice, off int) decbuf {
|
||||
if bs.Len() < off+4 {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
b := bs.Range(off, off+4)
|
||||
l := int(binary.BigEndian.Uint32(b))
|
||||
|
||||
if bs.Len() < off+4+l+4 {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
|
||||
// Load bytes holding the contents plus a CRC32 checksum.
|
||||
b = bs.Range(off+4, off+4+l+4)
|
||||
dec := decbuf{b: b[:len(b)-4]}
|
||||
|
||||
if exp := binary.BigEndian.Uint32(b[len(b)-4:]); dec.crc32() != exp {
|
||||
return decbuf{e: errInvalidChecksum}
|
||||
}
|
||||
return dec
|
||||
}
|
||||
|
||||
// decbufUvarintAt returns a new decoding buffer. It expects the first bytes
|
||||
// after offset to hold the uvarint-encoded buffers length, followed by the contents and the expected
|
||||
// checksum.
|
||||
func newDecbufUvarintAt(bs ByteSlice, off int) decbuf {
|
||||
// We never have to access this method at the far end of the byte slice. Thus just checking
|
||||
// against the MaxVarintLen32 is sufficient.
|
||||
if bs.Len() < off+binary.MaxVarintLen32 {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
b := bs.Range(off, off+binary.MaxVarintLen32)
|
||||
|
||||
l, n := binary.Uvarint(b)
|
||||
if n <= 0 || n > binary.MaxVarintLen32 {
|
||||
return decbuf{e: errors.Errorf("invalid uvarint %d", n)}
|
||||
}
|
||||
|
||||
if bs.Len() < off+n+int(l)+4 {
|
||||
return decbuf{e: errInvalidSize}
|
||||
}
|
||||
|
||||
// Load bytes holding the contents plus a CRC32 checksum.
|
||||
b = bs.Range(off+n, off+n+int(l)+4)
|
||||
dec := decbuf{b: b[:len(b)-4]}
|
||||
|
||||
if dec.crc32() != binary.BigEndian.Uint32(b[len(b)-4:]) {
|
||||
return decbuf{e: errInvalidChecksum}
|
||||
}
|
||||
return dec
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
|
||||
func (d *decbuf) be32int() int { return int(d.be32()) }
|
||||
|
||||
// crc32 returns a CRC32 checksum over the remaining bytes.
|
||||
func (d *decbuf) crc32() uint32 {
|
||||
return crc32.Checksum(d.b, castagnoliTable)
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarintStr() string {
|
||||
l := d.uvarint64()
|
||||
if d.e != nil {
|
||||
return ""
|
||||
}
|
||||
if len(d.b) < int(l) {
|
||||
d.e = errInvalidSize
|
||||
return ""
|
||||
}
|
||||
s := string(d.b[:l])
|
||||
d.b = d.b[l:]
|
||||
return s
|
||||
}
|
||||
|
||||
func (d *decbuf) varint64() int64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Varint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) uvarint64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
x, n := binary.Uvarint(d.b)
|
||||
if n < 1 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
d.b = d.b[n:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be64() uint64 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 8 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint64(d.b)
|
||||
d.b = d.b[8:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) be32() uint32 {
|
||||
if d.e != nil {
|
||||
return 0
|
||||
}
|
||||
if len(d.b) < 4 {
|
||||
d.e = errInvalidSize
|
||||
return 0
|
||||
}
|
||||
x := binary.BigEndian.Uint32(d.b)
|
||||
d.b = d.b[4:]
|
||||
return x
|
||||
}
|
||||
|
||||
func (d *decbuf) err() error { return d.e }
|
||||
func (d *decbuf) len() int { return len(d.b) }
|
||||
func (d *decbuf) get() []byte { return d.b }
|
269
index/index.go
269
index/index.go
|
@ -16,7 +16,6 @@ package index
|
|||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"hash"
|
||||
"hash/crc32"
|
||||
"io"
|
||||
|
@ -29,6 +28,7 @@ import (
|
|||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
@ -119,8 +119,8 @@ type Writer struct {
|
|||
stage indexWriterStage
|
||||
|
||||
// Reusable memory.
|
||||
buf1 encbuf
|
||||
buf2 encbuf
|
||||
buf1 encoding.Encbuf
|
||||
buf2 encoding.Encbuf
|
||||
uint32s []uint32
|
||||
|
||||
symbols map[string]uint32 // symbol offsets
|
||||
|
@ -149,28 +149,28 @@ type TOC struct {
|
|||
// NewTOCFromByteSlice return parsed TOC from given index byte slice.
|
||||
func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
|
||||
if bs.Len() < indexTOCLen {
|
||||
return nil, errInvalidSize
|
||||
return nil, encoding.ErrInvalidSize
|
||||
}
|
||||
b := bs.Range(bs.Len()-indexTOCLen, bs.Len())
|
||||
|
||||
expCRC := binary.BigEndian.Uint32(b[len(b)-4:])
|
||||
d := decbuf{b: b[:len(b)-4]}
|
||||
d := encoding.Decbuf{B: b[:len(b)-4]}
|
||||
|
||||
if d.crc32() != expCRC {
|
||||
return nil, errors.Wrap(errInvalidChecksum, "read TOC")
|
||||
if d.Crc32(castagnoliTable) != expCRC {
|
||||
return nil, errors.Wrap(encoding.ErrInvalidChecksum, "read TOC")
|
||||
}
|
||||
|
||||
if err := d.err(); err != nil {
|
||||
if err := d.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &TOC{
|
||||
Symbols: d.be64(),
|
||||
Series: d.be64(),
|
||||
LabelIndices: d.be64(),
|
||||
LabelIndicesTable: d.be64(),
|
||||
Postings: d.be64(),
|
||||
PostingsTable: d.be64(),
|
||||
Symbols: d.Be64(),
|
||||
Series: d.Be64(),
|
||||
LabelIndices: d.Be64(),
|
||||
LabelIndicesTable: d.Be64(),
|
||||
Postings: d.Be64(),
|
||||
PostingsTable: d.Be64(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -203,8 +203,8 @@ func NewWriter(fn string) (*Writer, error) {
|
|||
stage: idxStageNone,
|
||||
|
||||
// Reusable memory.
|
||||
buf1: encbuf{b: make([]byte, 0, 1<<22)},
|
||||
buf2: encbuf{b: make([]byte, 0, 1<<22)},
|
||||
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
uint32s: make([]uint32, 0, 1<<15),
|
||||
|
||||
// Caches.
|
||||
|
@ -288,11 +288,11 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|||
}
|
||||
|
||||
func (w *Writer) writeMeta() error {
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32(MagicIndex)
|
||||
w.buf1.putByte(FormatV2)
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32(MagicIndex)
|
||||
w.buf1.PutByte(FormatV2)
|
||||
|
||||
return w.write(w.buf1.get())
|
||||
return w.write(w.buf1.Get())
|
||||
}
|
||||
|
||||
// AddSeries adds the series one at a time along with its chunks.
|
||||
|
@ -318,8 +318,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
|||
}
|
||||
w.seriesOffsets[ref] = w.pos / 16
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putUvarint(len(lset))
|
||||
w.buf2.Reset()
|
||||
w.buf2.PutUvarint(len(lset))
|
||||
|
||||
for _, l := range lset {
|
||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
||||
|
@ -327,41 +327,41 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
|||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Name)
|
||||
}
|
||||
w.buf2.putUvarint32(index)
|
||||
w.buf2.PutUvarint32(index)
|
||||
|
||||
index, ok = w.symbols[l.Value]
|
||||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", l.Value)
|
||||
}
|
||||
w.buf2.putUvarint32(index)
|
||||
w.buf2.PutUvarint32(index)
|
||||
}
|
||||
|
||||
w.buf2.putUvarint(len(chunks))
|
||||
w.buf2.PutUvarint(len(chunks))
|
||||
|
||||
if len(chunks) > 0 {
|
||||
c := chunks[0]
|
||||
w.buf2.putVarint64(c.MinTime)
|
||||
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
|
||||
w.buf2.putUvarint64(c.Ref)
|
||||
w.buf2.PutVarint64(c.MinTime)
|
||||
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
|
||||
w.buf2.PutUvarint64(c.Ref)
|
||||
t0 := c.MaxTime
|
||||
ref0 := int64(c.Ref)
|
||||
|
||||
for _, c := range chunks[1:] {
|
||||
w.buf2.putUvarint64(uint64(c.MinTime - t0))
|
||||
w.buf2.putUvarint64(uint64(c.MaxTime - c.MinTime))
|
||||
w.buf2.PutUvarint64(uint64(c.MinTime - t0))
|
||||
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
|
||||
t0 = c.MaxTime
|
||||
|
||||
w.buf2.putVarint64(int64(c.Ref) - ref0)
|
||||
w.buf2.PutVarint64(int64(c.Ref) - ref0)
|
||||
ref0 = int64(c.Ref)
|
||||
}
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putUvarint(w.buf2.len())
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutUvarint(w.buf2.Len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
w.buf2.PutHash(w.crc32)
|
||||
|
||||
if err := w.write(w.buf1.get(), w.buf2.get()); err != nil {
|
||||
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
|
||||
return errors.Wrap(err, "write series data")
|
||||
}
|
||||
|
||||
|
@ -382,22 +382,22 @@ func (w *Writer) AddSymbols(sym map[string]struct{}) error {
|
|||
}
|
||||
sort.Strings(symbols)
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf2.reset()
|
||||
w.buf1.Reset()
|
||||
w.buf2.Reset()
|
||||
|
||||
w.buf2.putBE32int(len(symbols))
|
||||
w.buf2.PutBE32int(len(symbols))
|
||||
|
||||
w.symbols = make(map[string]uint32, len(symbols))
|
||||
|
||||
for index, s := range symbols {
|
||||
w.symbols[s] = uint32(index)
|
||||
w.buf2.putUvarintStr(s)
|
||||
w.buf2.PutUvarintStr(s)
|
||||
}
|
||||
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf2.putHash(w.crc32)
|
||||
w.buf1.PutBE32int(w.buf2.Len())
|
||||
w.buf2.PutHash(w.crc32)
|
||||
|
||||
err := w.write(w.buf1.get(), w.buf2.get())
|
||||
err := w.write(w.buf1.Get(), w.buf2.Get())
|
||||
return errors.Wrap(err, "write symbols")
|
||||
}
|
||||
|
||||
|
@ -425,9 +425,9 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|||
offset: w.pos,
|
||||
})
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(names))
|
||||
w.buf2.putBE32int(valt.Len())
|
||||
w.buf2.Reset()
|
||||
w.buf2.PutBE32int(len(names))
|
||||
w.buf2.PutBE32int(valt.Len())
|
||||
|
||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
||||
for _, v := range valt.entries {
|
||||
|
@ -435,53 +435,53 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|||
if !ok {
|
||||
return errors.Errorf("symbol entry for %q does not exist", v)
|
||||
}
|
||||
w.buf2.putBE32(index)
|
||||
w.buf2.PutBE32(index)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(w.buf2.Len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
w.buf2.PutHash(w.crc32)
|
||||
|
||||
err = w.write(w.buf1.get(), w.buf2.get())
|
||||
err = w.write(w.buf1.Get(), w.buf2.Get())
|
||||
return errors.Wrap(err, "write label index")
|
||||
}
|
||||
|
||||
// writeOffsetTable writes a sequence of readable hash entries.
|
||||
func (w *Writer) writeOffsetTable(entries []hashEntry) error {
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(entries))
|
||||
w.buf2.Reset()
|
||||
w.buf2.PutBE32int(len(entries))
|
||||
|
||||
for _, e := range entries {
|
||||
w.buf2.putUvarint(len(e.keys))
|
||||
w.buf2.PutUvarint(len(e.keys))
|
||||
for _, k := range e.keys {
|
||||
w.buf2.putUvarintStr(k)
|
||||
w.buf2.PutUvarintStr(k)
|
||||
}
|
||||
w.buf2.putUvarint64(e.offset)
|
||||
w.buf2.PutUvarint64(e.offset)
|
||||
}
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf2.putHash(w.crc32)
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(w.buf2.Len())
|
||||
w.buf2.PutHash(w.crc32)
|
||||
|
||||
return w.write(w.buf1.get(), w.buf2.get())
|
||||
return w.write(w.buf1.Get(), w.buf2.Get())
|
||||
}
|
||||
|
||||
const indexTOCLen = 6*8 + 4
|
||||
|
||||
func (w *Writer) writeTOC() error {
|
||||
w.buf1.reset()
|
||||
w.buf1.Reset()
|
||||
|
||||
w.buf1.putBE64(w.toc.Symbols)
|
||||
w.buf1.putBE64(w.toc.Series)
|
||||
w.buf1.putBE64(w.toc.LabelIndices)
|
||||
w.buf1.putBE64(w.toc.LabelIndicesTable)
|
||||
w.buf1.putBE64(w.toc.Postings)
|
||||
w.buf1.putBE64(w.toc.PostingsTable)
|
||||
w.buf1.PutBE64(w.toc.Symbols)
|
||||
w.buf1.PutBE64(w.toc.Series)
|
||||
w.buf1.PutBE64(w.toc.LabelIndices)
|
||||
w.buf1.PutBE64(w.toc.LabelIndicesTable)
|
||||
w.buf1.PutBE64(w.toc.Postings)
|
||||
w.buf1.PutBE64(w.toc.PostingsTable)
|
||||
|
||||
w.buf1.putHash(w.crc32)
|
||||
w.buf1.PutHash(w.crc32)
|
||||
|
||||
return w.write(w.buf1.get())
|
||||
return w.write(w.buf1.Get())
|
||||
}
|
||||
|
||||
func (w *Writer) WritePostings(name, value string, it Postings) error {
|
||||
|
@ -519,20 +519,20 @@ func (w *Writer) WritePostings(name, value string, it Postings) error {
|
|||
}
|
||||
sort.Sort(uint32slice(refs))
|
||||
|
||||
w.buf2.reset()
|
||||
w.buf2.putBE32int(len(refs))
|
||||
w.buf2.Reset()
|
||||
w.buf2.PutBE32int(len(refs))
|
||||
|
||||
for _, r := range refs {
|
||||
w.buf2.putBE32(r)
|
||||
w.buf2.PutBE32(r)
|
||||
}
|
||||
w.uint32s = refs
|
||||
|
||||
w.buf1.reset()
|
||||
w.buf1.putBE32int(w.buf2.len())
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(w.buf2.Len())
|
||||
|
||||
w.buf2.putHash(w.crc32)
|
||||
w.buf2.PutHash(w.crc32)
|
||||
|
||||
err := w.write(w.buf1.get(), w.buf2.get())
|
||||
err := w.write(w.buf1.Get(), w.buf2.Get())
|
||||
return errors.Wrap(err, "write postings")
|
||||
}
|
||||
|
||||
|
@ -593,11 +593,6 @@ type Reader struct {
|
|||
version int
|
||||
}
|
||||
|
||||
var (
|
||||
errInvalidSize = fmt.Errorf("invalid size")
|
||||
errInvalidChecksum = fmt.Errorf("invalid checksum")
|
||||
)
|
||||
|
||||
// ByteSlice abstracts a byte slice.
|
||||
type ByteSlice interface {
|
||||
Len() int
|
||||
|
@ -643,7 +638,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
|
|||
|
||||
// Verify header.
|
||||
if r.b.Len() < HeaderLen {
|
||||
return nil, errors.Wrap(errInvalidSize, "index header")
|
||||
return nil, errors.Wrap(encoding.ErrInvalidSize, "index header")
|
||||
}
|
||||
if m := binary.BigEndian.Uint32(r.b.Range(0, 4)); m != MagicIndex {
|
||||
return nil, errors.Errorf("invalid magic number %x", m)
|
||||
|
@ -724,13 +719,13 @@ func (r *Reader) PostingsRanges() (map[labels.Label]Range, error) {
|
|||
|
||||
for k, e := range r.postings {
|
||||
for v, start := range e {
|
||||
d := newDecbufAt(r.b, int(start))
|
||||
if d.err() != nil {
|
||||
return nil, d.err()
|
||||
d := encoding.NewDecbufAt(r.b, int(start), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return nil, d.Err()
|
||||
}
|
||||
m[labels.Label{Name: k, Value: v}] = Range{
|
||||
Start: int64(start) + 4,
|
||||
End: int64(start) + 4 + int64(d.len()),
|
||||
End: int64(start) + 4 + int64(d.Len()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -744,13 +739,13 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
|
|||
if off == 0 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
d := newDecbufAt(bs, off)
|
||||
d := encoding.NewDecbufAt(bs, off, castagnoliTable)
|
||||
|
||||
var (
|
||||
origLen = d.len()
|
||||
cnt = d.be32int()
|
||||
origLen = d.Len()
|
||||
cnt = d.Be32int()
|
||||
basePos = uint32(off) + 4
|
||||
nextPos = basePos + uint32(origLen-d.len())
|
||||
nextPos = basePos + uint32(origLen-d.Len())
|
||||
symbolSlice []string
|
||||
symbols = map[uint32]string{}
|
||||
)
|
||||
|
@ -758,35 +753,35 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin
|
|||
symbolSlice = make([]string, 0, cnt)
|
||||
}
|
||||
|
||||
for d.err() == nil && d.len() > 0 && cnt > 0 {
|
||||
s := d.uvarintStr()
|
||||
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||
s := d.UvarintStr()
|
||||
|
||||
if version == FormatV2 {
|
||||
symbolSlice = append(symbolSlice, s)
|
||||
} else {
|
||||
symbols[nextPos] = s
|
||||
nextPos = basePos + uint32(origLen-d.len())
|
||||
nextPos = basePos + uint32(origLen-d.Len())
|
||||
}
|
||||
cnt--
|
||||
}
|
||||
return symbolSlice, symbols, errors.Wrap(d.err(), "read symbols")
|
||||
return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
|
||||
}
|
||||
|
||||
// ReadOffsetTable reads an offset table and at the given position calls f for each
|
||||
// found entry. If f returns an error it stops decoding and returns the received error.
|
||||
func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) error {
|
||||
d := newDecbufAt(bs, int(off))
|
||||
cnt := d.be32()
|
||||
d := encoding.NewDecbufAt(bs, int(off), castagnoliTable)
|
||||
cnt := d.Be32()
|
||||
|
||||
for d.err() == nil && d.len() > 0 && cnt > 0 {
|
||||
keyCount := d.uvarint()
|
||||
for d.Err() == nil && d.Len() > 0 && cnt > 0 {
|
||||
keyCount := d.Uvarint()
|
||||
keys := make([]string, 0, keyCount)
|
||||
|
||||
for i := 0; i < keyCount; i++ {
|
||||
keys = append(keys, d.uvarintStr())
|
||||
keys = append(keys, d.UvarintStr())
|
||||
}
|
||||
o := d.uvarint64()
|
||||
if d.err() != nil {
|
||||
o := d.Uvarint64()
|
||||
if d.Err() != nil {
|
||||
break
|
||||
}
|
||||
if err := f(keys, o); err != nil {
|
||||
|
@ -794,7 +789,7 @@ func ReadOffsetTable(bs ByteSlice, off uint64, f func([]string, uint64) error) e
|
|||
}
|
||||
cnt--
|
||||
}
|
||||
return d.err()
|
||||
return d.Err()
|
||||
}
|
||||
|
||||
// Close the reader and its underlying resources.
|
||||
|
@ -843,17 +838,17 @@ func (r *Reader) LabelValues(names ...string) (StringTuples, error) {
|
|||
//return nil, fmt.Errorf("label index doesn't exist")
|
||||
}
|
||||
|
||||
d := newDecbufAt(r.b, int(off))
|
||||
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
||||
|
||||
nc := d.be32int()
|
||||
d.be32() // consume unused value entry count.
|
||||
nc := d.Be32int()
|
||||
d.Be32() // consume unused value entry count.
|
||||
|
||||
if d.err() != nil {
|
||||
return nil, errors.Wrap(d.err(), "read label value index")
|
||||
if d.Err() != nil {
|
||||
return nil, errors.Wrap(d.Err(), "read label value index")
|
||||
}
|
||||
st := &serializedStringTuples{
|
||||
idsCount: nc,
|
||||
idsBytes: d.get(),
|
||||
idsBytes: d.Get(),
|
||||
lookup: r.lookupSymbol,
|
||||
}
|
||||
return st, nil
|
||||
|
@ -882,11 +877,11 @@ func (r *Reader) Series(id uint64, lbls *labels.Labels, chks *[]chunks.Meta) err
|
|||
if r.version == FormatV2 {
|
||||
offset = id * 16
|
||||
}
|
||||
d := newDecbufUvarintAt(r.b, int(offset))
|
||||
if d.err() != nil {
|
||||
return d.err()
|
||||
d := encoding.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return d.Err()
|
||||
}
|
||||
return errors.Wrap(r.dec.Series(d.get(), lbls, chks), "read series")
|
||||
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
|
||||
}
|
||||
|
||||
// Postings returns a postings list for the given label pair.
|
||||
|
@ -899,11 +894,11 @@ func (r *Reader) Postings(name, value string) (Postings, error) {
|
|||
if !ok {
|
||||
return EmptyPostings(), nil
|
||||
}
|
||||
d := newDecbufAt(r.b, int(off))
|
||||
if d.err() != nil {
|
||||
return nil, errors.Wrap(d.err(), "get postings entry")
|
||||
d := encoding.NewDecbufAt(r.b, int(off), castagnoliTable)
|
||||
if d.Err() != nil {
|
||||
return nil, errors.Wrap(d.Err(), "get postings entry")
|
||||
}
|
||||
_, p, err := r.dec.Postings(d.get())
|
||||
_, p, err := r.dec.Postings(d.Get())
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "decode postings")
|
||||
}
|
||||
|
@ -952,7 +947,7 @@ type stringTuples struct {
|
|||
|
||||
func NewStringTuples(entries []string, length int) (*stringTuples, error) {
|
||||
if len(entries)%length != 0 {
|
||||
return nil, errors.Wrap(errInvalidSize, "string tuple list")
|
||||
return nil, errors.Wrap(encoding.ErrInvalidSize, "string tuple list")
|
||||
}
|
||||
return &stringTuples{entries: entries, length: length}, nil
|
||||
}
|
||||
|
@ -996,7 +991,7 @@ func (t *serializedStringTuples) Len() int {
|
|||
|
||||
func (t *serializedStringTuples) At(i int) ([]string, error) {
|
||||
if len(t.idsBytes) < (i+t.idsCount)*4 {
|
||||
return nil, errInvalidSize
|
||||
return nil, encoding.ErrInvalidSize
|
||||
}
|
||||
res := make([]string, 0, t.idsCount)
|
||||
|
||||
|
@ -1023,10 +1018,10 @@ type Decoder struct {
|
|||
|
||||
// Postings returns a postings list for b and its number of elements.
|
||||
func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
|
||||
d := decbuf{b: b}
|
||||
n := d.be32int()
|
||||
l := d.get()
|
||||
return n, newBigEndianPostings(l), d.err()
|
||||
d := encoding.Decbuf{B: b}
|
||||
n := d.Be32int()
|
||||
l := d.Get()
|
||||
return n, newBigEndianPostings(l), d.Err()
|
||||
}
|
||||
|
||||
// Series decodes a series entry from the given byte slice into lset and chks.
|
||||
|
@ -1034,16 +1029,16 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
|
|||
*lbls = (*lbls)[:0]
|
||||
*chks = (*chks)[:0]
|
||||
|
||||
d := decbuf{b: b}
|
||||
d := encoding.Decbuf{B: b}
|
||||
|
||||
k := d.uvarint()
|
||||
k := d.Uvarint()
|
||||
|
||||
for i := 0; i < k; i++ {
|
||||
lno := uint32(d.uvarint())
|
||||
lvo := uint32(d.uvarint())
|
||||
lno := uint32(d.Uvarint())
|
||||
lvo := uint32(d.Uvarint())
|
||||
|
||||
if d.err() != nil {
|
||||
return errors.Wrap(d.err(), "read series label offsets")
|
||||
if d.Err() != nil {
|
||||
return errors.Wrap(d.Err(), "read series label offsets")
|
||||
}
|
||||
|
||||
ln, err := dec.LookupSymbol(lno)
|
||||
|
@ -1059,15 +1054,15 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
|
|||
}
|
||||
|
||||
// Read the chunks meta data.
|
||||
k = d.uvarint()
|
||||
k = d.Uvarint()
|
||||
|
||||
if k == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
t0 := d.varint64()
|
||||
maxt := int64(d.uvarint64()) + t0
|
||||
ref0 := int64(d.uvarint64())
|
||||
t0 := d.Varint64()
|
||||
maxt := int64(d.Uvarint64()) + t0
|
||||
ref0 := int64(d.Uvarint64())
|
||||
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
Ref: uint64(ref0),
|
||||
|
@ -1077,14 +1072,14 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
|
|||
t0 = maxt
|
||||
|
||||
for i := 1; i < k; i++ {
|
||||
mint := int64(d.uvarint64()) + t0
|
||||
maxt := int64(d.uvarint64()) + mint
|
||||
mint := int64(d.Uvarint64()) + t0
|
||||
maxt := int64(d.Uvarint64()) + mint
|
||||
|
||||
ref0 += d.varint64()
|
||||
ref0 += d.Varint64()
|
||||
t0 = maxt
|
||||
|
||||
if d.err() != nil {
|
||||
return errors.Wrapf(d.err(), "read meta for chunk %d", i)
|
||||
if d.Err() != nil {
|
||||
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
|
||||
}
|
||||
|
||||
*chks = append(*chks, chunks.Meta{
|
||||
|
@ -1093,5 +1088,5 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]chunks.Meta) e
|
|||
MaxTime: maxt,
|
||||
})
|
||||
}
|
||||
return d.err()
|
||||
return d.Err()
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/chunkenc"
|
||||
"github.com/prometheus/tsdb/chunks"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/tsdb/testutil"
|
||||
)
|
||||
|
@ -395,8 +396,8 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||
func TestDecbufUvariantWithInvalidBuffer(t *testing.T) {
|
||||
b := realByteSlice([]byte{0x81, 0x81, 0x81, 0x81, 0x81, 0x81})
|
||||
|
||||
db := newDecbufUvarintAt(b, 0)
|
||||
testutil.NotOk(t, db.err())
|
||||
db := encoding.NewDecbufUvarintAt(b, 0, castagnoliTable)
|
||||
testutil.NotOk(t, db.Err())
|
||||
}
|
||||
|
||||
func TestReaderWithInvalidBuffer(t *testing.T) {
|
||||
|
|
111
record.go
111
record.go
|
@ -19,6 +19,7 @@ import (
|
|||
"sort"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
)
|
||||
|
||||
|
@ -56,19 +57,19 @@ func (d *RecordDecoder) Type(rec []byte) RecordType {
|
|||
|
||||
// Series appends series in rec to the given slice.
|
||||
func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, error) {
|
||||
dec := decbuf{b: rec}
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
|
||||
if RecordType(dec.byte()) != RecordSeries {
|
||||
if RecordType(dec.Byte()) != RecordSeries {
|
||||
return nil, errors.New("invalid record type")
|
||||
}
|
||||
for len(dec.b) > 0 && dec.err() == nil {
|
||||
ref := dec.be64()
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
ref := dec.Be64()
|
||||
|
||||
lset := make(labels.Labels, dec.uvarint())
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.uvarintStr()
|
||||
lset[i].Value = dec.uvarintStr()
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
|
||||
|
@ -77,33 +78,33 @@ func (d *RecordDecoder) Series(rec []byte, series []RefSeries) ([]RefSeries, err
|
|||
Labels: lset,
|
||||
})
|
||||
}
|
||||
if dec.err() != nil {
|
||||
return nil, dec.err()
|
||||
if dec.Err() != nil {
|
||||
return nil, dec.Err()
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return series, nil
|
||||
}
|
||||
|
||||
// Samples appends samples in rec to the given slice.
|
||||
func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error) {
|
||||
dec := decbuf{b: rec}
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
|
||||
if RecordType(dec.byte()) != RecordSamples {
|
||||
if RecordType(dec.Byte()) != RecordSamples {
|
||||
return nil, errors.New("invalid record type")
|
||||
}
|
||||
if dec.len() == 0 {
|
||||
if dec.Len() == 0 {
|
||||
return samples, nil
|
||||
}
|
||||
var (
|
||||
baseRef = dec.be64()
|
||||
baseTime = dec.be64int64()
|
||||
baseRef = dec.Be64()
|
||||
baseTime = dec.Be64int64()
|
||||
)
|
||||
for len(dec.b) > 0 && dec.err() == nil {
|
||||
dref := dec.varint64()
|
||||
dtime := dec.varint64()
|
||||
val := dec.be64()
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
dref := dec.Varint64()
|
||||
dtime := dec.Varint64()
|
||||
val := dec.Be64()
|
||||
|
||||
samples = append(samples, RefSample{
|
||||
Ref: uint64(int64(baseRef) + dref),
|
||||
|
@ -112,35 +113,35 @@ func (d *RecordDecoder) Samples(rec []byte, samples []RefSample) ([]RefSample, e
|
|||
})
|
||||
}
|
||||
|
||||
if dec.err() != nil {
|
||||
return nil, errors.Wrapf(dec.err(), "decode error after %d samples", len(samples))
|
||||
if dec.Err() != nil {
|
||||
return nil, errors.Wrapf(dec.Err(), "decode error after %d samples", len(samples))
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return samples, nil
|
||||
}
|
||||
|
||||
// Tombstones appends tombstones in rec to the given slice.
|
||||
func (d *RecordDecoder) Tombstones(rec []byte, tstones []Stone) ([]Stone, error) {
|
||||
dec := decbuf{b: rec}
|
||||
dec := encoding.Decbuf{B: rec}
|
||||
|
||||
if RecordType(dec.byte()) != RecordTombstones {
|
||||
if RecordType(dec.Byte()) != RecordTombstones {
|
||||
return nil, errors.New("invalid record type")
|
||||
}
|
||||
for dec.len() > 0 && dec.err() == nil {
|
||||
for dec.Len() > 0 && dec.Err() == nil {
|
||||
tstones = append(tstones, Stone{
|
||||
ref: dec.be64(),
|
||||
ref: dec.Be64(),
|
||||
intervals: Intervals{
|
||||
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
||||
{Mint: dec.Varint64(), Maxt: dec.Varint64()},
|
||||
},
|
||||
})
|
||||
}
|
||||
if dec.err() != nil {
|
||||
return nil, dec.err()
|
||||
if dec.Err() != nil {
|
||||
return nil, dec.Err()
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return nil, errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return tstones, nil
|
||||
}
|
||||
|
@ -152,56 +153,56 @@ type RecordEncoder struct {
|
|||
|
||||
// Series appends the encoded series to b and returns the resulting slice.
|
||||
func (e *RecordEncoder) Series(series []RefSeries, b []byte) []byte {
|
||||
buf := encbuf{b: b}
|
||||
buf.putByte(byte(RecordSeries))
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(RecordSeries))
|
||||
|
||||
for _, s := range series {
|
||||
buf.putBE64(s.Ref)
|
||||
buf.putUvarint(len(s.Labels))
|
||||
buf.PutBE64(s.Ref)
|
||||
buf.PutUvarint(len(s.Labels))
|
||||
|
||||
for _, l := range s.Labels {
|
||||
buf.putUvarintStr(l.Name)
|
||||
buf.putUvarintStr(l.Value)
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
}
|
||||
return buf.get()
|
||||
return buf.Get()
|
||||
}
|
||||
|
||||
// Samples appends the encoded samples to b and returns the resulting slice.
|
||||
func (e *RecordEncoder) Samples(samples []RefSample, b []byte) []byte {
|
||||
buf := encbuf{b: b}
|
||||
buf.putByte(byte(RecordSamples))
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(RecordSamples))
|
||||
|
||||
if len(samples) == 0 {
|
||||
return buf.get()
|
||||
return buf.Get()
|
||||
}
|
||||
|
||||
// Store base timestamp and base reference number of first sample.
|
||||
// All samples encode their timestamp and ref as delta to those.
|
||||
first := samples[0]
|
||||
|
||||
buf.putBE64(first.Ref)
|
||||
buf.putBE64int64(first.T)
|
||||
buf.PutBE64(first.Ref)
|
||||
buf.PutBE64int64(first.T)
|
||||
|
||||
for _, s := range samples {
|
||||
buf.putVarint64(int64(s.Ref) - int64(first.Ref))
|
||||
buf.putVarint64(s.T - first.T)
|
||||
buf.putBE64(math.Float64bits(s.V))
|
||||
buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(s.T - first.T)
|
||||
buf.PutBE64(math.Float64bits(s.V))
|
||||
}
|
||||
return buf.get()
|
||||
return buf.Get()
|
||||
}
|
||||
|
||||
// Tombstones appends the encoded tombstones to b and returns the resulting slice.
|
||||
func (e *RecordEncoder) Tombstones(tstones []Stone, b []byte) []byte {
|
||||
buf := encbuf{b: b}
|
||||
buf.putByte(byte(RecordTombstones))
|
||||
buf := encoding.Encbuf{B: b}
|
||||
buf.PutByte(byte(RecordTombstones))
|
||||
|
||||
for _, s := range tstones {
|
||||
for _, iv := range s.intervals {
|
||||
buf.putBE64(s.ref)
|
||||
buf.putVarint64(iv.Mint)
|
||||
buf.putVarint64(iv.Maxt)
|
||||
buf.PutBE64(s.ref)
|
||||
buf.PutVarint64(iv.Mint)
|
||||
buf.PutVarint64(iv.Maxt)
|
||||
}
|
||||
}
|
||||
return buf.get()
|
||||
return buf.Get()
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
)
|
||||
|
||||
const tombstoneFilename = "tombstones"
|
||||
|
@ -64,12 +65,12 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
|
|||
}
|
||||
}()
|
||||
|
||||
buf := encbuf{b: make([]byte, 3*binary.MaxVarintLen64)}
|
||||
buf.reset()
|
||||
buf := encoding.Encbuf{B: make([]byte, 3*binary.MaxVarintLen64)}
|
||||
buf.Reset()
|
||||
// Write the meta.
|
||||
buf.putBE32(MagicTombstone)
|
||||
buf.putByte(tombstoneFormatV1)
|
||||
_, err = f.Write(buf.get())
|
||||
buf.PutBE32(MagicTombstone)
|
||||
buf.PutByte(tombstoneFormatV1)
|
||||
_, err = f.Write(buf.Get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -78,13 +79,13 @@ func writeTombstoneFile(dir string, tr TombstoneReader) error {
|
|||
|
||||
if err := tr.Iter(func(ref uint64, ivs Intervals) error {
|
||||
for _, iv := range ivs {
|
||||
buf.reset()
|
||||
buf.Reset()
|
||||
|
||||
buf.putUvarint64(ref)
|
||||
buf.putVarint64(iv.Mint)
|
||||
buf.putVarint64(iv.Maxt)
|
||||
buf.PutUvarint64(ref)
|
||||
buf.PutVarint64(iv.Mint)
|
||||
buf.PutVarint64(iv.Maxt)
|
||||
|
||||
_, err = mw.Write(buf.get())
|
||||
_, err = mw.Write(buf.Get())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -126,24 +127,24 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
|
|||
}
|
||||
|
||||
if len(b) < 5 {
|
||||
return nil, sr, errors.Wrap(errInvalidSize, "tombstones header")
|
||||
return nil, sr, errors.Wrap(encoding.ErrInvalidSize, "tombstones header")
|
||||
}
|
||||
|
||||
d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum.
|
||||
if mg := d.be32(); mg != MagicTombstone {
|
||||
d := &encoding.Decbuf{B: b[:len(b)-4]} // 4 for the checksum.
|
||||
if mg := d.Be32(); mg != MagicTombstone {
|
||||
return nil, sr, fmt.Errorf("invalid magic number %x", mg)
|
||||
}
|
||||
if flag := d.byte(); flag != tombstoneFormatV1 {
|
||||
if flag := d.Byte(); flag != tombstoneFormatV1 {
|
||||
return nil, sr, fmt.Errorf("invalid tombstone format %x", flag)
|
||||
}
|
||||
|
||||
if d.err() != nil {
|
||||
return nil, sr, d.err()
|
||||
if d.Err() != nil {
|
||||
return nil, sr, d.Err()
|
||||
}
|
||||
|
||||
// Verify checksum.
|
||||
hash := newCRC32()
|
||||
if _, err := hash.Write(d.get()); err != nil {
|
||||
if _, err := hash.Write(d.Get()); err != nil {
|
||||
return nil, sr, errors.Wrap(err, "write to hash")
|
||||
}
|
||||
if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() {
|
||||
|
@ -152,12 +153,12 @@ func readTombstones(dir string) (TombstoneReader, SizeReader, error) {
|
|||
|
||||
stonesMap := newMemTombstones()
|
||||
|
||||
for d.len() > 0 {
|
||||
k := d.uvarint64()
|
||||
mint := d.varint64()
|
||||
maxt := d.varint64()
|
||||
if d.err() != nil {
|
||||
return nil, sr, d.err()
|
||||
for d.Len() > 0 {
|
||||
k := d.Uvarint64()
|
||||
mint := d.Varint64()
|
||||
maxt := d.Varint64()
|
||||
if d.Err() != nil {
|
||||
return nil, sr, d.Err()
|
||||
}
|
||||
|
||||
stonesMap.addInterval(k, Interval{mint, maxt})
|
||||
|
|
107
wal.go
107
wal.go
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/tsdb/encoding"
|
||||
"github.com/prometheus/tsdb/fileutil"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/tsdb/wal"
|
||||
|
@ -287,16 +288,16 @@ func (w *SegmentWAL) Reader() WALReader {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) getBuffer() *encbuf {
|
||||
func (w *SegmentWAL) getBuffer() *encoding.Encbuf {
|
||||
b := w.buffers.Get()
|
||||
if b == nil {
|
||||
return &encbuf{b: make([]byte, 0, 64*1024)}
|
||||
return &encoding.Encbuf{B: make([]byte, 0, 64*1024)}
|
||||
}
|
||||
return b.(*encbuf)
|
||||
return b.(*encoding.Encbuf)
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) putBuffer(b *encbuf) {
|
||||
b.reset()
|
||||
func (w *SegmentWAL) putBuffer(b *encoding.Encbuf) {
|
||||
b.Reset()
|
||||
w.buffers.Put(b)
|
||||
}
|
||||
|
||||
|
@ -366,7 +367,7 @@ func (w *SegmentWAL) Truncate(mint int64, keep func(uint64) bool) error {
|
|||
buf := w.getBuffer()
|
||||
flag = w.encodeSeries(buf, activeSeries)
|
||||
|
||||
_, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.get())
|
||||
_, err = w.writeTo(csf, crc32, WALEntrySeries, flag, buf.Get())
|
||||
w.putBuffer(buf)
|
||||
|
||||
if err != nil {
|
||||
|
@ -427,7 +428,7 @@ func (w *SegmentWAL) LogSeries(series []RefSeries) error {
|
|||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
err := w.write(WALEntrySeries, flag, buf.get())
|
||||
err := w.write(WALEntrySeries, flag, buf.Get())
|
||||
|
||||
w.putBuffer(buf)
|
||||
|
||||
|
@ -454,7 +455,7 @@ func (w *SegmentWAL) LogSamples(samples []RefSample) error {
|
|||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
err := w.write(WALEntrySamples, flag, buf.get())
|
||||
err := w.write(WALEntrySamples, flag, buf.Get())
|
||||
|
||||
w.putBuffer(buf)
|
||||
|
||||
|
@ -480,7 +481,7 @@ func (w *SegmentWAL) LogDeletes(stones []Stone) error {
|
|||
w.mtx.Lock()
|
||||
defer w.mtx.Unlock()
|
||||
|
||||
err := w.write(WALEntryDeletes, flag, buf.get())
|
||||
err := w.write(WALEntryDeletes, flag, buf.Get())
|
||||
|
||||
w.putBuffer(buf)
|
||||
|
||||
|
@ -783,20 +784,20 @@ const (
|
|||
walDeletesSimple = 1
|
||||
)
|
||||
|
||||
func (w *SegmentWAL) encodeSeries(buf *encbuf, series []RefSeries) uint8 {
|
||||
func (w *SegmentWAL) encodeSeries(buf *encoding.Encbuf, series []RefSeries) uint8 {
|
||||
for _, s := range series {
|
||||
buf.putBE64(s.Ref)
|
||||
buf.putUvarint(len(s.Labels))
|
||||
buf.PutBE64(s.Ref)
|
||||
buf.PutUvarint(len(s.Labels))
|
||||
|
||||
for _, l := range s.Labels {
|
||||
buf.putUvarintStr(l.Name)
|
||||
buf.putUvarintStr(l.Value)
|
||||
buf.PutUvarintStr(l.Name)
|
||||
buf.PutUvarintStr(l.Value)
|
||||
}
|
||||
}
|
||||
return walSeriesSimple
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) encodeSamples(buf *encbuf, samples []RefSample) uint8 {
|
||||
func (w *SegmentWAL) encodeSamples(buf *encoding.Encbuf, samples []RefSample) uint8 {
|
||||
if len(samples) == 0 {
|
||||
return walSamplesSimple
|
||||
}
|
||||
|
@ -806,23 +807,23 @@ func (w *SegmentWAL) encodeSamples(buf *encbuf, samples []RefSample) uint8 {
|
|||
// TODO(fabxc): optimize for all samples having the same timestamp.
|
||||
first := samples[0]
|
||||
|
||||
buf.putBE64(first.Ref)
|
||||
buf.putBE64int64(first.T)
|
||||
buf.PutBE64(first.Ref)
|
||||
buf.PutBE64int64(first.T)
|
||||
|
||||
for _, s := range samples {
|
||||
buf.putVarint64(int64(s.Ref) - int64(first.Ref))
|
||||
buf.putVarint64(s.T - first.T)
|
||||
buf.putBE64(math.Float64bits(s.V))
|
||||
buf.PutVarint64(int64(s.Ref) - int64(first.Ref))
|
||||
buf.PutVarint64(s.T - first.T)
|
||||
buf.PutBE64(math.Float64bits(s.V))
|
||||
}
|
||||
return walSamplesSimple
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) encodeDeletes(buf *encbuf, stones []Stone) uint8 {
|
||||
func (w *SegmentWAL) encodeDeletes(buf *encoding.Encbuf, stones []Stone) uint8 {
|
||||
for _, s := range stones {
|
||||
for _, iv := range s.intervals {
|
||||
buf.putBE64(s.ref)
|
||||
buf.putVarint64(iv.Mint)
|
||||
buf.putVarint64(iv.Maxt)
|
||||
buf.PutBE64(s.ref)
|
||||
buf.PutVarint64(iv.Mint)
|
||||
buf.PutVarint64(iv.Maxt)
|
||||
}
|
||||
}
|
||||
return walDeletesSimple
|
||||
|
@ -1115,16 +1116,16 @@ func (r *walReader) entry(cr io.Reader) (WALEntryType, byte, []byte, error) {
|
|||
}
|
||||
|
||||
func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error {
|
||||
dec := decbuf{b: b}
|
||||
dec := encoding.Decbuf{B: b}
|
||||
|
||||
for len(dec.b) > 0 && dec.err() == nil {
|
||||
ref := dec.be64()
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
ref := dec.Be64()
|
||||
|
||||
lset := make(labels.Labels, dec.uvarint())
|
||||
lset := make(labels.Labels, dec.Uvarint())
|
||||
|
||||
for i := range lset {
|
||||
lset[i].Name = dec.uvarintStr()
|
||||
lset[i].Value = dec.uvarintStr()
|
||||
lset[i].Name = dec.UvarintStr()
|
||||
lset[i].Value = dec.UvarintStr()
|
||||
}
|
||||
sort.Sort(lset)
|
||||
|
||||
|
@ -1133,11 +1134,11 @@ func (r *walReader) decodeSeries(flag byte, b []byte, res *[]RefSeries) error {
|
|||
Labels: lset,
|
||||
})
|
||||
}
|
||||
if dec.err() != nil {
|
||||
return dec.err()
|
||||
if dec.Err() != nil {
|
||||
return dec.Err()
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -1146,17 +1147,17 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error {
|
|||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
dec := decbuf{b: b}
|
||||
dec := encoding.Decbuf{B: b}
|
||||
|
||||
var (
|
||||
baseRef = dec.be64()
|
||||
baseTime = dec.be64int64()
|
||||
baseRef = dec.Be64()
|
||||
baseTime = dec.Be64int64()
|
||||
)
|
||||
|
||||
for len(dec.b) > 0 && dec.err() == nil {
|
||||
dref := dec.varint64()
|
||||
dtime := dec.varint64()
|
||||
val := dec.be64()
|
||||
for len(dec.B) > 0 && dec.Err() == nil {
|
||||
dref := dec.Varint64()
|
||||
dtime := dec.Varint64()
|
||||
val := dec.Be64()
|
||||
|
||||
*res = append(*res, RefSample{
|
||||
Ref: uint64(int64(baseRef) + dref),
|
||||
|
@ -1165,31 +1166,31 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]RefSample) error {
|
|||
})
|
||||
}
|
||||
|
||||
if dec.err() != nil {
|
||||
return errors.Wrapf(dec.err(), "decode error after %d samples", len(*res))
|
||||
if dec.Err() != nil {
|
||||
return errors.Wrapf(dec.Err(), "decode error after %d samples", len(*res))
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *walReader) decodeDeletes(flag byte, b []byte, res *[]Stone) error {
|
||||
dec := &decbuf{b: b}
|
||||
dec := &encoding.Decbuf{B: b}
|
||||
|
||||
for dec.len() > 0 && dec.err() == nil {
|
||||
for dec.Len() > 0 && dec.Err() == nil {
|
||||
*res = append(*res, Stone{
|
||||
ref: dec.be64(),
|
||||
ref: dec.Be64(),
|
||||
intervals: Intervals{
|
||||
{Mint: dec.varint64(), Maxt: dec.varint64()},
|
||||
{Mint: dec.Varint64(), Maxt: dec.Varint64()},
|
||||
},
|
||||
})
|
||||
}
|
||||
if dec.err() != nil {
|
||||
return dec.err()
|
||||
if dec.Err() != nil {
|
||||
return dec.Err()
|
||||
}
|
||||
if len(dec.b) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.b))
|
||||
if len(dec.B) > 0 {
|
||||
return errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue