From b02d900e6117dddda68916a84671681f22365309 Mon Sep 17 00:00:00 2001 From: beorn7 Date: Mon, 13 Apr 2015 20:20:26 +0200 Subject: [PATCH] Improve chunk and chunkDesc loading. Also, clean up some things in the code (especially introduction of the chunkLenWithHeader constant to avoid the same expression all over the place). Benchmark results: BEFORE BenchmarkLoadChunksSequentially 5000 283580 ns/op 152143 B/op 312 allocs/op BenchmarkLoadChunksRandomly 20000 82936 ns/op 39310 B/op 99 allocs/op BenchmarkLoadChunkDescs 10000 110833 ns/op 15092 B/op 345 allocs/op AFTER BenchmarkLoadChunksSequentially 10000 146785 ns/op 152285 B/op 315 allocs/op BenchmarkLoadChunksRandomly 20000 67598 ns/op 39438 B/op 103 allocs/op BenchmarkLoadChunkDescs 20000 99631 ns/op 12636 B/op 192 allocs/op Note that everything is obviously loaded from the page cache (as the benchmark runs thousands of times with very small series files). In a real-world scenario, I expect a larger impact, as the disk operations will more often actually hit the disk. To load ~50 sequential chunks, this reduces the iops from 100 seeks and 100 reads to 1 seek and 1 read. --- storage/local/chunk.go | 1 + storage/local/crashrecovery.go | 4 +- storage/local/delta.go | 16 ++++---- storage/local/doubledelta.go | 16 ++++---- storage/local/persistence.go | 72 ++++++++++++++++++++-------------- 5 files changed, 64 insertions(+), 45 deletions(-) diff --git a/storage/local/chunk.go b/storage/local/chunk.go index 58110c350..e1bac7f8b 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -192,6 +192,7 @@ type chunk interface { newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error + unmarshalFromBuf([]byte) encoding() chunkEncoding // values returns a channel, from which all sample values in the chunk // can be received in order. The channel is closed after the last diff --git a/storage/local/crashrecovery.go b/storage/local/crashrecovery.go index 104766d16..16950ff30 100644 --- a/storage/local/crashrecovery.go +++ b/storage/local/crashrecovery.go @@ -191,8 +191,8 @@ func (p *persistence) sanitizeSeries( return fp, false } - bytesToTrim := fi.Size() % int64(chunkLen+chunkHeaderLen) - chunksInFile := int(fi.Size()) / (chunkLen + chunkHeaderLen) + bytesToTrim := fi.Size() % int64(chunkLenWithHeader) + chunksInFile := int(fi.Size()) / chunkLenWithHeader modTime := fi.ModTime() if bytesToTrim != 0 { glog.Warningf( diff --git a/storage/local/delta.go b/storage/local/delta.go index 0e4816935..1be169feb 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -223,18 +223,20 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error { // unmarshal implements chunk. func (c *deltaEncodedChunk) unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n + if _, err := io.ReadFull(r, *c); err != nil { + return err } *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] return nil } +// unmarshalFromBuf implements chunk. +func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { + *c = (*c)[:cap(*c)] + copy(*c, buf) + *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] +} + // values implements chunk. func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index a1c9ad224..dcfd155ed 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -231,18 +231,20 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error { // unmarshal implements chunk. func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error { *c = (*c)[:cap(*c)] - readBytes := 0 - for readBytes < len(*c) { - n, err := r.Read((*c)[readBytes:]) - if err != nil { - return err - } - readBytes += n + if _, err := io.ReadFull(r, *c); err != nil { + return err } *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] return nil } +// unmarshalFromBuf implements chunk. +func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { + *c = (*c)[:cap(*c)] + copy(*c, buf) + *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] +} + // values implements chunk. func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { n := c.len() diff --git a/storage/local/persistence.go b/storage/local/persistence.go index e4b0e29c3..08a78e427 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -63,13 +63,19 @@ const ( chunkHeaderTypeOffset = 0 chunkHeaderFirstTimeOffset = 1 chunkHeaderLastTimeOffset = 9 + chunkLenWithHeader = chunkLen + chunkHeaderLen + chunkMaxBatchSize = 64 // How many chunks to load at most in one batch. indexingMaxBatchSize = 1024 * 1024 indexingBatchTimeout = 500 * time.Millisecond // Commit batch when idle for that long. indexingQueueCapacity = 1024 * 16 ) -var fpLen = len(clientmodel.Fingerprint(0).String()) // Length of a fingerprint as string. +var ( + fpLen = len(clientmodel.Fingerprint(0).String()) // Length of a fingerprint as string. + + byteBufPool = sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }} +) const ( flagHeadChunkPersisted byte = 1 << iota @@ -377,28 +383,37 @@ func (p *persistence) loadChunks(fp clientmodel.Fingerprint, indexes []int, inde defer f.Close() chunks := make([]chunk, 0, len(indexes)) - typeBuf := make([]byte, 1) - for _, idx := range indexes { - _, err := f.Seek(offsetForChunkIndex(idx+indexOffset), os.SEEK_SET) - if err != nil { + buf := byteBufPool.Get().([]byte) + defer func() { + byteBufPool.Put(buf) + }() + + for i := 0; i < len(indexes); i++ { + // This loads chunks in batches. A batch is a streak of + // consecutive chunks, read from disk in one go. + batchSize := 1 + if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), os.SEEK_SET); err != nil { return nil, err } - n, err := f.Read(typeBuf) - if err != nil { - return nil, err + for ; batchSize < chunkMaxBatchSize && + i+1 < len(indexes) && + indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 { } - if n != 1 { - panic("read returned != 1 bytes") + readSize := batchSize * chunkLenWithHeader + if cap(buf) < readSize { + buf = make([]byte, readSize) } + buf = buf[:readSize] - _, err = f.Seek(chunkHeaderLen-1, os.SEEK_CUR) - if err != nil { + if _, err := io.ReadFull(f, buf); err != nil { return nil, err } - chunk := newChunkForEncoding(chunkEncoding(typeBuf[0])) - chunk.unmarshal(f) - chunks = append(chunks, chunk) + for c := 0; c < batchSize; c++ { + chunk := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset])) + chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]) + chunks = append(chunks, chunk) + } } chunkOps.WithLabelValues(load).Add(float64(len(chunks))) atomic.AddInt64(&numMemChunks, int64(len(chunks))) @@ -422,24 +437,23 @@ func (p *persistence) loadChunkDescs(fp clientmodel.Fingerprint, beforeTime clie if err != nil { return nil, err } - totalChunkLen := chunkHeaderLen + chunkLen - if fi.Size()%int64(totalChunkLen) != 0 { + if fi.Size()%int64(chunkLenWithHeader) != 0 { p.setDirty(true) return nil, fmt.Errorf( "size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d", - fp, fi.Size(), totalChunkLen, + fp, fi.Size(), chunkLenWithHeader, ) } - numChunks := int(fi.Size()) / totalChunkLen + numChunks := int(fi.Size()) / chunkLenWithHeader cds := make([]*chunkDesc, 0, numChunks) + chunkTimesBuf := make([]byte, 16) for i := 0; i < numChunks; i++ { _, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET) if err != nil { return nil, err } - chunkTimesBuf := make([]byte, 16) _, err = io.ReadAtLeast(f, chunkTimesBuf, 16) if err != nil { return nil, err @@ -799,7 +813,7 @@ func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist in } chunk := newChunkForEncoding(chunkEncoding(encoding)) if err := chunk.unmarshal(r); err != nil { - glog.Warning("Could not decode chunk type:", err) + glog.Warning("Could not decode chunk:", err) p.dirty = true return sm, chunksToPersist, nil } @@ -900,7 +914,7 @@ func (p *persistence) dropAndPersistChunks( return } headerBuf := make([]byte, chunkHeaderLen) - _, err = io.ReadAtLeast(f, headerBuf, chunkHeaderLen) + _, err = io.ReadFull(f, headerBuf) if err == io.EOF { // We ran into the end of the file without finding any chunks that should // be kept. Remove the whole file. @@ -960,7 +974,7 @@ func (p *persistence) dropAndPersistChunks( if err != nil { return } - offset = int(written / (chunkHeaderLen + chunkLen)) + offset = int(written / chunkLenWithHeader) if len(chunks) > 0 { if err = writeChunks(temp, chunks); err != nil { @@ -983,7 +997,7 @@ func (p *persistence) deleteSeriesFile(fp clientmodel.Fingerprint) (int, error) if err != nil { return -1, err } - numChunks := int(fi.Size() / (chunkHeaderLen + chunkLen)) + numChunks := int(fi.Size() / chunkLenWithHeader) if err := os.Remove(fname); err != nil { return -1, err } @@ -1366,17 +1380,17 @@ loop: } func offsetForChunkIndex(i int) int64 { - return int64(i * (chunkHeaderLen + chunkLen)) + return int64(i * chunkLenWithHeader) } func chunkIndexForOffset(offset int64) (int, error) { - if int(offset)%(chunkHeaderLen+chunkLen) != 0 { + if int(offset)%chunkLenWithHeader != 0 { return -1, fmt.Errorf( "offset %d is not a multiple of on-disk chunk length %d", - offset, chunkHeaderLen+chunkLen, + offset, chunkLenWithHeader, ) } - return int(offset) / (chunkHeaderLen + chunkLen), nil + return int(offset) / chunkLenWithHeader, nil } func writeChunkHeader(w io.Writer, c chunk) error { @@ -1389,7 +1403,7 @@ func writeChunkHeader(w io.Writer, c chunk) error { } func writeChunks(w io.Writer, chunks []chunk) error { - b := bufio.NewWriterSize(w, len(chunks)*(chunkHeaderLen+chunkLen)) + b := bufio.NewWriterSize(w, len(chunks)*chunkLenWithHeader) for _, chunk := range chunks { if err := writeChunkHeader(b, chunk); err != nil { return err