Merge pull request #2276 from prometheus/beorn7/storage

storage: Catch data corruption that leads to division by zero
This commit is contained in:
Björn Rabenstein 2016-12-13 23:13:39 +01:00 committed by GitHub
commit 5f0c0e43cf
6 changed files with 75 additions and 39 deletions

View file

@ -39,7 +39,10 @@ func TestLen(t *testing.T) {
t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len()) t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len())
} }
cs, _ := c.Add(model.SamplePair{model.Time(i), model.SampleValue(i)}) cs, _ := c.Add(model.SamplePair{
Timestamp: model.Time(i),
Value: model.SampleValue(i),
})
c = cs[0] c = cs[0]
} }
} }

View file

@ -238,27 +238,36 @@ func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
} }
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) return c.setLen()
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
if int(l) < deltaHeaderBytes {
return fmt.Errorf("chunk length less than header size: %d < %d", l, deltaHeaderBytes)
}
*c = (*c)[:l]
return nil
} }
// UnmarshalFromBuf implements chunk. // UnmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
return c.setLen()
}
// setLen sets the length of the underlying slice and performs some sanity checks.
func (c *deltaEncodedChunk) setLen() error {
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:]) l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) { if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) return fmt.Errorf("delta chunk length exceeded during unmarshaling: %d", l)
} }
if int(l) < deltaHeaderBytes { if int(l) < deltaHeaderBytes {
return fmt.Errorf("chunk length less than header size: %d < %d", l, deltaHeaderBytes) return fmt.Errorf("delta chunk length less than header size: %d < %d", l, deltaHeaderBytes)
}
switch c.timeBytes() {
case d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of time bytes in delta chunk: %d", c.timeBytes())
}
switch c.valueBytes() {
case d0, d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of value bytes in delta chunk: %d", c.valueBytes())
} }
*c = (*c)[:l] *c = (*c)[:l]
return nil return nil

View file

@ -32,21 +32,20 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
err error, err error,
chunkTypeName string, chunkTypeName string,
unmarshalMethod string, unmarshalMethod string,
badLen int) { expectedStr string,
) {
if err == nil { if err == nil {
t.Errorf("Failed to obtain an error when unmarshalling %s (from %s) with corrupt length of %d", chunkTypeName, unmarshalMethod, badLen) t.Errorf("Failed to obtain an error when unmarshalling corrupt %s (from %s)", chunkTypeName, unmarshalMethod)
return return
} }
expectedStr := "header size"
if !strings.Contains(err.Error(), expectedStr) { if !strings.Contains(err.Error(), expectedStr) {
t.Errorf( t.Errorf(
"'%s' not present in error when unmarshalling %s (from %s) with corrupt length %d: '%s'", "'%s' not present in error when unmarshalling corrupt %s (from %s): '%s'",
expectedStr, expectedStr,
chunkTypeName, chunkTypeName,
unmarshalMethod, unmarshalMethod,
badLen,
err.Error()) err.Error())
} }
} }
@ -56,6 +55,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk
minHeaderLen int minHeaderLen int
chunkLenPos int chunkLenPos int
timeBytesPos int
}{ }{
{ {
chunkTypeName: "deltaEncodedChunk", chunkTypeName: "deltaEncodedChunk",
@ -64,6 +64,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
}, },
minHeaderLen: deltaHeaderBytes, minHeaderLen: deltaHeaderBytes,
chunkLenPos: deltaHeaderBufLenOffset, chunkLenPos: deltaHeaderBufLenOffset,
timeBytesPos: deltaHeaderTimeBytesOffset,
}, },
{ {
chunkTypeName: "doubleDeltaEncodedChunk", chunkTypeName: "doubleDeltaEncodedChunk",
@ -72,6 +73,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
}, },
minHeaderLen: doubleDeltaHeaderMinBytes, minHeaderLen: doubleDeltaHeaderMinBytes,
chunkLenPos: doubleDeltaHeaderBufLenOffset, chunkLenPos: doubleDeltaHeaderBufLenOffset,
timeBytesPos: doubleDeltaHeaderTimeBytesOffset,
}, },
} }
for _, c := range cases { for _, c := range cases {
@ -89,15 +91,26 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
cs[0].MarshalToBuf(buf) cs[0].MarshalToBuf(buf)
// Corrupt time byte to 0, which is illegal.
buf[c.timeBytesPos] = 0
err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", "invalid number of time bytes")
err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "invalid number of time bytes")
// Fix the corruption to go on.
buf[c.timeBytesPos] = byte(d1)
// Corrupt the length to be every possible too-small value // Corrupt the length to be every possible too-small value
for i := 0; i < c.minHeaderLen; i++ { for i := 0; i < c.minHeaderLen; i++ {
binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i)) binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i))
err = cs[0].UnmarshalFromBuf(buf) err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", i) verifyUnmarshallingError(err, c.chunkTypeName, "buf", "header size")
err = cs[0].Unmarshal(bytes.NewBuffer(buf)) err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", i) verifyUnmarshallingError(err, c.chunkTypeName, "Reader", "header size")
} }
} }
} }

