// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
	"bufio"
	"encoding/binary"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/common/log"
	"github.com/prometheus/common/model"

	"github.com/prometheus/prometheus/storage/local/codable"
	"github.com/prometheus/prometheus/storage/local/index"
	"github.com/prometheus/prometheus/util/flock"
)

const (
	// Version of the storage as it can be found in the version file.
	// Increment to protect against incompatible changes.
	Version         = 1
	versionFileName = "VERSION"

	seriesFileSuffix     = ".db"
	seriesTempFileSuffix = ".db.tmp"
	seriesDirNameLen     = 2 // How many bytes of the fingerprint in dir name.
	hintFileSuffix       = ".hint"

	mappingsFileName      = "mappings.db"
	mappingsTempFileName  = "mappings.db.tmp"
	mappingsFormatVersion = 1
	mappingsMagicString   = "PrometheusMappings"

	dirtyFileName = "DIRTY"

	fileBufSize = 1 << 16 // 64kiB.

	chunkHeaderLen             = 17
	chunkHeaderTypeOffset      = 0
	chunkHeaderFirstTimeOffset = 1
	chunkHeaderLastTimeOffset  = 9
	chunkLenWithHeader         = chunkLen + chunkHeaderLen
	chunkMaxBatchSize          = 62 // Max no. of chunks to load or write in
	// one batch.  Note that 62 is the largest number of chunks that fit
	// into 64kiB on disk because chunkHeaderLen is added to each 1k chunk.

	indexingMaxBatchSize  = 1024 * 1024
	indexingBatchTimeout  = 500 * time.Millisecond // Commit batch when idle for that long.
	indexingQueueCapacity = 1024 * 16
)

var fpLen = len(model.Fingerprint(0).String()) // Length of a fingerprint as string.

const (
	flagHeadChunkPersisted byte = 1 << iota
	// Add more flags here like:
	// flagFoo
	// flagBar
)

type indexingOpType byte

const (
	add indexingOpType = iota
	remove
)

type indexingOp struct {
	fingerprint model.Fingerprint
	metric      model.Metric
	opType      indexingOpType
}

// A Persistence is used by a Storage implementation to store samples
// persistently across restarts. The methods are only goroutine-safe if
// explicitly marked as such below. The chunk-related methods persistChunks,
// dropChunks, loadChunks, and loadChunkDescs can be called concurrently with
// each other if each call refers to a different fingerprint.
type persistence struct {
	basePath string

	archivedFingerprintToMetrics   *index.FingerprintMetricIndex
	archivedFingerprintToTimeRange *index.FingerprintTimeRangeIndex
	labelPairToFingerprints        *index.LabelPairFingerprintIndex
	labelNameToLabelValues         *index.LabelNameLabelValuesIndex

	indexingQueue   chan indexingOp
	indexingStopped chan struct{}
	indexingFlush   chan chan int

	indexingQueueLength   prometheus.Gauge
	indexingQueueCapacity prometheus.Metric
	indexingBatchSizes    prometheus.Summary
	indexingBatchDuration prometheus.Summary
	checkpointDuration    prometheus.Gauge
	dirtyCounter          prometheus.Counter
	startedDirty          prometheus.Gauge

	dirtyMtx       sync.Mutex     // Protects dirty and becameDirty.
	dirty          bool           // true if persistence was started in dirty state.
	becameDirty    bool           // true if an inconsistency came up during runtime.
	pedanticChecks bool           // true if crash recovery should check each series.
	dirtyFileName  string         // The file used for locking and to mark dirty state.
	fLock          flock.Releaser // The file lock to protect against concurrent usage.

	shouldSync syncStrategy

	minShrinkRatio float64 // How much a series file has to shrink to justify dropping chunks.

	bufPool sync.Pool
}

// newPersistence returns a newly allocated persistence backed by local disk storage, ready to use.
func newPersistence(
	basePath string,
	dirty, pedanticChecks bool,
	shouldSync syncStrategy,
	minShrinkRatio float64,
) (*persistence, error) {
	dirtyPath := filepath.Join(basePath, dirtyFileName)
	versionPath := filepath.Join(basePath, versionFileName)

	if versionData, err := ioutil.ReadFile(versionPath); err == nil {
		if persistedVersion, err := strconv.Atoi(strings.TrimSpace(string(versionData))); err != nil {
			return nil, fmt.Errorf("cannot parse content of %s: %s", versionPath, versionData)
		} else if persistedVersion != Version {
			return nil, fmt.Errorf("found storage version %d on disk, need version %d - please wipe storage or run a version of Prometheus compatible with storage version %d", persistedVersion, Version, persistedVersion)
		}
	} else if os.IsNotExist(err) {
		// No version file found. Let's create the directory (in case
		// it's not there yet) and then check if it is actually
		// empty. If not, we have found an old storage directory without
		// version file, so we have to bail out.
		if err := os.MkdirAll(basePath, 0700); err != nil {
			return nil, err
		}
		fis, err := ioutil.ReadDir(basePath)
		if err != nil {
			return nil, err
		}
		if len(fis) > 0 && !(len(fis) == 1 && fis[0].Name() == "lost+found" && fis[0].IsDir()) {
			return nil, fmt.Errorf("could not detect storage version on disk, assuming version 0, need version %d - please wipe storage or run a version of Prometheus compatible with storage version 0", Version)
		}
		// Finally we can write our own version into a new version file.
		file, err := os.Create(versionPath)
		if err != nil {
			return nil, err
		}
		defer file.Close()
		if _, err := fmt.Fprintf(file, "%d\n", Version); err != nil {
			return nil, err
		}
	} else {
		return nil, err
	}

	fLock, dirtyfileExisted, err := flock.New(dirtyPath)
	if err != nil {
		log.Errorf("Could not lock %s, Prometheus already running?", dirtyPath)
		return nil, err
	}
	if dirtyfileExisted {
		dirty = true
	}

	archivedFingerprintToMetrics, err := index.NewFingerprintMetricIndex(basePath)
	if err != nil {
		return nil, err
	}
	archivedFingerprintToTimeRange, err := index.NewFingerprintTimeRangeIndex(basePath)
	if err != nil {
		return nil, err
	}

	p := &persistence{
		basePath: basePath,

		archivedFingerprintToMetrics:   archivedFingerprintToMetrics,
		archivedFingerprintToTimeRange: archivedFingerprintToTimeRange,

		indexingQueue:   make(chan indexingOp, indexingQueueCapacity),
		indexingStopped: make(chan struct{}),
		indexingFlush:   make(chan chan int),

		indexingQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "indexing_queue_length",
			Help:      "The number of metrics waiting to be indexed.",
		}),
		indexingQueueCapacity: prometheus.MustNewConstMetric(
			prometheus.NewDesc(
				prometheus.BuildFQName(namespace, subsystem, "indexing_queue_capacity"),
				"The capacity of the indexing queue.",
				nil, nil,
			),
			prometheus.GaugeValue,
			float64(indexingQueueCapacity),
		),
		indexingBatchSizes: prometheus.NewSummary(
			prometheus.SummaryOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "indexing_batch_sizes",
				Help:      "Quantiles for indexing batch sizes (number of metrics per batch).",
			},
		),
		indexingBatchDuration: prometheus.NewSummary(
			prometheus.SummaryOpts{
				Namespace: namespace,
				Subsystem: subsystem,
				Name:      "indexing_batch_duration_seconds",
				Help:      "Quantiles for batch indexing duration in seconds.",
			},
		),
		checkpointDuration: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "checkpoint_duration_seconds",
			Help:      "The duration in seconds it took to checkpoint in-memory metrics and head chunks.",
		}),
		dirtyCounter: prometheus.NewCounter(prometheus.CounterOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "inconsistencies_total",
			Help:      "A counter incremented each time an inconsistency in the local storage is detected. If this is greater zero, restart the server as soon as possible.",
		}),
		startedDirty: prometheus.NewGauge(prometheus.GaugeOpts{
			Namespace: namespace,
			Subsystem: subsystem,
			Name:      "started_dirty",
			Help:      "Whether the local storage was found to be dirty (and crash recovery occurred) during Prometheus startup.",
		}),
		dirty:          dirty,
		pedanticChecks: pedanticChecks,
		dirtyFileName:  dirtyPath,
		fLock:          fLock,
		shouldSync:     shouldSync,
		// Create buffers of length 3*chunkLenWithHeader by default because that is still reasonably small
		// and at the same time enough for many uses. The contract is to never return buffer smaller than
		// that to the pool so that callers can rely on a minimum buffer size.
		bufPool: sync.Pool{New: func() interface{} { return make([]byte, 0, 3*chunkLenWithHeader) }},
	}

	if p.dirty {
		// Blow away the label indexes. We'll rebuild them later.
		if err := index.DeleteLabelPairFingerprintIndex(basePath); err != nil {
			return nil, err
		}
		if err := index.DeleteLabelNameLabelValuesIndex(basePath); err != nil {
			return nil, err
		}
	}
	labelPairToFingerprints, err := index.NewLabelPairFingerprintIndex(basePath)
	if err != nil {
		return nil, err
	}
	labelNameToLabelValues, err := index.NewLabelNameLabelValuesIndex(basePath)
	if err != nil {
		return nil, err
	}
	p.labelPairToFingerprints = labelPairToFingerprints
	p.labelNameToLabelValues = labelNameToLabelValues

	return p, nil
}

