mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
storage: remove global flags
This commit is contained in:
parent
fe301d7946
commit
b105e26f4d
|
@ -15,7 +15,6 @@ package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -26,12 +25,28 @@ import (
|
||||||
"github.com/prometheus/prometheus/storage/metric"
|
"github.com/prometheus/prometheus/storage/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var DefaultChunkEncoding = doubleDelta
|
||||||
defaultChunkEncoding = flag.Int("storage.local.chunk-encoding-version", 1, "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).")
|
|
||||||
)
|
|
||||||
|
|
||||||
type chunkEncoding byte
|
type chunkEncoding byte
|
||||||
|
|
||||||
|
// String implements flag.Value.
|
||||||
|
func (ce chunkEncoding) String() string {
|
||||||
|
return fmt.Sprintf("%d", ce)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements flag.Value.
|
||||||
|
func (ce *chunkEncoding) Set(s string) error {
|
||||||
|
switch s {
|
||||||
|
case "0":
|
||||||
|
*ce = delta
|
||||||
|
case "1":
|
||||||
|
*ce = doubleDelta
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid chunk encoding: %s", s)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
delta chunkEncoding = iota
|
delta chunkEncoding = iota
|
||||||
doubleDelta
|
doubleDelta
|
||||||
|
@ -244,7 +259,7 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk {
|
||||||
// newChunk creates a new chunk according to the encoding set by the
|
// newChunk creates a new chunk according to the encoding set by the
|
||||||
// defaultChunkEncoding flag.
|
// defaultChunkEncoding flag.
|
||||||
func newChunk() chunk {
|
func newChunk() chunk {
|
||||||
return newChunkForEncoding(chunkEncoding(*defaultChunkEncoding))
|
return newChunkForEncoding(DefaultChunkEncoding)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newChunkForEncoding(encoding chunkEncoding) chunk {
|
func newChunkForEncoding(encoding chunkEncoding) chunk {
|
||||||
|
|
|
@ -17,7 +17,6 @@
|
||||||
package index
|
package index
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
|
@ -35,10 +34,10 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
fingerprintToMetricCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-metric", 10*1024*1024, "The size in bytes for the fingerprint to metric index cache.")
|
FingerprintMetricCacheSize = 10 * 1024 * 1024
|
||||||
fingerprintTimeRangeCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-timerange", 5*1024*1024, "The size in bytes for the metric time range index cache.")
|
FingerprintTimeRangeCacheSize = 5 * 1024 * 1024
|
||||||
labelNameToLabelValuesCacheSize = flag.Int("storage.local.index-cache-size.label-name-to-label-values", 10*1024*1024, "The size in bytes for the label name to label values index cache.")
|
LabelNameLabelValuesCacheSize = 10 * 1024 * 1024
|
||||||
labelPairToFingerprintsCacheSize = flag.Int("storage.local.index-cache-size.label-pair-to-fingerprints", 20*1024*1024, "The size in bytes for the label pair to fingerprints index cache.")
|
LabelPairFingerprintsCacheSize = 20 * 1024 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// FingerprintMetricMapping is an in-memory map of fingerprints to metrics.
|
// FingerprintMetricMapping is an in-memory map of fingerprints to metrics.
|
||||||
|
@ -93,7 +92,7 @@ func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clie
|
||||||
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
|
func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) {
|
||||||
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
|
fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{
|
||||||
Path: path.Join(basePath, fingerprintToMetricDir),
|
Path: path.Join(basePath, fingerprintToMetricDir),
|
||||||
CacheSizeBytes: *fingerprintToMetricCacheSize,
|
CacheSizeBytes: FingerprintMetricCacheSize,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -165,7 +164,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values m
|
||||||
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
|
func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) {
|
||||||
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
|
labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{
|
||||||
Path: path.Join(basePath, labelNameToLabelValuesDir),
|
Path: path.Join(basePath, labelNameToLabelValuesDir),
|
||||||
CacheSizeBytes: *labelNameToLabelValuesCacheSize,
|
CacheSizeBytes: LabelNameLabelValuesCacheSize,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -239,7 +238,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p metric.LabelPair) (fps map[clien
|
||||||
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
|
func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) {
|
||||||
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
|
labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{
|
||||||
Path: path.Join(basePath, labelPairToFingerprintsDir),
|
Path: path.Join(basePath, labelPairToFingerprintsDir),
|
||||||
CacheSizeBytes: *labelPairToFingerprintsCacheSize,
|
CacheSizeBytes: LabelPairFingerprintsCacheSize,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -277,7 +276,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim
|
||||||
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
|
func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) {
|
||||||
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
|
fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{
|
||||||
Path: path.Join(basePath, fingerprintTimeRangeDir),
|
Path: path.Join(basePath, fingerprintTimeRangeDir),
|
||||||
CacheSizeBytes: *fingerprintTimeRangeCacheSize,
|
CacheSizeBytes: FingerprintTimeRangeCacheSize,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -35,7 +35,7 @@ var (
|
||||||
)
|
)
|
||||||
|
|
||||||
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) {
|
func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) {
|
||||||
*defaultChunkEncoding = int(encoding)
|
DefaultChunkEncoding = encoding
|
||||||
dir := testutil.NewTemporaryDirectory("test_persistence", t)
|
dir := testutil.NewTemporaryDirectory("test_persistence", t)
|
||||||
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
|
p, err := newPersistence(dir.Path(), false, false, func() bool { return false })
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -16,6 +16,7 @@ package local
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
"container/list"
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -66,6 +67,34 @@ type evictRequest struct {
|
||||||
// SyncStrategy is an enum to select a sync strategy for series files.
|
// SyncStrategy is an enum to select a sync strategy for series files.
|
||||||
type SyncStrategy int
|
type SyncStrategy int
|
||||||
|
|
||||||
|
// String implements flag.Value.
|
||||||
|
func (ss SyncStrategy) String() string {
|
||||||
|
switch ss {
|
||||||
|
case Adaptive:
|
||||||
|
return "adaptive"
|
||||||
|
case Always:
|
||||||
|
return "always"
|
||||||
|
case Never:
|
||||||
|
return "never"
|
||||||
|
}
|
||||||
|
return "<unknown>"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements flag.Value.
|
||||||
|
func (ss *SyncStrategy) Set(s string) error {
|
||||||
|
switch s {
|
||||||
|
case "adaptive":
|
||||||
|
*ss = Adaptive
|
||||||
|
case "always":
|
||||||
|
*ss = Always
|
||||||
|
case "never":
|
||||||
|
*ss = Never
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("invalid sync strategy: %s", s)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Possible values for SyncStrategy.
|
// Possible values for SyncStrategy.
|
||||||
const (
|
const (
|
||||||
_ SyncStrategy = iota
|
_ SyncStrategy = iota
|
||||||
|
|
|
@ -977,7 +977,7 @@ func TestFuzzChunkType1(t *testing.T) {
|
||||||
//
|
//
|
||||||
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
|
// go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType
|
||||||
func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
func benchmarkFuzz(b *testing.B, encoding chunkEncoding) {
|
||||||
*defaultChunkEncoding = int(encoding)
|
DefaultChunkEncoding = encoding
|
||||||
const samplesPerRun = 100000
|
const samplesPerRun = 100000
|
||||||
rand.Seed(42)
|
rand.Seed(42)
|
||||||
directory := testutil.NewTemporaryDirectory("test_storage", b)
|
directory := testutil.NewTemporaryDirectory("test_storage", b)
|
||||||
|
|
|
@ -38,7 +38,7 @@ func (t *testStorageCloser) Close() {
|
||||||
// directory. The returned storage is already in serving state. Upon closing the
|
// directory. The returned storage is already in serving state. Upon closing the
|
||||||
// returned test.Closer, the temporary directory is cleaned up.
|
// returned test.Closer, the temporary directory is cleaned up.
|
||||||
func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, testutil.Closer) {
|
func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, testutil.Closer) {
|
||||||
*defaultChunkEncoding = int(encoding)
|
DefaultChunkEncoding = encoding
|
||||||
directory := testutil.NewTemporaryDirectory("test_storage", t)
|
directory := testutil.NewTemporaryDirectory("test_storage", t)
|
||||||
o := &MemorySeriesStorageOptions{
|
o := &MemorySeriesStorageOptions{
|
||||||
MemoryChunks: 1000000,
|
MemoryChunks: 1000000,
|
||||||
|
|
|
@ -16,7 +16,6 @@ package influxdb
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math"
|
"math"
|
||||||
|
@ -36,22 +35,21 @@ const (
|
||||||
contentTypeJSON = "application/json"
|
contentTypeJSON = "application/json"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
retentionPolicy = flag.String("storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.")
|
|
||||||
database = flag.String("storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Client allows sending batches of Prometheus samples to InfluxDB.
|
// Client allows sending batches of Prometheus samples to InfluxDB.
|
||||||
type Client struct {
|
type Client struct {
|
||||||
url string
|
url string
|
||||||
httpClient *http.Client
|
httpClient *http.Client
|
||||||
|
retentionPolicy string
|
||||||
|
database string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates a new Client.
|
// NewClient creates a new Client.
|
||||||
func NewClient(url string, timeout time.Duration) *Client {
|
func NewClient(url string, timeout time.Duration, database, retentionPolicy string) *Client {
|
||||||
return &Client{
|
return &Client{
|
||||||
url: url,
|
url: url,
|
||||||
httpClient: httputil.NewDeadlineClient(timeout),
|
httpClient: httputil.NewDeadlineClient(timeout),
|
||||||
|
retentionPolicy: retentionPolicy,
|
||||||
|
database: database,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,8 +118,8 @@ func (c *Client) Store(samples clientmodel.Samples) error {
|
||||||
u.Path = writeEndpoint
|
u.Path = writeEndpoint
|
||||||
|
|
||||||
req := StoreSamplesRequest{
|
req := StoreSamplesRequest{
|
||||||
Database: *database,
|
Database: c.database,
|
||||||
RetentionPolicy: *retentionPolicy,
|
RetentionPolicy: c.retentionPolicy,
|
||||||
Points: points,
|
Points: points,
|
||||||
}
|
}
|
||||||
buf, err := json.Marshal(req)
|
buf, err := json.Marshal(req)
|
||||||
|
|
|
@ -80,7 +80,7 @@ func TestClient(t *testing.T) {
|
||||||
))
|
))
|
||||||
defer server.Close()
|
defer server.Close()
|
||||||
|
|
||||||
c := NewClient(server.URL, time.Minute)
|
c := NewClient(server.URL, time.Minute, "prometheus", "default")
|
||||||
|
|
||||||
if err := c.Store(samples); err != nil {
|
if err := c.Store(samples); err != nil {
|
||||||
t.Fatalf("Error sending samples: %s", err)
|
t.Fatalf("Error sending samples: %s", err)
|
||||||
|
|
Loading…
Reference in a new issue