Parameterize the buffer for marshal/unmarshal.

We are not reusing buffers yet.  This could introduce problems,
so the behavior is disabled for now.

Cursory benchmark data:
- Marshal for 10,000 samples: -30% overhead.
- Unmarshal for 10,000 samples: -15% overhead.

Change-Id: Ib006bdc656af45dca2b92de08a8f905d8d728cac
This commit is contained in:
Matt T. Proud 2014-04-15 20:49:09 +02:00
parent 2064f32662
commit 3e969a8ca2
9 changed files with 104 additions and 44 deletions

View file

@ -51,6 +51,9 @@ tag:
$(BUILD_PATH)/cache/$(GOPKG): $(BUILD_PATH)/cache/$(GOPKG):
curl -o $@ $(GOURL)/$(GOPKG) curl -o $@ $(GOURL)/$(GOPKG)
benchmark: test
$(GO) test $(GO_TEST_FLAGS) -test.bench='Benchmark' ./...
clean: clean:
$(MAKE) -C $(BUILD_PATH) clean $(MAKE) -C $(BUILD_PATH) clean
$(MAKE) -C tools clean $(MAKE) -C tools clean

View file

@ -78,7 +78,7 @@ export PKG_CONFIG_PATH := $(PREFIX)/lib/pkgconfig:$(PKG_CONFIG_PATH)
export CGO_CFLAGS = $(CFLAGS) export CGO_CFLAGS = $(CFLAGS)
export CGO_LDFLAGS = $(LDFLAGS) export CGO_LDFLAGS = $(LDFLAGS)
export GO_TEST_FLAGS := "-v" export GO_TEST_FLAGS ?= "-v"
GO_GET := $(GO) get -u -v -x GO_GET := $(GO) get -u -v -x
APT_GET_INSTALL := sudo apt-get install -y APT_GET_INSTALL := sudo apt-get install -y

View file

@ -374,7 +374,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples clientmodel.Samples) (e
Value: sample.Value, Value: sample.Value,
}) })
} }
val := values.marshal() val := values.marshal(nil)
samplesBatch.PutRaw(keyDto, val) samplesBatch.PutRaw(keyDto, val)
} }
} }
@ -669,7 +669,7 @@ func (d *MetricSamplesDecoder) DecodeKey(in interface{}) (interface{}, error) {
// DecodeValue implements storage.RecordDecoder. It requires 'in' to be a // DecodeValue implements storage.RecordDecoder. It requires 'in' to be a
// SampleValueSeries protobuf. 'out' is of type metric.Values. // SampleValueSeries protobuf. 'out' is of type metric.Values.
func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) { func (d *MetricSamplesDecoder) DecodeValue(in interface{}) (interface{}, error) {
return unmarshalValues(in.([]byte)), nil return unmarshalValues(in.([]byte), nil), nil
} }
// AcceptAllFilter implements storage.RecordFilter and accepts all records. // AcceptAllFilter implements storage.RecordFilter and accepts all records.

View file

