This commit is contained in:
Fabian Reinartz 2016-12-09 16:54:38 +01:00
parent 74f8dfd95d
commit 62f9dc311c
3 changed files with 64 additions and 43 deletions

View file

@ -5,7 +5,6 @@ package tsdb
// "fmt"
// "time"
// "github.com/fabxc/tsdb/index"
// "github.com/prometheus/common/model"
// "github.com/prometheus/prometheus/storage/local"
// "github.com/prometheus/prometheus/storage/metric"
@ -13,33 +12,37 @@ package tsdb
// )
// type DefaultSeriesIterator struct {
// s Series
// it SeriesIterator
// }
// func (it *DefaultSeriesIterator) ValueAtOrBeforeTime(ts model.Time) model.SamplePair {
// sp, ok := it.it.Seek(ts)
// ok := it.it.Seek(int64(ts))
// if !ok {
// return model.SamplePair{Timestamp: model.Earliest}
// }
// return sp
// t, v := it.it.Values()
// return model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)}
// }
// func (it *DefaultSeriesIterator) Metric() metric.Metric {
// m := it.it.Metric()
// met := make(model.Metric, len(m))
// for k, v := range m {
// met[model.LabelName(k)] = model.LabelValue(v)
// ls := it.s.Labels()
// met := make(model.Metric, len(ls))
// for _, l := range ls {
// met[model.LabelName(l.Name)] = model.LabelValue(l.Value)
// }
// return metric.Metric{Metric: met, Copied: true}
// }
// func (it *DefaultSeriesIterator) RangeValues(interval metric.Interval) []model.SamplePair {
// var res []model.SamplePair
// for sp, ok := it.it.Seek(interval.NewestInclusive); ok; sp, ok = it.it.Next() {
// if sp.Timestamp > interval.OldestInclusive {
// for ok := it.it.Seek(int64(interval.NewestInclusive)); ok; ok = it.it.Next() {
// t, v := it.it.Values()
// if model.Time(t) > interval.OldestInclusive {
// break
// }
// res = append(res, sp)
// res = append(res, model.SamplePair{Timestamp: model.Time(t), Value: model.SampleValue(v)})
// }
// return res
// }
@ -80,12 +83,15 @@ package tsdb
// // indexed. Indexing is needed for FingerprintsForLabelMatchers and
// // LabelValuesForLabelName and may lag behind.
// func (da *DefaultAdapter) WaitForIndexing() {
// da.db.indexer.wait()
// }
// func (da *DefaultAdapter) Append(s *model.Sample) error {
// labels := make([]Label, len(s.Metric))
// for k, v := range s.Metric {
// labels = append(labels, Label{Name: string(k), Value: string(v)})
// }
// // Ignore the Scrape batching for now.
// return da.db.memChunks.append(s.Metric, s.Timestamp, s.Value)
// return da.db.appendSingle(labels, int64(s.Timestamp), float64(s.Value))
// }
// func (da *DefaultAdapter) NeedsThrottling() bool {
@ -93,15 +99,16 @@ package tsdb
// }
// func (da *DefaultAdapter) Querier() (local.Querier, error) {
// q, err := da.db.Querier()
// if err != nil {
// return nil, err
// }
// // q, err := da.db.Querier()
// // if err != nil {
// // return nil, err
// // }
// return defaultQuerierAdapter{q: q}, nil
// }
// type defaultQuerierAdapter struct {
// q *Querier
// q Querier
// }
// func (da defaultQuerierAdapter) Close() error {

View file

@ -48,11 +48,11 @@ const (
)
const (
seriesMetaSize = int(unsafe.Sizeof(seriesMeta{}))
metaSize = int(unsafe.Sizeof(meta{}))
seriesStatsSize = int(unsafe.Sizeof(blockStats{}))
)
type seriesMeta struct {
type meta struct {
magic uint32
flag byte
_ [7]byte // padding/reserved
@ -64,19 +64,19 @@ type blockStats struct {
_ [4]byte // padding/reserved
}
func (s *persistedSeries) meta() *seriesMeta {
return (*seriesMeta)(unsafe.Pointer(&s.data[0]))
func (s *persistedSeries) meta() *meta {
return (*meta)(unsafe.Pointer(&s.data[0]))
}
func (s *persistedSeries) stats() *blockStats {
// The stats start right behind the block meta data.
return (*blockStats)(unsafe.Pointer(&s.data[seriesMetaSize]))
return (*blockStats)(unsafe.Pointer(&s.data[metaSize]))
}
// seriesAt returns the series stored at offset as a skiplist and the chunks
// it points to as a byte slice.
func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) {
offset += seriesMetaSize
offset += metaSize
offset += seriesStatsSize
switch b := s.data[offset]; b {
@ -157,8 +157,8 @@ func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) {
// However, we'll have to pick correct endianness for the unsafe casts to work
// when reading again. That and the added slowness due to reflection seem to make
// it somewhat pointless.
meta := &seriesMeta{magic: magicSeries, flag: flagStd}
metab := ((*[seriesMetaSize]byte)(unsafe.Pointer(meta)))[:]
meta := &meta{magic: magicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
m, err := w.Write(metab)
if err != nil {
@ -205,3 +205,22 @@ func (bw *blockWriter) writeSeries(ow io.Writer) (n int64, err error) {
m, err = ow.Write(h.Sum(nil))
return n + int64(m), err
}
func (bw *blockWriter) writeIndex(ow io.Writer) (n int64, err error) {
// Duplicate all writes through a CRC64 hash writer.
h := crc64.New(crc64.MakeTable(crc64.ECMA))
w := io.MultiWriter(h, ow)
meta := &meta{magic: magicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
m, err := w.Write(metab)
if err != nil {
return n + int64(m), err
}
n += int64(m)
// Write checksum to the original writer.
m, err = ow.Write(h.Sum(nil))
return n + int64(m), err
}

31
db.go
View file

@ -108,6 +108,17 @@ func (db *DB) AppendVector(ts int64, v *Vector) error {
return nil
}
func (db *DB) appendSingle(lset Labels, ts int64, v float64) error {
h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift))
return db.shards[s].appendBatch(ts, Sample{
Hash: h,
Labels: lset,
Value: v,
})
}
// Matcher matches a string.
type Matcher interface {
// Match returns true if the matcher applies to the string value.
@ -142,8 +153,7 @@ type Querier interface {
// Series represents a single time series.
type Series interface {
// LabelsRef returns the label set reference
LabelRefs() LabelRefs
Labels() Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
@ -192,24 +202,9 @@ func NewSeriesShard(path string, logger log.Logger) *SeriesShard {
// Use actual time for now.
s.head = NewHeadBlock(time.Now().UnixNano() / int64(time.Millisecond))
go s.run()
return s
}
func (s *SeriesShard) run() {
// for {
// select {
// case <-s.done:
// return
// case <-s.persistCh:
// if err := s.persist(); err != nil {
// s.logger.With("err", err).Error("persistence failed")
// }
// }
// }
}
// Close the series shard.
func (s *SeriesShard) Close() error {
close(s.done)
@ -241,9 +236,9 @@ func (s *SeriesShard) appendBatch(ts int64, samples []Sample) error {
// TODO(fabxc): randomize over time
if s.head.stats().samples/uint64(s.head.stats().chunks) > 400 {
s.persist()
select {
case s.persistCh <- struct{}{}:
s.logger.Debug("trigger persistence")
go s.persist()
default:
}