This commit is contained in:
Fabian Reinartz 2016-12-09 10:00:14 +01:00
parent 2c34a15fe6
commit 8aa99a3ebd
6 changed files with 240 additions and 129 deletions

View file

@ -23,6 +23,10 @@ type block interface {
seriesData() seriesDataIterator
}
type persistedBlock struct {
}
type seriesDataIterator interface {
next() bool
values() (skiplist, []chunks.Chunk)
@ -33,12 +37,10 @@ func compactBlocks(a, b block) error {
return nil
}
const maxMmapSize = 1 << 20
type persistedSeries struct {
size int
dataref []byte
data *[maxMmapSize]byte
data *[maxMapSize]byte
}
const (
@ -72,6 +74,32 @@ func (s *persistedSeries) stats() *seriesStats {
return (*seriesStats)(unsafe.Pointer(&s.data[seriesMetaSize]))
}
// seriesAt returns the series stored at offset as a skiplist and the chunks
// it points to as a byte slice.
func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) {
offset += seriesMetaSize
offset += seriesStatsSize
switch b := s.data[offset]; b {
case flagStd:
default:
return nil, nil, fmt.Errorf("invalid flag: %x", b)
}
offset++
var (
slLen = *(*uint16)(unsafe.Pointer(&s.data[offset]))
slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{}))
sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize]
)
offset += 3
chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset]))
chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen]
return simpleSkiplist(sl), chunks, nil
}
// A skiplist maps offsets to values. The values found in the data at an
// offset are strictly greater than the indexed value.
type skiplist interface {
@ -115,32 +143,6 @@ func (sl simpleSkiplist) WriteTo(w io.Writer) (n int64, err error) {
return n, err
}
// seriesAt returns the series stored at offset as a skiplist and the chunks
// it points to as a byte slice.
func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) {
offset += seriesMetaSize
offset += seriesStatsSize
switch b := s.data[offset]; b {
case flagStd:
default:
return nil, nil, fmt.Errorf("invalid flag: %x", b)
}
offset++
var (
slLen = *(*uint16)(unsafe.Pointer(&s.data[offset]))
slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{}))
sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize]
)
offset += 3
chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset]))
chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen]
return simpleSkiplist(sl), chunks, nil
}
type blockWriter struct {
block block
}

View file

@ -3,14 +3,14 @@ build:
bench_default: build
@echo ">> running benchmark"
@./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.100k
@./tsdb bench write --out=benchout/default --engine=default --metrics=$(NUM_METRICS) testdata.1m
@go tool pprof -svg ./tsdb benchout/default/cpu.prof > benchout/default/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/default/mem.prof > benchout/default/memprof.svg
@go tool pprof -svg ./tsdb benchout/default/block.prof > benchout/default/blockprof.svg
bench_tsdb: build
@echo ">> running benchmark"
@./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.100k
@./tsdb bench write --out=benchout/tsdb --engine=tsdb --metrics=$(NUM_METRICS) testdata.1m
@go tool pprof -svg ./tsdb benchout/tsdb/cpu.prof > benchout/tsdb/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/tsdb/mem.prof > benchout/tsdb/memprof.svg
@go tool pprof -svg ./tsdb benchout/tsdb/block.prof > benchout/tsdb/blockprof.svg

View file

