Merge pull request #234 from prometheus/feature/storage/long-tail-deletion

Include deletion processor for long-tail values.
This commit is contained in:
juliusv 2013-05-10 03:26:13 -07:00
commit 76521c3ff0
13 changed files with 878 additions and 48 deletions

View file

@ -37,7 +37,9 @@ FULL_GOPATH_BASE := $(FIRST_GOPATH)/src/github.com/prometheus
export PREFIX=$(PWD)/build/root export PREFIX=$(PWD)/build/root
export PATH := $(PREFIX)/bin:$(GOPATH)/bin:$(PATH) export LOCAL_BINARIES=$(PREFIX)/bin
export PATH := $(LOCAL_BINARIES):$(GOPATH)/bin:$(PATH)
export LD_LIBRARY_PATH := $(PREFIX)/lib:$(LD_LIBRARY_PATH) export LD_LIBRARY_PATH := $(PREFIX)/lib:$(LD_LIBRARY_PATH)
export CFLAGS := $(CFLAGS) -I$(PREFIX)/include export CFLAGS := $(CFLAGS) -I$(PREFIX)/include
@ -68,3 +70,5 @@ BUILDFLAGS := -ldflags \
-X main.leveldbVersion $(LEVELDB_VERSION)\ -X main.leveldbVersion $(LEVELDB_VERSION)\
-X main.protobufVersion $(PROTOCOL_BUFFERS_VERSION)\ -X main.protobufVersion $(PROTOCOL_BUFFERS_VERSION)\
-X main.snappyVersion $(SNAPPY_VERSION)" -X main.snappyVersion $(SNAPPY_VERSION)"
PROTOC := $(LOCAL_BINARIES)/protoc

View file

@ -24,4 +24,4 @@ include ../Makefile.INCLUDE
generated/config.pb.go: config.proto generated/config.pb.go: config.proto
protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto $(PROTOC) --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto

View file

@ -22,8 +22,8 @@ include ../Makefile.INCLUDE
# #
# make -C build goprotobuf-protoc-gen-go-stamp # make -C build goprotobuf-protoc-gen-go-stamp
generated/data.pb.go: data.proto generated/data.pb.go: data.proto
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto
generated/descriptor.blob: data.proto generated/descriptor.blob: data.proto
protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto

View file

@ -122,3 +122,8 @@ message CurationValue {
// fingerprint. // fingerprint.
optional int64 last_completion_timestamp = 1; optional int64 last_completion_timestamp = 1;
} }
// DeletionProcessorDefinition models a curation process across the sample
// corpus that deletes old values.
message DeletionProcessorDefinition {
}

View file

@ -295,5 +295,13 @@ func (m *CurationValue) GetLastCompletionTimestamp() int64 {
return 0 return 0
} }
type DeletionProcessorDefinition struct {
XXX_unrecognized []byte `json:"-"`
}
func (m *DeletionProcessorDefinition) Reset() { *m = DeletionProcessorDefinition{} }
func (m *DeletionProcessorDefinition) String() string { return proto.CompactTextString(m) }
func (*DeletionProcessorDefinition) ProtoMessage() {}
func init() { func init() {
} }

Binary file not shown.

View file

@ -186,24 +186,15 @@ func (v Values) InsideInterval(t time.Time) (s bool) {
// TruncateBefore returns a subslice of the original such that extraneous // TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are // samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated. // dropped. The original slice is not mutated
func (v Values) TruncateBefore(t time.Time) (values Values) { func (v Values) TruncateBefore(t time.Time) Values {
index := sort.Search(len(v), func(i int) bool { index := sort.Search(len(v), func(i int) bool {
timestamp := v[i].Timestamp timestamp := v[i].Timestamp
return !timestamp.Before(t) return !timestamp.Before(t)
}) })
switch index { return v[index:]
case 0:
values = v
case len(v):
values = v[len(v)-1:]
default:
values = v[index-1:]
}
return
} }
func (v Values) ToDTO() (out *dto.SampleValueSeries) { func (v Values) ToDTO() (out *dto.SampleValueSeries) {

View file

@ -79,7 +79,7 @@ func BenchmarkMetric(b *testing.B) {
} }
} }
func testValues(t test.Tester) { func testTruncateBefore(t test.Tester) {
type in struct { type in struct {
values Values values Values
time time.Time time time.Time
@ -165,10 +165,6 @@ func testValues(t test.Tester) {
}, },
}, },
out: Values{ out: Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{ {
Value: 2, Value: 2,
Timestamp: instant.Add(2 * time.Second), Timestamp: instant.Add(2 * time.Second),
@ -209,13 +205,7 @@ func testValues(t test.Tester) {
}, },
}, },
}, },
out: Values{ out: Values{},
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
}, },
} }
@ -234,6 +224,6 @@ func testValues(t test.Tester) {
} }
} }
func TestValues(t *testing.T) { func TestTruncateBefore(t *testing.T) {
testValues(t) testTruncateBefore(t)
} }