@ -118,7 +118,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
sampleKey.Load(sampleKeyDto) sampleKey.Load(sampleKeyDto)
unactedSamples = unmarshalValues(sampleIterator.RawValue()) unactedSamples = unmarshalValues(sampleIterator.RawValue(), nil)
for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) { for lastCurated.Before(stopAt) && lastTouchedTime.Before(stopAt) && sampleKey.Fingerprint.Equal(fingerprint) {
switch { switch {
@ -144,7 +144,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
break break
} }
unactedSamples = unmarshalValues(sampleIterator.RawValue()) unactedSamples = unmarshalValues(sampleIterator.RawValue(), nil)
// If the number of pending mutations exceeds the allowed batch amount, // If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if // commit to disk and delete the batch. A new one will be recreated if
@ -182,7 +182,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
k := &dto.SampleKey{} k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k) newSampleKey.Dump(k)
b := pendingSamples.marshal() b := pendingSamples.marshal(nil)
pendingBatch.PutRaw(k, b) pendingBatch.PutRaw(k, b)
pendingMutations++ pendingMutations++
@ -231,7 +231,7 @@ func (p *CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPers
k := &dto.SampleKey{} k := &dto.SampleKey{}
newSampleKey := pendingSamples.ToSampleKey(fingerprint) newSampleKey := pendingSamples.ToSampleKey(fingerprint)
newSampleKey.Dump(k) newSampleKey.Dump(k)
b := pendingSamples.marshal() b := pendingSamples.marshal(nil)
pendingBatch.PutRaw(k, b) pendingBatch.PutRaw(k, b)
pendingSamples = Values{} pendingSamples = Values{}
pendingMutations++ pendingMutations++
@ -339,7 +339,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
} }
sampleKey.Load(sampleKeyDto) sampleKey.Load(sampleKeyDto)
sampleValues := unmarshalValues(sampleIterator.RawValue()) sampleValues := unmarshalValues(sampleIterator.RawValue(), nil)
pendingMutations := 0 pendingMutations := 0
@ -363,7 +363,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
} }
sampleKey.Load(sampleKeyDto) sampleKey.Load(sampleKeyDto)
sampleValues = unmarshalValues(sampleIterator.RawValue()) sampleValues = unmarshalValues(sampleIterator.RawValue(), nil)
// If the number of pending mutations exceeds the allowed batch // If the number of pending mutations exceeds the allowed batch
// amount, commit to disk and delete the batch. A new one will // amount, commit to disk and delete the batch. A new one will
@ -399,7 +399,7 @@ func (p *DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersis
sampleKey = sampleValues.ToSampleKey(fingerprint) sampleKey = sampleValues.ToSampleKey(fingerprint)
sampleKey.Dump(k) sampleKey.Dump(k)
lastCurated = sampleKey.FirstTimestamp lastCurated = sampleKey.FirstTimestamp
v := sampleValues.marshal() v := sampleValues.marshal(nil)
pendingBatch.PutRaw(k, v) pendingBatch.PutRaw(k, v)
pendingMutations++ pendingMutations++
} else { } else {

View file

@ -106,7 +106,7 @@ func (s sampleGroup) Get() (key proto.Message, value interface{}) {
k := &dto.SampleKey{} k := &dto.SampleKey{}
keyRaw.Dump(k) keyRaw.Dump(k)
return k, s.values.marshal() return k, s.values.marshal(nil)
} }
type noopUpdater struct{} type noopUpdater struct{}
@ -960,7 +960,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err) t.Fatalf("%d.%d. error %s", i, j, err)
} }
sampleValues := unmarshalValues(iterator.RawValue()) sampleValues := unmarshalValues(iterator.RawValue(), nil)
expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint) expectedFingerprint.LoadFromString(expected.fingerprint)
@ -1487,7 +1487,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err) t.Fatalf("%d.%d. error %s", i, j, err)
} }
sampleValues := unmarshalValues(iterator.RawValue()) sampleValues := unmarshalValues(iterator.RawValue(), nil)
expectedFingerprint := &clientmodel.Fingerprint{} expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint) expectedFingerprint.LoadFromString(expected.fingerprint)

View file

@ -157,35 +157,46 @@ func (v Values) String() string {
return buffer.String() return buffer.String()
} }
// marshal marshals a group of samples for being written to disk. // marshal marshals a group of samples for being written to disk into dest or a
func (v Values) marshal() []byte { // new slice if dest is insufficiently small.
buf := make([]byte, formatVersionSize+len(v)*sampleSize) func (v Values) marshal(dest []byte) []byte {
buf[0] = formatVersion sz := formatVersionSize + len(v)*sampleSize
if cap(dest) < sz {
dest = make([]byte, sz)
} else {
dest = dest[0:sz]
}
dest[0] = formatVersion
for i, val := range v { for i, val := range v {
offset := formatVersionSize + i*sampleSize offset := formatVersionSize + i*sampleSize
binary.LittleEndian.PutUint64(buf[offset:], uint64(val.Timestamp.Unix())) binary.LittleEndian.PutUint64(dest[offset:], uint64(val.Timestamp.Unix()))
binary.LittleEndian.PutUint64(buf[offset+8:], math.Float64bits(float64(val.Value))) binary.LittleEndian.PutUint64(dest[offset+8:], math.Float64bits(float64(val.Value)))
} }
return buf return dest
} }
// unmarshalValues decodes marshalled samples and returns them as Values. // unmarshalValues decodes marshalled samples into dest and returns either dest
func unmarshalValues(buf []byte) Values { // or a new slice containing those values if dest is insufficiently small.
n := (len(buf) - formatVersionSize) / sampleSize func unmarshalValues(buf []byte, dest Values) Values {
// Setting the value of a given slice index is around 15% faster than doing
// an append, even if the slice already has the required capacity. For this
// reason, we already set the full target length here.
v := make(Values, n)
if buf[0] != formatVersion { if buf[0] != formatVersion {
panic("unsupported format version") panic("unsupported format version")
} }
n := (len(buf) - formatVersionSize) / sampleSize
if cap(dest) < n {
dest = make(Values, n)
} else {
dest = dest[0:n]
}
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
offset := formatVersionSize + i*sampleSize offset := formatVersionSize + i*sampleSize
v[i].Timestamp = clientmodel.TimestampFromUnix(int64(binary.LittleEndian.Uint64(buf[offset:]))) dest[i].Timestamp = clientmodel.TimestampFromUnix(int64(binary.LittleEndian.Uint64(buf[offset:])))
v[i].Value = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(buf[offset+8:]))) dest[i].Value = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(buf[offset+8:])))
} }
return v return dest
} }
// SampleSet is Values with a Metric attached. // SampleSet is Values with a Metric attached.