@ -4,7 +4,6 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"runtime"
@ -194,11 +193,11 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount
}
for i := 0; i < scrapeCount; i++ {
ts = ts + int64(i*10000)
ts += int64(10000)
sc.Reset()
for _, s := range scrape {
s.value += rand.Int63n(1000)
s.value += 1000
sc.Add(s.labels, float64(s.value))
}
if err := b.storage.ingestScrape(ts, &sc); err != nil {

155
db.go
View file

@ -14,14 +14,15 @@ import (
"github.com/prometheus/common/log"
)
// DefaultOptions used for the DB.
// DefaultOptions used for the DB. They are sane for setups using
// millisecond precision timestamps.
var DefaultOptions = &Options{
StalenessDelta: 5 * time.Minute,
Retention: 15 * 24 * 3600 * 1000, // 15 days
}
// Options of the DB storage.
type Options struct {
StalenessDelta time.Duration
Retention int64
}
// DB is a time series storage.
@ -59,7 +60,8 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
// TODO(fabxc): validate shard number to be power of 2, which is required
// for the bitshift-modulo when finding the right shard.
for i := 0; i < numSeriesShards; i++ {
c.shards = append(c.shards, NewSeriesShard())
path := filepath.Join(path, fmt.Sprintf("%d", i))
c.shards = append(c.shards, NewSeriesShard(path))
}
// TODO(fabxc): run background compaction + GC.
@ -69,26 +71,40 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
// Close the database.
func (db *DB) Close() error {
var wg sync.WaitGroup
start := time.Now()
for i, shard := range db.shards {
fmt.Println("shard", i)
fmt.Println(" num chunks", len(shard.head.forward))
fmt.Println(" num samples", shard.head.samples)
f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i)))
if err != nil {
return err
}
bw := &blockWriter{block: shard.head}
n, err := bw.writeSeries(f)
if err != nil {
return err
}
fmt.Println(" wrote bytes", n)
wg.Add(1)
go func(i int, shard *SeriesShard) {
f, err := os.Create(filepath.Join(db.path, fmt.Sprintf("shard-%d-series", i)))
if err != nil {
panic(err)
}
bw := &blockWriter{block: shard.head}
n, err := bw.writeSeries(f)
if err != nil {
panic(err)
}
fmt.Println(" wrote bytes", n)
if err := f.Sync(); err != nil {
panic(err)
}
if err := f.Close(); err != nil {
return err
}
if err := f.Close(); err != nil {
panic(err)
}
wg.Done()
}(i, shard)
}
wg.Wait()
fmt.Println("final serialization took", time.Since(start))
return nil
}
@ -152,6 +168,62 @@ type SeriesIterator interface {
Err() error
}
const sep = '\xff'
// SeriesShard handles reads and writes of time series falling into
// a hashed shard of a series.
type SeriesShard struct {
path string
mtx sync.RWMutex
blocks *Block
head *HeadBlock
}
// NewSeriesShard returns a new SeriesShard.
func NewSeriesShard(path string) *SeriesShard {
return &SeriesShard{
path: path,
// TODO(fabxc): restore from checkpoint.
head: &HeadBlock{
ivIndex: newMemIndex(),
descs: map[uint64][]*chunkDesc{},
values: map[string]stringset{},
forward: map[uint32]*chunkDesc{},
},
// TODO(fabxc): provide access to persisted blocks.
}
}
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
lset Labels
chunk chunks.Chunk
// Caching fields.
lastTimestamp int64
lastValue float64
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts int64, v float64) (err error) {
if cd.app == nil {
cd.app, err = cd.chunk.Appender()
if err != nil {
return err
}
}
if err := cd.app.Append(ts, v); err != nil {
return err
}
cd.lastTimestamp = ts
cd.lastValue = v
return nil
}
// LabelRefs contains a reference to a label set that can be resolved
// against a Querier.
type LabelRefs struct {
@ -300,52 +372,3 @@ func (db *DB) AppendVector(ts int64, v *Vector) error {
return nil
}
const sep = '\xff'
// SeriesShard handles reads and writes of time series falling into
// a hashed shard of a series.
type SeriesShard struct {
mtx sync.RWMutex
blocks *Block
head *HeadBlock
}
// NewSeriesShard returns a new SeriesShard.
func NewSeriesShard() *SeriesShard {
return &SeriesShard{
// TODO(fabxc): restore from checkpoint.
head: &HeadBlock{
index: newMemIndex(),
descs: map[uint64][]*chunkDesc{},
values: map[string][]string{},
forward: map[uint32]*chunkDesc{},
},
// TODO(fabxc): provide access to persisted blocks.
}
}
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
lset Labels
chunk chunks.Chunk
// Caching fields.
lastTimestamp int64
lastValue float64
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts int64, v float64) (err error) {
if cd.app == nil {
cd.app, err = cd.chunk.Appender()
if err != nil {
return err
}
}
cd.lastTimestamp = ts
cd.lastValue = v
return cd.app.Append(ts, v)
}

53
db_unix.go Normal file
View file

