diff --git a/storage/local/chunk.go b/storage/local/chunk.go index e911e4260..52e13eece 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -15,7 +15,6 @@ package local import ( "container/list" - "flag" "fmt" "io" "sync" @@ -26,12 +25,28 @@ import ( "github.com/prometheus/prometheus/storage/metric" ) -var ( - defaultChunkEncoding = flag.Int("storage.local.chunk-encoding-version", 1, "Which chunk encoding version to use for newly created chunks. Currently supported is 0 (delta encoding) and 1 (double-delta encoding).") -) +var DefaultChunkEncoding = doubleDelta type chunkEncoding byte +// String implements flag.Value. +func (ce chunkEncoding) String() string { + return fmt.Sprintf("%d", ce) +} + +// Set implements flag.Value. +func (ce *chunkEncoding) Set(s string) error { + switch s { + case "0": + *ce = delta + case "1": + *ce = doubleDelta + default: + return fmt.Errorf("invalid chunk encoding: %s", s) + } + return nil +} + const ( delta chunkEncoding = iota doubleDelta @@ -244,7 +259,7 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { // newChunk creates a new chunk according to the encoding set by the // defaultChunkEncoding flag. func newChunk() chunk { - return newChunkForEncoding(chunkEncoding(*defaultChunkEncoding)) + return newChunkForEncoding(DefaultChunkEncoding) } func newChunkForEncoding(encoding chunkEncoding) chunk { diff --git a/storage/local/index/index.go b/storage/local/index/index.go index 82945c767..dd31211fc 100644 --- a/storage/local/index/index.go +++ b/storage/local/index/index.go @@ -17,7 +17,6 @@ package index import ( - "flag" "os" "path" @@ -35,10 +34,10 @@ const ( ) var ( - fingerprintToMetricCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-metric", 10*1024*1024, "The size in bytes for the fingerprint to metric index cache.") - fingerprintTimeRangeCacheSize = flag.Int("storage.local.index-cache-size.fingerprint-to-timerange", 5*1024*1024, "The size in bytes for the metric time range index cache.") - labelNameToLabelValuesCacheSize = flag.Int("storage.local.index-cache-size.label-name-to-label-values", 10*1024*1024, "The size in bytes for the label name to label values index cache.") - labelPairToFingerprintsCacheSize = flag.Int("storage.local.index-cache-size.label-pair-to-fingerprints", 20*1024*1024, "The size in bytes for the label pair to fingerprints index cache.") + FingerprintMetricCacheSize = 10 * 1024 * 1024 + FingerprintTimeRangeCacheSize = 5 * 1024 * 1024 + LabelNameLabelValuesCacheSize = 10 * 1024 * 1024 + LabelPairFingerprintsCacheSize = 20 * 1024 * 1024 ) // FingerprintMetricMapping is an in-memory map of fingerprints to metrics. @@ -93,7 +92,7 @@ func (i *FingerprintMetricIndex) Lookup(fp clientmodel.Fingerprint) (metric clie func NewFingerprintMetricIndex(basePath string) (*FingerprintMetricIndex, error) { fingerprintToMetricDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, fingerprintToMetricDir), - CacheSizeBytes: *fingerprintToMetricCacheSize, + CacheSizeBytes: FingerprintMetricCacheSize, }) if err != nil { return nil, err @@ -165,7 +164,7 @@ func (i *LabelNameLabelValuesIndex) LookupSet(l clientmodel.LabelName) (values m func NewLabelNameLabelValuesIndex(basePath string) (*LabelNameLabelValuesIndex, error) { labelNameToLabelValuesDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, labelNameToLabelValuesDir), - CacheSizeBytes: *labelNameToLabelValuesCacheSize, + CacheSizeBytes: LabelNameLabelValuesCacheSize, }) if err != nil { return nil, err @@ -239,7 +238,7 @@ func (i *LabelPairFingerprintIndex) LookupSet(p metric.LabelPair) (fps map[clien func NewLabelPairFingerprintIndex(basePath string) (*LabelPairFingerprintIndex, error) { labelPairToFingerprintsDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, labelPairToFingerprintsDir), - CacheSizeBytes: *labelPairToFingerprintsCacheSize, + CacheSizeBytes: LabelPairFingerprintsCacheSize, }) if err != nil { return nil, err @@ -277,7 +276,7 @@ func (i *FingerprintTimeRangeIndex) Lookup(fp clientmodel.Fingerprint) (firstTim func NewFingerprintTimeRangeIndex(basePath string) (*FingerprintTimeRangeIndex, error) { fingerprintTimeRangeDB, err := NewLevelDB(LevelDBOptions{ Path: path.Join(basePath, fingerprintTimeRangeDir), - CacheSizeBytes: *fingerprintTimeRangeCacheSize, + CacheSizeBytes: FingerprintTimeRangeCacheSize, }) if err != nil { return nil, err diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 4327a177b..1954823a0 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -35,7 +35,7 @@ var ( ) func newTestPersistence(t *testing.T, encoding chunkEncoding) (*persistence, testutil.Closer) { - *defaultChunkEncoding = int(encoding) + DefaultChunkEncoding = encoding dir := testutil.NewTemporaryDirectory("test_persistence", t) p, err := newPersistence(dir.Path(), false, false, func() bool { return false }) if err != nil { diff --git a/storage/local/storage.go b/storage/local/storage.go index 549cb6b39..d874603ec 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -16,6 +16,7 @@ package local import ( "container/list" + "fmt" "sync/atomic" "time" @@ -66,6 +67,34 @@ type evictRequest struct { // SyncStrategy is an enum to select a sync strategy for series files. type SyncStrategy int +// String implements flag.Value. +func (ss SyncStrategy) String() string { + switch ss { + case Adaptive: + return "adaptive" + case Always: + return "always" + case Never: + return "never" + } + return "" +} + +// Set implements flag.Value. +func (ss *SyncStrategy) Set(s string) error { + switch s { + case "adaptive": + *ss = Adaptive + case "always": + *ss = Always + case "never": + *ss = Never + default: + return fmt.Errorf("invalid sync strategy: %s", s) + } + return nil +} + // Possible values for SyncStrategy. const ( _ SyncStrategy = iota diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 959d25711..f0e171e9c 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -977,7 +977,7 @@ func TestFuzzChunkType1(t *testing.T) { // // go test -race -cpu 8 -short -bench BenchmarkFuzzChunkType func benchmarkFuzz(b *testing.B, encoding chunkEncoding) { - *defaultChunkEncoding = int(encoding) + DefaultChunkEncoding = encoding const samplesPerRun = 100000 rand.Seed(42) directory := testutil.NewTemporaryDirectory("test_storage", b) diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index beec6648a..33accf571 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -38,7 +38,7 @@ func (t *testStorageCloser) Close() { // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, testutil.Closer) { - *defaultChunkEncoding = int(encoding) + DefaultChunkEncoding = encoding directory := testutil.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{ MemoryChunks: 1000000, diff --git a/storage/remote/influxdb/client.go b/storage/remote/influxdb/client.go index c9dd6c5e3..570a32546 100644 --- a/storage/remote/influxdb/client.go +++ b/storage/remote/influxdb/client.go @@ -16,7 +16,6 @@ package influxdb import ( "bytes" "encoding/json" - "flag" "fmt" "io/ioutil" "math" @@ -36,22 +35,21 @@ const ( contentTypeJSON = "application/json" ) -var ( - retentionPolicy = flag.String("storage.remote.influxdb.retention-policy", "default", "The InfluxDB retention policy to use.") - database = flag.String("storage.remote.influxdb.database", "prometheus", "The name of the database to use for storing samples in InfluxDB.") -) - // Client allows sending batches of Prometheus samples to InfluxDB. type Client struct { - url string - httpClient *http.Client + url string + httpClient *http.Client + retentionPolicy string + database string } // NewClient creates a new Client. -func NewClient(url string, timeout time.Duration) *Client { +func NewClient(url string, timeout time.Duration, database, retentionPolicy string) *Client { return &Client{ - url: url, - httpClient: httputil.NewDeadlineClient(timeout), + url: url, + httpClient: httputil.NewDeadlineClient(timeout), + retentionPolicy: retentionPolicy, + database: database, } } @@ -120,8 +118,8 @@ func (c *Client) Store(samples clientmodel.Samples) error { u.Path = writeEndpoint req := StoreSamplesRequest{ - Database: *database, - RetentionPolicy: *retentionPolicy, + Database: c.database, + RetentionPolicy: c.retentionPolicy, Points: points, } buf, err := json.Marshal(req) diff --git a/storage/remote/influxdb/client_test.go b/storage/remote/influxdb/client_test.go index ae8329826..d2f9b9ce3 100644 --- a/storage/remote/influxdb/client_test.go +++ b/storage/remote/influxdb/client_test.go @@ -80,7 +80,7 @@ func TestClient(t *testing.T) { )) defer server.Close() - c := NewClient(server.URL, time.Minute) + c := NewClient(server.URL, time.Minute, "prometheus", "default") if err := c.Store(samples); err != nil { t.Fatalf("Error sending samples: %s", err)