mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-12 06:17:27 -08:00
cda025b5b5
* TSDB: demistify seriesRefs and ChunkRefs The TSDB package contains many types of series and chunk references, all shrouded in uint types. Often the same uint value may actually mean one of different types, in non-obvious ways. This PR aims to clarify the code and help navigating to relevant docs, usage, etc much quicker. Concretely: * Use appropriately named types and document their semantics and relations. * Make multiplexing and demuxing of types explicit (on the boundaries between concrete implementations and generic interfaces). * Casting between different types should be free. None of the changes should have any impact on how the code runs. TODO: Implement BlockSeriesRef where appropriate (for a future PR) Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * agent: demistify seriesRefs and ChunkRefs Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
450 lines
14 KiB
Go
450 lines
14 KiB
Go
// Copyright 2020 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package chunks
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"os"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/prometheus/prometheus/tsdb/chunkenc"
|
|
)
|
|
|
|
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
|
|
hrw := testChunkDiskMapper(t)
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
expectedBytes := []byte{}
|
|
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
|
|
chkCRC32 := newCRC32()
|
|
|
|
type expectedDataType struct {
|
|
seriesRef HeadSeriesRef
|
|
chunkRef ChunkDiskMapperRef
|
|
mint, maxt int64
|
|
numSamples uint16
|
|
chunk chunkenc.Chunk
|
|
}
|
|
expectedData := []expectedDataType{}
|
|
|
|
var buf [MaxHeadChunkMetaSize]byte
|
|
totalChunks := 0
|
|
var firstFileName string
|
|
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
|
|
addChunks := func(numChunks int) {
|
|
for i := 0; i < numChunks; i++ {
|
|
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
|
|
totalChunks++
|
|
expectedData = append(expectedData, expectedDataType{
|
|
seriesRef: seriesRef,
|
|
mint: mint,
|
|
maxt: maxt,
|
|
chunkRef: chkRef,
|
|
chunk: chunk,
|
|
numSamples: uint16(chunk.NumSamples()),
|
|
})
|
|
|
|
if hrw.curFileSequence != 1 {
|
|
// We are checking for bytes written only for the first file.
|
|
continue
|
|
}
|
|
|
|
// Calculating expected bytes written on disk for first file.
|
|
firstFileName = hrw.curFile.Name()
|
|
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
|
|
|
|
bytesWritten := 0
|
|
chkCRC32.Reset()
|
|
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(seriesRef))
|
|
bytesWritten += SeriesRefSize
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
|
|
bytesWritten += MintMaxtSize
|
|
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
|
|
bytesWritten += MintMaxtSize
|
|
buf[bytesWritten] = byte(chunk.Encoding())
|
|
bytesWritten += ChunkEncodingSize
|
|
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
|
|
bytesWritten += n
|
|
|
|
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
|
|
_, err := chkCRC32.Write(buf[:bytesWritten])
|
|
require.NoError(t, err)
|
|
expectedBytes = append(expectedBytes, chunk.Bytes()...)
|
|
_, err = chkCRC32.Write(chunk.Bytes())
|
|
require.NoError(t, err)
|
|
|
|
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
|
|
|
|
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
|
|
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
|
|
}
|
|
}
|
|
addChunks(100)
|
|
hrw.CutNewFile()
|
|
addChunks(10) // For chunks in in-memory buffer.
|
|
}
|
|
|
|
// Checking on-disk bytes for the first file.
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
|
|
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
|
|
|
|
actualBytes, err := ioutil.ReadFile(firstFileName)
|
|
require.NoError(t, err)
|
|
|
|
// Check header of the segment file.
|
|
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
|
|
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
|
|
|
|
// Remaining chunk data.
|
|
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
|
|
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
|
|
|
|
// Testing reading of chunks.
|
|
for _, exp := range expectedData {
|
|
actChunk, err := hrw.Chunk(exp.chunkRef)
|
|
require.NoError(t, err)
|
|
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
|
|
}
|
|
|
|
// Testing IterateAllChunks method.
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
|
|
idx := 0
|
|
require.NoError(t, hrw.IterateAllChunks(func(seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error {
|
|
t.Helper()
|
|
|
|
expData := expectedData[idx]
|
|
require.Equal(t, expData.seriesRef, seriesRef)
|
|
require.Equal(t, expData.chunkRef, chunkRef)
|
|
require.Equal(t, expData.maxt, maxt)
|
|
require.Equal(t, expData.maxt, maxt)
|
|
require.Equal(t, expData.numSamples, numSamples)
|
|
|
|
actChunk, err := hrw.Chunk(expData.chunkRef)
|
|
require.NoError(t, err)
|
|
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
|
|
|
|
idx++
|
|
return nil
|
|
}))
|
|
require.Equal(t, len(expectedData), idx)
|
|
}
|
|
|
|
// TestChunkDiskMapper_Truncate tests
|
|
// * If truncation is happening properly based on the time passed.
|
|
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
|
|
// * Empty current file does not lead to creation of another file after truncation.
|
|
// * Non-empty current file leads to creation of another file after truncation.
|
|
func TestChunkDiskMapper_Truncate(t *testing.T) {
|
|
hrw := testChunkDiskMapper(t)
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
timeRange := 0
|
|
fileTimeStep := 100
|
|
var thirdFileMinT, sixthFileMinT int64
|
|
|
|
addChunk := func() int {
|
|
mint := timeRange + 1 // Just after the new file cut.
|
|
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
|
|
|
|
// Write a chunks to set maxt for the segment.
|
|
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
|
require.NoError(t, err)
|
|
|
|
timeRange += fileTimeStep
|
|
|
|
return mint
|
|
}
|
|
|
|
verifyFiles := func(remainingFiles []int) {
|
|
t.Helper()
|
|
|
|
files, err := ioutil.ReadDir(hrw.dir.Name())
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(remainingFiles), len(files), "files on disk")
|
|
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
|
|
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
|
|
|
|
for _, i := range remainingFiles {
|
|
_, ok := hrw.mmappedChunkFiles[i]
|
|
require.Equal(t, true, ok)
|
|
}
|
|
}
|
|
|
|
// Create segments 1 to 7.
|
|
for i := 1; i <= 7; i++ {
|
|
require.NoError(t, hrw.CutNewFile())
|
|
mint := int64(addChunk())
|
|
if i == 3 {
|
|
thirdFileMinT = mint
|
|
} else if i == 6 {
|
|
sixthFileMinT = mint
|
|
}
|
|
}
|
|
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
|
|
|
|
// Truncating files.
|
|
require.NoError(t, hrw.Truncate(thirdFileMinT))
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8})
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Restarted.
|
|
var err error
|
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
|
|
require.False(t, hrw.fileMaxtSet)
|
|
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
|
|
require.True(t, hrw.fileMaxtSet)
|
|
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8})
|
|
// New file is created after restart even if last file was empty.
|
|
addChunk()
|
|
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
|
|
|
|
// Truncating files after restart.
|
|
require.NoError(t, hrw.Truncate(sixthFileMinT))
|
|
verifyFiles([]int{6, 7, 8, 9, 10})
|
|
|
|
// As the last file was empty, this creates no new files.
|
|
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
|
|
verifyFiles([]int{6, 7, 8, 9, 10})
|
|
addChunk()
|
|
|
|
// Truncating till current time should not delete the current active file.
|
|
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
|
|
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
|
|
}
|
|
|
|
// TestChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
|
|
// holes into the file sequence, even if there are empty files in between non-empty files.
|
|
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
|
|
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
|
|
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
|
|
hrw := testChunkDiskMapper(t)
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
timeRange := 0
|
|
addChunk := func() {
|
|
step := 100
|
|
mint, maxt := timeRange+1, timeRange+step-1
|
|
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
|
require.NoError(t, err)
|
|
timeRange += step
|
|
}
|
|
emptyFile := func() {
|
|
require.NoError(t, hrw.CutNewFile())
|
|
}
|
|
nonEmptyFile := func() {
|
|
emptyFile()
|
|
addChunk()
|
|
}
|
|
|
|
addChunk() // 1. Created with the first chunk.
|
|
nonEmptyFile() // 2.
|
|
nonEmptyFile() // 3.
|
|
emptyFile() // 4.
|
|
nonEmptyFile() // 5.
|
|
emptyFile() // 6.
|
|
|
|
verifyFiles := func(remainingFiles []int) {
|
|
t.Helper()
|
|
|
|
files, err := ioutil.ReadDir(hrw.dir.Name())
|
|
require.NoError(t, err)
|
|
require.Equal(t, len(remainingFiles), len(files), "files on disk")
|
|
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
|
|
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
|
|
|
|
for _, i := range remainingFiles {
|
|
_, ok := hrw.mmappedChunkFiles[i]
|
|
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
|
|
}
|
|
}
|
|
|
|
verifyFiles([]int{1, 2, 3, 4, 5, 6})
|
|
|
|
// Truncating files till 2. It should not delete anything after 3 (inclusive)
|
|
// though files 4 and 6 are empty.
|
|
file2Maxt := hrw.mmappedChunkFiles[2].maxt
|
|
require.NoError(t, hrw.Truncate(file2Maxt+1))
|
|
// As 6 was empty, it should not create another file.
|
|
verifyFiles([]int{3, 4, 5, 6})
|
|
|
|
addChunk()
|
|
// Truncate creates another file as 6 is not empty now.
|
|
require.NoError(t, hrw.Truncate(file2Maxt+1))
|
|
verifyFiles([]int{3, 4, 5, 6, 7})
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Restarting checks for unsequential files.
|
|
var err error
|
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
verifyFiles([]int{3, 4, 5, 6, 7})
|
|
}
|
|
|
|
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
|
|
// https://github.com/prometheus/prometheus/issues/7753
|
|
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
|
|
hrw := testChunkDiskMapper(t)
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
// Write a chunks to iterate on it later.
|
|
_, err := hrw.WriteChunk(1, 0, 1000, randomChunk(t))
|
|
require.NoError(t, err)
|
|
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
|
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
|
|
// Forcefully failing IterateAllChunks.
|
|
require.Error(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error {
|
|
return errors.New("random error")
|
|
}))
|
|
|
|
// Truncation call should not return error after IterateAllChunks fails.
|
|
require.NoError(t, hrw.Truncate(2000))
|
|
}
|
|
|
|
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
|
|
hrw := testChunkDiskMapper(t)
|
|
defer func() {
|
|
require.NoError(t, hrw.Close())
|
|
}()
|
|
|
|
timeRange := 0
|
|
addChunk := func() {
|
|
step := 100
|
|
mint, maxt := timeRange+1, timeRange+step-1
|
|
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
|
|
require.NoError(t, err)
|
|
timeRange += step
|
|
}
|
|
nonEmptyFile := func() {
|
|
require.NoError(t, hrw.CutNewFile())
|
|
addChunk()
|
|
}
|
|
|
|
addChunk() // 1. Created with the first chunk.
|
|
nonEmptyFile() // 2.
|
|
nonEmptyFile() // 3.
|
|
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
|
|
lastFile := 0
|
|
for idx := range hrw.mmappedChunkFiles {
|
|
if idx > lastFile {
|
|
lastFile = idx
|
|
}
|
|
}
|
|
require.Equal(t, 3, lastFile)
|
|
dir := hrw.dir.Name()
|
|
require.NoError(t, hrw.Close())
|
|
|
|
// Write an empty last file mimicking an abrupt shutdown on file creation.
|
|
emptyFileName := segmentFile(dir, lastFile+1)
|
|
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0o666)
|
|
require.NoError(t, err)
|
|
require.NoError(t, f.Sync())
|
|
stat, err := f.Stat()
|
|
require.NoError(t, err)
|
|
require.Equal(t, int64(0), stat.Size())
|
|
require.NoError(t, f.Close())
|
|
|
|
// Open chunk disk mapper again, corrupt file should be removed.
|
|
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
require.False(t, hrw.fileMaxtSet)
|
|
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
|
|
require.True(t, hrw.fileMaxtSet)
|
|
|
|
// Removed from memory.
|
|
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
|
|
for idx := range hrw.mmappedChunkFiles {
|
|
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
|
|
}
|
|
|
|
// Removed even from disk.
|
|
files, err := ioutil.ReadDir(dir)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, len(files))
|
|
for _, fi := range files {
|
|
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
|
|
require.NoError(t, err)
|
|
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
|
|
}
|
|
}
|
|
|
|
func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper {
|
|
tmpdir, err := ioutil.TempDir("", "data")
|
|
require.NoError(t, err)
|
|
t.Cleanup(func() {
|
|
require.NoError(t, os.RemoveAll(tmpdir))
|
|
})
|
|
|
|
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
|
|
require.NoError(t, err)
|
|
require.False(t, hrw.fileMaxtSet)
|
|
require.NoError(t, hrw.IterateAllChunks(func(_ HeadSeriesRef, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
|
|
require.True(t, hrw.fileMaxtSet)
|
|
return hrw
|
|
}
|
|
|
|
func randomChunk(t *testing.T) chunkenc.Chunk {
|
|
chunk := chunkenc.NewXORChunk()
|
|
len := rand.Int() % 120
|
|
app, err := chunk.Appender()
|
|
require.NoError(t, err)
|
|
for i := 0; i < len; i++ {
|
|
app.Append(rand.Int63(), rand.Float64())
|
|
}
|
|
return chunk
|
|
}
|
|
|
|
func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef HeadSeriesRef, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
|
|
var err error
|
|
seriesRef = HeadSeriesRef(rand.Int63())
|
|
mint = int64((idx)*1000 + 1)
|
|
maxt = int64((idx + 1) * 1000)
|
|
chunk = randomChunk(t)
|
|
chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk)
|
|
require.NoError(t, err)
|
|
return
|
|
}
|