func (p *persistence) run() {
	p.processIndexingQueue()
}

// Describe implements prometheus.Collector.
func (p *persistence) Describe(ch chan<- *prometheus.Desc) {
	ch <- p.indexingQueueLength.Desc()
	ch <- p.indexingQueueCapacity.Desc()
	p.indexingBatchSizes.Describe(ch)
	p.indexingBatchDuration.Describe(ch)
	ch <- p.checkpointDuration.Desc()
	ch <- p.dirtyCounter.Desc()
	ch <- p.startedDirty.Desc()
}

// Collect implements prometheus.Collector.
func (p *persistence) Collect(ch chan<- prometheus.Metric) {
	p.indexingQueueLength.Set(float64(len(p.indexingQueue)))

	ch <- p.indexingQueueLength
	ch <- p.indexingQueueCapacity
	p.indexingBatchSizes.Collect(ch)
	p.indexingBatchDuration.Collect(ch)
	ch <- p.checkpointDuration
	ch <- p.dirtyCounter
	ch <- p.startedDirty
}

// isDirty returns the dirty flag in a goroutine-safe way.
func (p *persistence) isDirty() bool {
	p.dirtyMtx.Lock()
	defer p.dirtyMtx.Unlock()
	return p.dirty
}

// setDirty flags the storage as dirty in a goroutine-safe way. The provided
// error will be logged as a reason the first time the storage is flagged as dirty.
func (p *persistence) setDirty(err error) {
	p.dirtyCounter.Inc()
	p.dirtyMtx.Lock()
	defer p.dirtyMtx.Unlock()
	if p.becameDirty {
		return
	}
	p.dirty = true
	p.becameDirty = true
	log.With("error", err).Error("The storage is now inconsistent. Restart Prometheus ASAP to initiate recovery.")
}

// fingerprintsForLabelPair returns the fingerprints for the given label
// pair. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) fingerprintsForLabelPair(lp model.LabelPair) model.Fingerprints {
	fps, _, err := p.labelPairToFingerprints.Lookup(lp)
	if err != nil {
		p.setDirty(fmt.Errorf("error in method fingerprintsForLabelPair(%v): %s", lp, err))
		return nil
	}
	return fps
}

// labelValuesForLabelName returns the label values for the given label
// name. This method is goroutine-safe but take into account that metrics queued
// for indexing with IndexMetric might not have made it into the index
// yet. (Same applies correspondingly to UnindexMetric.)
func (p *persistence) labelValuesForLabelName(ln model.LabelName) (model.LabelValues, error) {
	lvs, _, err := p.labelNameToLabelValues.Lookup(ln)
	if err != nil {
		p.setDirty(fmt.Errorf("error in method labelValuesForLabelName(%v): %s", ln, err))
		return nil, err
	}
	return lvs, nil
}

// persistChunks persists a number of consecutive chunks of a series. It is the
// caller's responsibility to not modify the chunks concurrently and to not
// persist or drop anything for the same fingerprint concurrently. It returns
// the (zero-based) index of the first persisted chunk within the series
// file. In case of an error, the returned index is -1 (to avoid the
// misconception that the chunk was written at position 0).
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) persistChunks(fp model.Fingerprint, chunks []chunk) (index int, err error) {
	f, err := p.openChunkFileForWriting(fp)
	if err != nil {
		return -1, err
	}
	defer p.closeChunkFile(f)

	if err := p.writeChunks(f, chunks); err != nil {
		return -1, err
	}

	// Determine index within the file.
	offset, err := f.Seek(0, os.SEEK_CUR)
	if err != nil {
		return -1, err
	}
	index, err = chunkIndexForOffset(offset)
	if err != nil {
		return -1, err
	}

	return index - len(chunks), err
}

