mirror of
https://github.com/prometheus/prometheus.git
synced 2025-03-05 20:59:13 -08:00
Convert persister into function
This commit is contained in:
parent
dbd2b21d2e
commit
a648ef5252
118
compact.go
118
compact.go
|
@ -44,21 +44,18 @@ func (c *compactor) run() {
|
||||||
if len(c.shard.persisted) < 2 {
|
if len(c.shard.persisted) < 2 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dir := fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
|
var (
|
||||||
|
dir = fmt.Sprintf("compacted-%d", timestamp.FromTime(time.Now()))
|
||||||
|
a = c.shard.persisted[0]
|
||||||
|
b = c.shard.persisted[1]
|
||||||
|
)
|
||||||
|
|
||||||
p, err := newPersister(dir)
|
if err := persist(dir, func(indexw IndexWriter, chunkw SeriesWriter) error {
|
||||||
if err != nil {
|
return c.compact(indexw, chunkw, a, b)
|
||||||
c.logger.Log("msg", "creating persister failed", "err", err)
|
}); err != nil {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := c.compact(p, c.shard.persisted[0], c.shard.persisted[1]); err != nil {
|
|
||||||
c.logger.Log("msg", "compaction failed", "err", err)
|
c.logger.Log("msg", "compaction failed", "err", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := p.Close(); err != nil {
|
|
||||||
c.logger.Log("msg", "compaction failed", "err", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
close(c.donec)
|
close(c.donec)
|
||||||
}
|
}
|
||||||
|
@ -69,7 +66,7 @@ func (c *compactor) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *compactor) compact(p *persister, a, b block) error {
|
func (c *compactor) compact(indexw IndexWriter, chunkw SeriesWriter, a, b block) error {
|
||||||
aall, err := a.index().Postings("", "")
|
aall, err := a.index().Postings("", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -110,7 +107,7 @@ func (c *compactor) compact(p *persister, a, b block) error {
|
||||||
|
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
lset, chunks := set.At()
|
lset, chunks := set.At()
|
||||||
if err := p.chunkw.WriteSeries(i, lset, chunks); err != nil {
|
if err := chunkw.WriteSeries(i, lset, chunks); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,7 +130,7 @@ func (c *compactor) compact(p *persister, a, b block) error {
|
||||||
return set.Err()
|
return set.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.indexw.WriteStats(stats); err != nil {
|
if err := indexw.WriteStats(stats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,13 +141,13 @@ func (c *compactor) compact(p *persister, a, b block) error {
|
||||||
for x := range v {
|
for x := range v {
|
||||||
s = append(s, x)
|
s = append(s, x)
|
||||||
}
|
}
|
||||||
if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for t := range postings.m {
|
for t := range postings.m {
|
||||||
if err := p.indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil {
|
if err := indexw.WritePostings(t.name, t.value, postings.get(t)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -159,7 +156,7 @@ func (c *compactor) compact(p *persister, a, b block) error {
|
||||||
for i := range all {
|
for i := range all {
|
||||||
all[i] = uint32(i)
|
all[i] = uint32(i)
|
||||||
}
|
}
|
||||||
if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,68 +288,55 @@ func (c *compactionMerger) At() (labels.Labels, []ChunkMeta) {
|
||||||
return c.l, c.c
|
return c.l, c.c
|
||||||
}
|
}
|
||||||
|
|
||||||
type persister struct {
|
func persist(dir string, write func(IndexWriter, SeriesWriter) error) error {
|
||||||
dir, tmpdir string
|
tmpdir := dir + ".tmp"
|
||||||
|
|
||||||
chunkf, indexf *fileutil.LockedFile
|
|
||||||
|
|
||||||
chunkw SeriesWriter
|
|
||||||
indexw IndexWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPersister(dir string) (*persister, error) {
|
|
||||||
p := &persister{
|
|
||||||
dir: dir,
|
|
||||||
tmpdir: dir + ".tmp",
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
|
|
||||||
// Write to temporary directory to make persistence appear atomic.
|
// Write to temporary directory to make persistence appear atomic.
|
||||||
if fileutil.Exist(p.tmpdir) {
|
if fileutil.Exist(tmpdir) {
|
||||||
if err := os.RemoveAll(p.tmpdir); err != nil {
|
if err := os.RemoveAll(tmpdir); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := fileutil.CreateDirAll(p.tmpdir); err != nil {
|
if err := fileutil.CreateDirAll(tmpdir); err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
p.chunkf, err = fileutil.LockFile(chunksFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
chunkf, err := fileutil.LockFile(chunksFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
p.indexf, err = fileutil.LockFile(indexFileName(p.tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
indexf, err := fileutil.LockFile(indexFileName(tmpdir), os.O_WRONLY|os.O_CREATE, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.indexw = newIndexWriter(p.indexf)
|
|
||||||
p.chunkw = newSeriesWriter(p.chunkf, p.indexw)
|
|
||||||
|
|
||||||
return p, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *persister) Close() error {
|
|
||||||
if err := p.chunkw.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := p.indexw.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := fileutil.Fsync(p.chunkf.File); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := fileutil.Fsync(p.indexf.File); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := p.chunkf.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := p.indexf.Close(); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return renameDir(p.tmpdir, p.dir)
|
indexw := newIndexWriter(indexf)
|
||||||
|
chunkw := newSeriesWriter(chunkf, indexw)
|
||||||
|
|
||||||
|
if err := write(indexw, chunkw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := chunkw.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := indexw.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(chunkf.File); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fileutil.Fsync(indexf.File); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := chunkf.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := indexf.Close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return renameDir(tmpdir, dir)
|
||||||
}
|
}
|
||||||
|
|
||||||
func renameDir(from, to string) error {
|
func renameDir(from, to string) error {
|
||||||
|
|
12
db.go
12
db.go
|
@ -385,18 +385,10 @@ func (s *Shard) persist() error {
|
||||||
// before actually persisting it.
|
// before actually persisting it.
|
||||||
dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
|
dir := filepath.Join(s.path, fmt.Sprintf("%d", head.stats.MinTime))
|
||||||
|
|
||||||
p, err := newPersister(dir)
|
if err := persist(dir, head.persist); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := head.persist(p); err != nil {
|
s.logger.Log("samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := p.Close(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sz := fmt.Sprintf("%.2fMB", float64(p.chunkw.Size()+p.indexw.Size())/1e6)
|
|
||||||
s.logger.Log("size", sz, "samples", head.stats.SampleCount, "chunks", head.stats.ChunkCount, "msg", "persisted head")
|
|
||||||
|
|
||||||
// Reopen block as persisted block for querying.
|
// Reopen block as persisted block for querying.
|
||||||
pb, err := newPersistedBlock(dir)
|
pb, err := newPersistedBlock(dir)
|
||||||
|
|
12
head.go
12
head.go
|
@ -260,13 +260,13 @@ func (h *HeadBlock) appendBatch(samples []hashedSample) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *HeadBlock) persist(p *persister) error {
|
func (h *HeadBlock) persist(indexw IndexWriter, chunkw SeriesWriter) error {
|
||||||
if err := h.wal.Close(); err != nil {
|
if err := h.wal.Close(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for ref, cd := range h.descs {
|
for ref, cd := range h.descs {
|
||||||
if err := p.chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{
|
if err := chunkw.WriteSeries(uint32(ref), cd.lset, []ChunkMeta{
|
||||||
{
|
{
|
||||||
MinTime: cd.firsTimestamp,
|
MinTime: cd.firsTimestamp,
|
||||||
MaxTime: cd.lastTimestamp,
|
MaxTime: cd.lastTimestamp,
|
||||||
|
@ -277,7 +277,7 @@ func (h *HeadBlock) persist(p *persister) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.indexw.WriteStats(h.stats); err != nil {
|
if err := indexw.WriteStats(h.stats); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for n, v := range h.values {
|
for n, v := range h.values {
|
||||||
|
@ -286,13 +286,13 @@ func (h *HeadBlock) persist(p *persister) error {
|
||||||
s = append(s, x)
|
s = append(s, x)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p.indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for t := range h.postings.m {
|
for t := range h.postings.m {
|
||||||
if err := p.indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil {
|
if err := indexw.WritePostings(t.name, t.value, h.postings.get(t)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -301,7 +301,7 @@ func (h *HeadBlock) persist(p *persister) error {
|
||||||
for i := range all {
|
for i := range all {
|
||||||
all[i] = uint32(i)
|
all[i] = uint32(i)
|
||||||
}
|
}
|
||||||
if err := p.indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
if err := indexw.WritePostings("", "", newListPostings(all)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in a new issue