View file

@ -143,7 +143,7 @@ var testMatrix = ast.Matrix{
{ {
Metric: model.Metric{ Metric: model.Metric{
model.MetricNameLabel: "x", model.MetricNameLabel: "x",
"y": "testvalue", "y": "testvalue",
}, },
Values: getTestValueStream(0, 100, 10), Values: getTestValueStream(0, 100, 10),
}, },

View file

@ -41,7 +41,7 @@ type Processor interface {
// //
// Upon completion or error, the last time at which the processor finished // Upon completion or error, the last time at which the processor finished
// shall be emitted in addition to any errors. // shall be emitted in addition to any errors.
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
} }
// CompactionProcessor combines sparse values in the database together such // CompactionProcessor combines sparse values in the database together such
@ -80,10 +80,10 @@ func (p *CompactionProcessor) Signature() (out []byte, err error) {
} }
func (p CompactionProcessor) String() string { func (p CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize) return fmt.Sprintf("compactionProcessor for minimum group size %d", p.MinimumGroupSize)
} }
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil var pendingBatch raw.Batch = nil
defer func() { defer func() {
@ -137,7 +137,7 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.
// commit to disk and delete the batch. A new one will be recreated if // commit to disk and delete the batch. A new one will be recreated if
// necessary. // necessary.
case pendingMutations >= p.MaximumMutationPoolBatch: case pendingMutations >= p.MaximumMutationPoolBatch:
err = samples.Commit(pendingBatch) err = samplesPersistence.Commit(pendingBatch)
if err != nil { if err != nil {
return return
} }
@ -223,7 +223,136 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.
// This is not deferred due to the off-chance that a pre-existing commit // This is not deferred due to the off-chance that a pre-existing commit
// failed. // failed.
if pendingBatch != nil && pendingMutations > 0 { if pendingBatch != nil && pendingMutations > 0 {
err = samples.Commit(pendingBatch) err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}
}
return
}
// DeletionProcessor deletes sample blocks older than a defined value.
type DeletionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
MaximumMutationPoolBatch int
// signature is the byte representation of the DeletionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
}
func (p DeletionProcessor) Name() string {
return "io.prometheus.DeletionProcessorDefinition"
}
func (p *DeletionProcessor) Signature() (out []byte, err error) {
if len(p.signature) == 0 {
out, err = proto.Marshal(&dto.DeletionProcessorDefinition{})
p.signature = out
}
out = p.signature
return
}
func (p DeletionProcessor) String() string {
return "deletionProcessor"
}
func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {
if pendingBatch != nil {
pendingBatch.Close()
}
}()
sampleKey, err := extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err := extractSampleValues(sampleIterator)
if err != nil {
return
}
pendingMutations := 0
for lastCurated.Before(stopAt) {
switch {
// Furnish a new pending batch operation if none is available.
case pendingBatch == nil:
pendingBatch = leveldb.NewBatch()
// If there are no sample values to extract from the datastore, let's
// continue extracting more values to use. We know that the time.Before()
// block would prevent us from going into unsafe territory.
case len(sampleValues) == 0:
if !sampleIterator.Next() {
return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation")
}
sampleKey, err = extractSampleKey(sampleIterator)
if err != nil {
return
}
sampleValues, err = extractSampleValues(sampleIterator)
if err != nil {
return
}
// If the number of pending mutations exceeds the allowed batch amount,
// commit to disk and delete the batch. A new one will be recreated if
// necessary.
case pendingMutations >= p.MaximumMutationPoolBatch:
err = samplesPersistence.Commit(pendingBatch)
if err != nil {
return
}
pendingMutations = 0
pendingBatch.Close()
pendingBatch = nil
case !sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
lastCurated = sampleKey.LastTimestamp
sampleValues = model.Values{}
pendingMutations++
case sampleKey.MayContain(stopAt):
key := coding.NewProtocolBuffer(sampleKey.ToDTO())
pendingBatch.Drop(key)
pendingMutations++
sampleValues = sampleValues.TruncateBefore(stopAt)
if len(sampleValues) > 0 {
sampleKey = sampleValues.ToSampleKey(fingerprint)
lastCurated = sampleKey.FirstTimestamp
newKey := coding.NewProtocolBuffer(sampleKey.ToDTO())
newValue := coding.NewProtocolBuffer(sampleValues.ToDTO())
pendingBatch.Put(newKey, newValue)
pendingMutations++
} else {
lastCurated = sampleKey.LastTimestamp
}
default:
err = fmt.Errorf("Unhandled processing case.")
}
}
// This is not deferred due to the off-chance that a pre-existing commit
// failed.
if pendingBatch != nil && pendingMutations > 0 {
err = samplesPersistence.Commit(pendingBatch)
if err != nil { if err != nil {
return return
} }

View file

@ -880,11 +880,529 @@ func TestCuratorCompactionProcessor(t *testing.T) {
err = proto.Unmarshal(iterator.Key(), curationKeyDto) err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil { if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
err = proto.Unmarshal(iterator.Value(), curationValueDto) err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil { if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)
actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto)
signature, err := expected.processor.Signature()
if err != nil {
t.Fatal(err)
}
actualKey := curationKey
expectedKey := model.CurationKey{
Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint),
IgnoreYoungerThan: expected.ignoreYoungerThan,
ProcessorMessageRaw: signature,
ProcessorMessageTypeName: expected.processor.Name(),
}
if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
}
expectedCurationRemark := model.CurationRemark{
LastCompletionTimestamp: expected.lastCurated,
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
}
}
iterator = samples.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.sampleGroups {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected)
}
}
sampleKey, err := extractSampleKey(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
sampleValues, err := extractSampleValues(iterator)
if err != nil {
t.Fatalf("%d.%d. error %s", i, j, err)
}
if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) {
t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint)
}
if int(sampleKey.SampleCount) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount)
}
if len(sampleValues) != len(expected.values) {
t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues))
}
if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp)
}
for k, actualValue := range sampleValues {
if expected.values[k].Value != actualValue.Value {
t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value)
}
if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) {
t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp)
}
}
if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) {
fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value)
t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp)
}
}
}
}
func TestCuratorDeletionProcessor(t *testing.T) {
scenarios := []struct {
in in
out out
}{
{
in: in{
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
ignoreYoungerThan: 1 * time.Hour,
groupSize: 5,
curationStates: fixture.Pairs{
curationState{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
curationState{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
},
watermarkStates: fixture.Pairs{
watermarkState{
fingerprint: "0001-A-1-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
watermarkState{
fingerprint: "0002-A-2-Z",
lastAppended: testInstant.Add(-1 * 15 * time.Minute),
},
},
sampleGroups: fixture.Pairs{
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 90,
},
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 30,
},
},
},
sampleGroup{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 90 * time.Minute),
Value: 0,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 89 * time.Minute),
Value: 1,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 88 * time.Minute),
Value: 2,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 87 * time.Minute),
Value: 3,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 86 * time.Minute),
Value: 4,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 85 * time.Minute),
Value: 5,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 84 * time.Minute),
Value: 6,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 83 * time.Minute),
Value: 7,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 82 * time.Minute),
Value: 8,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 81 * time.Minute),
Value: 9,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 80 * time.Minute),
Value: 10,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 79 * time.Minute),
Value: 11,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 78 * time.Minute),
Value: 12,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 77 * time.Minute),
Value: 13,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 76 * time.Minute),
Value: 14,
},
{
Timestamp: testInstant.Add(-1 * 75 * time.Minute),
Value: 15,
},
{
Timestamp: testInstant.Add(-1 * 74 * time.Minute),
Value: 16,
},
{
Timestamp: testInstant.Add(-1 * 73 * time.Minute),
Value: 17,
},
{
Timestamp: testInstant.Add(-1 * 72 * time.Minute),
Value: 18,
},
{
Timestamp: testInstant.Add(-1 * 71 * time.Minute),
Value: 19,
},
{
Timestamp: testInstant.Add(-1 * 70 * time.Minute),
Value: 20,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 69 * time.Minute),
Value: 21,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 68 * time.Minute),
Value: 22,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 67 * time.Minute),
Value: 23,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 66 * time.Minute),
Value: 24,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 65 * time.Minute),
Value: 25,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 64 * time.Minute),
Value: 26,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 63 * time.Minute),
Value: 27,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 62 * time.Minute),
Value: 28,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 61 * time.Minute),
Value: 29,
},
},
},
sampleGroup{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
out: out{
curationStates: []curationState{
{
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
{
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &DeletionProcessor{
MaximumMutationPoolBatch: 15,
},
},
},
sampleGroups: []sampleGroup{
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 30 * time.Minute),
Value: 30,
},
},
},
{
fingerprint: "0001-A-1-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 15 * time.Minute),
Value: 15,
},
},
},
{
fingerprint: "0002-A-2-Z",
values: model.Values{
{
Timestamp: testInstant.Add(-1 * 60 * time.Minute),
Value: 30,
},
},
},
},
},
},
}
for i, scenario := range scenarios {
curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates))
defer curatorDirectory.Close()
watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates))
defer watermarkDirectory.Close()
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer curatorStates.Close()
watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer watermarkStates.Close()
samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0)
if err != nil {
t.Fatal(err)
}
defer samples.Close()
updates := make(chan CurationState, 100)
defer close(updates)
stop := make(chan bool)
defer close(stop)
c := Curator{
Stop: stop,
}
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
if err != nil {
t.Fatal(err)
}
iterator := curatorStates.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
switch j {
case 0:
if !iterator.SeekToFirst() {
t.Fatalf("%d.%d. could not seek to beginning.", i, j)
}
default:
if !iterator.Next() {
t.Fatalf("%d.%d. could not seek to next.", i, j)
}
}
curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
} }
curationKey := model.NewCurationKeyFromDTO(curationKeyDto) curationKey := model.NewCurationKeyFromDTO(curationKeyDto)