// loadChunks loads a group of chunks of a timeseries by their index. The chunk
// with the earliest time will have index 0, the following ones will have
// incrementally larger indexes. The indexOffset denotes the offset to be added to
// each index in indexes. It is the caller's responsibility to not persist or
// drop anything for the same fingerprint concurrently.
func (p *persistence) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) {
	f, err := p.openChunkFileForReading(fp)
	if err != nil {
		return nil, err
	}
	defer f.Close()

	chunks := make([]chunk, 0, len(indexes))
	buf := p.bufPool.Get().([]byte)
	defer func() {
		// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
		// would only put back the original buf.
		p.bufPool.Put(buf)
	}()

	for i := 0; i < len(indexes); i++ {
		// This loads chunks in batches. A batch is a streak of
		// consecutive chunks, read from disk in one go.
		batchSize := 1
		if _, err := f.Seek(offsetForChunkIndex(indexes[i]+indexOffset), os.SEEK_SET); err != nil {
			return nil, err
		}

		for ; batchSize < chunkMaxBatchSize &&
			i+1 < len(indexes) &&
			indexes[i]+1 == indexes[i+1]; i, batchSize = i+1, batchSize+1 {
		}
		readSize := batchSize * chunkLenWithHeader
		if cap(buf) < readSize {
			buf = make([]byte, readSize)
		}
		buf = buf[:readSize]

		if _, err := io.ReadFull(f, buf); err != nil {
			return nil, err
		}
		for c := 0; c < batchSize; c++ {
			chunk, err := newChunkForEncoding(chunkEncoding(buf[c*chunkLenWithHeader+chunkHeaderTypeOffset]))
			if err != nil {
				return nil, err
			}
			if err := chunk.unmarshalFromBuf(buf[c*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
				return nil, err
			}
			chunks = append(chunks, chunk)
		}
	}
	chunkOps.WithLabelValues(load).Add(float64(len(chunks)))
	atomic.AddInt64(&numMemChunks, int64(len(chunks)))
	return chunks, nil
}

// loadChunkDescs loads the chunkDescs for a series from disk. offsetFromEnd is
// the number of chunkDescs to skip from the end of the series file. It is the
// caller's responsibility to not persist or drop anything for the same
// fingerprint concurrently.
func (p *persistence) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) {
	f, err := p.openChunkFileForReading(fp)
	if os.IsNotExist(err) {
		return nil, nil
	}
	if err != nil {
		return nil, err
	}
	defer f.Close()

	fi, err := f.Stat()
	if err != nil {
		return nil, err
	}
	if fi.Size()%int64(chunkLenWithHeader) != 0 {
		// The returned error will bubble up and lead to quarantining of the whole series.
		return nil, fmt.Errorf(
			"size of series file for fingerprint %v is %d, which is not a multiple of the chunk length %d",
			fp, fi.Size(), chunkLenWithHeader,
		)
	}

	numChunks := int(fi.Size())/chunkLenWithHeader - offsetFromEnd
	cds := make([]*chunkDesc, numChunks)
	chunkTimesBuf := make([]byte, 16)
	for i := 0; i < numChunks; i++ {
		_, err := f.Seek(offsetForChunkIndex(i)+chunkHeaderFirstTimeOffset, os.SEEK_SET)
		if err != nil {
			return nil, err
		}

		_, err = io.ReadAtLeast(f, chunkTimesBuf, 16)
		if err != nil {
			return nil, err
		}
		cds[i] = &chunkDesc{
			chunkFirstTime: model.Time(binary.LittleEndian.Uint64(chunkTimesBuf)),
			chunkLastTime:  model.Time(binary.LittleEndian.Uint64(chunkTimesBuf[8:])),
		}
	}
	chunkDescOps.WithLabelValues(load).Add(float64(len(cds)))
	numMemChunkDescs.Add(float64(len(cds)))
	return cds, nil
}

