Bold changes to concurrency.

(WIP. Probably doesn't work yet.)

Change-Id: Id1537dfcca53831a1d428078a5863ece7bdf4875
This commit is contained in:
Bjoern Rabenstein 2014-10-07 19:11:24 +02:00
parent fcdf5a8ee7
commit 8fba3302bc
8 changed files with 406 additions and 408 deletions

10
main.go
View file

@ -137,16 +137,10 @@ func main() {
glog.Fatalf("Error loading configuration from %s: %v", *configFile, err) glog.Fatalf("Error loading configuration from %s: %v", *configFile, err)
} }
persistence, err := local.NewDiskPersistence(*metricsStoragePath, 1024)
if err != nil {
glog.Fatal("Error opening disk persistence: ", err)
}
registry.MustRegister(persistence)
o := &local.MemorySeriesStorageOptions{ o := &local.MemorySeriesStorageOptions{
Persistence: persistence,
MemoryEvictionInterval: *memoryEvictionInterval, MemoryEvictionInterval: *memoryEvictionInterval,
MemoryRetentionPeriod: *memoryRetentionPeriod, MemoryRetentionPeriod: *memoryRetentionPeriod,
PersistenceStoragePath: *metricsStoragePath,
PersistencePurgeInterval: *storagePurgeInterval, PersistencePurgeInterval: *storagePurgeInterval,
PersistenceRetentionPeriod: *storageRetentionPeriod, PersistenceRetentionPeriod: *storageRetentionPeriod,
} }
@ -155,7 +149,7 @@ func main() {
glog.Fatal("Error opening memory series storage: ", err) glog.Fatal("Error opening memory series storage: ", err)
} }
defer memStorage.Close() defer memStorage.Close()
//registry.MustRegister(memStorage) registry.MustRegister(memStorage)
var remoteTSDBQueue *remote.TSDBQueueManager var remoteTSDBQueue *remote.TSDBQueueManager
if *remoteTSDBUrl == "" { if *remoteTSDBUrl == "" {

View file

@ -15,15 +15,15 @@ package local
import ( import (
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// SeriesMap maps fingerprints to memory series.
type SeriesMap map[clientmodel.Fingerprint]*memorySeries
// Storage ingests and manages samples, along with various indexes. All methods // Storage ingests and manages samples, along with various indexes. All methods
// are goroutine-safe. // are goroutine-safe.
type Storage interface { type Storage interface {
prometheus.Collector
// AppendSamples stores a group of new samples. Multiple samples for the same // AppendSamples stores a group of new samples. Multiple samples for the same
// fingerprint need to be submitted in chronological order, from oldest to // fingerprint need to be submitted in chronological order, from oldest to
// newest (both in the same call to AppendSamples and across multiple calls). // newest (both in the same call to AppendSamples and across multiple calls).
@ -65,108 +65,6 @@ type SeriesIterator interface {
GetRangeValues(metric.Interval) metric.Values GetRangeValues(metric.Interval) metric.Values
} }
// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are generally not goroutine-safe
// unless marked otherwise. The chunk-related methods PersistChunk, DropChunks,
// LoadChunks, and LoadChunkDescs can be called concurrently with each other if
// each call refers to a different fingerprint.
//
// TODO: As a Persistence is really only used within this package, consider not
// exporting it.
type Persistence interface {
// PersistChunk persists a single chunk of a series. It is the caller's
// responsibility to not modify chunk concurrently.
PersistChunk(clientmodel.Fingerprint, chunk) error
// DropChunks deletes all chunks from a series whose last sample time is
// before beforeTime. It returns true if all chunks of the series have
// been deleted.
DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (allDropped bool, err error)
// LoadChunks loads a group of chunks of a timeseries by their index. The
// chunk with the earliest time will have index 0, the following ones will
// have incrementally larger indexes.
LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error)
// LoadChunkDescs loads chunkDescs for a series up until a given time.
LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error)
// PersistSeriesMapAndHeads persists the fingerprint to memory-series
// mapping and all open (non-full) head chunks. It is the caller's
// responsibility to not modify SeriesMap concurrently. Do not call
// concurrently with LoadSeriesMapAndHeads.
PersistSeriesMapAndHeads(SeriesMap) error
// LoadSeriesMapAndHeads loads the fingerprint to memory-series mapping
// and all open (non-full) head chunks. Do not call
// concurrently with PersistSeriesMapAndHeads.
LoadSeriesMapAndHeads() (SeriesMap, error)
// GetFingerprintsForLabelPair returns the fingerprints for the given
// label pair. This method is goroutine-safe but take into account that
// metrics queued for indexing with IndexMetric might not yet made it
// into the index. (Same applies correspondingly to UnindexMetric.)
GetFingerprintsForLabelPair(metric.LabelPair) (clientmodel.Fingerprints, error)
// GetLabelValuesForLabelName returns the label values for the given
// label name. This method is goroutine-safe but take into account that
// metrics queued for indexing with IndexMetric might not yet made it
// into the index. (Same applies correspondingly to UnindexMetric.)
GetLabelValuesForLabelName(clientmodel.LabelName) (clientmodel.LabelValues, error)
// IndexMetric queues the given metric for addition to the indexes
// needed by GetFingerprintsForLabelPair and GetLabelValuesForLabelName.
// If the queue is full, this method blocks until the metric can be queued.
// This method is goroutine-safe.
IndexMetric(clientmodel.Metric, clientmodel.Fingerprint)
// UnindexMetric queues references to the given metric for removal from
// the indexes used for GetFingerprintsForLabelPair and
// GetLabelValuesForLabelName. The index of fingerprints to archived
// metrics is not affected by this removal. (In fact, never call this
// method for an archived metric. To drop an archived metric, call
// DropArchivedFingerprint.) If the queue is full, this method blocks
// until the metric can be queued. This method is goroutine-safe.
UnindexMetric(clientmodel.Metric, clientmodel.Fingerprint)
// WaitForIndexing waits until all items in the indexing queue are
// processed. If queue processing is currently on hold (to gather more
// ops for batching), this method will trigger an immediate start of
// processing. This method is goroutine-safe.
WaitForIndexing()
// ArchiveMetric persists the mapping of the given fingerprint to the
// given metric, together with the first and last timestamp of the
// series belonging to the metric. Do not call concurrently with
// UnarchiveMetric or DropArchivedMetric.
ArchiveMetric(
fingerprint clientmodel.Fingerprint, metric clientmodel.Metric,
firstTime, lastTime clientmodel.Timestamp,
) error
// HasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in
// the corresponding series is. This method is goroutine-safe.
HasArchivedMetric(clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
)
// GetFingerprintsModifiedBefore returns the fingerprints of archived
// timeseries that have live samples before the provided timestamp. This
// method is goroutine-safe (but behavior during concurrent modification
// via ArchiveMetric, UnarchiveMetric, or DropArchivedMetric is
// undefined).
GetFingerprintsModifiedBefore(clientmodel.Timestamp) ([]clientmodel.Fingerprint, error)
// GetArchivedMetric retrieves the archived metric with the given
// fingerprint. This method is goroutine-safe.
GetArchivedMetric(clientmodel.Fingerprint) (clientmodel.Metric, error)
// DropArchivedMetric deletes an archived fingerprint and its
// corresponding metric entirely. It also queues the metric for
// un-indexing (no need to call UnindexMetric for the deleted metric.)
// Do not call concurrently with UnarchiveMetric or ArchiveMetric.
DropArchivedMetric(clientmodel.Fingerprint) error
// UnarchiveMetric deletes an archived fingerprint and its metric, but
// (in contrast to DropArchivedMetric) does not un-index the metric.
// The method returns true if a metric was actually deleted. Do not call
// concurrently with DropArchivedMetric or ArchiveMetric.
UnarchiveMetric(clientmodel.Fingerprint) (bool, error)
// Close flushes the indexing queue and other buffered data and releases
// any held resources.
Close() error
}
// A Preloader preloads series data necessary for a query into memory and pins // A Preloader preloads series data necessary for a query into memory and pins
// them until released via Close(). Its methods are generally not // them until released via Close(). Its methods are generally not
// goroutine-safe. // goroutine-safe.

View file

@ -20,6 +20,7 @@ import (
"io" "io"
"os" "os"
"path" "path"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -75,10 +76,20 @@ type indexingOp struct {
opType indexingOpType opType indexingOpType
} }
type diskPersistence struct { // A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods PersistChunk,
// DropChunks, LoadChunks, and LoadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
basePath string basePath string
chunkLen int chunkLen int
// archiveMtx protects the archiving-related methods ArchiveMetric,
// UnarchiveMetric, DropArchiveMetric, and GetFingerprintsModifiedBefore
// from concurrent calls.
archiveMtx sync.Mutex
archivedFingerprintToMetrics *index.FingerprintMetricIndex archivedFingerprintToMetrics *index.FingerprintMetricIndex
archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
labelPairToFingerprints *index.LabelPairFingerprintIndex labelPairToFingerprints *index.LabelPairFingerprintIndex
@ -94,8 +105,8 @@ type diskPersistence struct {
indexingBatchLatency prometheus.Summary indexingBatchLatency prometheus.Summary
} }
// NewDiskPersistence returns a newly allocated Persistence backed by local disk storage, ready to use. // newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error) { func newPersistence(basePath string, chunkLen int) (*persistence, error) {
if err := os.MkdirAll(basePath, 0700); err != nil { if err := os.MkdirAll(basePath, 0700); err != nil {
return nil, err return nil, err
} }
@ -117,7 +128,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error)
return nil, err return nil, err
} }
p := &diskPersistence{ p := &persistence{
basePath: basePath, basePath: basePath,
chunkLen: chunkLen, chunkLen: chunkLen,
@ -167,7 +178,7 @@ func NewDiskPersistence(basePath string, chunkLen int) (*diskPersistence, error)
} }
// Describe implements prometheus.Collector. // Describe implements prometheus.Collector.
func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) { func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
ch <- p.indexingQueueLength.Desc() ch <- p.indexingQueueLength.Desc()
ch <- p.indexingQueueCapacity.Desc() ch <- p.indexingQueueCapacity.Desc()
p.indexingBatchSizes.Describe(ch) p.indexingBatchSizes.Describe(ch)
@ -175,7 +186,7 @@ func (p *diskPersistence) Describe(ch chan<- *prometheus.Desc) {
} }
// Collect implements prometheus.Collector. // Collect implements prometheus.Collector.
func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) { func (p *persistence) Collect(ch chan<- prometheus.Metric) {
p.indexingQueueLength.Set(float64(len(p.indexingQueue))) p.indexingQueueLength.Set(float64(len(p.indexingQueue)))
ch <- p.indexingQueueLength ch <- p.indexingQueueLength
@ -184,8 +195,11 @@ func (p *diskPersistence) Collect(ch chan<- prometheus.Metric) {
p.indexingBatchLatency.Collect(ch) p.indexingBatchLatency.Collect(ch)
} }
// GetFingerprintsForLabelPair implements persistence. // getFingerprintsForLabelPair returns the fingerprints for the given label
func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) { // pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not yet made it into the index. (Same
// applies correspondingly to UnindexMetric.)
func (p *persistence) getFingerprintsForLabelPair(lp metric.LabelPair) (clientmodel.Fingerprints, error) {
fps, _, err := p.labelPairToFingerprints.Lookup(lp) fps, _, err := p.labelPairToFingerprints.Lookup(lp)
if err != nil { if err != nil {
return nil, err return nil, err
@ -193,8 +207,11 @@ func (p *diskPersistence) GetFingerprintsForLabelPair(lp metric.LabelPair) (clie
return fps, nil return fps, nil
} }
// GetLabelValuesForLabelName implements persistence. // getLabelValuesForLabelName returns the label values for the given label
func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) { // name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not yet made it into the index. (Same
// applies correspondingly to UnindexMetric.)
func (p *persistence) getLabelValuesForLabelName(ln clientmodel.LabelName) (clientmodel.LabelValues, error) {
lvs, _, err := p.labelNameToLabelValues.Lookup(ln) lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
if err != nil { if err != nil {
return nil, err return nil, err
@ -202,8 +219,10 @@ func (p *diskPersistence) GetLabelValuesForLabelName(ln clientmodel.LabelName) (
return lvs, nil return lvs, nil
} }
// PersistChunk implements Persistence. // persistChunk persists a single chunk of a series. It is the caller's
func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) error { // responsibility to not modify chunk concurrently and to not persist or drop anything
// for the same fingerprint concurrently.
func (p *persistence) persistChunk(fp clientmodel.Fingerprint, c chunk) error {
// 1. Open chunk file. // 1. Open chunk file.
f, err := p.openChunkFileForWriting(fp) f, err := p.openChunkFileForWriting(fp)
if err != nil { if err != nil {
@ -224,8 +243,11 @@ func (p *diskPersistence) PersistChunk(fp clientmodel.Fingerprint, c chunk) erro
return c.marshal(b) return c.marshal(b)
} }
// LoadChunks implements Persistence. // loadChunks loads a group of chunks of a timeseries by their index. The chunk
func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) { // with the earliest time will have index 0, the following ones will have
// incrementally larger indexes. It is the caller's responsibility to not
// persist or drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int) (chunks, error) {
// TODO: we need to verify at some point that file length is a multiple of // TODO: we need to verify at some point that file length is a multiple of
// the chunk size. When is the best time to do this, and where to remember // the chunk size. When is the best time to do this, and where to remember
// it? Right now, we only do it when loading chunkDescs. // it? Right now, we only do it when loading chunkDescs.
@ -270,7 +292,10 @@ func (p *diskPersistence) LoadChunks(fp clientmodel.Fingerprint, indexes []int)
return chunks, nil return chunks, nil
} }
func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) { // loadChunkDescs loads chunkDescs for a series up until a given time. It is
// the caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (chunkDescs, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, nil return nil, nil
@ -323,8 +348,17 @@ func (p *diskPersistence) LoadChunkDescs(fp clientmodel.Fingerprint, beforeTime
return cds, nil return cds, nil
} }
// PersistSeriesMapAndHeads implements Persistence. // persistSeriesMapAndHeads persists the fingerprint to memory-series mapping
func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap) error { // and all open (non-full) head chunks. Do not call concurrently with
// LoadSeriesMapAndHeads.
//
// TODO: Currently, this method assumes to be called while nothing else is going
// on in the storage concurrently. To make this method callable during normal
// operations, certain things have to be done:
// - Make sure the length of the seriesMap doesn't change during the runtime.
// - Lock the fingerprints while persisting unpersisted head chunks.
// - Write to temporary file and only rename after successfully finishing.
func (p *persistence) persistSeriesMapAndHeads(fingerprintToSeries seriesMap) error {
f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640) f, err := os.OpenFile(p.headsPath(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
if err != nil { if err != nil {
return err return err
@ -338,34 +372,41 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
if err := codable.EncodeVarint(w, headsFormatVersion); err != nil { if err := codable.EncodeVarint(w, headsFormatVersion); err != nil {
return err return err
} }
if err := codable.EncodeVarint(w, int64(len(fingerprintToSeries))); err != nil { if err := codable.EncodeVarint(w, int64(fingerprintToSeries.length())); err != nil {
return err return err
} }
for fp, series := range fingerprintToSeries { iter := fingerprintToSeries.iter()
defer func() {
// Consume the iterator in any case to not leak goroutines.
for _ = range iter {
}
}()
for m := range iter {
var seriesFlags byte var seriesFlags byte
if series.chunkDescsLoaded { if m.series.chunkDescsLoaded {
seriesFlags |= flagChunkDescsLoaded seriesFlags |= flagChunkDescsLoaded
} }
if series.headChunkPersisted { if m.series.headChunkPersisted {
seriesFlags |= flagHeadChunkPersisted seriesFlags |= flagHeadChunkPersisted
} }
if err := w.WriteByte(seriesFlags); err != nil { if err := w.WriteByte(seriesFlags); err != nil {
return err return err
} }
if err := codable.EncodeUint64(w, uint64(fp)); err != nil { if err := codable.EncodeUint64(w, uint64(m.fp)); err != nil {
return err return err
} }
buf, err := codable.Metric(series.metric).MarshalBinary() buf, err := codable.Metric(m.series.metric).MarshalBinary()
if err != nil { if err != nil {
return err return err
} }
w.Write(buf) w.Write(buf)
if err := codable.EncodeVarint(w, int64(len(series.chunkDescs))); err != nil { if err := codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
return err return err
} }
for i, chunkDesc := range series.chunkDescs { for i, chunkDesc := range m.series.chunkDescs {
if series.headChunkPersisted || i < len(series.chunkDescs)-1 { if m.series.headChunkPersisted || i < len(m.series.chunkDescs)-1 {
if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil { if err := codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
return err return err
} }
@ -386,55 +427,58 @@ func (p *diskPersistence) PersistSeriesMapAndHeads(fingerprintToSeries SeriesMap
return w.Flush() return w.Flush()
} }
// LoadSeriesMapAndHeads implements Persistence. // loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) { // open (non-full) head chunks. Only call this method during start-up while
// nothing else is running in storage land. This method is utterly
// goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (seriesMap, error) {
f, err := os.Open(p.headsPath()) f, err := os.Open(p.headsPath())
if os.IsNotExist(err) { if os.IsNotExist(err) {
return SeriesMap{}, nil return newSeriesMap(), nil
} }
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
defer f.Close() defer f.Close()
r := bufio.NewReaderSize(f, fileBufSize) r := bufio.NewReaderSize(f, fileBufSize)
buf := make([]byte, len(headsMagicString)) buf := make([]byte, len(headsMagicString))
if _, err := io.ReadFull(r, buf); err != nil { if _, err := io.ReadFull(r, buf); err != nil {
return nil, err return seriesMap{}, err
} }
magic := string(buf) magic := string(buf)
if magic != headsMagicString { if magic != headsMagicString {
return nil, fmt.Errorf( return seriesMap{}, fmt.Errorf(
"unexpected magic string, want %q, got %q", "unexpected magic string, want %q, got %q",
headsMagicString, magic, headsMagicString, magic,
) )
} }
if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil { if version, err := binary.ReadVarint(r); version != headsFormatVersion || err != nil {
return nil, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion) return seriesMap{}, fmt.Errorf("unknown heads format version, want %d", headsFormatVersion)
} }
numSeries, err := binary.ReadVarint(r) numSeries, err := binary.ReadVarint(r)
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
fingerprintToSeries := make(SeriesMap, numSeries) fingerprintToSeries := make(map[clientmodel.Fingerprint]*memorySeries, numSeries)
for ; numSeries > 0; numSeries-- { for ; numSeries > 0; numSeries-- {
seriesFlags, err := r.ReadByte() seriesFlags, err := r.ReadByte()
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0 headChunkPersisted := seriesFlags&flagHeadChunkPersisted != 0
fp, err := codable.DecodeUint64(r) fp, err := codable.DecodeUint64(r)
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
var metric codable.Metric var metric codable.Metric
if err := metric.UnmarshalFromReader(r); err != nil { if err := metric.UnmarshalFromReader(r); err != nil {
return nil, err return seriesMap{}, err
} }
numChunkDescs, err := binary.ReadVarint(r) numChunkDescs, err := binary.ReadVarint(r)
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
chunkDescs := make(chunkDescs, numChunkDescs) chunkDescs := make(chunkDescs, numChunkDescs)
@ -442,11 +486,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
if headChunkPersisted || i < numChunkDescs-1 { if headChunkPersisted || i < numChunkDescs-1 {
firstTime, err := binary.ReadVarint(r) firstTime, err := binary.ReadVarint(r)
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
lastTime, err := binary.ReadVarint(r) lastTime, err := binary.ReadVarint(r)
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
chunkDescs[i] = &chunkDesc{ chunkDescs[i] = &chunkDesc{
firstTimeField: clientmodel.Timestamp(firstTime), firstTimeField: clientmodel.Timestamp(firstTime),
@ -456,11 +500,11 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
// Non-persisted head chunk. // Non-persisted head chunk.
chunkType, err := r.ReadByte() chunkType, err := r.ReadByte()
if err != nil { if err != nil {
return nil, err return seriesMap{}, err
} }
chunk := chunkForType(chunkType) chunk := chunkForType(chunkType)
if err := chunk.unmarshal(r); err != nil { if err := chunk.unmarshal(r); err != nil {
return nil, err return seriesMap{}, err
} }
chunkDescs[i] = &chunkDesc{ chunkDescs[i] = &chunkDesc{
chunk: chunk, chunk: chunk,
@ -476,11 +520,14 @@ func (p *diskPersistence) LoadSeriesMapAndHeads() (SeriesMap, error) {
headChunkPersisted: headChunkPersisted, headChunkPersisted: headChunkPersisted,
} }
} }
return fingerprintToSeries, nil return seriesMap{m: fingerprintToSeries}, nil
} }
// DropChunks implements persistence. // dropChunks deletes all chunks from a series whose last sample time is before
func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) { // beforeTime. It returns true if all chunks of the series have been deleted.
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
func (p *persistence) dropChunks(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) (bool, error) {
f, err := p.openChunkFileForReading(fp) f, err := p.openChunkFileForReading(fp)
if os.IsNotExist(err) { if os.IsNotExist(err) {
return true, nil return true, nil
@ -538,18 +585,30 @@ func (p *diskPersistence) DropChunks(fp clientmodel.Fingerprint, beforeTime clie
return false, nil return false, nil
} }
// IndexMetric implements Persistence. // indexMetric queues the given metric for addition to the indexes needed by
func (p *diskPersistence) IndexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { // getFingerprintsForLabelPair, getLabelValuesForLabelName, and
// getFingerprintsModifiedBefore. If the queue is full, this method blocks
// until the metric can be queued. This method is goroutine-safe.
func (p *persistence) indexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) {
p.indexingQueue <- indexingOp{fp, m, add} p.indexingQueue <- indexingOp{fp, m, add}
} }
// UnindexMetric implements Persistence. // unindexMetric queues references to the given metric for removal from the
func (p *diskPersistence) UnindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) { // indexes used for getFingerprintsForLabelPair, getLabelValuesForLabelName, and
// getFingerprintsModifiedBefore. The index of fingerprints to archived metrics
// is not affected by this removal. (In fact, never call this method for an
// archived metric. To drop an archived metric, call dropArchivedFingerprint.)
// If the queue is full, this method blocks until the metric can be queued. This
// method is goroutine-safe.
func (p *persistence) unindexMetric(m clientmodel.Metric, fp clientmodel.Fingerprint) {
p.indexingQueue <- indexingOp{fp, m, remove} p.indexingQueue <- indexingOp{fp, m, remove}
} }
// WaitForIndexing implements Persistence. // waitForIndexing waits until all items in the indexing queue are processed. If
func (p *diskPersistence) WaitForIndexing() { // queue processing is currently on hold (to gather more ops for batching), this
// method will trigger an immediate start of processing. This method is
// goroutine-safe.
func (p *persistence) waitForIndexing() {
wait := make(chan int) wait := make(chan int)
for { for {
p.indexingFlush <- wait p.indexingFlush <- wait
@ -559,10 +618,15 @@ func (p *diskPersistence) WaitForIndexing() {
} }
} }
// ArchiveMetric implements Persistence. // archiveMetric persists the mapping of the given fingerprint to the given
func (p *diskPersistence) ArchiveMetric( // metric, together with the first and last timestamp of the series belonging to
// the metric. This method is goroutine-safe.
func (p *persistence) archiveMetric(
fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp, fp clientmodel.Fingerprint, m clientmodel.Metric, first, last clientmodel.Timestamp,
) error { ) error {
p.archiveMtx.Lock()
defer p.archiveMtx.Unlock()
if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil { if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
return err return err
} }
@ -572,16 +636,26 @@ func (p *diskPersistence) ArchiveMetric(
return nil return nil
} }
// HasArchivedMetric implements Persistence. // hasArchivedMetric returns whether the archived metric for the given
func (p *diskPersistence) HasArchivedMetric(fp clientmodel.Fingerprint) ( // fingerprint exists and if yes, what the first and last timestamp in the
// corresponding series is. This method is goroutine-safe.
func (p *persistence) hasArchivedMetric(fp clientmodel.Fingerprint) (
hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error, hasMetric bool, firstTime, lastTime clientmodel.Timestamp, err error,
) { ) {
firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp) firstTime, lastTime, hasMetric, err = p.archivedFingerprintToTimeRange.Lookup(fp)
return return
} }
// GetFingerprintsModifiedBefore implements Persistence. // getFingerprintsModifiedBefore returns the fingerprints of archived timeseries
func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) { // that have live samples before the provided timestamp. This method is
// goroutine-safe.
func (p *persistence) getFingerprintsModifiedBefore(beforeTime clientmodel.Timestamp) ([]clientmodel.Fingerprint, error) {
// The locking makes sure archivedFingerprintToTimeRange won't be
// mutated while being iterated over (which will probably not result in
// races, but might still yield weird results).
p.archiveMtx.Lock()
defer p.archiveMtx.Unlock()
var fp codable.Fingerprint var fp codable.Fingerprint
var tr codable.TimeRange var tr codable.TimeRange
fps := []clientmodel.Fingerprint{} fps := []clientmodel.Fingerprint{}
@ -600,15 +674,21 @@ func (p *diskPersistence) GetFingerprintsModifiedBefore(beforeTime clientmodel.T
return fps, nil return fps, nil
} }
// GetArchivedMetric implements Persistence. // getArchivedMetric retrieves the archived metric with the given
func (p *diskPersistence) GetArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) { // fingerprint. This method is goroutine-safe.
func (p *persistence) getArchivedMetric(fp clientmodel.Fingerprint) (clientmodel.Metric, error) {
metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp) metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
return metric, err return metric, err
} }
// DropArchivedMetric implements Persistence. // dropArchivedMetric deletes an archived fingerprint and its corresponding
func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error { // metric entirely. It also queues the metric for un-indexing (no need to call
metric, err := p.GetArchivedMetric(fp) // unindexMetric for the deleted metric.) This method is goroutine-safe.
func (p *persistence) dropArchivedMetric(fp clientmodel.Fingerprint) error {
p.archiveMtx.Lock()
defer p.archiveMtx.Unlock()
metric, err := p.getArchivedMetric(fp)
if err != nil || metric == nil { if err != nil || metric == nil {
return err return err
} }
@ -618,12 +698,17 @@ func (p *diskPersistence) DropArchivedMetric(fp clientmodel.Fingerprint) error {
if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil { if err := p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp)); err != nil {
return err return err
} }
p.UnindexMetric(metric, fp) p.unindexMetric(metric, fp)
return nil return nil
} }
// UnarchiveMetric implements Persistence. // unarchiveMetric deletes an archived fingerprint and its metric, but (in
func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, error) { // contrast to dropArchivedMetric) does not un-index the metric. The method
// returns true if a metric was actually deleted. This method is goroutine-safe.
func (p *persistence) unarchiveMetric(fp clientmodel.Fingerprint) (bool, error) {
p.archiveMtx.Lock()
defer p.archiveMtx.Unlock()
has, err := p.archivedFingerprintToTimeRange.Has(fp) has, err := p.archivedFingerprintToTimeRange.Has(fp)
if err != nil || !has { if err != nil || !has {
return false, err return false, err
@ -637,8 +722,9 @@ func (p *diskPersistence) UnarchiveMetric(fp clientmodel.Fingerprint) (bool, err
return true, nil return true, nil
} }
// Close implements Persistence. // close flushes the indexing queue and other buffered data and releases any
func (p *diskPersistence) Close() error { // held resources.
func (p *persistence) close() error {
close(p.indexingQueue) close(p.indexingQueue)
<-p.indexingStopped <-p.indexingStopped
@ -662,12 +748,12 @@ func (p *diskPersistence) Close() error {
return lastError return lastError
} }
func (p *diskPersistence) dirForFingerprint(fp clientmodel.Fingerprint) string { func (p *persistence) dirForFingerprint(fp clientmodel.Fingerprint) string {
fpStr := fp.String() fpStr := fp.String()
return fmt.Sprintf("%s/%c%c/%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:]) return fmt.Sprintf("%s/%c%c/%s", p.basePath, fpStr[0], fpStr[1], fpStr[2:])
} }
func (p *diskPersistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) { func (p *persistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*os.File, error) {
dirname := p.dirForFingerprint(fp) dirname := p.dirForFingerprint(fp)
ex, err := exists(dirname) ex, err := exists(dirname)
if err != nil { if err != nil {
@ -681,7 +767,7 @@ func (p *diskPersistence) openChunkFileForWriting(fp clientmodel.Fingerprint) (*
return os.OpenFile(path.Join(dirname, seriesFileName), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640) return os.OpenFile(path.Join(dirname, seriesFileName), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
} }
func (p *diskPersistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) { func (p *persistence) openChunkFileForReading(fp clientmodel.Fingerprint) (*os.File, error) {
dirname := p.dirForFingerprint(fp) dirname := p.dirForFingerprint(fp)
return os.Open(path.Join(dirname, seriesFileName)) return os.Open(path.Join(dirname, seriesFileName))
} }
@ -695,15 +781,15 @@ func writeChunkHeader(w io.Writer, c chunk) error {
return err return err
} }
func (p *diskPersistence) offsetForChunkIndex(i int) int64 { func (p *persistence) offsetForChunkIndex(i int) int64 {
return int64(i * (chunkHeaderLen + p.chunkLen)) return int64(i * (chunkHeaderLen + p.chunkLen))
} }
func (p *diskPersistence) headsPath() string { func (p *persistence) headsPath() string {
return path.Join(p.basePath, headsFileName) return path.Join(p.basePath, headsFileName)
} }
func (p *diskPersistence) processIndexingQueue() { func (p *persistence) processIndexingQueue() {
batchSize := 0 batchSize := 0
nameToValues := index.LabelNameLabelValuesMapping{} nameToValues := index.LabelNameLabelValuesMapping{}
pairToFPs := index.LabelPairFingerprintsMapping{} pairToFPs := index.LabelPairFingerprintsMapping{}

View file

@ -25,15 +25,15 @@ import (
"github.com/prometheus/prometheus/utility/test" "github.com/prometheus/prometheus/utility/test"
) )
func newTestPersistence(t *testing.T) (Persistence, test.Closer) { func newTestPersistence(t *testing.T) (*persistence, test.Closer) {
dir := test.NewTemporaryDirectory("test_persistence", t) dir := test.NewTemporaryDirectory("test_persistence", t)
p, err := NewDiskPersistence(dir.Path(), 1024) p, err := newPersistence(dir.Path(), 1024)
if err != nil { if err != nil {
dir.Close() dir.Close()
t.Fatal(err) t.Fatal(err)
} }
return p, test.NewCallbackCloser(func() { return p, test.NewCallbackCloser(func() {
p.Close() p.close()
dir.Close() dir.Close()
}) })
} }
@ -83,7 +83,7 @@ func TestPersistChunk(t *testing.T) {
for fp, chunks := range fpToChunks { for fp, chunks := range fpToChunks {
for _, c := range chunks { for _, c := range chunks {
if err := p.PersistChunk(fp, c); err != nil { if err := p.persistChunk(fp, c); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -94,7 +94,7 @@ func TestPersistChunk(t *testing.T) {
for i := range expectedChunks { for i := range expectedChunks {
indexes = append(indexes, i) indexes = append(indexes, i)
} }
actualChunks, err := p.LoadChunks(fp, indexes) actualChunks, err := p.loadChunks(fp, indexes)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -254,21 +254,21 @@ func TestIndexing(t *testing.T) {
indexedFpsToMetrics := index.FingerprintMetricMapping{} indexedFpsToMetrics := index.FingerprintMetricMapping{}
for i, b := range batches { for i, b := range batches {
for fp, m := range b.fpToMetric { for fp, m := range b.fpToMetric {
p.IndexMetric(m, fp) p.indexMetric(m, fp)
if err := p.ArchiveMetric(fp, m, 1, 2); err != nil { if err := p.archiveMetric(fp, m, 1, 2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
indexedFpsToMetrics[fp] = m indexedFpsToMetrics[fp] = m
} }
verifyIndexedState(i, t, b, indexedFpsToMetrics, p.(*diskPersistence)) verifyIndexedState(i, t, b, indexedFpsToMetrics, p)
} }
for i := len(batches) - 1; i >= 0; i-- { for i := len(batches) - 1; i >= 0; i-- {
b := batches[i] b := batches[i]
verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p.(*diskPersistence)) verifyIndexedState(i, t, batches[i], indexedFpsToMetrics, p)
for fp, m := range b.fpToMetric { for fp, m := range b.fpToMetric {
p.UnindexMetric(m, fp) p.unindexMetric(m, fp)
unarchived, err := p.UnarchiveMetric(fp) unarchived, err := p.unarchiveMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -280,11 +280,11 @@ func TestIndexing(t *testing.T) {
} }
} }
func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *diskPersistence) { func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMetrics index.FingerprintMetricMapping, p *persistence) {
p.WaitForIndexing() p.waitForIndexing()
for fp, m := range indexedFpsToMetrics { for fp, m := range indexedFpsToMetrics {
// Compare archived metrics with input metrics. // Compare archived metrics with input metrics.
mOut, err := p.GetArchivedMetric(fp) mOut, err := p.getArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -293,7 +293,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
} }
// Check that archived metrics are in membership index. // Check that archived metrics are in membership index.
has, first, last, err := p.HasArchivedMetric(fp) has, first, last, err := p.hasArchivedMetric(fp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -310,7 +310,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label name -> label values mappings. // Compare label name -> label values mappings.
for ln, lvs := range b.expectedLnToLvs { for ln, lvs := range b.expectedLnToLvs {
outLvs, err := p.GetLabelValuesForLabelName(ln) outLvs, err := p.getLabelValuesForLabelName(ln)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -327,7 +327,7 @@ func verifyIndexedState(i int, t *testing.T, b incrementalBatch, indexedFpsToMet
// Compare label pair -> fingerprints mappings. // Compare label pair -> fingerprints mappings.
for lp, fps := range b.expectedLpToFps { for lp, fps := range b.expectedLpToFps {
outFps, err := p.GetFingerprintsForLabelPair(lp) outFps, err := p.getFingerprintsForLabelPair(lp)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View file

@ -22,6 +22,98 @@ import (
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
type fingerprintSeriesPair struct {
fp clientmodel.Fingerprint
series *memorySeries
}
// seriesMap maps fingerprints to memory series. All its methods are
// goroutine-safe. A SeriesMap is effectively is a goroutine-safe version of
// map[clientmodel.Fingerprint]*memorySeries.
type seriesMap struct {
mtx sync.RWMutex
m map[clientmodel.Fingerprint]*memorySeries
}
// newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap
// based on a prefilled map, use an explicit initializer.
func newSeriesMap() seriesMap {
return seriesMap{m: make(map[clientmodel.Fingerprint]*memorySeries)}
}
// length returns the number of mappings in the seriesMap.
func (sm seriesMap) length() int {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
return len(sm.m)
}
// get returns a memorySeries for a fingerprint. Return values have the same
// semantics as the native Go map.
func (sm seriesMap) get(fp clientmodel.Fingerprint) (s *memorySeries, ok bool) {
sm.mtx.RLock()
defer sm.mtx.RUnlock()
s, ok = sm.m[fp]
return
}
// put adds a mapping to the seriesMap.
func (sm seriesMap) put(fp clientmodel.Fingerprint, s *memorySeries) {
sm.mtx.Lock()
defer sm.mtx.Unlock()
sm.m[fp] = s
}
// del removes a mapping from the series Map.
func (sm seriesMap) del(fp clientmodel.Fingerprint) {
sm.mtx.Lock()
defer sm.mtx.Unlock()
delete(sm.m, fp)
}
// iter returns a channel that produces all mappings in the seriesMap. The
// channel will be closed once all fingerprints have been received. Not
// consuming all fingerprints from the channel will leak a goroutine. The
// semantics of concurrent modification of seriesMap is the same as for
// iterating over a map with a 'range' clause.
func (sm seriesMap) iter() <-chan fingerprintSeriesPair {
ch := make(chan fingerprintSeriesPair)
go func() {
sm.mtx.RLock()
for fp, s := range sm.m {
sm.mtx.RUnlock()
ch <- fingerprintSeriesPair{fp, s}
sm.mtx.RLock()
}
sm.mtx.RUnlock()
}()
return ch
}
// fpIter returns a channel that produces all fingerprints in the seriesMap. The
// channel will be closed once all fingerprints have been received. Not
// consuming all fingerprints from the channel will leak a goroutine. The
// semantics of concurrent modification of seriesMap is the same as for
// iterating over a map with a 'range' clause.
func (sm seriesMap) fpIter() <-chan clientmodel.Fingerprint {
ch := make(chan clientmodel.Fingerprint)
go func() {
sm.mtx.RLock()
for fp := range sm.m {
sm.mtx.RUnlock()
ch <- fp
sm.mtx.RLock()
}
sm.mtx.RUnlock()
}()
return ch
}
type chunkDescs []*chunkDesc type chunkDescs []*chunkDesc
type chunkDesc struct { type chunkDesc struct {
@ -110,8 +202,6 @@ func (cd *chunkDesc) evictNow() {
} }
type memorySeries struct { type memorySeries struct {
mtx sync.Mutex
metric clientmodel.Metric metric clientmodel.Metric
// Sorted by start time, overlapping chunk ranges are forbidden. // Sorted by start time, overlapping chunk ranges are forbidden.
chunkDescs chunkDescs chunkDescs chunkDescs
@ -125,17 +215,21 @@ type memorySeries struct {
headChunkPersisted bool headChunkPersisted bool
} }
func newMemorySeries(m clientmodel.Metric) *memorySeries { // newMemorySeries returns a pointer to a newly allocated memorySeries for the
// given metric. reallyNew defines if the memorySeries is a genuinely new series
// or (if false) a series for a metric being unarchived, i.e. a series that
// existed before but has been evicted from memory.
func newMemorySeries(m clientmodel.Metric, reallyNew bool) *memorySeries {
return &memorySeries{ return &memorySeries{
metric: m, metric: m,
chunkDescsLoaded: true, chunkDescsLoaded: reallyNew,
headChunkPersisted: !reallyNew,
} }
} }
// add adds a sample pair to the series.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) { func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, persistQueue chan *persistRequest) {
s.mtx.Lock()
defer s.mtx.Unlock()
if len(s.chunkDescs) == 0 || s.headChunkPersisted { if len(s.chunkDescs) == 0 || s.headChunkPersisted {
newHead := &chunkDesc{ newHead := &chunkDesc{
chunk: newDeltaEncodedChunk(d1, d0, true), chunk: newDeltaEncodedChunk(d1, d0, true),
@ -172,9 +266,9 @@ func (s *memorySeries) add(fp clientmodel.Fingerprint, v *metric.SamplePair, per
} }
} }
// persistHeadChunk queues the head chunk for persisting if not already done.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) { func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue chan *persistRequest) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.headChunkPersisted { if s.headChunkPersisted {
return return
} }
@ -185,10 +279,9 @@ func (s *memorySeries) persistHeadChunk(fp clientmodel.Fingerprint, persistQueue
} }
} }
// evictOlderThan evicts chunks whose latest sample is older than the given timestamp.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) { func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool) {
s.mtx.Lock()
defer s.mtx.Unlock()
// For now, always drop the entire range from oldest to t. // For now, always drop the entire range from oldest to t.
for _, cd := range s.chunkDescs { for _, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) { if !cd.lastTime().Before(t) {
@ -203,10 +296,8 @@ func (s *memorySeries) evictOlderThan(t clientmodel.Timestamp) (allEvicted bool)
} }
// purgeOlderThan returns true if all chunks have been purged. // purgeOlderThan returns true if all chunks have been purged.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool { func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
s.mtx.Lock()
defer s.mtx.Unlock()
keepIdx := len(s.chunkDescs) keepIdx := len(s.chunkDescs)
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if !cd.lastTime().Before(t) { if !cd.lastTime().Before(t) {
@ -224,10 +315,11 @@ func (s *memorySeries) purgeOlderThan(t clientmodel.Timestamp) bool {
return len(s.chunkDescs) == 0 return len(s.chunkDescs) == 0
} }
// preloadChunks is an internal helper method.
// TODO: in this method (and other places), we just fudge around with chunkDesc // TODO: in this method (and other places), we just fudge around with chunkDesc
// internals without grabbing the chunkDesc lock. Study how this needs to be // internals without grabbing the chunkDesc lock. Study how this needs to be
// protected against other accesses that don't hold the series lock. // protected against other accesses that don't hold the fp lock.
func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs, error) { func (s *memorySeries) preloadChunks(indexes []int, p *persistence) (chunkDescs, error) {
loadIndexes := []int{} loadIndexes := []int{}
pinnedChunkDescs := make(chunkDescs, 0, len(indexes)) pinnedChunkDescs := make(chunkDescs, 0, len(indexes))
for _, idx := range indexes { for _, idx := range indexes {
@ -241,7 +333,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs,
if len(loadIndexes) > 0 { if len(loadIndexes) > 0 {
fp := s.metric.Fingerprint() fp := s.metric.Fingerprint()
chunks, err := p.LoadChunks(fp, loadIndexes) chunks, err := p.loadChunks(fp, loadIndexes)
if err != nil { if err != nil {
// Unpin any pinned chunks that were already loaded. // Unpin any pinned chunks that were already loaded.
for _, cd := range pinnedChunkDescs { for _, cd := range pinnedChunkDescs {
@ -261,7 +353,7 @@ func (s *memorySeries) preloadChunks(indexes []int, p Persistence) (chunkDescs,
} }
/* /*
func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p Persistence) (chunkDescs, error) { func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p *persistence) (chunkDescs, error) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
@ -291,8 +383,9 @@ func (s *memorySeries) preloadChunksAtTime(t clientmodel.Timestamp, p Persistenc
} }
*/ */
func (s *memorySeries) loadChunkDescs(p Persistence) error { // loadChunkDescs is an internal helper method.
cds, err := p.LoadChunkDescs(s.metric.Fingerprint(), s.chunkDescs[0].firstTime()) func (s *memorySeries) loadChunkDescs(p *persistence) error {
cds, err := p.loadChunkDescs(s.metric.Fingerprint(), s.chunkDescs[0].firstTime())
if err != nil { if err != nil {
return err return err
} }
@ -301,10 +394,9 @@ func (s *memorySeries) loadChunkDescs(p Persistence) error {
return nil return nil
} }
func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through clientmodel.Timestamp, p Persistence) (chunkDescs, error) { // preloadChunksForRange loads chunks for the given range from the persistence.
s.mtx.Lock() // The caller must have locked the fingerprint of the series.
defer s.mtx.Unlock() func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through clientmodel.Timestamp, p *persistence) (chunkDescs, error) {
if !s.chunkDescsLoaded && from.Before(s.chunkDescs[0].firstTime()) { if !s.chunkDescsLoaded && from.Before(s.chunkDescs[0].firstTime()) {
if err := s.loadChunkDescs(p); err != nil { if err := s.loadChunkDescs(p); err != nil {
return nil, err return nil, err
@ -339,15 +431,12 @@ func (s *memorySeries) preloadChunksForRange(from clientmodel.Timestamp, through
// memorySeriesIterator implements SeriesIterator. // memorySeriesIterator implements SeriesIterator.
type memorySeriesIterator struct { type memorySeriesIterator struct {
mtx *sync.Mutex lock, unlock func()
chunkIt chunkIterator chunkIt chunkIterator
chunks chunks chunks chunks
} }
func (s *memorySeries) newIterator() SeriesIterator { func (s *memorySeries) newIterator(lockFunc, unlockFunc func()) SeriesIterator {
// TODO: Possible concurrency issue if series is modified while this is
// running. Only caller at the moment is in NewIterator() in storage.go,
// where there is no locking.
chunks := make(chunks, 0, len(s.chunkDescs)) chunks := make(chunks, 0, len(s.chunkDescs))
for i, cd := range s.chunkDescs { for i, cd := range s.chunkDescs {
if cd.chunk != nil { if cd.chunk != nil {
@ -360,7 +449,8 @@ func (s *memorySeries) newIterator() SeriesIterator {
} }
return &memorySeriesIterator{ return &memorySeriesIterator{
mtx: &s.mtx, lock: lockFunc,
unlock: unlockFunc,
chunks: chunks, chunks: chunks,
} }
} }
@ -389,8 +479,8 @@ func (s *memorySeries) lastTime() clientmodel.Timestamp {
// GetValueAtTime implements SeriesIterator. // GetValueAtTime implements SeriesIterator.
func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values { func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.Values {
it.mtx.Lock() it.lock()
defer it.mtx.Unlock() defer it.unlock()
// The most common case. We are iterating through a chunk. // The most common case. We are iterating through a chunk.
if it.chunkIt != nil && it.chunkIt.contains(t) { if it.chunkIt != nil && it.chunkIt.contains(t) {
@ -437,8 +527,8 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V
// GetBoundaryValues implements SeriesIterator. // GetBoundaryValues implements SeriesIterator.
func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values { func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Values {
it.mtx.Lock() it.lock()
defer it.mtx.Unlock() defer it.unlock()
// Find the first relevant chunk. // Find the first relevant chunk.
i := sort.Search(len(it.chunks), func(i int) bool { i := sort.Search(len(it.chunks), func(i int) bool {
@ -487,8 +577,8 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val
// GetRangeValues implements SeriesIterator. // GetRangeValues implements SeriesIterator.
func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values { func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values {
it.mtx.Lock() it.lock()
defer it.mtx.Unlock() defer it.unlock()
// Find the first relevant chunk. // Find the first relevant chunk.
i := sort.Search(len(it.chunks), func(i int) bool { i := sort.Search(len(it.chunks), func(i int) bool {

View file

@ -16,12 +16,12 @@ package local
import ( import (
"fmt" "fmt"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
clientmodel "github.com/prometheus/client_golang/model" clientmodel "github.com/prometheus/client_golang/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/storage/metric" "github.com/prometheus/prometheus/storage/metric"
) )
@ -36,13 +36,12 @@ const (
) )
type memorySeriesStorage struct { type memorySeriesStorage struct {
mtx sync.RWMutex fpLocker *fingerprintLocker
state storageState
persistDone chan bool persistDone chan bool
stopServing chan chan<- bool stopServing chan chan<- bool
fingerprintToSeries SeriesMap fingerprintToSeries seriesMap
memoryEvictionInterval time.Duration memoryEvictionInterval time.Duration
memoryRetentionPeriod time.Duration memoryRetentionPeriod time.Duration
@ -51,16 +50,16 @@ type memorySeriesStorage struct {
persistenceRetentionPeriod time.Duration persistenceRetentionPeriod time.Duration
persistQueue chan *persistRequest persistQueue chan *persistRequest
persistence Persistence persistence *persistence
} }
// MemorySeriesStorageOptions contains options needed by // MemorySeriesStorageOptions contains options needed by
// NewMemorySeriesStorage. It is not safe to leave any of those at their zero // NewMemorySeriesStorage. It is not safe to leave any of those at their zero
// values. // values.
type MemorySeriesStorageOptions struct { type MemorySeriesStorageOptions struct {
Persistence Persistence // Used to persist storage content across restarts.
MemoryEvictionInterval time.Duration // How often to check for memory eviction. MemoryEvictionInterval time.Duration // How often to check for memory eviction.
MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory. MemoryRetentionPeriod time.Duration // Chunks at least that old are evicted from memory.
PersistenceStoragePath string // Location of persistence files.
PersistencePurgeInterval time.Duration // How often to check for purging. PersistencePurgeInterval time.Duration // How often to check for purging.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged. PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
} }
@ -68,14 +67,20 @@ type MemorySeriesStorageOptions struct {
// NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still
// has to be called to start the storage. // has to be called to start the storage.
func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) { func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
glog.Info("Loading series map and head chunks...") p, err := newPersistence(o.PersistenceStoragePath, 1024)
fingerprintToSeries, err := o.Persistence.LoadSeriesMapAndHeads()
if err != nil { if err != nil {
return nil, err return nil, err
} }
numSeries.Set(float64(len(fingerprintToSeries))) glog.Info("Loading series map and head chunks...")
fingerprintToSeries, err := p.loadSeriesMapAndHeads()
if err != nil {
return nil, err
}
numSeries.Set(float64(fingerprintToSeries.length()))
return &memorySeriesStorage{ return &memorySeriesStorage{
fpLocker: newFingerprintLocker(100), // TODO: Tweak value.
fingerprintToSeries: fingerprintToSeries, fingerprintToSeries: fingerprintToSeries,
persistDone: make(chan bool), persistDone: make(chan bool),
stopServing: make(chan chan<- bool), stopServing: make(chan chan<- bool),
@ -87,7 +92,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
persistenceRetentionPeriod: o.PersistenceRetentionPeriod, persistenceRetentionPeriod: o.PersistenceRetentionPeriod,
persistQueue: make(chan *persistRequest, persistQueueCap), persistQueue: make(chan *persistRequest, persistQueueCap),
persistence: o.Persistence, persistence: p,
}, nil }, nil
} }
@ -106,14 +111,10 @@ func (s *memorySeriesStorage) AppendSamples(samples clientmodel.Samples) {
} }
func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) { func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state != storageServing {
panic("storage is not serving")
}
fp := sample.Metric.Fingerprint() fp := sample.Metric.Fingerprint()
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series := s.getOrCreateSeries(fp, sample.Metric) series := s.getOrCreateSeries(fp, sample.Metric)
series.add(fp, &metric.SamplePair{ series.add(fp, &metric.SamplePair{
Value: sample.Value, Value: sample.Value,
@ -122,37 +123,27 @@ func (s *memorySeriesStorage) appendSample(sample *clientmodel.Sample) {
} }
func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries { func (s *memorySeriesStorage) getOrCreateSeries(fp clientmodel.Fingerprint, m clientmodel.Metric) *memorySeries {
series, ok := s.fingerprintToSeries[fp] series, ok := s.fingerprintToSeries.get(fp)
if !ok { if !ok {
series = newMemorySeries(m) unarchived, err := s.persistence.unarchiveMetric(fp)
s.fingerprintToSeries[fp] = series
numSeries.Set(float64(len(s.fingerprintToSeries)))
unarchived, err := s.persistence.UnarchiveMetric(fp)
if err != nil { if err != nil {
glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err) glog.Errorf("Error unarchiving fingerprint %v: %v", fp, err)
} }
if !unarchived {
if unarchived {
// The series existed before, had been archived at some
// point, and has now been unarchived, i.e. it has
// chunks on disk. Set chunkDescsLoaded accordingly so
// that they will be looked at later. Also, an
// unarchived series comes with a persisted head chunk.
series.chunkDescsLoaded = false
series.headChunkPersisted = true
} else {
// This was a genuinely new series, so index the metric. // This was a genuinely new series, so index the metric.
s.persistence.IndexMetric(m, fp) s.persistence.indexMetric(m, fp)
} }
series = newMemorySeries(m, !unarchived)
s.fingerprintToSeries.put(fp, series)
numSeries.Set(float64(s.fingerprintToSeries.length()))
} }
return series return series
} }
/* /*
func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) { func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts clientmodel.Timestamp) (chunkDescs, error) {
series, ok := s.fingerprintToSeries[fp] series, ok := s.fingerprintToSeries.get(fp)
if !ok { if !ok {
panic("requested preload for non-existent series") panic("requested preload for non-existent series")
} }
@ -162,13 +153,12 @@ func (s *memorySeriesStorage) preloadChunksAtTime(fp clientmodel.Fingerprint, ts
func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) { func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint, from clientmodel.Timestamp, through clientmodel.Timestamp) (chunkDescs, error) {
stalenessDelta := 300 * time.Second // TODO: Turn into parameter. stalenessDelta := 300 * time.Second // TODO: Turn into parameter.
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
s.mtx.RLock() series, ok := s.fingerprintToSeries.get(fp)
series, ok := s.fingerprintToSeries[fp]
s.mtx.RUnlock()
if !ok { if !ok {
has, first, last, err := s.persistence.HasArchivedMetric(fp) has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -176,13 +166,11 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint,
return nil, fmt.Errorf("requested preload for non-existent series %v", fp) return nil, fmt.Errorf("requested preload for non-existent series %v", fp)
} }
if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) { if from.Add(-stalenessDelta).Before(last) && through.Add(stalenessDelta).After(first) {
metric, err := s.persistence.GetArchivedMetric(fp) metric, err := s.persistence.getArchivedMetric(fp)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.mtx.Lock()
series = s.getOrCreateSeries(fp, metric) series = s.getOrCreateSeries(fp, metric)
defer s.mtx.Unlock() // Ugh.
} }
} }
return series.preloadChunksForRange(from, through, s.persistence) return series.preloadChunksForRange(from, through, s.persistence)
@ -190,58 +178,37 @@ func (s *memorySeriesStorage) preloadChunksForRange(fp clientmodel.Fingerprint,
// NewIterator implements storage. // NewIterator implements storage.
func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator { func (s *memorySeriesStorage) NewIterator(fp clientmodel.Fingerprint) SeriesIterator {
s.mtx.RLock() s.fpLocker.Lock(fp)
series, ok := s.fingerprintToSeries[fp] defer s.fpLocker.Unlock(fp)
s.mtx.RUnlock()
series, ok := s.fingerprintToSeries.get(fp)
if !ok { if !ok {
// TODO: Could this legitimately happen? Series just got purged?
panic("requested iterator for non-existent series") panic("requested iterator for non-existent series")
} }
return series.newIterator() return series.newIterator(
func() { s.fpLocker.Lock(fp) },
func() { s.fpLocker.Unlock(fp) },
)
} }
func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) { func (s *memorySeriesStorage) evictMemoryChunks(ttl time.Duration) {
fpsToArchive := []clientmodel.Fingerprint{}
defer func(begin time.Time) { defer func(begin time.Time) {
evictionDuration.Set(float64(time.Since(begin) / time.Millisecond)) evictionDuration.Set(float64(time.Since(begin) / time.Millisecond))
}(time.Now()) }(time.Now())
s.mtx.RLock() for m := range s.fingerprintToSeries.iter() {
for fp, series := range s.fingerprintToSeries { s.fpLocker.Lock(m.fp)
if series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) { if m.series.evictOlderThan(clientmodel.TimestampFromTime(time.Now()).Add(-1 * ttl)) {
fpsToArchive = append(fpsToArchive, fp) m.series.persistHeadChunk(m.fp, s.persistQueue)
series.persistHeadChunk(fp, s.persistQueue) s.fingerprintToSeries.del(m.fp)
if err := s.persistence.archiveMetric(
m.fp, m.series.metric, m.series.firstTime(), m.series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", m.series.metric, err)
}
} }
} s.fpLocker.Unlock(m.fp)
s.mtx.RUnlock()
if len(fpsToArchive) == 0 {
return
}
// If we are here, we have metrics to archive. For that, we need the write lock.
s.mtx.Lock()
defer s.mtx.Unlock()
for _, fp := range fpsToArchive {
series, ok := s.fingerprintToSeries[fp]
if !ok {
// Oops, perhaps another evict run happening in parallel?
continue
}
// TODO: Need series lock (or later FP lock)?
if !series.headChunkPersisted {
// Oops. The series has received new samples all of a
// sudden, giving it a new head chunk. Leave it alone.
continue
}
if err := s.persistence.ArchiveMetric(
fp, series.metric, series.firstTime(), series.lastTime(),
); err != nil {
glog.Errorf("Error archiving metric %v: %v", series.metric, err)
}
delete(s.fingerprintToSeries, fp)
} }
} }
@ -255,12 +222,11 @@ func recordPersist(start time.Time, err error) {
func (s *memorySeriesStorage) handlePersistQueue() { func (s *memorySeriesStorage) handlePersistQueue() {
for req := range s.persistQueue { for req := range s.persistQueue {
// TODO: Make this thread-safe?
persistQueueLength.Set(float64(len(s.persistQueue))) persistQueueLength.Set(float64(len(s.persistQueue)))
//glog.Info("Persist request: ", *req.fingerprint) //glog.Info("Persist request: ", *req.fingerprint)
start := time.Now() start := time.Now()
err := s.persistence.PersistChunk(req.fingerprint, req.chunkDesc.chunk) err := s.persistence.persistChunk(req.fingerprint, req.chunkDesc.chunk)
recordPersist(start, err) recordPersist(start, err)
if err != nil { if err != nil {
glog.Error("Error persisting chunk, requeuing: ", err) glog.Error("Error persisting chunk, requeuing: ", err)
@ -275,16 +241,9 @@ func (s *memorySeriesStorage) handlePersistQueue() {
// Close stops serving, flushes all pending operations, and frees all // Close stops serving, flushes all pending operations, and frees all
// resources. It implements Storage. // resources. It implements Storage.
func (s *memorySeriesStorage) Close() error { func (s *memorySeriesStorage) Close() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.state == storageStopping {
panic("Illegal State: Attempted to restop memorySeriesStorage.")
}
stopped := make(chan bool) stopped := make(chan bool)
glog.Info("Waiting for storage to stop serving...") glog.Info("Waiting for storage to stop serving...")
s.stopServing <- (stopped) s.stopServing <- stopped
glog.Info("Serving stopped.") glog.Info("Serving stopped.")
<-stopped <-stopped
@ -294,17 +253,14 @@ func (s *memorySeriesStorage) Close() error {
glog.Info("Persist loop stopped.") glog.Info("Persist loop stopped.")
glog.Info("Persisting head chunks...") glog.Info("Persisting head chunks...")
if err := s.persistence.PersistSeriesMapAndHeads(s.fingerprintToSeries); err != nil { if err := s.persistence.persistSeriesMapAndHeads(s.fingerprintToSeries); err != nil {
return err return err
} }
glog.Info("Done persisting head chunks.") glog.Info("Done persisting head chunks.")
s.fingerprintToSeries = nil if err := s.persistence.close(); err != nil {
if err := s.persistence.Close(); err != nil {
return err return err
} }
s.state = storageStopping
return nil return nil
} }
@ -318,29 +274,25 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
return return
case <-purgeTicker.C: case <-purgeTicker.C:
glog.Info("Purging old series data...") glog.Info("Purging old series data...")
s.mtx.RLock() ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod)
begin := time.Now() begin := time.Now()
fps := make([]clientmodel.Fingerprint, 0, len(s.fingerprintToSeries)) for fp := range s.fingerprintToSeries.fpIter() {
for fp := range s.fingerprintToSeries { select {
fps = append(fps, fp) case <-stop:
glog.Info("Interrupted running series purge.")
return
default:
s.purgeSeries(fp, ts)
}
} }
s.mtx.RUnlock()
ts := clientmodel.TimestampFromTime(time.Now()).Add(-1 * s.persistenceRetentionPeriod) persistedFPs, err := s.persistence.getFingerprintsModifiedBefore(ts)
// TODO: If we decide not to remove entries from the timerange disk index
// upon unarchival, we could remove the memory copy above and only use
// the fingerprints from the disk index.
persistedFPs, err := s.persistence.GetFingerprintsModifiedBefore(ts)
if err != nil { if err != nil {
glog.Error("Failed to lookup persisted fingerprint ranges: ", err) glog.Error("Failed to lookup persisted fingerprint ranges: ", err)
break break
} }
fps = append(fps, persistedFPs...) for _, fp := range persistedFPs {
for _, fp := range fps {
select { select {
case <-stop: case <-stop:
glog.Info("Interrupted running series purge.") glog.Info("Interrupted running series purge.")
@ -362,33 +314,20 @@ func (s *memorySeriesStorage) purgePeriodically(stop <-chan bool) {
// series. If the series contains no chunks after the purge, it is dropped // series. If the series contains no chunks after the purge, it is dropped
// entirely. // entirely.
func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) { func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime clientmodel.Timestamp) {
s.mtx.Lock() s.fpLocker.Lock(fp)
// TODO: This is a lock FAR to coarse! However, we cannot lock using the defer s.fpLocker.Unlock(fp)
// memorySeries since we might have none (for series that are on disk
// only). And we really don't want to un-archive a series from disk
// while we are at the same time purging it. A locking per fingerprint
// would be nice. Or something... Have to think about it... Careful,
// more race conditions lurk below. Also unsolved: If there are chunks
// in the persist queue. persistence.DropChunks and
// persistence.PersistChunck needs to be locked on fp level, or
// something. And even then, what happens if everything is dropped, but
// there are still chunks hung in the persist queue? They would later
// re-create a file for a series that doesn't exist anymore... But
// there is the ref count, which is one higher if you have not yet
// persisted the chunk.
defer s.mtx.Unlock()
// First purge persisted chunks. We need to do that anyway. // First purge persisted chunks. We need to do that anyway.
allDropped, err := s.persistence.DropChunks(fp, beforeTime) allDropped, err := s.persistence.dropChunks(fp, beforeTime)
if err != nil { if err != nil {
glog.Error("Error purging persisted chunks: ", err) glog.Error("Error purging persisted chunks: ", err)
} }
// Purge chunks from memory accordingly. // Purge chunks from memory accordingly.
if series, ok := s.fingerprintToSeries[fp]; ok { if series, ok := s.fingerprintToSeries.get(fp); ok {
if series.purgeOlderThan(beforeTime) && allDropped { if series.purgeOlderThan(beforeTime) && allDropped {
delete(s.fingerprintToSeries, fp) s.fingerprintToSeries.del(fp)
s.persistence.UnindexMetric(series.metric, fp) s.persistence.unindexMetric(series.metric, fp)
} }
return return
} }
@ -399,20 +338,13 @@ func (s *memorySeriesStorage) purgeSeries(fp clientmodel.Fingerprint, beforeTime
if !allDropped { if !allDropped {
return return
} }
if err := s.persistence.DropArchivedMetric(fp); err != nil { if err := s.persistence.dropArchivedMetric(fp); err != nil {
glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err) glog.Errorf("Error dropping archived metric for fingerprint %v: %v", fp, err)
} }
} }
// Serve implements Storage. // Serve implements Storage.
func (s *memorySeriesStorage) Serve(started chan<- bool) { func (s *memorySeriesStorage) Serve(started chan<- bool) {
s.mtx.Lock()
if s.state != storageStarting {
panic("Illegal State: Attempted to restart memorySeriesStorage.")
}
s.state = storageServing
s.mtx.Unlock()
evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval) evictMemoryTicker := time.NewTicker(s.memoryEvictionInterval)
defer evictMemoryTicker.Stop() defer evictMemoryTicker.Stop()
@ -443,16 +375,12 @@ func (s *memorySeriesStorage) NewPreloader() Preloader {
// GetFingerprintsForLabelMatchers implements Storage. // GetFingerprintsForLabelMatchers implements Storage.
func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints { func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metric.LabelMatchers) clientmodel.Fingerprints {
// TODO: Is this lock needed?
s.mtx.RLock()
defer s.mtx.RUnlock()
var result map[clientmodel.Fingerprint]struct{} var result map[clientmodel.Fingerprint]struct{}
for _, matcher := range labelMatchers { for _, matcher := range labelMatchers {
intersection := map[clientmodel.Fingerprint]struct{}{} intersection := map[clientmodel.Fingerprint]struct{}{}
switch matcher.Type { switch matcher.Type {
case metric.Equal: case metric.Equal:
fps, err := s.persistence.GetFingerprintsForLabelPair( fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{ metric.LabelPair{
Name: matcher.Name, Name: matcher.Name,
Value: matcher.Value, Value: matcher.Value,
@ -470,7 +398,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
} }
} }
default: default:
values, err := s.persistence.GetLabelValuesForLabelName(matcher.Name) values, err := s.persistence.getLabelValuesForLabelName(matcher.Name)
if err != nil { if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err) glog.Errorf("Error getting label values for label name %q: %v", matcher.Name, err)
} }
@ -479,7 +407,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
return nil return nil
} }
for _, v := range matches { for _, v := range matches {
fps, err := s.persistence.GetFingerprintsForLabelPair( fps, err := s.persistence.getFingerprintsForLabelPair(
metric.LabelPair{ metric.LabelPair{
Name: matcher.Name, Name: matcher.Name,
Value: v, Value: v,
@ -510,11 +438,7 @@ func (s *memorySeriesStorage) GetFingerprintsForLabelMatchers(labelMatchers metr
// GetLabelValuesForLabelName implements Storage. // GetLabelValuesForLabelName implements Storage.
func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues { func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.LabelName) clientmodel.LabelValues {
// TODO: Is this lock needed? lvs, err := s.persistence.getLabelValuesForLabelName(labelName)
s.mtx.RLock()
defer s.mtx.RUnlock()
lvs, err := s.persistence.GetLabelValuesForLabelName(labelName)
if err != nil { if err != nil {
glog.Errorf("Error getting label values for label name %q: %v", labelName, err) glog.Errorf("Error getting label values for label name %q: %v", labelName, err)
} }
@ -523,10 +447,10 @@ func (s *memorySeriesStorage) GetLabelValuesForLabelName(labelName clientmodel.L
// GetMetricForFingerprint implements Storage. // GetMetricForFingerprint implements Storage.
func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric { func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint) clientmodel.Metric {
s.mtx.RLock() s.fpLocker.Lock(fp)
defer s.mtx.RUnlock() defer s.fpLocker.Unlock(fp)
series, ok := s.fingerprintToSeries[fp] series, ok := s.fingerprintToSeries.get(fp)
if ok { if ok {
// Copy required here because caller might mutate the returned // Copy required here because caller might mutate the returned
// metric. // metric.
@ -536,9 +460,19 @@ func (s *memorySeriesStorage) GetMetricForFingerprint(fp clientmodel.Fingerprint
} }
return m return m
} }
metric, err := s.persistence.GetArchivedMetric(fp) metric, err := s.persistence.getArchivedMetric(fp)
if err != nil { if err != nil {
glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err) glog.Errorf("Error retrieving archived metric for fingerprint %v: %v", fp, err)
} }
return metric return metric
} }
// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)
}
// Collect implements prometheus.Collector.
func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {
s.persistence.Collect(ch)
}

View file

@ -37,8 +37,8 @@ func TestChunk(t *testing.T) {
s.AppendSamples(samples) s.AppendSamples(samples)
for _, s := range s.(*memorySeriesStorage).fingerprintToSeries { for m := range s.(*memorySeriesStorage).fingerprintToSeries.iter() {
for i, v := range s.values() { for i, v := range m.series.values() {
if samples[i].Timestamp != v.Timestamp { if samples[i].Timestamp != v.Timestamp {
t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp) t.Fatalf("%d. Got %v; want %v", i, v.Timestamp, samples[i].Timestamp)
} }

View file

@ -35,16 +35,12 @@ func (t *testStorageCloser) Close() {
// returned test.Closer, the temporary directory is cleaned up. // returned test.Closer, the temporary directory is cleaned up.
func NewTestStorage(t testing.TB) (Storage, test.Closer) { func NewTestStorage(t testing.TB) (Storage, test.Closer) {
directory := test.NewTemporaryDirectory("test_storage", t) directory := test.NewTemporaryDirectory("test_storage", t)
persistence, err := NewDiskPersistence(directory.Path(), 1024)
if err != nil {
t.Fatal("Error opening disk persistence: ", err)
}
o := &MemorySeriesStorageOptions{ o := &MemorySeriesStorageOptions{
Persistence: persistence,
MemoryEvictionInterval: time.Minute, MemoryEvictionInterval: time.Minute,
MemoryRetentionPeriod: time.Hour, MemoryRetentionPeriod: time.Hour,
PersistencePurgeInterval: time.Hour, PersistencePurgeInterval: time.Hour,
PersistenceRetentionPeriod: 24 * 7 * time.Hour, PersistenceRetentionPeriod: 24 * 7 * time.Hour,
PersistenceStoragePath: directory.Path(),
} }
storage, err := NewMemorySeriesStorage(o) storage, err := NewMemorySeriesStorage(o)
if err != nil { if err != nil {