@ -0,0 +1,53 @@
// +build !windows,!plan9,!solaris
package tsdb
// import (
// "fmt"
// "syscall"
// "unsafe"
// )
// // mmap memory maps a DB's data file.
// func mmap(db *DB, sz int) error {
// // Map the data file to memory.
// b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
// if err != nil {
// return err
// }
// // Advise the kernel that the mmap is accessed randomly.
// if err := madvise(b, syscall.MADV_RANDOM); err != nil {
// return fmt.Errorf("madvise: %s", err)
// }
// // Save the original byte slice and convert to a byte array pointer.
// db.dataref = b
// db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
// db.datasz = sz
// return nil
// }
// // munmap unmaps a DB's data file from memory.
// func munmap(db *DB) error {
// // Ignore the unmap if we have no mapped data.
// if db.dataref == nil {
// return nil
// }
// // Unmap using the original byte slice.
// err := syscall.Munmap(db.dataref)
// db.dataref = nil
// db.data = nil
// db.datasz = 0
// return err
// }
// // NOTE: This function is copied from stdlib because it is not available on darwin.
// func madvise(b []byte, advice int) (err error) {
// _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
// if e1 != 0 {
// err = e1
// }
// return
// }

92
head.go
View file

@ -2,6 +2,8 @@ package tsdb
import (
"math"
"sort"
"strings"
"sync"
"github.com/fabxc/tsdb/chunks"
@ -12,19 +14,19 @@ type HeadBlock struct {
mtx sync.RWMutex
descs map[uint64][]*chunkDesc // labels hash to possible chunks descs
forward map[uint32]*chunkDesc // chunk ID to chunk desc
values map[string][]string // label names to possible values
index *memIndex // inverted index for label pairs
values map[string]stringset // label names to possible values
ivIndex *memIndex // inverted index for label pairs
samples uint64
samples uint64 // total samples in the block.
}
// get retrieves the chunk with the hash and label set and creates
// a new one if it doesn't exist yet.
func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) {
func (h *HeadBlock) get(hash uint64, lset Labels) *chunkDesc {
cds := h.descs[hash]
for _, cd := range cds {
if cd.lset.Equals(lset) {
return cd, false
return cd
}
}
// None of the given chunks was for the series, create a new one.
@ -32,45 +34,53 @@ func (h *HeadBlock) get(hash uint64, lset Labels) (*chunkDesc, bool) {
lset: lset,
chunk: chunks.NewXORChunk(int(math.MaxInt64)),
}
h.index(cd)
h.descs[hash] = append(cds, cd)
return cd, true
return cd
}
// append adds the sample to the headblock. If the series is seen
// for the first time it creates a chunk and index entries for it.
//
// TODO(fabxc): switch to single writer and append queue with optimistic concurrency?
func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error {
chkd, created := h.get(hash, lset)
if created {
// Add each label pair as a term to the inverted index.
terms := make([]string, 0, len(lset))
b := make([]byte, 0, 64)
func (h *HeadBlock) index(chkd *chunkDesc) {
// Add each label pair as a term to the inverted index.
terms := make([]string, 0, len(chkd.lset))
b := make([]byte, 0, 64)
for _, l := range lset {
b = append(b, l.Name...)
b = append(b, sep)
b = append(b, l.Value...)
for _, l := range chkd.lset {
b = append(b, l.Name...)
b = append(b, sep)
b = append(b, l.Value...)
terms = append(terms, string(b))
b = b[:0]
terms = append(terms, string(b))
b = b[:0]
// Add to label name to values index.
valset, ok := h.values[l.Name]
if !ok {
valset = stringset{}
h.values[l.Name] = valset
}
id := h.index.add(terms...)
// Store forward index for the returned ID.
h.forward[id] = chkd
valset.set(l.Value)
}
if err := chkd.append(ts, v); err != nil {
id := h.ivIndex.add(terms...)
// Store forward index for the returned ID.
h.forward[id] = chkd
}
// append adds the sample to the headblock.
func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error {
if err := h.get(hash, lset).append(ts, v); err != nil {
return err
}
h.samples++
return nil
}
func (h *HeadBlock) stats() *seriesStats {
return &seriesStats{}
return &seriesStats{
series: uint32(len(h.forward)),
samples: h.samples,
}
}
func (h *HeadBlock) seriesData() seriesDataIterator {
@ -105,3 +115,27 @@ func (it *chunkDescsIterator) values() (skiplist, []chunks.Chunk) {
func (it *chunkDescsIterator) err() error {
return nil
}
type stringset map[string]struct{}
func (ss stringset) set(s string) {
ss[s] = struct{}{}
}
func (ss stringset) has(s string) bool {
_, ok := ss[s]
return ok
}
func (ss stringset) String() string {
return strings.Join(ss.slice(), ",")
}
func (ss stringset) slice() []string {
slice := make([]string, 0, len(ss))
for k := range ss {
slice = append(slice, k)
}
sort.Strings(slice)
return slice
}