// checkpointSeriesMapAndHeads persists the fingerprint to memory-series mapping
// and all non persisted chunks. Do not call concurrently with
// loadSeriesMapAndHeads. This method will only write heads format v2, but
// loadSeriesMapAndHeads can also understand v1.
//
// Description of the file format (for both, v1 and v2):
//
// (1) Magic string (const headsMagicString).
//
// (2) Varint-encoded format version (const headsFormatVersion).
//
// (3) Number of series in checkpoint as big-endian uint64.
//
// (4) Repeated once per series:
//
// (4.1) A flag byte, see flag constants above. (Present but unused in v2.)
//
// (4.2) The fingerprint as big-endian uint64.
//
// (4.3) The metric as defined by codable.Metric.
//
// (4.4) The varint-encoded persistWatermark. (Missing in v1.)
//
// (4.5) The modification time of the series file as nanoseconds elapsed since
// January 1, 1970 UTC. -1 if the modification time is unknown or no series file
// exists yet. (Missing in v1.)
//
// (4.6) The varint-encoded chunkDescsOffset.
//
// (4.6) The varint-encoded savedFirstTime.
//
// (4.7) The varint-encoded number of chunk descriptors.
//
// (4.8) Repeated once per chunk descriptor, oldest to most recent, either
// variant 4.8.1 (if index < persistWatermark) or variant 4.8.2 (if index >=
// persistWatermark). In v1, everything is variant 4.8.1 except for a
// non-persisted head-chunk (determined by the flags).
//
// (4.8.1.1) The varint-encoded first time.
// (4.8.1.2) The varint-encoded last time.
//
// (4.8.2.1) A byte defining the chunk type.
// (4.8.2.2) The chunk itself, marshaled with the marshal() method.
//
func (p *persistence) checkpointSeriesMapAndHeads(fingerprintToSeries *seriesMap, fpLocker *fingerprintLocker) (err error) {
	log.Info("Checkpointing in-memory metrics and chunks...")
	begin := time.Now()
	f, err := os.OpenFile(p.headsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
	if err != nil {
		return err
	}

	defer func() {
		syncErr := f.Sync()
		closeErr := f.Close()
		if err != nil {
			return
		}
		err = syncErr
		if err != nil {
			return
		}
		err = closeErr
		if err != nil {
			return
		}
		err = os.Rename(p.headsTempFileName(), p.headsFileName())
		duration := time.Since(begin)
		p.checkpointDuration.Set(duration.Seconds())
		log.Infof("Done checkpointing in-memory metrics and chunks in %v.", duration)
	}()

	w := bufio.NewWriterSize(f, fileBufSize)

	if _, err = w.WriteString(headsMagicString); err != nil {
		return err
	}
	var numberOfSeriesOffset int
	if numberOfSeriesOffset, err = codable.EncodeVarint(w, headsFormatVersion); err != nil {
		return err
	}
	numberOfSeriesOffset += len(headsMagicString)
	numberOfSeriesInHeader := uint64(fingerprintToSeries.length())
	// We have to write the number of series as uint64 because we might need
	// to overwrite it later, and a varint might change byte width then.
	if err = codable.EncodeUint64(w, numberOfSeriesInHeader); err != nil {
		return err
	}

	iter := fingerprintToSeries.iter()
	defer func() {
		// Consume the iterator in any case to not leak goroutines.
		for range iter {
		}
	}()

	var realNumberOfSeries uint64
	for m := range iter {
		func() { // Wrapped in function to use defer for unlocking the fp.
			fpLocker.Lock(m.fp)
			defer fpLocker.Unlock(m.fp)

			if len(m.series.chunkDescs) == 0 {
				// This series was completely purged or archived in the meantime. Ignore.
				return
			}
			realNumberOfSeries++
			// seriesFlags left empty in v2.
			if err = w.WriteByte(0); err != nil {
				return
			}
			if err = codable.EncodeUint64(w, uint64(m.fp)); err != nil {
				return
			}
			var buf []byte
			buf, err = codable.Metric(m.series.metric).MarshalBinary()
			if err != nil {
				return
			}
			if _, err = w.Write(buf); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, int64(m.series.persistWatermark)); err != nil {
				return
			}
			if m.series.modTime.IsZero() {
				if _, err = codable.EncodeVarint(w, -1); err != nil {
					return
				}
			} else {
				if _, err = codable.EncodeVarint(w, m.series.modTime.UnixNano()); err != nil {
					return
				}
			}
			if _, err = codable.EncodeVarint(w, int64(m.series.chunkDescsOffset)); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, int64(m.series.savedFirstTime)); err != nil {
				return
			}
			if _, err = codable.EncodeVarint(w, int64(len(m.series.chunkDescs))); err != nil {
				return
			}
			for i, chunkDesc := range m.series.chunkDescs {
				if i < m.series.persistWatermark {
					if _, err = codable.EncodeVarint(w, int64(chunkDesc.firstTime())); err != nil {
						return
					}
					lt, err := chunkDesc.lastTime()
					if err != nil {
						return
					}
					if _, err = codable.EncodeVarint(w, int64(lt)); err != nil {
						return
					}
				} else {
					// This is a non-persisted chunk. Fully marshal it.
					if err = w.WriteByte(byte(chunkDesc.c.encoding())); err != nil {
						return
					}
					if err = chunkDesc.c.marshal(w); err != nil {
						return
					}
				}
			}
			// Series is checkpointed now, so declare it clean. In case the entire
			// checkpoint fails later on, this is fine, as the storage's series
			// maintenance will mark these series newly dirty again, continuously
			// increasing the total number of dirty series as seen by the storage.
			// This has the effect of triggering a new checkpoint attempt even
			// earlier than if we hadn't incorrectly set "dirty" to "false" here
			// already.
			m.series.dirty = false
		}()
		if err != nil {
			return err
		}
	}
	if err = w.Flush(); err != nil {
		return err
	}
	if realNumberOfSeries != numberOfSeriesInHeader {
		// The number of series has changed in the meantime.
		// Rewrite it in the header.
		if _, err = f.Seek(int64(numberOfSeriesOffset), os.SEEK_SET); err != nil {
			return err
		}
		if err = codable.EncodeUint64(f, realNumberOfSeries); err != nil {
			return err
		}
	}
	return err
}

// loadSeriesMapAndHeads loads the fingerprint to memory-series mapping and all
// the chunks contained in the checkpoint (and thus not yet persisted to series
// files). The method is capable of loading the checkpoint format v1 and v2. If
// recoverable corruption is detected, or if the dirty flag was set from the
// beginning, crash recovery is run, which might take a while. If an
// unrecoverable error is encountered, it is returned. Call this method during
// start-up while nothing else is running in storage land. This method is
// utterly goroutine-unsafe.
func (p *persistence) loadSeriesMapAndHeads() (sm *seriesMap, chunksToPersist int64, err error) {
	fingerprintToSeries := make(map[model.Fingerprint]*memorySeries)
	sm = &seriesMap{m: fingerprintToSeries}

	defer func() {
		if p.dirty {
			log.Warn("Persistence layer appears dirty.")
			p.startedDirty.Set(1)
			err = p.recoverFromCrash(fingerprintToSeries)
			if err != nil {
				sm = nil
			}
		} else {
			p.startedDirty.Set(0)
		}
	}()

	hs := newHeadsScanner(p.headsFileName())
	defer hs.close()
	for hs.scan() {
		fingerprintToSeries[hs.fp] = hs.series
	}
	if os.IsNotExist(hs.err) {
		return sm, 0, nil
	}
	if hs.err != nil {
		p.dirty = true
		log.
			With("file", p.headsFileName()).
			With("error", hs.err).
			Error("Error reading heads file.")
		return sm, 0, hs.err
	}
	return sm, hs.chunksToPersistTotal, nil
}

