mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Merge remote-tracking branch 'prometheus/main' into chore/sync-prometheus
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
commit
85df1d4013
|
@ -218,6 +218,7 @@ func main() {
|
||||||
analyzeBlockID := tsdbAnalyzeCmd.Arg("block id", "Block to analyze (default is the last block).").String()
|
analyzeBlockID := tsdbAnalyzeCmd.Arg("block id", "Block to analyze (default is the last block).").String()
|
||||||
analyzeLimit := tsdbAnalyzeCmd.Flag("limit", "How many items to show in each list.").Default("20").Int()
|
analyzeLimit := tsdbAnalyzeCmd.Flag("limit", "How many items to show in each list.").Default("20").Int()
|
||||||
analyzeRunExtended := tsdbAnalyzeCmd.Flag("extended", "Run extended analysis.").Bool()
|
analyzeRunExtended := tsdbAnalyzeCmd.Flag("extended", "Run extended analysis.").Bool()
|
||||||
|
analyzeMatchers := tsdbAnalyzeCmd.Flag("match", "Series selector to analyze. Only 1 set of matchers is supported now.").String()
|
||||||
|
|
||||||
tsdbListCmd := tsdbCmd.Command("list", "List tsdb blocks.")
|
tsdbListCmd := tsdbCmd.Command("list", "List tsdb blocks.")
|
||||||
listHumanReadable := tsdbListCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
|
listHumanReadable := tsdbListCmd.Flag("human-readable", "Print human readable values.").Short('r').Bool()
|
||||||
|
@ -372,7 +373,7 @@ func main() {
|
||||||
os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes)))
|
os.Exit(checkErr(benchmarkWrite(*benchWriteOutPath, *benchSamplesFile, *benchWriteNumMetrics, *benchWriteNumScrapes)))
|
||||||
|
|
||||||
case tsdbAnalyzeCmd.FullCommand():
|
case tsdbAnalyzeCmd.FullCommand():
|
||||||
os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended)))
|
os.Exit(checkErr(analyzeBlock(ctx, *analyzePath, *analyzeBlockID, *analyzeLimit, *analyzeRunExtended, *analyzeMatchers)))
|
||||||
|
|
||||||
case tsdbListCmd.FullCommand():
|
case tsdbListCmd.FullCommand():
|
||||||
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
os.Exit(checkErr(listBlocks(*listPath, *listHumanReadable)))
|
||||||
|
|
|
@ -116,7 +116,7 @@ func parseAndPushMetrics(client *remote.Client, data []byte, labels map[string]s
|
||||||
|
|
||||||
// Encode the request body into snappy encoding.
|
// Encode the request body into snappy encoding.
|
||||||
compressed := snappy.Encode(nil, raw)
|
compressed := snappy.Encode(nil, raw)
|
||||||
err = client.Store(context.Background(), compressed)
|
err = client.Store(context.Background(), compressed, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
fmt.Fprintln(os.Stderr, " FAILED:", err)
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -413,7 +413,17 @@ func openBlock(path, blockID string) (*tsdb.DBReadOnly, tsdb.BlockReader, error)
|
||||||
return db, b, nil
|
return db, b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool) error {
|
func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExtended bool, matchers string) error {
|
||||||
|
var (
|
||||||
|
selectors []*labels.Matcher
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
if len(matchers) > 0 {
|
||||||
|
selectors, err = parser.ParseMetricSelector(matchers)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
db, block, err := openBlock(path, blockID)
|
db, block, err := openBlock(path, blockID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -426,14 +436,17 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
fmt.Printf("Block ID: %s\n", meta.ULID)
|
fmt.Printf("Block ID: %s\n", meta.ULID)
|
||||||
// Presume 1ms resolution that Prometheus uses.
|
// Presume 1ms resolution that Prometheus uses.
|
||||||
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
|
fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String())
|
||||||
fmt.Printf("Series: %d\n", meta.Stats.NumSeries)
|
fmt.Printf("Total Series: %d\n", meta.Stats.NumSeries)
|
||||||
|
if len(matchers) > 0 {
|
||||||
|
fmt.Printf("Matcher: %s\n", matchers)
|
||||||
|
}
|
||||||
ir, err := block.Index()
|
ir, err := block.Index()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer ir.Close()
|
defer ir.Close()
|
||||||
|
|
||||||
allLabelNames, err := ir.LabelNames(ctx)
|
allLabelNames, err := ir.LabelNames(ctx, selectors...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -460,10 +473,30 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
labelpairsUncovered := map[string]uint64{}
|
labelpairsUncovered := map[string]uint64{}
|
||||||
labelpairsCount := map[string]uint64{}
|
labelpairsCount := map[string]uint64{}
|
||||||
entries := 0
|
entries := 0
|
||||||
p, err := ir.Postings(ctx, "", "") // The special all key.
|
var (
|
||||||
if err != nil {
|
p index.Postings
|
||||||
return err
|
refs []storage.SeriesRef
|
||||||
|
)
|
||||||
|
if len(matchers) > 0 {
|
||||||
|
p, err = tsdb.PostingsForMatchers(ctx, ir, selectors...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Expand refs first and cache in memory.
|
||||||
|
// So later we don't have to expand again.
|
||||||
|
refs, err = index.ExpandPostings(p)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Matched series: %d\n", len(refs))
|
||||||
|
p = index.NewListPostings(refs)
|
||||||
|
} else {
|
||||||
|
p, err = ir.Postings(ctx, "", "") // The special all key.
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
chks := []chunks.Meta{}
|
chks := []chunks.Meta{}
|
||||||
builder := labels.ScratchBuilder{}
|
builder := labels.ScratchBuilder{}
|
||||||
for p.Next() {
|
for p.Next() {
|
||||||
|
@ -512,7 +545,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
|
|
||||||
postingInfos = postingInfos[:0]
|
postingInfos = postingInfos[:0]
|
||||||
for _, n := range allLabelNames {
|
for _, n := range allLabelNames {
|
||||||
values, err := ir.SortedLabelValues(ctx, n)
|
values, err := ir.SortedLabelValues(ctx, n, selectors...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -528,7 +561,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
|
|
||||||
postingInfos = postingInfos[:0]
|
postingInfos = postingInfos[:0]
|
||||||
for _, n := range allLabelNames {
|
for _, n := range allLabelNames {
|
||||||
lv, err := ir.SortedLabelValues(ctx, n)
|
lv, err := ir.SortedLabelValues(ctx, n, selectors...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -538,7 +571,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
printInfo(postingInfos)
|
printInfo(postingInfos)
|
||||||
|
|
||||||
postingInfos = postingInfos[:0]
|
postingInfos = postingInfos[:0]
|
||||||
lv, err := ir.SortedLabelValues(ctx, "__name__")
|
lv, err := ir.SortedLabelValues(ctx, "__name__", selectors...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -547,6 +580,7 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
postings = index.Intersect(postings, index.NewListPostings(refs))
|
||||||
count := 0
|
count := 0
|
||||||
for postings.Next() {
|
for postings.Next() {
|
||||||
count++
|
count++
|
||||||
|
@ -560,18 +594,24 @@ func analyzeBlock(ctx context.Context, path, blockID string, limit int, runExten
|
||||||
printInfo(postingInfos)
|
printInfo(postingInfos)
|
||||||
|
|
||||||
if runExtended {
|
if runExtended {
|
||||||
return analyzeCompaction(ctx, block, ir)
|
return analyzeCompaction(ctx, block, ir, selectors)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader) (err error) {
|
func analyzeCompaction(ctx context.Context, block tsdb.BlockReader, indexr tsdb.IndexReader, matchers []*labels.Matcher) (err error) {
|
||||||
n, v := index.AllPostingsKey()
|
var postingsr index.Postings
|
||||||
postingsr, err := indexr.Postings(ctx, n, v)
|
if len(matchers) > 0 {
|
||||||
|
postingsr, err = tsdb.PostingsForMatchers(ctx, indexr, matchers...)
|
||||||
|
} else {
|
||||||
|
n, v := index.AllPostingsKey()
|
||||||
|
postingsr, err = indexr.Postings(ctx, n, v)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
chunkr, err := block.Chunks()
|
chunkr, err := block.Chunks()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -488,6 +488,7 @@ Analyze churn, label pair cardinality and compaction efficiency.
|
||||||
| --- | --- | --- |
|
| --- | --- | --- |
|
||||||
| <code class="text-nowrap">--limit</code> | How many items to show in each list. | `20` |
|
| <code class="text-nowrap">--limit</code> | How many items to show in each list. | `20` |
|
||||||
| <code class="text-nowrap">--extended</code> | Run extended analysis. | |
|
| <code class="text-nowrap">--extended</code> | Run extended analysis. | |
|
||||||
|
| <code class="text-nowrap">--match</code> | Series selector to analyze. Only 1 set of matchers is supported now. | |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1190,41 +1190,24 @@ func (ev *evaluator) rangeEval(prepSeries func(labels.Labels, *EvalSeriesHelper)
|
||||||
}
|
}
|
||||||
|
|
||||||
for si, series := range matrixes[i] {
|
for si, series := range matrixes[i] {
|
||||||
for _, point := range series.Floats {
|
switch {
|
||||||
if point.T == ts {
|
case len(series.Floats) > 0 && series.Floats[0].T == ts:
|
||||||
if ev.currentSamples < ev.maxSamples {
|
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: series.Floats[0].F, T: ts})
|
||||||
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, F: point.F, T: ts})
|
// Move input vectors forward so we don't have to re-scan the same
|
||||||
if prepSeries != nil {
|
// past points at the next step.
|
||||||
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
|
matrixes[i][si].Floats = series.Floats[1:]
|
||||||
}
|
case len(series.Histograms) > 0 && series.Histograms[0].T == ts:
|
||||||
|
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: series.Histograms[0].H, T: ts})
|
||||||
// Move input vectors forward so we don't have to re-scan the same
|
matrixes[i][si].Histograms = series.Histograms[1:]
|
||||||
// past points at the next step.
|
default:
|
||||||
matrixes[i][si].Floats = series.Floats[1:]
|
continue
|
||||||
ev.currentSamples++
|
|
||||||
} else {
|
|
||||||
ev.error(ErrTooManySamples(env))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
for _, point := range series.Histograms {
|
if prepSeries != nil {
|
||||||
if point.T == ts {
|
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
|
||||||
if ev.currentSamples < ev.maxSamples {
|
}
|
||||||
vectors[i] = append(vectors[i], Sample{Metric: series.Metric, H: point.H, T: ts})
|
ev.currentSamples++
|
||||||
if prepSeries != nil {
|
if ev.currentSamples > ev.maxSamples {
|
||||||
bufHelpers[i] = append(bufHelpers[i], seriesHelpers[i][si])
|
ev.error(ErrTooManySamples(env))
|
||||||
}
|
|
||||||
|
|
||||||
// Move input vectors forward so we don't have to re-scan the same
|
|
||||||
// past points at the next step.
|
|
||||||
matrixes[i][si].Histograms = series.Histograms[1:]
|
|
||||||
ev.currentSamples++
|
|
||||||
} else {
|
|
||||||
ev.error(ErrTooManySamples(env))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
args[i] = vectors[i]
|
args[i] = vectors[i]
|
||||||
|
|
|
@ -195,7 +195,7 @@ type RecoverableError struct {
|
||||||
|
|
||||||
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
// Store sends a batch of samples to the HTTP endpoint, the request is the proto marshalled
|
||||||
// and encoded bytes from codec.go.
|
// and encoded bytes from codec.go.
|
||||||
func (c *Client) Store(ctx context.Context, req []byte) error {
|
func (c *Client) Store(ctx context.Context, req []byte, attempt int) error {
|
||||||
httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req))
|
httpReq, err := http.NewRequest("POST", c.urlString, bytes.NewReader(req))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Errors from NewRequest are from unparsable URLs, so are not
|
// Errors from NewRequest are from unparsable URLs, so are not
|
||||||
|
@ -207,6 +207,10 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
||||||
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
httpReq.Header.Set("Content-Type", "application/x-protobuf")
|
||||||
httpReq.Header.Set("User-Agent", UserAgent)
|
httpReq.Header.Set("User-Agent", UserAgent)
|
||||||
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||||
|
if attempt > 0 {
|
||||||
|
httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt))
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
ctx, cancel := context.WithTimeout(ctx, c.timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
|
@ -232,10 +236,8 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
|
||||||
}
|
}
|
||||||
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
|
||||||
}
|
}
|
||||||
if httpResp.StatusCode/100 == 5 {
|
if httpResp.StatusCode/100 == 5 ||
|
||||||
return RecoverableError{err, defaultBackoff}
|
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
|
||||||
}
|
|
||||||
if c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests {
|
|
||||||
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
|
return RecoverableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -73,7 +73,7 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
c, err := NewWriteClient(hash, conf)
|
c, err := NewWriteClient(hash, conf)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = c.Store(context.Background(), []byte{})
|
err = c.Store(context.Background(), []byte{}, 0)
|
||||||
if test.err != nil {
|
if test.err != nil {
|
||||||
require.EqualError(t, err, test.err.Error())
|
require.EqualError(t, err, test.err.Error())
|
||||||
} else {
|
} else {
|
||||||
|
@ -85,12 +85,22 @@ func TestStoreHTTPErrorHandling(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClientRetryAfter(t *testing.T) {
|
func TestClientRetryAfter(t *testing.T) {
|
||||||
server := httptest.NewServer(
|
setupServer := func(statusCode int) *httptest.Server {
|
||||||
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return httptest.NewServer(
|
||||||
http.Error(w, longErrMessage, http.StatusTooManyRequests)
|
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
}),
|
w.Header().Set("Retry-After", "5")
|
||||||
)
|
http.Error(w, longErrMessage, statusCode)
|
||||||
defer server.Close()
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
getClientConfig := func(serverURL *url.URL, retryOnRateLimit bool) *ClientConfig {
|
||||||
|
return &ClientConfig{
|
||||||
|
URL: &config_util.URL{URL: serverURL},
|
||||||
|
Timeout: model.Duration(time.Second),
|
||||||
|
RetryOnRateLimit: retryOnRateLimit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
getClient := func(conf *ClientConfig) WriteClient {
|
getClient := func(conf *ClientConfig) WriteClient {
|
||||||
hash, err := toHash(conf)
|
hash, err := toHash(conf)
|
||||||
|
@ -100,30 +110,36 @@ func TestClientRetryAfter(t *testing.T) {
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
serverURL, err := url.Parse(server.URL)
|
testCases := []struct {
|
||||||
require.NoError(t, err)
|
name string
|
||||||
|
statusCode int
|
||||||
conf := &ClientConfig{
|
retryOnRateLimit bool
|
||||||
URL: &config_util.URL{URL: serverURL},
|
expectedRecoverable bool
|
||||||
Timeout: model.Duration(time.Second),
|
expectedRetryAfter model.Duration
|
||||||
RetryOnRateLimit: false,
|
}{
|
||||||
|
{"TooManyRequests - No Retry", http.StatusTooManyRequests, false, false, 0},
|
||||||
|
{"TooManyRequests - With Retry", http.StatusTooManyRequests, true, true, 5 * model.Duration(time.Second)},
|
||||||
|
{"InternalServerError", http.StatusInternalServerError, false, true, 5 * model.Duration(time.Second)}, // HTTP 5xx errors do not depend on retryOnRateLimit.
|
||||||
}
|
}
|
||||||
|
|
||||||
var recErr RecoverableError
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
server := setupServer(tc.statusCode)
|
||||||
|
defer server.Close()
|
||||||
|
|
||||||
c := getClient(conf)
|
serverURL, err := url.Parse(server.URL)
|
||||||
err = c.Store(context.Background(), []byte{})
|
require.NoError(t, err)
|
||||||
require.False(t, errors.As(err, &recErr), "Recoverable error not expected.")
|
|
||||||
|
|
||||||
conf = &ClientConfig{
|
c := getClient(getClientConfig(serverURL, tc.retryOnRateLimit))
|
||||||
URL: &config_util.URL{URL: serverURL},
|
|
||||||
Timeout: model.Duration(time.Second),
|
var recErr RecoverableError
|
||||||
RetryOnRateLimit: true,
|
err = c.Store(context.Background(), []byte{}, 0)
|
||||||
|
require.Equal(t, tc.expectedRecoverable, errors.As(err, &recErr), "Mismatch in expected recoverable error status.")
|
||||||
|
if tc.expectedRecoverable {
|
||||||
|
require.Equal(t, tc.expectedRetryAfter, err.(RecoverableError).retryAfter)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
c = getClient(conf)
|
|
||||||
err = c.Store(context.Background(), []byte{})
|
|
||||||
require.True(t, errors.As(err, &recErr), "Recoverable error was expected.")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRetryAfterDuration(t *testing.T) {
|
func TestRetryAfterDuration(t *testing.T) {
|
||||||
|
|
|
@ -380,7 +380,7 @@ func (m *queueManagerMetrics) unregister() {
|
||||||
// external timeseries database.
|
// external timeseries database.
|
||||||
type WriteClient interface {
|
type WriteClient interface {
|
||||||
// Store stores the given samples in the remote storage.
|
// Store stores the given samples in the remote storage.
|
||||||
Store(context.Context, []byte) error
|
Store(context.Context, []byte, int) error
|
||||||
// Name uniquely identifies the remote storage.
|
// Name uniquely identifies the remote storage.
|
||||||
Name() string
|
Name() string
|
||||||
// Endpoint is the remote read or write endpoint for the storage client.
|
// Endpoint is the remote read or write endpoint for the storage client.
|
||||||
|
@ -552,7 +552,7 @@ func (t *QueueManager) sendMetadataWithBackoff(ctx context.Context, metadata []p
|
||||||
}
|
}
|
||||||
|
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
err := t.storeClient.Store(ctx, req)
|
err := t.storeClient.Store(ctx, req, try)
|
||||||
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
t.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1526,7 +1526,7 @@ func (s *shards) sendSamplesWithBackoff(ctx context.Context, samples []prompb.Ti
|
||||||
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
s.qm.metrics.samplesTotal.Add(float64(sampleCount))
|
||||||
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
s.qm.metrics.exemplarsTotal.Add(float64(exemplarCount))
|
||||||
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
s.qm.metrics.histogramsTotal.Add(float64(histogramCount))
|
||||||
err := s.qm.client().Store(ctx, *buf)
|
err := s.qm.client().Store(ctx, *buf, try)
|
||||||
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
s.qm.metrics.sentBatchDuration.Observe(time.Since(begin).Seconds())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -769,7 +769,7 @@ func (c *TestWriteClient) waitForExpectedData(tb testing.TB) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestWriteClient) Store(_ context.Context, req []byte) error {
|
func (c *TestWriteClient) Store(_ context.Context, req []byte, _ int) error {
|
||||||
c.mtx.Lock()
|
c.mtx.Lock()
|
||||||
defer c.mtx.Unlock()
|
defer c.mtx.Unlock()
|
||||||
// nil buffers are ok for snappy, ignore cast error.
|
// nil buffers are ok for snappy, ignore cast error.
|
||||||
|
@ -843,7 +843,7 @@ func NewTestBlockedWriteClient() *TestBlockingWriteClient {
|
||||||
return &TestBlockingWriteClient{}
|
return &TestBlockingWriteClient{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte) error {
|
func (c *TestBlockingWriteClient) Store(ctx context.Context, _ []byte, _ int) error {
|
||||||
c.numCalls.Inc()
|
c.numCalls.Inc()
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return nil
|
return nil
|
||||||
|
@ -864,10 +864,10 @@ func (c *TestBlockingWriteClient) Endpoint() string {
|
||||||
// For benchmarking the send and not the receive side.
|
// For benchmarking the send and not the receive side.
|
||||||
type NopWriteClient struct{}
|
type NopWriteClient struct{}
|
||||||
|
|
||||||
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
func NewNopWriteClient() *NopWriteClient { return &NopWriteClient{} }
|
||||||
func (c *NopWriteClient) Store(context.Context, []byte) error { return nil }
|
func (c *NopWriteClient) Store(context.Context, []byte, int) error { return nil }
|
||||||
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
func (c *NopWriteClient) Name() string { return "nopwriteclient" }
|
||||||
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
func (c *NopWriteClient) Endpoint() string { return "http://test-remote.com/1234" }
|
||||||
|
|
||||||
func BenchmarkSampleSend(b *testing.B) {
|
func BenchmarkSampleSend(b *testing.B) {
|
||||||
// Send one sample per series, which is the typical remote_write case
|
// Send one sample per series, which is the typical remote_write case
|
||||||
|
|
|
@ -1289,9 +1289,6 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
||||||
// There is no head chunk in this series yet, create the first chunk for the sample.
|
// There is no head chunk in this series yet, create the first chunk for the sample.
|
||||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||||
chunkCreated = true
|
chunkCreated = true
|
||||||
} else if len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
|
||||||
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
|
||||||
chunkCreated = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Out of order sample.
|
// Out of order sample.
|
||||||
|
@ -1299,6 +1296,12 @@ func (s *memSeries) appendPreprocessor(t int64, e chunkenc.Encoding, o chunkOpts
|
||||||
return c, false, chunkCreated
|
return c, false, chunkCreated
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the chunk size, unless we just created it and if the chunk is too large, cut a new one.
|
||||||
|
if !chunkCreated && len(c.chunk.Bytes()) > maxBytesPerXORChunk {
|
||||||
|
c = s.cutNewHeadChunk(t, e, o.chunkRange)
|
||||||
|
chunkCreated = true
|
||||||
|
}
|
||||||
|
|
||||||
if c.chunk.Encoding() != e {
|
if c.chunk.Encoding() != e {
|
||||||
// The chunk encoding expected by this append is different than the head chunk's
|
// The chunk encoding expected by this append is different than the head chunk's
|
||||||
// encoding. So we cut a new chunk with the expected encoding.
|
// encoding. So we cut a new chunk with the expected encoding.
|
||||||
|
|
|
@ -5568,3 +5568,49 @@ func TestCuttingNewHeadChunks(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestHeadDetectsDuplcateSampleAtSizeLimit tests a regression where a duplicate sample
|
||||||
|
// is appended to the head, right when the head chunk is at the size limit.
|
||||||
|
// The test adds all samples as duplicate, thus expecting that the result has
|
||||||
|
// exactly half of the samples.
|
||||||
|
func TestHeadDetectsDuplicateSampleAtSizeLimit(t *testing.T) {
|
||||||
|
numSamples := 1000
|
||||||
|
baseTS := int64(1695209650)
|
||||||
|
|
||||||
|
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, h.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
a := h.Appender(context.Background())
|
||||||
|
var err error
|
||||||
|
vals := []float64{math.MaxFloat64, 0x00} // Use the worst case scenario for the XOR encoding. Otherwise we hit the sample limit before the size limit.
|
||||||
|
for i := 0; i < numSamples; i++ {
|
||||||
|
ts := baseTS + int64(i/2)*10000
|
||||||
|
a.Append(0, labels.FromStrings("foo", "bar"), ts, vals[(i/2)%len(vals)])
|
||||||
|
err = a.Commit()
|
||||||
|
require.NoError(t, err)
|
||||||
|
a = h.Appender(context.Background())
|
||||||
|
}
|
||||||
|
|
||||||
|
indexReader, err := h.Index()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
var (
|
||||||
|
chunks []chunks.Meta
|
||||||
|
builder labels.ScratchBuilder
|
||||||
|
)
|
||||||
|
require.NoError(t, indexReader.Series(1, &builder, &chunks))
|
||||||
|
|
||||||
|
chunkReader, err := h.Chunks()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
storedSampleCount := 0
|
||||||
|
for _, chunkMeta := range chunks {
|
||||||
|
chunk, err := chunkReader.Chunk(chunkMeta)
|
||||||
|
require.NoError(t, err)
|
||||||
|
storedSampleCount += chunk.NumSamples()
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, numSamples/2, storedSampleCount)
|
||||||
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
// See the License for the specific language governing permissions and
|
// See the License for the specific language governing permissions and
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
|
|
||||||
// nolint:revive // Many unsued function arguments in this file by design.
|
// nolint:revive // Many unused function arguments in this file by design.
|
||||||
package tsdb
|
package tsdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
|
|
@ -61,18 +61,18 @@ type PostingsForMatchersCache struct {
|
||||||
// timeNow is the time.Now that can be replaced for testing purposes
|
// timeNow is the time.Now that can be replaced for testing purposes
|
||||||
timeNow func() time.Time
|
timeNow func() time.Time
|
||||||
// postingsForMatchers can be replaced for testing purposes
|
// postingsForMatchers can be replaced for testing purposes
|
||||||
postingsForMatchers func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
|
postingsForMatchers func(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
func (c *PostingsForMatchersCache) PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
if !concurrent && !c.force {
|
if !concurrent && !c.force {
|
||||||
return c.postingsForMatchers(ix, ms...)
|
return c.postingsForMatchers(ctx, ix, ms...)
|
||||||
}
|
}
|
||||||
c.expire()
|
c.expire()
|
||||||
return c.postingsForMatchersPromise(ctx, ix, ms)()
|
return c.postingsForMatchersPromise(ctx, ix, ms)()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PostingsForMatchersCache) postingsForMatchersPromise(_ context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
|
func (c *PostingsForMatchersCache) postingsForMatchersPromise(ctx context.Context, ix IndexPostingsReader, ms []*labels.Matcher) func() (index.Postings, error) {
|
||||||
var (
|
var (
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
cloner *index.PostingsCloner
|
cloner *index.PostingsCloner
|
||||||
|
@ -95,7 +95,7 @@ func (c *PostingsForMatchersCache) postingsForMatchersPromise(_ context.Context,
|
||||||
}
|
}
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
if postings, err := c.postingsForMatchers(ix, ms...); err != nil {
|
if postings, err := c.postingsForMatchers(ctx, ix, ms...); err != nil {
|
||||||
outerErr = err
|
outerErr = err
|
||||||
} else {
|
} else {
|
||||||
cloner = index.NewPostingsCloner(postings)
|
cloner = index.NewPostingsCloner(postings)
|
||||||
|
|
|
@ -17,7 +17,7 @@ import (
|
||||||
func TestPostingsForMatchersCache(t *testing.T) {
|
func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
const testCacheSize = 5
|
const testCacheSize = 5
|
||||||
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
|
// newPostingsForMatchersCache tests the NewPostingsForMatcherCache constructor, but overrides the postingsForMatchers func
|
||||||
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
|
newPostingsForMatchersCache := func(ttl time.Duration, pfm func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error), timeMock *timeNowMock, force bool) *PostingsForMatchersCache {
|
||||||
c := NewPostingsForMatchersCache(ttl, testCacheSize, force)
|
c := NewPostingsForMatchersCache(ttl, testCacheSize, force)
|
||||||
if c.postingsForMatchers == nil {
|
if c.postingsForMatchers == nil {
|
||||||
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
|
t.Fatalf("NewPostingsForMatchersCache() didn't assign postingsForMatchers func")
|
||||||
|
@ -35,7 +35,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
||||||
expectedPostingsErr := fmt.Errorf("failed successfully")
|
expectedPostingsErr := fmt.Errorf("failed successfully")
|
||||||
|
|
||||||
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
|
require.IsType(t, indexForPostingsMock{}, ix, "Incorrect IndexPostingsReader was provided to PostingsForMatchers, expected the mock, was given %v (%T)", ix, ix)
|
||||||
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
|
require.Equal(t, expectedMatchers, ms, "Wrong label matchers provided, expected %v, got %v", expectedMatchers, ms)
|
||||||
return index.ErrPostings(expectedPostingsErr), nil
|
return index.ErrPostings(expectedPostingsErr), nil
|
||||||
|
@ -53,7 +53,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
||||||
expectedErr := fmt.Errorf("failed successfully")
|
expectedErr := fmt.Errorf("failed successfully")
|
||||||
|
|
||||||
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
return nil, expectedErr
|
return nil, expectedErr
|
||||||
}, &timeNowMock{}, false)
|
}, &timeNowMock{}, false)
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
if cacheEnabled {
|
if cacheEnabled {
|
||||||
ttl = defaultPostingsForMatchersCacheTTL
|
ttl = defaultPostingsForMatchersCacheTTL
|
||||||
}
|
}
|
||||||
c := newPostingsForMatchersCache(ttl, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(ttl, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
select {
|
select {
|
||||||
case called <- struct{}{}:
|
case called <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
|
@ -148,7 +148,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
||||||
|
|
||||||
var call int
|
var call int
|
||||||
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
call++
|
call++
|
||||||
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
||||||
}, &timeNowMock{}, false)
|
}, &timeNowMock{}, false)
|
||||||
|
@ -168,7 +168,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
expectedMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
|
||||||
|
|
||||||
var call int
|
var call int
|
||||||
c := newPostingsForMatchersCache(0, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(0, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
call++
|
call++
|
||||||
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
||||||
}, &timeNowMock{}, false)
|
}, &timeNowMock{}, false)
|
||||||
|
@ -191,7 +191,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var call int
|
var call int
|
||||||
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
call++
|
call++
|
||||||
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
return index.ErrPostings(fmt.Errorf("result from call %d", call)), nil
|
||||||
}, timeNow, false)
|
}, timeNow, false)
|
||||||
|
@ -224,7 +224,7 @@ func TestPostingsForMatchersCache(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
callsPerMatchers := map[string]int{}
|
callsPerMatchers := map[string]int{}
|
||||||
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
c := newPostingsForMatchersCache(defaultPostingsForMatchersCacheTTL, func(_ context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
k := matchersKey(ms)
|
k := matchersKey(ms)
|
||||||
callsPerMatchers[k]++
|
callsPerMatchers[k]++
|
||||||
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
|
return index.ErrPostings(fmt.Errorf("result from call %d", callsPerMatchers[k])), nil
|
||||||
|
|
|
@ -179,7 +179,7 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
|
||||||
|
|
||||||
// PostingsForMatchers assembles a single postings iterator against the index reader
|
// PostingsForMatchers assembles a single postings iterator against the index reader
|
||||||
// based on the given matchers. The resulting postings are not ordered by series.
|
// based on the given matchers. The resulting postings are not ordered by series.
|
||||||
func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
func PostingsForMatchers(ctx context.Context, ix IndexPostingsReader, ms ...*labels.Matcher) (index.Postings, error) {
|
||||||
var its, notIts []index.Postings
|
var its, notIts []index.Postings
|
||||||
// See which label must be non-empty.
|
// See which label must be non-empty.
|
||||||
// Optimization for case like {l=~".", l!="1"}.
|
// Optimization for case like {l=~".", l!="1"}.
|
||||||
|
@ -209,7 +209,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
|
// We prefer to get AllPostings so that the base of subtraction (i.e. allPostings)
|
||||||
// doesn't include series that may be added to the index reader during this function call.
|
// doesn't include series that may be added to the index reader during this function call.
|
||||||
k, v := index.AllPostingsKey()
|
k, v := index.AllPostingsKey()
|
||||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
allPostings, err := ix.Postings(ctx, k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -226,10 +226,13 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
})
|
})
|
||||||
|
|
||||||
for _, m := range ms {
|
for _, m := range ms {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
switch {
|
switch {
|
||||||
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
|
case m.Name == "" && m.Value == "": // Special-case for AllPostings, used in tests at least.
|
||||||
k, v := index.AllPostingsKey()
|
k, v := index.AllPostingsKey()
|
||||||
allPostings, err := ix.Postings(context.TODO(), k, v)
|
allPostings, err := ix.Postings(ctx, k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -247,7 +250,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
it, err := postingsForMatcher(ix, inverse)
|
it, err := postingsForMatcher(ctx, ix, inverse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -260,7 +263,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
it, err := inversePostingsForMatcher(ix, inverse)
|
it, err := inversePostingsForMatcher(ctx, ix, inverse)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -270,7 +273,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
its = append(its, it)
|
its = append(its, it)
|
||||||
default: // l="a"
|
default: // l="a"
|
||||||
// Non-Not matcher, use normal postingsForMatcher.
|
// Non-Not matcher, use normal postingsForMatcher.
|
||||||
it, err := postingsForMatcher(ix, m)
|
it, err := postingsForMatcher(ctx, ix, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -284,7 +287,7 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
// the series which don't have the label name set too. See:
|
// the series which don't have the label name set too. See:
|
||||||
// https://github.com/prometheus/prometheus/issues/3575 and
|
// https://github.com/prometheus/prometheus/issues/3575 and
|
||||||
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
// https://github.com/prometheus/prometheus/pull/3578#issuecomment-351653555
|
||||||
it, err := inversePostingsForMatcher(ix, m)
|
it, err := inversePostingsForMatcher(ctx, ix, m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -301,23 +304,23 @@ func PostingsForMatchers(ix IndexPostingsReader, ms ...*labels.Matcher) (index.P
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func postingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
|
func postingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
|
||||||
// This method will not return postings for missing labels.
|
// This method will not return postings for missing labels.
|
||||||
|
|
||||||
// Fast-path for equal matching.
|
// Fast-path for equal matching.
|
||||||
if m.Type == labels.MatchEqual {
|
if m.Type == labels.MatchEqual {
|
||||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
return ix.Postings(ctx, m.Name, m.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast-path for set matching.
|
// Fast-path for set matching.
|
||||||
if m.Type == labels.MatchRegexp {
|
if m.Type == labels.MatchRegexp {
|
||||||
setMatches := m.SetMatches()
|
setMatches := m.SetMatches()
|
||||||
if len(setMatches) > 0 {
|
if len(setMatches) > 0 {
|
||||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
return ix.Postings(ctx, m.Name, setMatches...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
vals, err := ix.LabelValues(ctx, m.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -333,28 +336,28 @@ func postingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postin
|
||||||
return index.EmptyPostings(), nil
|
return index.EmptyPostings(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return ix.Postings(context.TODO(), m.Name, res...)
|
return ix.Postings(ctx, m.Name, res...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
// inversePostingsForMatcher returns the postings for the series with the label name set but not matching the matcher.
|
||||||
func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
|
func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *labels.Matcher) (index.Postings, error) {
|
||||||
// Fast-path for MatchNotRegexp matching.
|
// Fast-path for MatchNotRegexp matching.
|
||||||
// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
|
// Inverse of a MatchNotRegexp is MatchRegexp (double negation).
|
||||||
// Fast-path for set matching.
|
// Fast-path for set matching.
|
||||||
if m.Type == labels.MatchNotRegexp {
|
if m.Type == labels.MatchNotRegexp {
|
||||||
setMatches := m.SetMatches()
|
setMatches := m.SetMatches()
|
||||||
if len(setMatches) > 0 {
|
if len(setMatches) > 0 {
|
||||||
return ix.Postings(context.TODO(), m.Name, setMatches...)
|
return ix.Postings(ctx, m.Name, setMatches...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast-path for MatchNotEqual matching.
|
// Fast-path for MatchNotEqual matching.
|
||||||
// Inverse of a MatchNotEqual is MatchEqual (double negation).
|
// Inverse of a MatchNotEqual is MatchEqual (double negation).
|
||||||
if m.Type == labels.MatchNotEqual {
|
if m.Type == labels.MatchNotEqual {
|
||||||
return ix.Postings(context.TODO(), m.Name, m.Value)
|
return ix.Postings(ctx, m.Name, m.Value)
|
||||||
}
|
}
|
||||||
|
|
||||||
vals, err := ix.LabelValues(context.TODO(), m.Name)
|
vals, err := ix.LabelValues(ctx, m.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -371,13 +374,13 @@ func inversePostingsForMatcher(ix IndexPostingsReader, m *labels.Matcher) (index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ix.Postings(context.TODO(), m.Name, res...)
|
return ix.Postings(ctx, m.Name, res...)
|
||||||
}
|
}
|
||||||
|
|
||||||
const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
|
const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
|
||||||
|
|
||||||
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, matchers ...*labels.Matcher) ([]string, error) {
|
||||||
p, err := PostingsForMatchers(r, matchers...)
|
p, err := PostingsForMatchers(ctx, r, matchers...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "fetching postings for matchers")
|
return nil, errors.Wrap(err, "fetching postings for matchers")
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,8 @@ func BenchmarkQuerier(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
n1 := labels.MustNewMatcher(labels.MatchEqual, "n", "1"+postingsBenchSuffix)
|
||||||
nX := labels.MustNewMatcher(labels.MatchEqual, "n", "X"+postingsBenchSuffix)
|
nX := labels.MustNewMatcher(labels.MatchEqual, "n", "X"+postingsBenchSuffix)
|
||||||
|
|
||||||
|
@ -168,7 +170,7 @@ func benchmarkPostingsForMatchers(b *testing.B, ir IndexReader) {
|
||||||
b.ReportAllocs()
|
b.ReportAllocs()
|
||||||
b.ResetTimer()
|
b.ResetTimer()
|
||||||
for i := 0; i < b.N; i++ {
|
for i := 0; i < b.N; i++ {
|
||||||
_, err := PostingsForMatchers(ir, c.matchers...)
|
_, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||||
require.NoError(b, err)
|
require.NoError(b, err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
|
@ -1917,6 +1917,8 @@ func BenchmarkSetMatcher(b *testing.B) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPostingsForMatchers(t *testing.T) {
|
func TestPostingsForMatchers(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
chunkDir := t.TempDir()
|
chunkDir := t.TempDir()
|
||||||
opts := DefaultHeadOptions()
|
opts := DefaultHeadOptions()
|
||||||
opts.ChunkRange = 1000
|
opts.ChunkRange = 1000
|
||||||
|
@ -2195,7 +2197,7 @@ func TestPostingsForMatchers(t *testing.T) {
|
||||||
for _, l := range c.exp {
|
for _, l := range c.exp {
|
||||||
exp[l.String()] = struct{}{}
|
exp[l.String()] = struct{}{}
|
||||||
}
|
}
|
||||||
p, err := PostingsForMatchers(ir, c.matchers...)
|
p, err := PostingsForMatchers(ctx, ir, c.matchers...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var builder labels.ScratchBuilder
|
var builder labels.ScratchBuilder
|
||||||
|
@ -2269,7 +2271,7 @@ func TestQuerierIndexQueriesRace(t *testing.T) {
|
||||||
func appendSeries(t *testing.T, ctx context.Context, wg *sync.WaitGroup, h *Head) {
|
func appendSeries(t *testing.T, ctx context.Context, wg *sync.WaitGroup, h *Head) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
for i := 0; ctx.Err() != nil; i++ {
|
for i := 0; ctx.Err() == nil; i++ {
|
||||||
app := h.Appender(context.Background())
|
app := h.Appender(context.Background())
|
||||||
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric", "seq", strconv.Itoa(i), "always_0", "0"), 0, 0)
|
_, err := app.Append(0, labels.FromStrings(labels.MetricName, "metric", "seq", strconv.Itoa(i), "always_0", "0"), 0, 0)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -2498,6 +2500,8 @@ func (m mockMatcherIndex) LabelNames(context.Context, ...*labels.Matcher) ([]str
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPostingsForMatcher(t *testing.T) {
|
func TestPostingsForMatcher(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
matcher *labels.Matcher
|
matcher *labels.Matcher
|
||||||
hasError bool
|
hasError bool
|
||||||
|
@ -2525,7 +2529,7 @@ func TestPostingsForMatcher(t *testing.T) {
|
||||||
|
|
||||||
for _, tc := range cases {
|
for _, tc := range cases {
|
||||||
ir := &mockMatcherIndex{}
|
ir := &mockMatcherIndex{}
|
||||||
_, err := postingsForMatcher(ir, tc.matcher)
|
_, err := postingsForMatcher(ctx, ir, tc.matcher)
|
||||||
if tc.hasError {
|
if tc.hasError {
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in a new issue