storage: Check for negative values from varint decoding

Sadly, we have a number of places where we use varint encoding for
numbers that cannot be negative. We could have saved a bit by using
uvarint encoding. On the bright side, we now have a 50% chance to
detect data corruption. :-/

Fixes #1800 and #2492.
This commit is contained in:
beorn7 2017-04-04 19:14:52 +02:00
parent 5f3327f620
commit ae286385fd
3 changed files with 31 additions and 0 deletions

View file

@ -123,6 +123,8 @@ func DecodeUint64(r io.Reader) (uint64, error) {
// encodeString writes the varint encoded length followed by the bytes of s to
// b.
func encodeString(b *bytes.Buffer, s string) error {
// Note that this should have used EncodeUvarint but a glitch happened
// while designing the checkpoint format.
if _, err := EncodeVarint(b, int64(len(s))); err != nil {
return err
}
@ -135,6 +137,9 @@ func encodeString(b *bytes.Buffer, s string) error {
// decodeString decodes a string encoded by encodeString.
func decodeString(b byteReader) (string, error) {
length, err := binary.ReadVarint(b)
if length < 0 {
err = fmt.Errorf("found negative string length during decoding: %d", length)
}
if err != nil {
return "", err
}
@ -155,6 +160,8 @@ type Metric model.Metric
// MarshalBinary implements encoding.BinaryMarshaler.
func (m Metric) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
// Note that this should have used EncodeUvarint but a glitch happened
// while designing the checkpoint format.
if _, err := EncodeVarint(buf, int64(len(m))); err != nil {
return nil, err
}
@ -180,6 +187,9 @@ func (m *Metric) UnmarshalBinary(buf []byte) error {
// Metric.
func (m *Metric) UnmarshalFromReader(r byteReader) error {
numLabelPairs, err := binary.ReadVarint(r)
if numLabelPairs < 0 {
err = fmt.Errorf("found negative numLabelPairs during unmarshaling: %d", numLabelPairs)
}
if err != nil {
return err
}
@ -344,6 +354,8 @@ type LabelValueSet map[model.LabelValue]struct{}
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs LabelValueSet) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
// Note that this should have used EncodeUvarint but a glitch happened
// while designing the checkpoint format.
if _, err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
}
@ -359,6 +371,9 @@ func (vs LabelValueSet) MarshalBinary() ([]byte, error) {
func (vs *LabelValueSet) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numValues, err := binary.ReadVarint(r)
if numValues < 0 {
err = fmt.Errorf("found negative number of values during unmarshaling: %d", numValues)
}
if err != nil {
return err
}
@ -382,6 +397,8 @@ type LabelValues model.LabelValues
// MarshalBinary implements encoding.BinaryMarshaler.
func (vs LabelValues) MarshalBinary() ([]byte, error) {
buf := &bytes.Buffer{}
// Note that this should have used EncodeUvarint but a glitch happened
// while designing the checkpoint format.
if _, err := EncodeVarint(buf, int64(len(vs))); err != nil {
return nil, err
}
@ -397,6 +414,9 @@ func (vs LabelValues) MarshalBinary() ([]byte, error) {
func (vs *LabelValues) UnmarshalBinary(buf []byte) error {
r := bytes.NewReader(buf)
numValues, err := binary.ReadVarint(r)
if numValues < 0 {
err = fmt.Errorf("found negative number of values during unmarshaling: %d", numValues)
}
if err != nil {
return err
}

View file

@ -126,6 +126,9 @@ func (hs *headsScanner) scan() bool {
if hs.version != headsFormatLegacyVersion {
// persistWatermark only present in v2.
persistWatermark, hs.err = binary.ReadVarint(hs.r)
if persistWatermark < 0 {
hs.err = fmt.Errorf("found negative persist watermark in checkpoint: %d", persistWatermark)
}
if hs.err != nil {
return false
}
@ -147,6 +150,11 @@ func (hs *headsScanner) scan() bool {
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
if numChunkDescs < 0 {
hs.err = fmt.Errorf("found negative number of chunk descriptors in checkpoint: %d", numChunkDescs)
return false
}
chunkDescs := make([]*chunk.Desc, numChunkDescs)
if hs.version == headsFormatLegacyVersion {
if headChunkPersisted {

View file

@ -609,6 +609,9 @@ func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([
// (4.8.2.1) A byte defining the chunk type.
// (4.8.2.2) The chunk itself, marshaled with the Marshal() method.
//
// NOTE: Above, varint encoding is used consistently although uvarint would have
// made more sense in many cases. This was simply a glitch while designing the
// format.
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
log.Info("Checkpointing in-memory metrics and chunks...")
p.checkpointing.Set(1)