// dropAndPersistChunks deletes all chunks from a series file whose last sample
// time is before beforeTime, and then appends the provided chunks, leaving out
// those whose last sample time is before beforeTime. It returns the timestamp
// of the first sample in the oldest chunk _not_ dropped, the offset within the
// series file of the first chunk persisted (out of the provided chunks), the
// number of deleted chunks, and true if all chunks of the series have been
// deleted (in which case the returned timestamp will be 0 and must be ignored).
// It is the caller's responsibility to make sure nothing is persisted or loaded
// for the same fingerprint concurrently.
//
// Returning an error signals problems with the series file. In this case, the
// caller should quarantine the series.
func (p *persistence) dropAndPersistChunks(
	fp model.Fingerprint, beforeTime model.Time, chunks []chunk,
) (
	firstTimeNotDropped model.Time,
	offset int,
	numDropped int,
	allDropped bool,
	err error,
) {
	// Style note: With the many return values, it was decided to use naked
	// returns in this method. They make the method more readable, but
	// please handle with care!
	if len(chunks) > 0 {
		// We have chunks to persist. First check if those are already
		// too old. If that's the case, the chunks in the series file
		// are all too old, too.
		i := 0
		for ; i < len(chunks); i++ {
			var lt model.Time
			lt, err = chunks[i].newIterator().lastTimestamp()
			if err != nil {
				return
			}
			if !lt.Before(beforeTime) {
				break
			}
		}
		if i < len(chunks) {
			firstTimeNotDropped = chunks[i].firstTime()
		}
		if i > 0 || firstTimeNotDropped.Before(beforeTime) {
			// Series file has to go.
			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
				return
			}
			numDropped += i
			if i == len(chunks) {
				allDropped = true
				return
			}
			// Now simply persist what has to be persisted to a new file.
			_, err = p.persistChunks(fp, chunks[i:])
			return
		}
	}

	// If we are here, we have to check the series file itself.
	f, err := p.openChunkFileForReading(fp)
	if os.IsNotExist(err) {
		// No series file. Only need to create new file with chunks to
		// persist, if there are any.
		if len(chunks) == 0 {
			allDropped = true
			err = nil // Do not report not-exist err.
			return
		}
		offset, err = p.persistChunks(fp, chunks)
		return
	}
	if err != nil {
		return
	}
	defer f.Close()

	headerBuf := make([]byte, chunkHeaderLen)
	var firstTimeInFile model.Time
	// Find the first chunk in the file that should be kept.
	for ; ; numDropped++ {
		_, err = f.Seek(offsetForChunkIndex(numDropped), os.SEEK_SET)
		if err != nil {
			return
		}
		_, err = io.ReadFull(f, headerBuf)
		if err == io.EOF {
			// Close the file before trying to delete it. This is necessary on Windows
			// (this will cause the defer f.Close to fail, but the error is silently ignored)
			f.Close()
			// We ran into the end of the file without finding any chunks that should
			// be kept. Remove the whole file.
			if numDropped, err = p.deleteSeriesFile(fp); err != nil {
				return
			}
			if len(chunks) == 0 {
				allDropped = true
				return
			}
			offset, err = p.persistChunks(fp, chunks)
			return
		}
		if err != nil {
			return
		}
		if numDropped == 0 {
			firstTimeInFile = model.Time(
				binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
			)
		}
		lastTime := model.Time(
			binary.LittleEndian.Uint64(headerBuf[chunkHeaderLastTimeOffset:]),
		)
		if !lastTime.Before(beforeTime) {
			break
		}
	}

	// We've found the first chunk that should be kept.
	// First check if the shrink ratio is good enough to perform the the
	// actual drop or leave it for next time if it is not worth the effort.
	fi, err := f.Stat()
	if err != nil {
		return
	}
	totalChunks := int(fi.Size())/chunkLenWithHeader + len(chunks)
	if numDropped == 0 || float64(numDropped)/float64(totalChunks) < p.minShrinkRatio {
		// Nothing to drop. Just adjust the return values and append the chunks (if any).
		numDropped = 0
		firstTimeNotDropped = firstTimeInFile
		if len(chunks) > 0 {
			offset, err = p.persistChunks(fp, chunks)
		}
		return
	}
	// If we are here, we have to drop some chunks for real. So we need to
	// record firstTimeNotDropped from the last read header, seek backwards
	// to the beginning of its header, and start copying everything from
	// there into a new file. Then append the chunks to the new file.
	firstTimeNotDropped = model.Time(
		binary.LittleEndian.Uint64(headerBuf[chunkHeaderFirstTimeOffset:]),
	)
	chunkOps.WithLabelValues(drop).Add(float64(numDropped))
	_, err = f.Seek(-chunkHeaderLen, os.SEEK_CUR)
	if err != nil {
		return
	}

	temp, err := os.OpenFile(p.tempFileNameForFingerprint(fp), os.O_WRONLY|os.O_CREATE, 0640)
	if err != nil {
		return
	}
	defer func() {
		// Close the file before trying to rename to it. This is necessary on Windows
		// (this will cause the defer f.Close to fail, but the error is silently ignored)
		f.Close()
		p.closeChunkFile(temp)
		if err == nil {
			err = os.Rename(p.tempFileNameForFingerprint(fp), p.fileNameForFingerprint(fp))
		}
	}()

	written, err := io.Copy(temp, f)
	if err != nil {
		return
	}
	offset = int(written / chunkLenWithHeader)

	if len(chunks) > 0 {
		if err = p.writeChunks(temp, chunks); err != nil {
			return
		}
	}
	return
}

// deleteSeriesFile deletes a series file belonging to the provided
// fingerprint. It returns the number of chunks that were contained in the
// deleted file.
func (p *persistence) deleteSeriesFile(fp model.Fingerprint) (int, error) {
	fname := p.fileNameForFingerprint(fp)
	fi, err := os.Stat(fname)
	if os.IsNotExist(err) {
		// Great. The file is already gone.
		return 0, nil
	}
	if err != nil {
		return -1, err
	}
	numChunks := int(fi.Size() / chunkLenWithHeader)
	if err := os.Remove(fname); err != nil {
		return -1, err
	}
	chunkOps.WithLabelValues(drop).Add(float64(numChunks))
	return numChunks, nil
}

// quarantineSeriesFile moves a series file to the orphaned directory. It also
// writes a hint file with the provided quarantine reason and, if series is
// non-nil, the string representation of the metric.
func (p *persistence) quarantineSeriesFile(fp model.Fingerprint, quarantineReason error, metric model.Metric) error {
	var (
		oldName     = p.fileNameForFingerprint(fp)
		orphanedDir = filepath.Join(p.basePath, "orphaned", filepath.Base(filepath.Dir(oldName)))
		newName     = filepath.Join(orphanedDir, filepath.Base(oldName))
		hintName    = newName[:len(newName)-len(seriesFileSuffix)] + hintFileSuffix
	)

	renameErr := os.MkdirAll(orphanedDir, 0700)
	if renameErr != nil {
		return renameErr
	}
	renameErr = os.Rename(oldName, newName)
	if os.IsNotExist(renameErr) {
		// Source file dosn't exist. That's normal.
		renameErr = nil
	}
	// Write hint file even if the rename ended in an error. At least try...
	// And ignore errors writing the hint file. It's best effort.
	if f, err := os.Create(hintName); err == nil {
		if metric != nil {
			f.WriteString(metric.String() + "\n")
		} else {
			f.WriteString("[UNKNOWN METRIC]\n")
		}
		if quarantineReason != nil {
			f.WriteString(quarantineReason.Error() + "\n")
		} else {
			f.WriteString("[UNKNOWN REASON]\n")
		}
		f.Close()
	}
	return renameErr
}

// seriesFileModTime returns the modification time of the series file belonging
// to the provided fingerprint. In case of an error, the zero value of time.Time
// is returned.
func (p *persistence) seriesFileModTime(fp model.Fingerprint) time.Time {
	var modTime time.Time
	if fi, err := os.Stat(p.fileNameForFingerprint(fp)); err == nil {
		return fi.ModTime()
	}
	return modTime
}