View file

@ -12,8 +12,8 @@ const numTestValues = 5000
func TestValuesMarshalAndUnmarshal(t *testing.T) { func TestValuesMarshalAndUnmarshal(t *testing.T) {
values := randomValues(numTestValues) values := randomValues(numTestValues)
marshalled := values.marshal() marshalled := values.marshal(nil)
unmarshalled := unmarshalValues(marshalled) unmarshalled := unmarshalValues(marshalled, nil)
for i, expected := range values { for i, expected := range values {
actual := unmarshalled[i] actual := unmarshalled[i]
@ -35,19 +35,65 @@ func randomValues(numSamples int) Values {
return v return v
} }
func BenchmarkMarshal(b *testing.B) { func benchmarkMarshal(b *testing.B, n int) {
v := randomValues(numTestValues) v := randomValues(n)
b.ResetTimer() b.ResetTimer()
// TODO: Reuse buffer to compare performance.
// - Delta is -30 percent time overhead.
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
v.marshal() v.marshal(nil)
} }
} }
func BenchmarkUnmarshal(b *testing.B) { func BenchmarkMarshal1(b *testing.B) {
benchmarkMarshal(b, 1)
}
func BenchmarkMarshal10(b *testing.B) {
benchmarkMarshal(b, 10)
}
func BenchmarkMarshal100(b *testing.B) {
benchmarkMarshal(b, 100)
}
func BenchmarkMarshal1000(b *testing.B) {
benchmarkMarshal(b, 1000)
}
func BenchmarkMarshal10000(b *testing.B) {
benchmarkMarshal(b, 10000)
}
func benchmarkUnmarshal(b *testing.B, n int) {
v := randomValues(numTestValues) v := randomValues(numTestValues)
marshalled := v.marshal() marshalled := v.marshal(nil)
b.ResetTimer() b.ResetTimer()
// TODO: Reuse buffer to compare performance.
// - Delta is -15 percent time overhead.
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
unmarshalValues(marshalled) unmarshalValues(marshalled, nil)
} }
} }
func BenchmarkUnmarshal1(b *testing.B) {
benchmarkUnmarshal(b, 1)
}
func BenchmarkUnmarshal10(b *testing.B) {
benchmarkUnmarshal(b, 10)
}
func BenchmarkUnmarshal100(b *testing.B) {
benchmarkUnmarshal(b, 100)
}
func BenchmarkUnmarshal1000(b *testing.B) {
benchmarkUnmarshal(b, 1000)
}
func BenchmarkUnmarshal10000(b *testing.B) {
benchmarkUnmarshal(b, 10000)
}

View file

@ -215,7 +215,7 @@ func levelDBGetRangeValues(l *LevelDBMetricPersistence, fp *clientmodel.Fingerpr
break break
} }
retrievedValues := unmarshalValues(iterator.RawValue()) retrievedValues := unmarshalValues(iterator.RawValue(), nil)
samples = append(samples, retrievedValues...) samples = append(samples, retrievedValues...)
} }

View file

@ -594,7 +594,7 @@ func (t *TieredStorage) loadChunkAroundTime(
// //
// Only do the rewind if there is another chunk before this one. // Only do the rewind if there is another chunk before this one.
if !seekingKey.MayContain(ts) { if !seekingKey.MayContain(ts) {
postValues := unmarshalValues(iterator.RawValue()) postValues := unmarshalValues(iterator.RawValue(), nil)
if !seekingKey.Equal(firstBlock) { if !seekingKey.Equal(firstBlock) {
if !iterator.Previous() { if !iterator.Previous() {
panic("This should never return false.") panic("This should never return false.")
@ -609,13 +609,13 @@ func (t *TieredStorage) loadChunkAroundTime(
return postValues, false return postValues, false
} }
foundValues = unmarshalValues(iterator.RawValue()) foundValues = unmarshalValues(iterator.RawValue(), nil)
foundValues = append(foundValues, postValues...) foundValues = append(foundValues, postValues...)
return foundValues, false return foundValues, false
} }
} }
foundValues = unmarshalValues(iterator.RawValue()) foundValues = unmarshalValues(iterator.RawValue(), nil)
return foundValues, false return foundValues, false
} }
@ -634,7 +634,7 @@ func (t *TieredStorage) loadChunkAroundTime(
return nil, false return nil, false
} }
foundValues = unmarshalValues(iterator.RawValue()) foundValues = unmarshalValues(iterator.RawValue(), nil)
return foundValues, false return foundValues, false
} }
} }