From 86f63b078bd3ff21c0cecf9d0efac206f513f1a1 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Fri, 7 Jun 2013 14:41:00 +0200 Subject: [PATCH] Fix fallthrough compaction value ordering. We discovered a regression whereby data chunks could be appended out of order if the fallthrough case was hit. --- storage/metric/processor.go | 50 ++++++++++++++++++------------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/storage/metric/processor.go b/storage/metric/processor.go index 88e7403d62..d913c69b17 100644 --- a/storage/metric/processor.go +++ b/storage/metric/processor.go @@ -95,7 +95,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi var pendingMutations = 0 var pendingSamples model.Values var sampleKey model.SampleKey - var sampleValues model.Values + var unactedSamples model.Values var lastTouchedTime time.Time var keyDropped bool @@ -103,7 +103,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi if err != nil { return } - sampleValues, err = extractSampleValues(sampleIterator) + unactedSamples, err = extractSampleValues(sampleIterator) if err != nil { return } @@ -117,7 +117,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi // If there are no sample values to extract from the datastore, let's // continue extracting more values to use. We know that the time.Before() // block would prevent us from going into unsafe territory. - case len(sampleValues) == 0: + case len(unactedSamples) == 0: if !sampleIterator.Next() { return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") } @@ -128,7 +128,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi if err != nil { return } - sampleValues, err = extractSampleValues(sampleIterator) + unactedSamples, err = extractSampleValues(sampleIterator) if err != nil { return } @@ -147,19 +147,19 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi pendingBatch.Close() pendingBatch = nil - case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize: - lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp - sampleValues = model.Values{} + case len(pendingSamples) == 0 && len(unactedSamples) >= p.MinimumGroupSize: + lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp + unactedSamples = model.Values{} - case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize: + case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize: if !keyDropped { key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } - pendingSamples = append(pendingSamples, sampleValues...) - lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp - sampleValues = model.Values{} + pendingSamples = append(pendingSamples, unactedSamples...) + lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp + unactedSamples = model.Values{} pendingMutations++ // If the number of pending writes equals the target group size @@ -170,37 +170,37 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi pendingBatch.Put(key, value) pendingMutations++ lastCurated = newSampleKey.FirstTimestamp.In(time.UTC) - if len(sampleValues) > 0 { + if len(unactedSamples) > 0 { if !keyDropped { key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } - if len(sampleValues) > p.MinimumGroupSize { - pendingSamples = sampleValues[:p.MinimumGroupSize] - sampleValues = sampleValues[p.MinimumGroupSize:] - lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + if len(unactedSamples) > p.MinimumGroupSize { + pendingSamples = unactedSamples[:p.MinimumGroupSize] + unactedSamples = unactedSamples[p.MinimumGroupSize:] + lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp } else { - pendingSamples = sampleValues + pendingSamples = unactedSamples lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp - sampleValues = model.Values{} + unactedSamples = model.Values{} } } - case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize: + case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize: if !keyDropped { key := coding.NewPBEncoder(sampleKey.ToDTO()) pendingBatch.Drop(key) keyDropped = true } remainder := p.MinimumGroupSize - len(pendingSamples) - pendingSamples = append(pendingSamples, sampleValues[:remainder]...) - sampleValues = sampleValues[remainder:] - if len(sampleValues) == 0 { + pendingSamples = append(pendingSamples, unactedSamples[:remainder]...) + unactedSamples = unactedSamples[remainder:] + if len(unactedSamples) == 0 { lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp } else { - lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp + lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp } pendingMutations++ default: @@ -208,8 +208,8 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi } } - if len(sampleValues) > 0 || len(pendingSamples) > 0 { - pendingSamples = append(sampleValues, pendingSamples...) + if len(unactedSamples) > 0 || len(pendingSamples) > 0 { + pendingSamples = append(pendingSamples, unactedSamples...) newSampleKey := pendingSamples.ToSampleKey(fingerprint) key := coding.NewPBEncoder(newSampleKey.ToDTO()) value := coding.NewPBEncoder(pendingSamples.ToDTO())