// indexMetric queues the given metric for addition to the indexes needed by
// fingerprintsForLabelPair, labelValuesForLabelName, and
// fingerprintsModifiedBefore.  If the queue is full, this method blocks until
// the metric can be queued.  This method is goroutine-safe.
func (p *persistence) indexMetric(fp model.Fingerprint, m model.Metric) {
	p.indexingQueue <- indexingOp{fp, m, add}
}

// unindexMetric queues references to the given metric for removal from the
// indexes used for fingerprintsForLabelPair, labelValuesForLabelName, and
// fingerprintsModifiedBefore. The index of fingerprints to archived metrics is
// not affected by this removal. (In fact, never call this method for an
// archived metric. To purge an archived metric, call purgeArchivedMetric.)
// If the queue is full, this method blocks until the metric can be queued. This
// method is goroutine-safe.
func (p *persistence) unindexMetric(fp model.Fingerprint, m model.Metric) {
	p.indexingQueue <- indexingOp{fp, m, remove}
}

// waitForIndexing waits until all items in the indexing queue are processed. If
// queue processing is currently on hold (to gather more ops for batching), this
// method will trigger an immediate start of processing. This method is
// goroutine-safe.
func (p *persistence) waitForIndexing() {
	wait := make(chan int)
	for {
		p.indexingFlush <- wait
		if <-wait == 0 {
			break
		}
	}
}

// archiveMetric persists the mapping of the given fingerprint to the given
// metric, together with the first and last timestamp of the series belonging to
// the metric. The caller must have locked the fingerprint.
func (p *persistence) archiveMetric(
	fp model.Fingerprint, m model.Metric, first, last model.Time,
) {
	if err := p.archivedFingerprintToMetrics.Put(codable.Fingerprint(fp), codable.Metric(m)); err != nil {
		p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToMetrics: %s", fp, err))
		return
	}
	if err := p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last}); err != nil {
		p.setDirty(fmt.Errorf("error in method archiveMetric inserting fingerprint %v into FingerprintToTimeRange: %s", fp, err))
	}
}

// hasArchivedMetric returns whether the archived metric for the given
// fingerprint exists and if yes, what the first and last timestamp in the
// corresponding series is. This method is goroutine-safe.
func (p *persistence) hasArchivedMetric(fp model.Fingerprint) (
	hasMetric bool, firstTime, lastTime model.Time,
) {
	firstTime, lastTime, hasMetric, err := p.archivedFingerprintToTimeRange.Lookup(fp)
	if err != nil {
		p.setDirty(fmt.Errorf("error in method hasArchivedMetric(%v): %s", fp, err))
		hasMetric = false
	}
	return hasMetric, firstTime, lastTime
}

// updateArchivedTimeRange updates an archived time range. The caller must make
// sure that the fingerprint is currently archived (the time range will
// otherwise be added without the corresponding metric in the archive).
func (p *persistence) updateArchivedTimeRange(
	fp model.Fingerprint, first, last model.Time,
) error {
	return p.archivedFingerprintToTimeRange.Put(codable.Fingerprint(fp), codable.TimeRange{First: first, Last: last})
}

// fingerprintsModifiedBefore returns the fingerprints of archived timeseries
// that have live samples before the provided timestamp. This method is
// goroutine-safe.
func (p *persistence) fingerprintsModifiedBefore(beforeTime model.Time) ([]model.Fingerprint, error) {
	var fp codable.Fingerprint
	var tr codable.TimeRange
	fps := []model.Fingerprint{}
	err := p.archivedFingerprintToTimeRange.ForEach(func(kv index.KeyValueAccessor) error {
		if err := kv.Value(&tr); err != nil {
			return err
		}
		if tr.First.Before(beforeTime) {
			if err := kv.Key(&fp); err != nil {
				return err
			}
			fps = append(fps, model.Fingerprint(fp))
		}
		return nil
	})
	return fps, err
}

// archivedMetric retrieves the archived metric with the given fingerprint. This
// method is goroutine-safe.
func (p *persistence) archivedMetric(fp model.Fingerprint) (model.Metric, error) {
	metric, _, err := p.archivedFingerprintToMetrics.Lookup(fp)
	if err != nil {
		p.setDirty(fmt.Errorf("error in method archivedMetric(%v): %s", fp, err))
		return nil, err
	}
	return metric, nil
}

// purgeArchivedMetric deletes an archived fingerprint and its corresponding
// metric entirely. It also queues the metric for un-indexing (no need to call
// unindexMetric for the deleted metric.) It does not touch the series file,
// though. The caller must have locked the fingerprint.
func (p *persistence) purgeArchivedMetric(fp model.Fingerprint) (err error) {
	defer func() {
		if err != nil {
			p.setDirty(fmt.Errorf("error in method purgeArchivedMetric(%v): %s", fp, err))
		}
	}()

	metric, err := p.archivedMetric(fp)
	if err != nil || metric == nil {
		return err
	}
	deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
	if err != nil {
		return err
	}
	if !deleted {
		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToMetrics index. This should never happen.", fp)
	}
	deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
	if err != nil {
		return err
	}
	if !deleted {
		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
	}
	p.unindexMetric(fp, metric)
	return nil
}

// unarchiveMetric deletes an archived fingerprint and its metric, but (in
// contrast to purgeArchivedMetric) does not un-index the metric.  If a metric
// was actually deleted, the method returns true and the first time and last
// time of the deleted metric. The caller must have locked the fingerprint.
func (p *persistence) unarchiveMetric(fp model.Fingerprint) (deletedAnything bool, err error) {
	// An error returned here will bubble up and lead to quarantining of the
	// series, so no setDirty required.
	deleted, err := p.archivedFingerprintToMetrics.Delete(codable.Fingerprint(fp))
	if err != nil || !deleted {
		return false, err
	}
	deleted, err = p.archivedFingerprintToTimeRange.Delete(codable.Fingerprint(fp))
	if err != nil {
		return false, err
	}
	if !deleted {
		log.Errorf("Tried to delete non-archived fingerprint %s from archivedFingerprintToTimeRange index. This should never happen.", fp)
	}
	return true, nil
}

