From 161c8fbf9bd491cc321ad51f9d0670165c72464a Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Wed, 8 May 2013 20:39:59 +0200 Subject: [PATCH] Include deletion processor for long-tail values. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit extracts the model.Values truncation behavior into the actual tiered storage, which uses it and behaves in a peculiar way—notably the retention of previous elements if the chunk were to ever go empty. This is done to enable interpolation between sparse sample values in the evaluation cycle. Nothing necessarily new here—just an extraction. Now, the model.Values TruncateBefore functionality would do what a user would expect without any surprises, which is required for the DeletionProcessor, which may decide to split a large chunk in two if it determines that the chunk contains the cut-off time. --- Makefile.INCLUDE | 6 +- config/Makefile | 2 +- model/Makefile | 6 +- model/data.proto | 5 + model/generated/data.pb.go | 8 + model/generated/descriptor.blob | Bin 5071 -> 5102 bytes model/metric.go | 15 +- model/metric_test.go | 18 +- rules/testdata.go | 2 +- storage/metric/processor.go | 139 +++++++- storage/metric/processor_test.go | 522 ++++++++++++++++++++++++++++++- storage/metric/tiered.go | 44 ++- storage/metric/tiered_test.go | 159 ++++++++++ 13 files changed, 878 insertions(+), 48 deletions(-) diff --git a/Makefile.INCLUDE b/Makefile.INCLUDE index ecb419549..cb83b97a0 100644 --- a/Makefile.INCLUDE +++ b/Makefile.INCLUDE @@ -37,7 +37,9 @@ FULL_GOPATH_BASE := $(FIRST_GOPATH)/src/github.com/prometheus export PREFIX=$(PWD)/build/root -export PATH := $(PREFIX)/bin:$(GOPATH)/bin:$(PATH) +export LOCAL_BINARIES=$(PREFIX)/bin + +export PATH := $(LOCAL_BINARIES):$(GOPATH)/bin:$(PATH) export LD_LIBRARY_PATH := $(PREFIX)/lib:$(LD_LIBRARY_PATH) export CFLAGS := $(CFLAGS) -I$(PREFIX)/include @@ -68,3 +70,5 @@ BUILDFLAGS := -ldflags \ -X main.leveldbVersion $(LEVELDB_VERSION)\ -X main.protobufVersion $(PROTOCOL_BUFFERS_VERSION)\ -X main.snappyVersion $(SNAPPY_VERSION)" + +PROTOC := $(LOCAL_BINARIES)/protoc diff --git a/config/Makefile b/config/Makefile index 741844423..7eca91f58 100644 --- a/config/Makefile +++ b/config/Makefile @@ -24,4 +24,4 @@ include ../Makefile.INCLUDE generated/config.pb.go: config.proto - protoc --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --go_out=generated/ config.proto diff --git a/model/Makefile b/model/Makefile index 0c1f25fd1..c0a8c4426 100644 --- a/model/Makefile +++ b/model/Makefile @@ -22,8 +22,8 @@ include ../Makefile.INCLUDE # # make -C build goprotobuf-protoc-gen-go-stamp -generated/data.pb.go: data.proto - protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto +generated/data.pb.go: data.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto generated/descriptor.blob: data.proto - protoc --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto + $(PROTOC) --proto_path=$(PREFIX)/include:. --include_imports --go_out=generated/ --descriptor_set_out=generated/descriptor.blob data.proto diff --git a/model/data.proto b/model/data.proto index e92fdec3b..6be584836 100644 --- a/model/data.proto +++ b/model/data.proto @@ -122,3 +122,8 @@ message CurationValue { // fingerprint. optional int64 last_completion_timestamp = 1; } + +// DeletionProcessorDefinition models a curation process across the sample +// corpus that deletes old values. +message DeletionProcessorDefinition { +} diff --git a/model/generated/data.pb.go b/model/generated/data.pb.go index d1ca868f5..a2ae871bd 100644 --- a/model/generated/data.pb.go +++ b/model/generated/data.pb.go @@ -295,5 +295,13 @@ func (m *CurationValue) GetLastCompletionTimestamp() int64 { return 0 } +type DeletionProcessorDefinition struct { + XXX_unrecognized []byte `json:"-"` +} + +func (m *DeletionProcessorDefinition) Reset() { *m = DeletionProcessorDefinition{} } +func (m *DeletionProcessorDefinition) String() string { return proto.CompactTextString(m) } +func (*DeletionProcessorDefinition) ProtoMessage() {} + func init() { } diff --git a/model/generated/descriptor.blob b/model/generated/descriptor.blob index 4c98e380253d379560660d9f17043233a055e3e3..044e1c72b48e8db0c1050376a19d84ac3dd99d9b 100644 GIT binary patch delta 46 zcmX@F{!V>^wgBV7%{l^$ndOyaxujiEb5ct(^Ya3V@{?1Gi}Q= p.MaximumMutationPoolBatch: - err = samples.Commit(pendingBatch) + err = samplesPersistence.Commit(pendingBatch) if err != nil { return } @@ -223,7 +223,136 @@ func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw. // This is not deferred due to the off-chance that a pre-existing commit // failed. if pendingBatch != nil && pendingMutations > 0 { - err = samples.Commit(pendingBatch) + err = samplesPersistence.Commit(pendingBatch) + if err != nil { + return + } + } + + return +} + +// DeletionProcessor deletes sample blocks older than a defined value. +type DeletionProcessor struct { + // MaximumMutationPoolBatch represents approximately the largest pending + // batch of mutation operations for the database before pausing to + // commit before resumption. + MaximumMutationPoolBatch int + // signature is the byte representation of the DeletionProcessor's settings, + // used for purely memoization purposes across an instance. + signature []byte +} + +func (p DeletionProcessor) Name() string { + return "io.prometheus.DeletionProcessorDefinition" +} + +func (p *DeletionProcessor) Signature() (out []byte, err error) { + if len(p.signature) == 0 { + out, err = proto.Marshal(&dto.DeletionProcessorDefinition{}) + + p.signature = out + } + + out = p.signature + + return +} + +func (p DeletionProcessor) String() string { + return "deletionProcessor" +} + +func (p DeletionProcessor) Apply(sampleIterator leveldb.Iterator, samplesPersistence raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) { + var pendingBatch raw.Batch = nil + + defer func() { + if pendingBatch != nil { + pendingBatch.Close() + } + }() + + sampleKey, err := extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err := extractSampleValues(sampleIterator) + if err != nil { + return + } + + pendingMutations := 0 + + for lastCurated.Before(stopAt) { + switch { + // Furnish a new pending batch operation if none is available. + case pendingBatch == nil: + pendingBatch = leveldb.NewBatch() + + // If there are no sample values to extract from the datastore, let's + // continue extracting more values to use. We know that the time.Before() + // block would prevent us from going into unsafe territory. + case len(sampleValues) == 0: + if !sampleIterator.Next() { + return lastCurated, fmt.Errorf("Illegal Condition: Invalid Iterator on Continuation") + } + + sampleKey, err = extractSampleKey(sampleIterator) + if err != nil { + return + } + sampleValues, err = extractSampleValues(sampleIterator) + if err != nil { + return + } + + // If the number of pending mutations exceeds the allowed batch amount, + // commit to disk and delete the batch. A new one will be recreated if + // necessary. + case pendingMutations >= p.MaximumMutationPoolBatch: + err = samplesPersistence.Commit(pendingBatch) + if err != nil { + return + } + + pendingMutations = 0 + + pendingBatch.Close() + pendingBatch = nil + + case !sampleKey.MayContain(stopAt): + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + lastCurated = sampleKey.LastTimestamp + sampleValues = model.Values{} + pendingMutations++ + + case sampleKey.MayContain(stopAt): + key := coding.NewProtocolBuffer(sampleKey.ToDTO()) + pendingBatch.Drop(key) + pendingMutations++ + + sampleValues = sampleValues.TruncateBefore(stopAt) + if len(sampleValues) > 0 { + sampleKey = sampleValues.ToSampleKey(fingerprint) + lastCurated = sampleKey.FirstTimestamp + newKey := coding.NewProtocolBuffer(sampleKey.ToDTO()) + newValue := coding.NewProtocolBuffer(sampleValues.ToDTO()) + pendingBatch.Put(newKey, newValue) + pendingMutations++ + } else { + lastCurated = sampleKey.LastTimestamp + } + + default: + err = fmt.Errorf("Unhandled processing case.") + } + } + + // This is not deferred due to the off-chance that a pre-existing commit + // failed. + if pendingBatch != nil && pendingMutations > 0 { + err = samplesPersistence.Commit(pendingBatch) if err != nil { return } diff --git a/storage/metric/processor_test.go b/storage/metric/processor_test.go index f4e33ae4c..e534f0ee3 100644 --- a/storage/metric/processor_test.go +++ b/storage/metric/processor_test.go @@ -880,11 +880,529 @@ func TestCuratorCompactionProcessor(t *testing.T) { err = proto.Unmarshal(iterator.Key(), curationKeyDto) if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } err = proto.Unmarshal(iterator.Value(), curationValueDto) if err != nil { - t.Fatalf("%d.%d. could not unmarshal: %s\n", i, j, err) + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) + } + + curationKey := model.NewCurationKeyFromDTO(curationKeyDto) + actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto) + signature, err := expected.processor.Signature() + if err != nil { + t.Fatal(err) + } + + actualKey := curationKey + expectedKey := model.CurationKey{ + Fingerprint: model.NewFingerprintFromRowKey(expected.fingerprint), + IgnoreYoungerThan: expected.ignoreYoungerThan, + ProcessorMessageRaw: signature, + ProcessorMessageTypeName: expected.processor.Name(), + } + if !actualKey.Equal(expectedKey) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey) + } + expectedCurationRemark := model.CurationRemark{ + LastCompletionTimestamp: expected.lastCurated, + } + if !actualCurationRemark.Equal(expectedCurationRemark) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark) + } + } + + iterator = samples.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.sampleGroups { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next, expected %s", i, j, expected) + } + } + + sampleKey, err := extractSampleKey(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + sampleValues, err := extractSampleValues(iterator) + if err != nil { + t.Fatalf("%d.%d. error %s", i, j, err) + } + + if !model.NewFingerprintFromRowKey(expected.fingerprint).Equal(sampleKey.Fingerprint) { + t.Fatalf("%d.%d. expected fingerprint %s, got %s", i, j, expected.fingerprint, sampleKey.Fingerprint) + } + + if int(sampleKey.SampleCount) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), sampleKey.SampleCount) + } + + if len(sampleValues) != len(expected.values) { + t.Fatalf("%d.%d. expected %d values, got %d", i, j, len(expected.values), len(sampleValues)) + } + + if !sampleKey.FirstTimestamp.Equal(expected.values[0].Timestamp) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.values[0].Timestamp, sampleKey.FirstTimestamp) + } + + for k, actualValue := range sampleValues { + if expected.values[k].Value != actualValue.Value { + t.Fatalf("%d.%d.%d. expected %d, got %d", i, j, k, expected.values[k].Value, actualValue.Value) + } + if !expected.values[k].Timestamp.Equal(actualValue.Timestamp) { + t.Fatalf("%d.%d.%d. expected %s, got %s", i, j, k, expected.values[k].Timestamp, actualValue.Timestamp) + } + } + + if !sampleKey.LastTimestamp.Equal(expected.values[len(expected.values)-1].Timestamp) { + fmt.Println("last", sampleValues[len(expected.values)-1].Value, expected.values[len(expected.values)-1].Value) + t.Errorf("%d.%d. expected %s, got %s", i, j, expected.values[len(expected.values)-1].Timestamp, sampleKey.LastTimestamp) + } + } + } +} + +func TestCuratorDeletionProcessor(t *testing.T) { + scenarios := []struct { + in in + out out + }{ + { + in: in{ + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + ignoreYoungerThan: 1 * time.Hour, + groupSize: 5, + curationStates: fixture.Pairs{ + curationState{ + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + curationState{ + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 90 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + }, + watermarkStates: fixture.Pairs{ + watermarkState{ + fingerprint: "0001-A-1-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + watermarkState{ + fingerprint: "0002-A-2-Z", + lastAppended: testInstant.Add(-1 * 15 * time.Minute), + }, + }, + sampleGroups: fixture.Pairs{ + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 90, + }, + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 30, + }, + }, + }, + sampleGroup{ + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: 15, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 90 * time.Minute), + Value: 0, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 89 * time.Minute), + Value: 1, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 88 * time.Minute), + Value: 2, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 87 * time.Minute), + Value: 3, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 86 * time.Minute), + Value: 4, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 85 * time.Minute), + Value: 5, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 84 * time.Minute), + Value: 6, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 83 * time.Minute), + Value: 7, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 82 * time.Minute), + Value: 8, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 81 * time.Minute), + Value: 9, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 80 * time.Minute), + Value: 10, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 79 * time.Minute), + Value: 11, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 78 * time.Minute), + Value: 12, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 77 * time.Minute), + Value: 13, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 76 * time.Minute), + Value: 14, + }, + { + Timestamp: testInstant.Add(-1 * 75 * time.Minute), + Value: 15, + }, + { + Timestamp: testInstant.Add(-1 * 74 * time.Minute), + Value: 16, + }, + { + Timestamp: testInstant.Add(-1 * 73 * time.Minute), + Value: 17, + }, + { + Timestamp: testInstant.Add(-1 * 72 * time.Minute), + Value: 18, + }, + { + Timestamp: testInstant.Add(-1 * 71 * time.Minute), + Value: 19, + }, + { + Timestamp: testInstant.Add(-1 * 70 * time.Minute), + Value: 20, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 69 * time.Minute), + Value: 21, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 68 * time.Minute), + Value: 22, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 67 * time.Minute), + Value: 23, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 66 * time.Minute), + Value: 24, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 65 * time.Minute), + Value: 25, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 64 * time.Minute), + Value: 26, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 63 * time.Minute), + Value: 27, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 62 * time.Minute), + Value: 28, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 61 * time.Minute), + Value: 29, + }, + }, + }, + sampleGroup{ + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + out: out{ + curationStates: []curationState{ + { + fingerprint: "0001-A-1-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 30 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + { + fingerprint: "0002-A-2-Z", + ignoreYoungerThan: 1 * time.Hour, + lastCurated: testInstant.Add(-1 * 60 * time.Minute), + processor: &DeletionProcessor{ + MaximumMutationPoolBatch: 15, + }, + }, + }, + sampleGroups: []sampleGroup{ + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 30 * time.Minute), + Value: 30, + }, + }, + }, + { + fingerprint: "0001-A-1-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 15 * time.Minute), + Value: 15, + }, + }, + }, + { + fingerprint: "0002-A-2-Z", + values: model.Values{ + { + Timestamp: testInstant.Add(-1 * 60 * time.Minute), + Value: 30, + }, + }, + }, + }, + }, + }, + } + + for i, scenario := range scenarios { + curatorDirectory := fixture.NewPreparer(t).Prepare("curator", fixture.NewCassetteFactory(scenario.in.curationStates)) + defer curatorDirectory.Close() + + watermarkDirectory := fixture.NewPreparer(t).Prepare("watermark", fixture.NewCassetteFactory(scenario.in.watermarkStates)) + defer watermarkDirectory.Close() + + sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups)) + defer sampleDirectory.Close() + + curatorStates, err := leveldb.NewLevelDBPersistence(curatorDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer curatorStates.Close() + + watermarkStates, err := leveldb.NewLevelDBPersistence(watermarkDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer watermarkStates.Close() + + samples, err := leveldb.NewLevelDBPersistence(sampleDirectory.Path(), 0, 0) + if err != nil { + t.Fatal(err) + } + defer samples.Close() + + updates := make(chan CurationState, 100) + defer close(updates) + + stop := make(chan bool) + defer close(stop) + + c := Curator{ + Stop: stop, + } + + err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates) + if err != nil { + t.Fatal(err) + } + + iterator := curatorStates.NewIterator(true) + defer iterator.Close() + + for j, expected := range scenario.out.curationStates { + switch j { + case 0: + if !iterator.SeekToFirst() { + t.Fatalf("%d.%d. could not seek to beginning.", i, j) + } + default: + if !iterator.Next() { + t.Fatalf("%d.%d. could not seek to next.", i, j) + } + } + + curationKeyDto := &dto.CurationKey{} + curationValueDto := &dto.CurationValue{} + + err = proto.Unmarshal(iterator.Key(), curationKeyDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) + } + err = proto.Unmarshal(iterator.Value(), curationValueDto) + if err != nil { + t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err) } curationKey := model.NewCurationKeyFromDTO(curationKeyDto) diff --git a/storage/metric/tiered.go b/storage/metric/tiered.go index 909a3d6ce..5e848e9b7 100644 --- a/storage/metric/tiered.go +++ b/storage/metric/tiered.go @@ -27,6 +27,32 @@ import ( "time" ) +type chunk model.Values + +// TruncateBefore returns a subslice of the original such that extraneous +// samples in the collection that occur before the provided time are +// dropped. The original slice is not mutated. It works with the assumption +// that consumers of these values could want preceding values if none would +// exist prior to the defined time. +func (c chunk) TruncateBefore(t time.Time) chunk { + index := sort.Search(len(c), func(i int) bool { + timestamp := c[i].Timestamp + + return !timestamp.Before(t) + }) + + switch index { + case 0: + return c + case len(c): + return c[len(c)-1:] + default: + return c[index-1:] + } + + panic("unreachable") +} + // TieredStorage both persists samples and generates materialized views for // queries. type TieredStorage struct { @@ -383,7 +409,7 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // Load data value chunk(s) around the first standing op's current time. targetTime := *standingOps[0].CurrentTime() - chunk := model.Values{} + currentChunk := chunk{} memValues := t.memoryArena.GetValueAtTime(scanJob.fingerprint, targetTime) // If we aimed before the oldest value in memory, load more data from disk. if (len(memValues) == 0 || memValues.FirstTimeAfter(targetTime)) && seriesFrontier != nil { @@ -397,22 +423,22 @@ func (t *TieredStorage) renderView(viewJob viewJob) { // If we aimed past the newest value on disk, combine it with the next value from memory. if len(memValues) > 0 && diskValues.LastTimeBefore(targetTime) { latestDiskValue := diskValues[len(diskValues)-1:] - chunk = append(latestDiskValue, memValues...) + currentChunk = append(chunk(latestDiskValue), chunk(memValues)...) } else { - chunk = diskValues + currentChunk = chunk(diskValues) } } else { - chunk = memValues + currentChunk = chunk(memValues) } // There's no data at all for this fingerprint, so stop processing ops for it. - if len(chunk) == 0 { + if len(currentChunk) == 0 { break } - chunk = chunk.TruncateBefore(targetTime) + currentChunk = currentChunk.TruncateBefore(targetTime) - lastChunkTime := chunk[len(chunk)-1].Timestamp + lastChunkTime := currentChunk[len(currentChunk)-1].Timestamp if lastChunkTime.After(targetTime) { targetTime = lastChunkTime } @@ -424,10 +450,10 @@ func (t *TieredStorage) renderView(viewJob viewJob) { break } - chunk = chunk.TruncateBefore(*(op.CurrentTime())) + currentChunk = currentChunk.TruncateBefore(*(op.CurrentTime())) for op.CurrentTime() != nil && !op.CurrentTime().After(targetTime) { - out = op.ExtractSamples(chunk) + out = op.ExtractSamples(model.Values(currentChunk)) } } diff --git a/storage/metric/tiered_test.go b/storage/metric/tiered_test.go index 721b9400a..b8cf2ee2b 100644 --- a/storage/metric/tiered_test.go +++ b/storage/metric/tiered_test.go @@ -568,3 +568,162 @@ func TestGetFingerprintsForLabelSet(t *testing.T) { } } } + +func testTruncateBefore(t test.Tester) { + type in struct { + values model.Values + time time.Time + } + instant := time.Now() + var scenarios = []struct { + in in + out model.Values + }{ + { + in: in{ + time: instant, + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(2 * time.Second), + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + { + in: in{ + time: instant.Add(5 * time.Second), + values: model.Values{ + { + Value: 0, + Timestamp: instant, + }, + { + Value: 1, + Timestamp: instant.Add(time.Second), + }, + { + Value: 2, + Timestamp: instant.Add(2 * time.Second), + }, + { + Value: 3, + Timestamp: instant.Add(3 * time.Second), + }, + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + out: model.Values{ + // Preserve the last value in case it needs to be used for the next set. + { + Value: 4, + Timestamp: instant.Add(4 * time.Second), + }, + }, + }, + } + + for i, scenario := range scenarios { + actual := chunk(scenario.in.values).TruncateBefore(scenario.in.time) + + if len(actual) != len(scenario.out) { + t.Fatalf("%d. expected length of %d, got %d", i, len(scenario.out), len(actual)) + } + + for j, actualValue := range actual { + if !actualValue.Equal(scenario.out[j]) { + t.Fatalf("%d.%d. expected %s, got %s", i, j, scenario.out[j], actualValue) + } + } + } +} + +func TestTruncateBefore(t *testing.T) { + testTruncateBefore(t) +}