prometheus/tsdb/chunks/head_chunks_test.go
Dieter Plaetinck d5afe0a577
TSDB: Use a dedicated head chunk reference type (#9501)
* Use dedicated Ref type

Throughout the code base, there are reference types masked as
regular integers.  Let's use dedicated types.  They are
equivalent, but clearer semantically.
This also makes it trivial to find where they are used,
and from uses, find the centralized docs.

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* postpone some work until after possible return

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* clarify

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* rename feedback

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>

* skip header is up to caller

Signed-off-by: Dieter Plaetinck <dieter@grafana.com>
2021-10-13 17:44:32 +05:30

451 lines
14 KiB
Go

// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunks
import (
"encoding/binary"
"errors"
"io/ioutil"
"math/rand"
"os"
"strconv"
"testing"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
func TestChunkDiskMapper_WriteChunk_Chunk_IterateChunks(t *testing.T) {
hrw := testChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
expectedBytes := []byte{}
nextChunkOffset := uint64(HeadChunkFileHeaderSize)
chkCRC32 := newCRC32()
type expectedDataType struct {
seriesRef uint64
chunkRef ChunkDiskMapperRef
mint, maxt int64
numSamples uint16
chunk chunkenc.Chunk
}
expectedData := []expectedDataType{}
var buf [MaxHeadChunkMetaSize]byte
totalChunks := 0
var firstFileName string
for hrw.curFileSequence < 3 || hrw.chkWriter.Buffered() == 0 {
addChunks := func(numChunks int) {
for i := 0; i < numChunks; i++ {
seriesRef, chkRef, mint, maxt, chunk := createChunk(t, totalChunks, hrw)
totalChunks++
expectedData = append(expectedData, expectedDataType{
seriesRef: seriesRef,
mint: mint,
maxt: maxt,
chunkRef: chkRef,
chunk: chunk,
numSamples: uint16(chunk.NumSamples()),
})
if hrw.curFileSequence != 1 {
// We are checking for bytes written only for the first file.
continue
}
// Calculating expected bytes written on disk for first file.
firstFileName = hrw.curFile.Name()
require.Equal(t, newChunkDiskMapperRef(1, nextChunkOffset), chkRef)
bytesWritten := 0
chkCRC32.Reset()
binary.BigEndian.PutUint64(buf[bytesWritten:], seriesRef)
bytesWritten += SeriesRefSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(mint))
bytesWritten += MintMaxtSize
binary.BigEndian.PutUint64(buf[bytesWritten:], uint64(maxt))
bytesWritten += MintMaxtSize
buf[bytesWritten] = byte(chunk.Encoding())
bytesWritten += ChunkEncodingSize
n := binary.PutUvarint(buf[bytesWritten:], uint64(len(chunk.Bytes())))
bytesWritten += n
expectedBytes = append(expectedBytes, buf[:bytesWritten]...)
_, err := chkCRC32.Write(buf[:bytesWritten])
require.NoError(t, err)
expectedBytes = append(expectedBytes, chunk.Bytes()...)
_, err = chkCRC32.Write(chunk.Bytes())
require.NoError(t, err)
expectedBytes = append(expectedBytes, chkCRC32.Sum(nil)...)
// += seriesRef, mint, maxt, encoding, chunk data len, chunk data, CRC.
nextChunkOffset += SeriesRefSize + 2*MintMaxtSize + ChunkEncodingSize + uint64(n) + uint64(len(chunk.Bytes())) + CRCSize
}
}
addChunks(100)
hrw.CutNewFile()
addChunks(10) // For chunks in in-memory buffer.
}
// Checking on-disk bytes for the first file.
require.Equal(t, 3, len(hrw.mmappedChunkFiles), "expected 3 mmapped files, got %d", len(hrw.mmappedChunkFiles))
require.Equal(t, len(hrw.mmappedChunkFiles), len(hrw.closers))
actualBytes, err := ioutil.ReadFile(firstFileName)
require.NoError(t, err)
// Check header of the segment file.
require.Equal(t, MagicHeadChunks, int(binary.BigEndian.Uint32(actualBytes[0:MagicChunksSize])))
require.Equal(t, chunksFormatV1, int(actualBytes[MagicChunksSize]))
// Remaining chunk data.
fileEnd := HeadChunkFileHeaderSize + len(expectedBytes)
require.Equal(t, expectedBytes, actualBytes[HeadChunkFileHeaderSize:fileEnd])
// Testing reading of chunks.
for _, exp := range expectedData {
actChunk, err := hrw.Chunk(exp.chunkRef)
require.NoError(t, err)
require.Equal(t, exp.chunk.Bytes(), actChunk.Bytes())
}
// Testing IterateAllChunks method.
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
idx := 0
require.NoError(t, hrw.IterateAllChunks(func(seriesRef uint64, chunkRef ChunkDiskMapperRef, mint, maxt int64, numSamples uint16) error {
t.Helper()
expData := expectedData[idx]
require.Equal(t, expData.seriesRef, seriesRef)
require.Equal(t, expData.chunkRef, chunkRef)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.maxt, maxt)
require.Equal(t, expData.numSamples, numSamples)
actChunk, err := hrw.Chunk(expData.chunkRef)
require.NoError(t, err)
require.Equal(t, expData.chunk.Bytes(), actChunk.Bytes())
idx++
return nil
}))
require.Equal(t, len(expectedData), idx)
}
// TestChunkDiskMapper_Truncate tests
// * If truncation is happening properly based on the time passed.
// * The active file is not deleted even if the passed time makes it eligible to be deleted.
// * Empty current file does not lead to creation of another file after truncation.
// * Non-empty current file leads to creation of another file after truncation.
func TestChunkDiskMapper_Truncate(t *testing.T) {
hrw := testChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
fileTimeStep := 100
var thirdFileMinT, sixthFileMinT int64
addChunk := func() int {
mint := timeRange + 1 // Just after the new file cut.
maxt := timeRange + fileTimeStep - 1 // Just before the next file.
// Write a chunks to set maxt for the segment.
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
require.NoError(t, err)
timeRange += fileTimeStep
return mint
}
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.Equal(t, true, ok)
}
}
// Create segments 1 to 7.
for i := 1; i <= 7; i++ {
require.NoError(t, hrw.CutNewFile())
mint := int64(addChunk())
if i == 3 {
thirdFileMinT = mint
} else if i == 6 {
sixthFileMinT = mint
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6, 7})
// Truncating files.
require.NoError(t, hrw.Truncate(thirdFileMinT))
verifyFiles([]int{3, 4, 5, 6, 7, 8})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarted.
var err error
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
verifyFiles([]int{3, 4, 5, 6, 7, 8})
// New file is created after restart even if last file was empty.
addChunk()
verifyFiles([]int{3, 4, 5, 6, 7, 8, 9})
// Truncating files after restart.
require.NoError(t, hrw.Truncate(sixthFileMinT))
verifyFiles([]int{6, 7, 8, 9, 10})
// As the last file was empty, this creates no new files.
require.NoError(t, hrw.Truncate(sixthFileMinT+1))
verifyFiles([]int{6, 7, 8, 9, 10})
addChunk()
// Truncating till current time should not delete the current active file.
require.NoError(t, hrw.Truncate(int64(timeRange+(2*fileTimeStep))))
verifyFiles([]int{10, 11}) // One file is the previously active file and one currently created.
}
// TestChunkDiskMapper_Truncate_PreservesFileSequence tests that truncation doesn't poke
// holes into the file sequence, even if there are empty files in between non-empty files.
// This test exposes https://github.com/prometheus/prometheus/issues/7412 where the truncation
// simply deleted all empty files instead of stopping once it encountered a non-empty file.
func TestChunkDiskMapper_Truncate_PreservesFileSequence(t *testing.T) {
hrw := testChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
require.NoError(t, err)
timeRange += step
}
emptyFile := func() {
require.NoError(t, hrw.CutNewFile())
}
nonEmptyFile := func() {
emptyFile()
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
emptyFile() // 4.
nonEmptyFile() // 5.
emptyFile() // 6.
verifyFiles := func(remainingFiles []int) {
t.Helper()
files, err := ioutil.ReadDir(hrw.dir.Name())
require.NoError(t, err)
require.Equal(t, len(remainingFiles), len(files), "files on disk")
require.Equal(t, len(remainingFiles), len(hrw.mmappedChunkFiles), "hrw.mmappedChunkFiles")
require.Equal(t, len(remainingFiles), len(hrw.closers), "closers")
for _, i := range remainingFiles {
_, ok := hrw.mmappedChunkFiles[i]
require.True(t, ok, "remaining file %d not in hrw.mmappedChunkFiles", i)
}
}
verifyFiles([]int{1, 2, 3, 4, 5, 6})
// Truncating files till 2. It should not delete anything after 3 (inclusive)
// though files 4 and 6 are empty.
file2Maxt := hrw.mmappedChunkFiles[2].maxt
require.NoError(t, hrw.Truncate(file2Maxt+1))
// As 6 was empty, it should not create another file.
verifyFiles([]int{3, 4, 5, 6})
addChunk()
// Truncate creates another file as 6 is not empty now.
require.NoError(t, hrw.Truncate(file2Maxt+1))
verifyFiles([]int{3, 4, 5, 6, 7})
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting checks for unsequential files.
var err error
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
verifyFiles([]int{3, 4, 5, 6, 7})
}
// TestHeadReadWriter_TruncateAfterIterateChunksError tests for
// https://github.com/prometheus/prometheus/issues/7753
func TestHeadReadWriter_TruncateAfterFailedIterateChunks(t *testing.T) {
hrw := testChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
// Write a chunks to iterate on it later.
_, err := hrw.WriteChunk(1, 0, 1000, randomChunk(t))
require.NoError(t, err)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Restarting to recreate https://github.com/prometheus/prometheus/issues/7753.
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
// Forcefully failing IterateAllChunks.
require.Error(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error {
return errors.New("random error")
}))
// Truncation call should not return error after IterateAllChunks fails.
require.NoError(t, hrw.Truncate(2000))
}
func TestHeadReadWriter_ReadRepairOnEmptyLastFile(t *testing.T) {
hrw := testChunkDiskMapper(t)
defer func() {
require.NoError(t, hrw.Close())
}()
timeRange := 0
addChunk := func() {
step := 100
mint, maxt := timeRange+1, timeRange+step-1
_, err := hrw.WriteChunk(1, int64(mint), int64(maxt), randomChunk(t))
require.NoError(t, err)
timeRange += step
}
nonEmptyFile := func() {
require.NoError(t, hrw.CutNewFile())
addChunk()
}
addChunk() // 1. Created with the first chunk.
nonEmptyFile() // 2.
nonEmptyFile() // 3.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
lastFile := 0
for idx := range hrw.mmappedChunkFiles {
if idx > lastFile {
lastFile = idx
}
}
require.Equal(t, 3, lastFile)
dir := hrw.dir.Name()
require.NoError(t, hrw.Close())
// Write an empty last file mimicking an abrupt shutdown on file creation.
emptyFileName := segmentFile(dir, lastFile+1)
f, err := os.OpenFile(emptyFileName, os.O_WRONLY|os.O_CREATE, 0666)
require.NoError(t, err)
require.NoError(t, f.Sync())
stat, err := f.Stat()
require.NoError(t, err)
require.Equal(t, int64(0), stat.Size())
require.NoError(t, f.Close())
// Open chunk disk mapper again, corrupt file should be removed.
hrw, err = NewChunkDiskMapper(dir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
// Removed from memory.
require.Equal(t, 3, len(hrw.mmappedChunkFiles))
for idx := range hrw.mmappedChunkFiles {
require.LessOrEqual(t, idx, lastFile, "file index is bigger than previous last file")
}
// Removed even from disk.
files, err := ioutil.ReadDir(dir)
require.NoError(t, err)
require.Equal(t, 3, len(files))
for _, fi := range files {
seq, err := strconv.ParseUint(fi.Name(), 10, 64)
require.NoError(t, err)
require.LessOrEqual(t, seq, uint64(lastFile), "file index on disk is bigger than previous last file")
}
}
func testChunkDiskMapper(t *testing.T) *ChunkDiskMapper {
tmpdir, err := ioutil.TempDir("", "data")
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, os.RemoveAll(tmpdir))
})
hrw, err := NewChunkDiskMapper(tmpdir, chunkenc.NewPool(), DefaultWriteBufferSize)
require.NoError(t, err)
require.False(t, hrw.fileMaxtSet)
require.NoError(t, hrw.IterateAllChunks(func(_ uint64, _ ChunkDiskMapperRef, _, _ int64, _ uint16) error { return nil }))
require.True(t, hrw.fileMaxtSet)
return hrw
}
func randomChunk(t *testing.T) chunkenc.Chunk {
chunk := chunkenc.NewXORChunk()
len := rand.Int() % 120
app, err := chunk.Appender()
require.NoError(t, err)
for i := 0; i < len; i++ {
app.Append(rand.Int63(), rand.Float64())
}
return chunk
}
func createChunk(t *testing.T, idx int, hrw *ChunkDiskMapper) (seriesRef uint64, chunkRef ChunkDiskMapperRef, mint, maxt int64, chunk chunkenc.Chunk) {
var err error
seriesRef = uint64(rand.Int63())
mint = int64((idx)*1000 + 1)
maxt = int64((idx + 1) * 1000)
chunk = randomChunk(t)
chunkRef, err = hrw.WriteChunk(seriesRef, mint, maxt, chunk)
require.NoError(t, err)
return
}