// close flushes the indexing queue and other buffered data and releases any
// held resources. It also removes the dirty marker file if successful and if
// the persistence is currently not marked as dirty.
func (p *persistence) close() error {
	close(p.indexingQueue)
	<-p.indexingStopped

	var lastError, dirtyFileRemoveError error
	if err := p.archivedFingerprintToMetrics.Close(); err != nil {
		lastError = err
		log.Error("Error closing archivedFingerprintToMetric index DB: ", err)
	}
	if err := p.archivedFingerprintToTimeRange.Close(); err != nil {
		lastError = err
		log.Error("Error closing archivedFingerprintToTimeRange index DB: ", err)
	}
	if err := p.labelPairToFingerprints.Close(); err != nil {
		lastError = err
		log.Error("Error closing labelPairToFingerprints index DB: ", err)
	}
	if err := p.labelNameToLabelValues.Close(); err != nil {
		lastError = err
		log.Error("Error closing labelNameToLabelValues index DB: ", err)
	}
	if lastError == nil && !p.isDirty() {
		dirtyFileRemoveError = os.Remove(p.dirtyFileName)
	}
	if err := p.fLock.Release(); err != nil {
		lastError = err
		log.Error("Error releasing file lock: ", err)
	}
	if dirtyFileRemoveError != nil {
		// On Windows, removing the dirty file before unlocking is not
		// possible.  So remove it here if it failed above.
		lastError = os.Remove(p.dirtyFileName)
	}
	return lastError
}

func (p *persistence) dirNameForFingerprint(fp model.Fingerprint) string {
	fpStr := fp.String()
	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen])
}

func (p *persistence) fileNameForFingerprint(fp model.Fingerprint) string {
	fpStr := fp.String()
	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesFileSuffix)
}

func (p *persistence) tempFileNameForFingerprint(fp model.Fingerprint) string {
	fpStr := fp.String()
	return filepath.Join(p.basePath, fpStr[0:seriesDirNameLen], fpStr[seriesDirNameLen:]+seriesTempFileSuffix)
}

func (p *persistence) openChunkFileForWriting(fp model.Fingerprint) (*os.File, error) {
	if err := os.MkdirAll(p.dirNameForFingerprint(fp), 0700); err != nil {
		return nil, err
	}
	return os.OpenFile(p.fileNameForFingerprint(fp), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0640)
	// NOTE: Although the file was opened for append,
	//     f.Seek(0, os.SEEK_CUR)
	// would now return '0, nil', so we cannot check for a consistent file length right now.
	// However, the chunkIndexForOffset function is doing that check, so a wrong file length
	// would still be detected.
}

// closeChunkFile first syncs the provided file if mandated so by the sync
// strategy. Then it closes the file. Errors are logged.
func (p *persistence) closeChunkFile(f *os.File) {
	if p.shouldSync() {
		if err := f.Sync(); err != nil {
			log.Error("Error syncing file:", err)
		}
	}
	if err := f.Close(); err != nil {
		log.Error("Error closing chunk file:", err)
	}
}

func (p *persistence) openChunkFileForReading(fp model.Fingerprint) (*os.File, error) {
	return os.Open(p.fileNameForFingerprint(fp))
}

func (p *persistence) headsFileName() string {
	return filepath.Join(p.basePath, headsFileName)
}

func (p *persistence) headsTempFileName() string {
	return filepath.Join(p.basePath, headsTempFileName)
}

func (p *persistence) mappingsFileName() string {
	return filepath.Join(p.basePath, mappingsFileName)
}

func (p *persistence) mappingsTempFileName() string {
	return filepath.Join(p.basePath, mappingsTempFileName)
}

func (p *persistence) processIndexingQueue() {
	batchSize := 0
	nameToValues := index.LabelNameLabelValuesMapping{}
	pairToFPs := index.LabelPairFingerprintsMapping{}
	batchTimeout := time.NewTimer(indexingBatchTimeout)
	defer batchTimeout.Stop()

	commitBatch := func() {
		p.indexingBatchSizes.Observe(float64(batchSize))
		defer func(begin time.Time) {
			p.indexingBatchDuration.Observe(time.Since(begin).Seconds())
		}(time.Now())

		if err := p.labelPairToFingerprints.IndexBatch(pairToFPs); err != nil {
			log.Error("Error indexing label pair to fingerprints batch: ", err)
		}
		if err := p.labelNameToLabelValues.IndexBatch(nameToValues); err != nil {
			log.Error("Error indexing label name to label values batch: ", err)
		}
		batchSize = 0
		nameToValues = index.LabelNameLabelValuesMapping{}
		pairToFPs = index.LabelPairFingerprintsMapping{}
		batchTimeout.Reset(indexingBatchTimeout)
	}

	var flush chan chan int
loop:
	for {
		// Only process flush requests if the queue is currently empty.
		if len(p.indexingQueue) == 0 {
			flush = p.indexingFlush
		} else {
			flush = nil
		}
		select {
		case <-batchTimeout.C:
			// Only commit if we have something to commit _and_
			// nothing is waiting in the queue to be picked up. That
			// prevents a death spiral if the LookupSet calls below
			// are slow for some reason.
			if batchSize > 0 && len(p.indexingQueue) == 0 {
				commitBatch()
			} else {
				batchTimeout.Reset(indexingBatchTimeout)
			}
		case r := <-flush:
			if batchSize > 0 {
				commitBatch()
			}
			r <- len(p.indexingQueue)
		case op, ok := <-p.indexingQueue:
			if !ok {
				if batchSize > 0 {
					commitBatch()
				}
				break loop
			}

			batchSize++
			for ln, lv := range op.metric {
				lp := model.LabelPair{Name: ln, Value: lv}
				baseFPs, ok := pairToFPs[lp]
				if !ok {
					var err error
					baseFPs, _, err = p.labelPairToFingerprints.LookupSet(lp)
					if err != nil {
						log.Errorf("Error looking up label pair %v: %s", lp, err)
						continue
					}
					pairToFPs[lp] = baseFPs
				}
				baseValues, ok := nameToValues[ln]
				if !ok {
					var err error
					baseValues, _, err = p.labelNameToLabelValues.LookupSet(ln)
					if err != nil {
						log.Errorf("Error looking up label name %v: %s", ln, err)
						continue
					}
					nameToValues[ln] = baseValues
				}
				switch op.opType {
				case add:
					baseFPs[op.fingerprint] = struct{}{}
					baseValues[lv] = struct{}{}
				case remove:
					delete(baseFPs, op.fingerprint)
					if len(baseFPs) == 0 {
						delete(baseValues, lv)
					}
				default:
					panic("unknown op type")
				}
			}

			if batchSize >= indexingMaxBatchSize {
				commitBatch()
			}
		}
	}
	close(p.indexingStopped)
}

