Fix fallthrough compaction value ordering.

We discovered a regression whereby data chunks could be appended out
of order if the fallthrough case was hit.
This commit is contained in:
Matt T. Proud 2013-06-07 14:41:00 +02:00
parent 819045541e
commit 86f63b078b

View file

@ -95,7 +95,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
var pendingMutations = 0
var pendingSamples model.Values
var sampleKey model.SampleKey
var sampleValues model.Values
var unactedSamples model.Values
var lastTouchedTime time.Time
var keyDropped bool
@ -103,7 +103,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
unactedSamples, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
@ -117,7 +117,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
case len(sampleValues) == 0:
case len(unactedSamples) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
@ -128,7 +128,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
unactedSamples, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
@ -147,19 +147,19 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
pendingBatch.Close()
pendingBatch = nil
case len(pendingSamples) == 0 && len(sampleValues) >= p.MinimumGroupSize:
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
case len(pendingSamples) == 0 && len(unactedSamples) >= p.MinimumGroupSize:
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
unactedSamples = model.Values{}
case len(pendingSamples)+len(sampleValues) < p.MinimumGroupSize:
case len(pendingSamples)+len(unactedSamples) < p.MinimumGroupSize:
if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
pendingSamples = append(pendingSamples, sampleValues...)
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
sampleValues = model.Values{}
pendingSamples = append(pendingSamples, unactedSamples...)
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
unactedSamples = model.Values{}
pendingMutations++
// If the number of pending writes equals the target group size
@ -170,37 +170,37 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
pendingBatch.Put(key, value)
pendingMutations++
lastCurated = newSampleKey.FirstTimestamp.In(time.UTC)
if len(sampleValues) > 0 {
if len(unactedSamples) > 0 {
if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
if len(sampleValues) > p.MinimumGroupSize {
pendingSamples = sampleValues[:p.MinimumGroupSize]
sampleValues = sampleValues[p.MinimumGroupSize:]
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
if len(unactedSamples) > p.MinimumGroupSize {
pendingSamples = unactedSamples[:p.MinimumGroupSize]
unactedSamples = unactedSamples[p.MinimumGroupSize:]
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
} else {
pendingSamples = sampleValues
pendingSamples = unactedSamples
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
sampleValues = model.Values{}
unactedSamples = model.Values{}
}
}
case len(pendingSamples)+len(sampleValues) >= p.MinimumGroupSize:
case len(pendingSamples)+len(unactedSamples) >= p.MinimumGroupSize:
if !keyDropped {
key := coding.NewPBEncoder(sampleKey.ToDTO())
pendingBatch.Drop(key)
keyDropped = true
}
remainder := p.MinimumGroupSize - len(pendingSamples)
pendingSamples = append(pendingSamples, sampleValues[:remainder]...)
sampleValues = sampleValues[remainder:]
if len(sampleValues) == 0 {
pendingSamples = append(pendingSamples, unactedSamples[:remainder]...)
unactedSamples = unactedSamples[remainder:]
if len(unactedSamples) == 0 {
lastTouchedTime = pendingSamples[len(pendingSamples)-1].Timestamp
} else {
lastTouchedTime = sampleValues[len(sampleValues)-1].Timestamp
lastTouchedTime = unactedSamples[len(unactedSamples)-1].Timestamp
}
pendingMutations++
default:
@ -208,8 +208,8 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersi
}
}
if len(sampleValues) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(sampleValues, pendingSamples...)
if len(unactedSamples) > 0 || len(pendingSamples) > 0 {
pendingSamples = append(pendingSamples, unactedSamples...)
newSampleKey := pendingSamples.ToSampleKey(fingerprint)
key := coding.NewPBEncoder(newSampleKey.ToDTO())
value := coding.NewPBEncoder(pendingSamples.ToDTO())