Merge branch 'main' into charleskorn/convert-range-query-tests

This commit is contained in:
Björn Rabenstein 2024-06-18 17:11:57 +02:00 committed by GitHub
commit d968408f51
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 200 additions and 155 deletions

View file

@ -1608,7 +1608,16 @@ and serves as an interface to plug in custom service discovery mechanisms.
It reads a set of files containing a list of zero or more
`<static_config>`s. Changes to all defined files are detected via disk watches
and applied immediately. Files may be provided in YAML or JSON format. Only
and applied immediately.
While those individual files are watched for changes,
the parent directory is also watched implicitly. This is to handle [atomic
renaming](https://github.com/fsnotify/fsnotify/blob/c1467c02fba575afdb5f4201072ab8403bbf00f4/README.md?plain=1#L128) efficiently and to detect new files that match the configured globs.
This may cause issues if the parent directory contains a large number of other files,
as each of these files will be watched too, even though the events related
to them are not relevant.
Files may be provided in YAML or JSON format. Only
changes resulting in well-formed target groups are applied.
Files must contain a list of static configs, using these formats:

View file

@ -18,6 +18,7 @@ import (
"encoding/json"
"slices"
"strconv"
"unsafe"
"github.com/prometheus/common/model"
)
@ -215,3 +216,7 @@ func contains(s []Label, n string) bool {
}
return false
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}

View file