View file

@ -27,6 +27,32 @@ import (
"time" "time"
) )
type chunk model.Values
// TruncateBefore returns a subslice of the original such that extraneous
// samples in the collection that occur before the provided time are
// dropped. The original slice is not mutated. It works with the assumption
// that consumers of these values could want preceding values if none would
// exist prior to the defined time.
func (c chunk) TruncateBefore(t time.Time) chunk {
index := sort.Search(len(c), func(i int) bool {
timestamp := c[i].Timestamp
return !timestamp.Before(t)
})
switch index {
case 0:
return c
case len(c):
return c[len(c)-1:]
default:
return c[index-1:]
}
panic("unreachable")
}
// TieredStorage both persists samples and generates materialized views for // TieredStorage both persists samples and generates materialized views for
// queries. // queries.
type TieredStorage struct { type TieredStorage struct {
@ -383,7 +409,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// Load data value chunk(s) around the first standing op's current time. // Load data value chunk(s) around the first standing op's current time.
targetTime := *standingOps[0].CurrentTime() targetTime := *standingOps[0].CurrentTime()
chunk := model.Values{} currentChunk := chunk{}
memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime)
// If we aimed before the oldest value in memory, load more data from disk. // If we aimed before the oldest value in memory, load more data from disk.
if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil {
@ -397,22 +423,22 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
// If we aimed past the newest value on disk, combine it with the next value from memory. // If we aimed past the newest value on disk, combine it with the next value from memory.
if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) {
latestDiskValue := diskValues[len(diskValues)-1:] latestDiskValue := diskValues[len(diskValues)-1:]
chunk = append(latestDiskValue, memValues...) currentChunk = append(chunk(latestDiskValue), chunk(memValues)...)
} else { } else {
chunk = diskValues currentChunk = chunk(diskValues)
} }
} else { } else {
chunk = memValues currentChunk = chunk(memValues)
} }
// There's no data at all for this fingerprint, so stop processing ops for it. // There's no data at all for this fingerprint, so stop processing ops for it.
if len(chunk) == 0 { if len(currentChunk) == 0 {
break break
} }
chunk = chunk.TruncateBefore(targetTime) currentChunk = currentChunk.TruncateBefore(targetTime)
lastChunkTime := chunk[len(chunk)-1].Timestamp lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp
if lastChunkTime.After(targetTime) { if lastChunkTime.After(targetTime) {
targetTime = lastChunkTime targetTime = lastChunkTime
} }
@ -424,10 +450,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) {
break break
} }
chunk = chunk.TruncateBefore(*(op.CurrentTime())) currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime()))
for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) {
out = op.ExtractSamples(chunk) out = op.ExtractSamples(model.Values(currentChunk))
} }
} }

View file

@ -568,3 +568,162 @@ func TestGetFingerprintsForLabelSet(t *testing.T) {
} }
} }
} }
func testTruncateBefore(t test.Tester) {
type in struct {
values model.Values
time time.Time
}
instant := time.Now()
var scenarios = []struct {
in in
out model.Values
}{
{
in: in{
time: instant,
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(2 * time.Second),
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
{
in: in{
time: instant.Add(5 * time.Second),
values: model.Values{
{
Value: 0,
Timestamp: instant,
},
{
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Value: 2,
Timestamp: instant.Add(2 * time.Second),
},
{
Value: 3,
Timestamp: instant.Add(3 * time.Second),
},
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
out: model.Values{
// Preserve the last value in case it needs to be used for the next set.
{
Value: 4,
Timestamp: instant.Add(4 * time.Second),
},
},
},
}
for i, scenario := range scenarios {
actual := chunk(scenario.in.values).TruncateBefore(scenario.in.time)
if len(actual) != len(scenario.out) {
t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual))
}
for j, actualValue := range actual {
if !actualValue.Equal(scenario.out[j]) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue)
}
}
}
}
func TestTruncateBefore(t *testing.T) {
testTruncateBefore(t)
}