View file

@ -247,28 +247,36 @@ func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
if _, err := io.ReadFull(r, *c); err != nil { if _, err := io.ReadFull(r, *c); err != nil {
return err return err
} }
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) return c.setLen()
if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
}
if int(l) < doubleDeltaHeaderMinBytes {
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
}
*c = (*c)[:l]
return nil
} }
// UnmarshalFromBuf implements chunk. // UnmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error { func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)] *c = (*c)[:cap(*c)]
copy(*c, buf) copy(*c, buf)
return c.setLen()
}
// setLen sets the length of the underlying slice and performs some sanity checks.
func (c *doubleDeltaEncodedChunk) setLen() error {
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:]) l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) { if int(l) > cap(*c) {
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l) return fmt.Errorf("doubledelta chunk length exceeded during unmarshaling: %d", l)
} }
if int(l) < doubleDeltaHeaderMinBytes { if int(l) < doubleDeltaHeaderMinBytes {
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes) return fmt.Errorf("doubledelta chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
}
switch c.timeBytes() {
case d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of time bytes in doubledelta chunk: %d", c.timeBytes())
}
switch c.valueBytes() {
case d0, d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes())
} }
*c = (*c)[:l] *c = (*c)[:l]
return nil return nil

View file

@ -45,6 +45,9 @@ func (s *NoopStorage) Querier() (Querier, error) {
return &NoopQuerier{}, nil return &NoopQuerier{}, nil
} }
// NoopQuerier is a dummy Querier for use when Prometheus's local storage is
// disabled. It is returned by the NoopStorage Querier method and always returns
// empty results.
type NoopQuerier struct{} type NoopQuerier struct{}
// Close implements Querier. // Close implements Querier.

View file

@ -319,31 +319,31 @@ func TestFingerprintsForLabels(t *testing.T) {
expected model.Fingerprints expected model.Fingerprints
}{ }{
{ {
pairs: []model.LabelPair{{"label1", "x"}}, pairs: []model.LabelPair{{Name: "label1", Value: "x"}},
expected: fingerprints[:0], expected: fingerprints[:0],
}, },
{ {
pairs: []model.LabelPair{{"label1", "test_0"}}, pairs: []model.LabelPair{{Name: "label1", Value: "test_0"}},
expected: fingerprints[:10], expected: fingerprints[:10],
}, },
{ {
pairs: []model.LabelPair{ pairs: []model.LabelPair{
{"label1", "test_0"}, {Name: "label1", Value: "test_0"},
{"label1", "test_1"}, {Name: "label1", Value: "test_1"},
}, },
expected: fingerprints[:0], expected: fingerprints[:0],
}, },
{ {
pairs: []model.LabelPair{ pairs: []model.LabelPair{
{"label1", "test_0"}, {Name: "label1", Value: "test_0"},
{"label2", "test_1"}, {Name: "label2", Value: "test_1"},
}, },
expected: fingerprints[5:10], expected: fingerprints[5:10],
}, },
{ {
pairs: []model.LabelPair{ pairs: []model.LabelPair{
{"label1", "test_1"}, {Name: "label1", Value: "test_1"},
{"label2", "test_2"}, {Name: "label2", Value: "test_2"},
}, },
expected: fingerprints[15:20], expected: fingerprints[15:20],
}, },