@ -20,7 +20,6 @@ import (
"slices"
"strings"
"sync"
"unsafe"
"github.com/cespare/xxhash/v2"
)
@ -426,10 +425,6 @@ func EmptyLabels() Labels {
return Labels{}
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
// New returns a sorted Labels from the given labels.
// The caller has to guarantee that all label names are unique.
// Note this function is not efficient; should not be used in performance-critical places.

View file

@ -299,11 +299,6 @@ func Equal(ls, o Labels) bool {
func EmptyLabels() Labels {
return Labels{}
}
func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func yoloBytes(s string) (b []byte) {
*(*string)(unsafe.Pointer(&b)) = s
(*reflect.SliceHeader)(unsafe.Pointer(&b)).Cap = len(s)

View file

@ -798,39 +798,23 @@ func (m *equalMultiStringMapMatcher) Matches(s string) bool {
// toNormalisedLower normalise the input string using "Unicode Normalization Form D" and then convert
// it to lower case.
func toNormalisedLower(s string) string {
// Check if the string is all ASCII chars and convert any upper case character to lower case character.
isASCII := true
var (
b strings.Builder
pos int
)
b.Grow(len(s))
var buf []byte
for i := 0; i < len(s); i++ {
c := s[i]
if isASCII && c >= utf8.RuneSelf {
isASCII = false
break
if c >= utf8.RuneSelf {
return strings.Map(unicode.ToLower, norm.NFKD.String(s))
}
if 'A' <= c && c <= 'Z' {
c += 'a' - 'A'
if pos < i {
b.WriteString(s[pos:i])
if buf == nil {
buf = []byte(s)
}
b.WriteByte(c)
pos = i + 1
buf[i] = c + 'a' - 'A'
}
}
if pos < len(s) {
b.WriteString(s[pos:])
if buf == nil {
return s
}
// Optimize for ASCII-only strings. In this case we don't have to do any normalization.
if isASCII {
return b.String()
}
// Normalise and convert to lower.
return strings.Map(unicode.ToLower, norm.NFKD.String(b.String()))
return yoloString(buf)
}
// anyStringWithoutNewlineMatcher is a stringMatcher which matches any string

View file

@ -1209,6 +1209,10 @@ func visitStringMatcher(matcher StringMatcher, callback func(matcher StringMatch
func TestToNormalisedLower(t *testing.T) {
testCases := map[string]string{
"foo": "foo",
"FOO": "foo",
"Foo": "foo",
"foO": "foo",
"fOo": "foo",
"AAAAAAAAAAAAAAAAAAAAAAAA": "aaaaaaaaaaaaaaaaaaaaaaaa",
"cccccccccccccccccccccccC": "cccccccccccccccccccccccc",
"ſſſſſſſſſſſſſſſſſſſſſſſſS": "sssssssssssssssssssssssss",

View file

@ -95,7 +95,7 @@ func EncodeReadResponse(resp *prompb.ReadResponse, w http.ResponseWriter) error
// ToQuery builds a Query proto.
func ToQuery(from, to int64, matchers []*labels.Matcher, hints *storage.SelectHints) (*prompb.Query, error) {
ms, err := toLabelMatchers(matchers)
ms, err := ToLabelMatchers(matchers)
if err != nil {
return nil, err
}
@ -566,7 +566,8 @@ func validateLabelsAndMetricName(ls []prompb.Label) error {
return nil
}
func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
// ToLabelMatchers converts Prometheus label matchers to protobuf label matchers.
func ToLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error) {
pbMatchers := make([]*prompb.LabelMatcher, 0, len(matchers))
for _, m := range matchers {
var mType prompb.LabelMatcher_Type
@ -591,7 +592,7 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*prompb.LabelMatcher, error)
return pbMatchers, nil
}
// FromLabelMatchers parses protobuf label matchers to Prometheus label matchers.
// FromLabelMatchers converts protobuf label matchers to Prometheus label matchers.
func FromLabelMatchers(matchers []*prompb.LabelMatcher) ([]*labels.Matcher, error) {
result := make([]*labels.Matcher, 0, len(matchers))
for _, matcher := range matchers {

View file

@ -1552,7 +1552,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
// Drop old chunks and remember series IDs and hashes if they can be
// deleted entirely.
deleted, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
deleted, affected, chunksRemoved, actualInOrderMint, minOOOTime, minMmapFile := h.series.gc(mint, minOOOMmapRef)
seriesRemoved := len(deleted)
h.metrics.seriesRemoved.Add(float64(seriesRemoved))
@ -1561,7 +1561,7 @@ func (h *Head) gc() (actualInOrderMint, minOOOTime int64, minMmapFile int) {
h.numSeries.Sub(uint64(seriesRemoved))
// Remove deleted series IDs from the postings lists.
h.postings.Delete(deleted)
h.postings.Delete(deleted, affected)
// Remove tombstones referring to the deleted series.
h.tombstones.DeleteTombstones(deleted)
@ -1869,9 +1869,10 @@ func newStripeSeries(stripeSize int, seriesCallback SeriesLifecycleCallback) *st
// but the returned map goes into postings.Delete() which expects a map[storage.SeriesRef]struct
// and there's no easy way to cast maps.
// minMmapFile is the min mmap file number seen in the series (in-order and out-of-order) after gc'ing the series.
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ int, _, _ int64, minMmapFile int) {
func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (_ map[storage.SeriesRef]struct{}, _ map[labels.Label]struct{}, _ int, _, _ int64, minMmapFile int) {
var (
deleted = map[storage.SeriesRef]struct{}{}
affected = map[labels.Label]struct{}{}
rmChunks = 0
actualMint int64 = math.MaxInt64
minOOOTime int64 = math.MaxInt64
@ -1927,6 +1928,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
}
deleted[storage.SeriesRef(series.ref)] = struct{}{}
series.lset.Range(func(l labels.Label) { affected[l] = struct{}{} })
s.hashes[hashShard].del(hash, series.ref)
delete(s.series[refShard], series.ref)
deletedForCallback[series.ref] = series.lset
@ -1938,7 +1940,7 @@ func (s *stripeSeries) gc(mint int64, minOOOMmapRef chunks.ChunkDiskMapperRef) (
actualMint = mint
}
return deleted, rmChunks, actualMint, minOOOTime, minMmapFile
return deleted, affected, rmChunks, actualMint, minOOOTime, minMmapFile
}
// The iterForDeletion function iterates through all series, invoking the checkDeletedFunc for each.

View file

@ -814,6 +814,80 @@ func TestHead_UnknownWALRecord(t *testing.T) {
require.NoError(t, head.Close())
}
// BenchmarkHead_Truncate is quite heavy, so consider running it with
// -benchtime=10x or similar to get more stable and comparable results.
func BenchmarkHead_Truncate(b *testing.B) {
const total = 1e6
prepare := func(b *testing.B, churn int) *Head {
h, _ := newTestHead(b, 1000, wlog.CompressionNone, false)
b.Cleanup(func() {
require.NoError(b, h.Close())
})
h.initTime(0)
internedItoa := map[int]string{}
var mtx sync.RWMutex
itoa := func(i int) string {
mtx.RLock()
s, ok := internedItoa[i]
mtx.RUnlock()
if ok {
return s
}
mtx.Lock()
s = strconv.Itoa(i)
internedItoa[i] = s
mtx.Unlock()
return s
}
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i%13 == 0 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
s, _, _ := h.getOrCreate(allSeries[i].Hash(), allSeries[i])
s.mmappedChunks = []*mmappedChunk{
{minTime: 1000 * int64(i/churn), maxTime: 999 + 1000*int64(i/churn)},
}
}
return h
}
for _, churn := range []int{10, 100, 1000} {
b.Run(fmt.Sprintf("churn=%d", churn), func(b *testing.B) {
if b.N > total/churn {
// Just to make sure that benchmark still makes sense.
panic("benchmark not prepared")
}
h := prepare(b, churn)
b.ResetTimer()
for i := 0; i < b.N; i++ {
require.NoError(b, h.Truncate(1000*int64(i)))
// Make sure the benchmark is meaningful and it's actually truncating the expected amount of series.
require.Equal(b, total-churn*i, int(h.NumSeries()))
}
})
}
}
func TestHead_Truncate(t *testing.T) {
h, _ := newTestHead(t, 1000, wlog.CompressionNone, false)
defer func() {

View file

@ -1557,9 +1557,12 @@ func (r *Reader) LabelNamesFor(ctx context.Context, postings Postings) ([]string
i := 0
for postings.Next() {
id := postings.At()
i++
if i%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
if i%checkContextEveryNIterations == 0 {
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, ctxErr
}
}
offset := id

View file

@ -634,6 +634,31 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter, ctx.Count())
}
func TestReader_LabelNamesForHonorsContextCancel(t *testing.T) {
const seriesCount = 1000
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings(labels.MetricName, fmt.Sprintf("%4d", i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)
name, value := AllPostingsKey()
p, err := ir.Postings(context.Background(), name, value)
require.NoError(t, err)
// We check context cancellation every 128 iterations so 3 will fail after
// iterating 3 * 128 series.
failAfter := uint64(3)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err = ir.LabelNamesFor(ctx, p)
require.Error(t, err)
require.Equal(t, failAfter, ctx.Count())
}
// createFileReader creates a temporary index file. It writes the provided input to this file.
// It returns a Reader for this file, the file's name, and the symbol map.
func createFileReader(ctx context.Context, tb testing.TB, input indexWriterSeriesSlice) (*Reader, string, map[string]struct{}) {

View file

@ -288,89 +288,34 @@ func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
}
// Delete removes all ids in the given map from the postings lists.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}) {
// We will take an optimistic read lock for the entire method,
// and only lock for writing when we actually find something to delete.
//
// Each SeriesRef can appear in several Postings.
// To change each one, we need to know the label name and value that it is indexed under.
// We iterate over all label names, then for each name all values,
// and look for individual series to be deleted.
p.mtx.RLock()
defer p.mtx.RUnlock()
// affectedLabels contains all the labels that are affected by the deletion, there's no need to check other labels.
func (p *MemPostings) Delete(deleted map[storage.SeriesRef]struct{}, affected map[labels.Label]struct{}) {
p.mtx.Lock()
defer p.mtx.Unlock()
// Collect all keys relevant for deletion once. New keys added afterwards
// can by definition not be affected by any of the given deletes.
keys := make([]string, 0, len(p.m))
maxVals := 0
for n := range p.m {
keys = append(keys, n)
if len(p.m[n]) > maxVals {
maxVals = len(p.m[n])
process := func(l labels.Label) {
orig := p.m[l.Name][l.Value]
repl := make([]storage.SeriesRef, 0, len(orig))
for _, id := range orig {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[l.Name][l.Value] = repl
} else {
delete(p.m[l.Name], l.Value)
// Delete the key if we removed all values.
if len(p.m[l.Name]) == 0 {
delete(p.m, l.Name)
}
}
}
vals := make([]string, 0, maxVals)
for _, n := range keys {
// Copy the values and iterate the copy: if we unlock in the loop below,
// another goroutine might modify the map while we are part-way through it.
vals = vals[:0]
for v := range p.m[n] {
vals = append(vals, v)
}
// For each posting we first analyse whether the postings list is affected by the deletes.
// If no, we remove the label value from the vals list.
// This way we only need to Lock once later.
for i := 0; i < len(vals); {
found := false
refs := p.m[n][vals[i]]
for _, id := range refs {
if _, ok := deleted[id]; ok {
i++
found = true
break
}
}
if !found {
// Didn't match, bring the last value to this position, make the slice shorter and check again.
// The order of the slice doesn't matter as it comes from a map iteration.
vals[i], vals = vals[len(vals)-1], vals[:len(vals)-1]
}
}
// If no label values have deleted ids, just continue.
if len(vals) == 0 {
continue
}
// The only vals left here are the ones that contain deleted ids.
// Now we take the write lock and remove the ids.
p.mtx.RUnlock()
p.mtx.Lock()
for _, l := range vals {
repl := make([]storage.SeriesRef, 0, len(p.m[n][l]))
for _, id := range p.m[n][l] {
if _, ok := deleted[id]; !ok {
repl = append(repl, id)
}
}
if len(repl) > 0 {
p.m[n][l] = repl
} else {
delete(p.m[n], l)
}
}
// Delete the key if we removed all values.
if len(p.m[n]) == 0 {
delete(p.m, n)
}
p.mtx.Unlock()
p.mtx.RLock()
for l := range affected {
process(l)
}
process(allPostingsKey)
}
// Iter calls f for each postings list. It aborts if f returns an error and returns it.

View file

@ -979,9 +979,13 @@ func TestMemPostings_Delete(t *testing.T) {
p.Add(3, labels.FromStrings("lbl2", "a"))
before := p.Get(allPostingsKey.Name, allPostingsKey.Value)
p.Delete(map[storage.SeriesRef]struct{}{
deletedRefs := map[storage.SeriesRef]struct{}{
2: {},
})
}
affectedLabels := map[labels.Label]struct{}{
{Name: "lbl1", Value: "b"}: {},
}
p.Delete(deletedRefs, affectedLabels)
after := p.Get(allPostingsKey.Name, allPostingsKey.Value)
// Make sure postings gotten before the delete have the old data when
@ -1022,33 +1026,23 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
}
const total = 1e6
prepare := func() *MemPostings {
var ref storage.SeriesRef
next := func() storage.SeriesRef {
ref++
return ref
allSeries := [total]labels.Labels{}
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
p := NewMemPostings()
nameValues := make([]string, 0, 100)
for i := 0; i < total; i++ {
nameValues = nameValues[:0]
// A thousand labels like lbl_x_of_1000, each with total/1000 values
thousand := "lbl_" + itoa(i%1000) + "_of_1000"
nameValues = append(nameValues, thousand, itoa(i/1000))
// A hundred labels like lbl_x_of_100, each with total/100 values.
hundred := "lbl_" + itoa(i%100) + "_of_100"
nameValues = append(nameValues, hundred, itoa(i/100))
if i < 100 {
ten := "lbl_" + itoa(i%10) + "_of_10"
nameValues = append(nameValues, ten, itoa(i%10))
}
p.Add(next(), labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...))
}
return p
allSeries[i] = labels.FromStrings(append(nameValues, "first", "a", "second", "a", "third", "a")...)
}
for _, refs := range []int{1, 100, 10_000} {
@ -1060,7 +1054,11 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
panic("benchmark not prepared")
}
p := prepare()
p := NewMemPostings()
for i := range allSeries {
p.Add(storage.SeriesRef(i), allSeries[i])
}
stop := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < reads; i++ {
@ -1086,11 +1084,16 @@ func BenchmarkMemPostings_Delete(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
deleted := map[storage.SeriesRef]struct{}{}
deleted := make(map[storage.SeriesRef]struct{}, refs)
affected := make(map[labels.Label]struct{}, refs)
for i := 0; i < refs; i++ {
deleted[storage.SeriesRef(n*refs+i)] = struct{}{}
ref := storage.SeriesRef(n*refs + i)
deleted[ref] = struct{}{}
allSeries[ref].Range(func(l labels.Label) {
affected[l] = struct{}{}
})
}
p.Delete(deleted)
p.Delete(deleted, affected)
}
})
}