// checkpointFPMappings persists the fingerprint mappings. The caller has to
// ensure that the provided mappings are not changed concurrently. This method
// is only called upon shutdown or during crash recovery, when no samples are
// ingested.
//
// Description of the file format, v1:
//
// (1) Magic string (const mappingsMagicString).
//
// (2) Uvarint-encoded format version (const mappingsFormatVersion).
//
// (3) Uvarint-encoded number of mappings in fpMappings.
//
// (4) Repeated once per mapping:
//
// (4.1) The raw fingerprint as big-endian uint64.
//
// (4.2) The uvarint-encoded number of sub-mappings for the raw fingerprint.
//
// (4.3) Repeated once per sub-mapping:
//
// (4.3.1) The uvarint-encoded length of the unique metric string.
// (4.3.2) The unique metric string.
// (4.3.3) The mapped fingerprint as big-endian uint64.
func (p *persistence) checkpointFPMappings(fpm fpMappings) (err error) {
	log.Info("Checkpointing fingerprint mappings...")
	begin := time.Now()
	f, err := os.OpenFile(p.mappingsTempFileName(), os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0640)
	if err != nil {
		return
	}

	defer func() {
		syncErr := f.Sync()
		closeErr := f.Close()
		if err != nil {
			return
		}
		err = syncErr
		if err != nil {
			return
		}
		err = closeErr
		if err != nil {
			return
		}
		err = os.Rename(p.mappingsTempFileName(), p.mappingsFileName())
		duration := time.Since(begin)
		log.Infof("Done checkpointing fingerprint mappings in %v.", duration)
	}()

	w := bufio.NewWriterSize(f, fileBufSize)

	if _, err = w.WriteString(mappingsMagicString); err != nil {
		return
	}
	if _, err = codable.EncodeUvarint(w, mappingsFormatVersion); err != nil {
		return
	}
	if _, err = codable.EncodeUvarint(w, uint64(len(fpm))); err != nil {
		return
	}

	for fp, mappings := range fpm {
		if err = codable.EncodeUint64(w, uint64(fp)); err != nil {
			return
		}
		if _, err = codable.EncodeUvarint(w, uint64(len(mappings))); err != nil {
			return
		}
		for ms, mappedFP := range mappings {
			if _, err = codable.EncodeUvarint(w, uint64(len(ms))); err != nil {
				return
			}
			if _, err = w.WriteString(ms); err != nil {
				return
			}
			if err = codable.EncodeUint64(w, uint64(mappedFP)); err != nil {
				return
			}
		}
	}
	err = w.Flush()
	return
}

// loadFPMappings loads the fingerprint mappings. It also returns the highest
// mapped fingerprint and any error encountered. If p.mappingsFileName is not
// found, the method returns (fpMappings{}, 0, nil). Do not call concurrently
// with checkpointFPMappings.
func (p *persistence) loadFPMappings() (fpMappings, model.Fingerprint, error) {
	fpm := fpMappings{}
	var highestMappedFP model.Fingerprint

	f, err := os.Open(p.mappingsFileName())
	if os.IsNotExist(err) {
		return fpm, 0, nil
	}
	if err != nil {
		return nil, 0, err
	}
	defer f.Close()
	r := bufio.NewReaderSize(f, fileBufSize)

	buf := make([]byte, len(mappingsMagicString))
	if _, err := io.ReadFull(r, buf); err != nil {
		return nil, 0, err
	}
	magic := string(buf)
	if magic != mappingsMagicString {
		return nil, 0, fmt.Errorf(
			"unexpected magic string, want %q, got %q",
			mappingsMagicString, magic,
		)
	}
	version, err := binary.ReadUvarint(r)
	if version != mappingsFormatVersion || err != nil {
		return nil, 0, fmt.Errorf("unknown fingerprint mappings format version, want %d", mappingsFormatVersion)
	}
	numRawFPs, err := binary.ReadUvarint(r)
	if err != nil {
		return nil, 0, err
	}
	for ; numRawFPs > 0; numRawFPs-- {
		rawFP, err := codable.DecodeUint64(r)
		if err != nil {
			return nil, 0, err
		}
		numMappings, err := binary.ReadUvarint(r)
		if err != nil {
			return nil, 0, err
		}
		mappings := make(map[string]model.Fingerprint, numMappings)
		for ; numMappings > 0; numMappings-- {
			lenMS, err := binary.ReadUvarint(r)
			if err != nil {
				return nil, 0, err
			}
			buf := make([]byte, lenMS)
			if _, err := io.ReadFull(r, buf); err != nil {
				return nil, 0, err
			}
			fp, err := codable.DecodeUint64(r)
			if err != nil {
				return nil, 0, err
			}
			mappedFP := model.Fingerprint(fp)
			if mappedFP > highestMappedFP {
				highestMappedFP = mappedFP
			}
			mappings[string(buf)] = mappedFP
		}
		fpm[model.Fingerprint(rawFP)] = mappings
	}
	return fpm, highestMappedFP, nil
}

func (p *persistence) writeChunks(w io.Writer, chunks []chunk) error {
	b := p.bufPool.Get().([]byte)
	defer func() {
		// buf may change below. An unwrapped 'defer p.bufPool.Put(buf)'
		// would only put back the original buf.
		p.bufPool.Put(b)
	}()

	for batchSize := chunkMaxBatchSize; len(chunks) > 0; chunks = chunks[batchSize:] {
		if batchSize > len(chunks) {
			batchSize = len(chunks)
		}
		writeSize := batchSize * chunkLenWithHeader
		if cap(b) < writeSize {
			b = make([]byte, writeSize)
		}
		b = b[:writeSize]

		for i, chunk := range chunks[:batchSize] {
			if err := writeChunkHeader(b[i*chunkLenWithHeader:], chunk); err != nil {
				return err
			}
			if err := chunk.marshalToBuf(b[i*chunkLenWithHeader+chunkHeaderLen:]); err != nil {
				return err
			}
		}
		if _, err := w.Write(b); err != nil {
			return err
		}
	}
	return nil
}

func offsetForChunkIndex(i int) int64 {
	return int64(i * chunkLenWithHeader)
}

func chunkIndexForOffset(offset int64) (int, error) {
	if int(offset)%chunkLenWithHeader != 0 {
		return -1, fmt.Errorf(
			"offset %d is not a multiple of on-disk chunk length %d",
			offset, chunkLenWithHeader,
		)
	}
	return int(offset) / chunkLenWithHeader, nil
}

func writeChunkHeader(header []byte, c chunk) error {
	header[chunkHeaderTypeOffset] = byte(c.encoding())
	binary.LittleEndian.PutUint64(
		header[chunkHeaderFirstTimeOffset:],
		uint64(c.firstTime()),
	)
	lt, err := c.newIterator().lastTimestamp()
	if err != nil {
		return err
	}
	binary.LittleEndian.PutUint64(
		header[chunkHeaderLastTimeOffset:],
		uint64(lt),
	)
	return nil
}