diff --git a/tsdb/head_test.go b/tsdb/head_test.go index f882516c36..3ef0e09e94 100644 --- a/tsdb/head_test.go +++ b/tsdb/head_test.go @@ -1309,6 +1309,60 @@ func TestMemSeries_append(t *testing.T) { } } +func TestMemSeries_appendHistogram(t *testing.T) { + dir := t.TempDir() + // This is usually taken from the Head, but passing manually here. + chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize) + require.NoError(t, err) + defer func() { + require.NoError(t, chunkDiskMapper.Close()) + }() + + s := newMemSeries(labels.Labels{}, 1, 500, nil, defaultIsolationDisabled) + + histograms := GenerateTestHistograms(4) + histogramWithOneMoreBucket := histograms[3].Copy() + histogramWithOneMoreBucket.Count++ + histogramWithOneMoreBucket.Sum += 1.23 + histogramWithOneMoreBucket.PositiveSpans[1].Length = 3 + histogramWithOneMoreBucket.PositiveBuckets = append(histogramWithOneMoreBucket.PositiveBuckets, 1) + + // Add first two samples at the very end of a chunk range and the next two + // on and after it. + // New chunk must correctly be cut at 1000. + ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper) + require.True(t, ok, "append failed") + require.True(t, chunkCreated, "first sample created chunk") + + ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper) + require.True(t, ok, "append failed") + require.False(t, chunkCreated, "second sample should use same chunk") + + ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper) + require.True(t, ok, "append failed") + require.True(t, chunkCreated, "expected new chunk on boundary") + + ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper) + require.True(t, ok, "append failed") + require.False(t, chunkCreated, "second sample should use same chunk") + + require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") + require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") + require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") + require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") + require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") + + ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper) + require.True(t, ok, "append failed") + require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk") + + require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") + require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") + require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") + require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") + require.Equal(t, int64(1002), s.headChunk.maxTime, "wrong chunk range") +} + func TestMemSeries_append_atVariableRate(t *testing.T) { const samplesPerChunk = 120 dir := t.TempDir() diff --git a/tsdb/querier.go b/tsdb/querier.go index 5609157951..d279e5080f 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -735,12 +735,33 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { var h *histogram.Histogram t, h = p.currDelIter.AtHistogram() p.curr.MinTime = t + app.AppendHistogram(t, h) for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() { if vt != chunkenc.ValHistogram { - panic(fmt.Errorf("found value type %v in histogram chunk", vt)) + err = fmt.Errorf("found value type %v in histogram chunk", vt) + break } t, h = p.currDelIter.AtHistogram() + + // Defend against corrupted chunks. + pI, nI, okToAppend, counterReset := app.(*chunkenc.HistogramAppender).Appendable(h) + if len(pI)+len(nI) > 0 { + err = fmt.Errorf( + "bucket layout has changed unexpectedly: %d positive and %d negative bucket interjections required", + len(pI), len(nI), + ) + break + } + if counterReset { + err = errors.New("detected unexpected counter reset in histogram") + break + } + if !okToAppend { + err = errors.New("unable to append histogram due to unexpected schema change") + break + } + app.AppendHistogram(t, h) } case chunkenc.ValFloat: @@ -754,7 +775,8 @@ func (p *populateWithDelChunkSeriesIterator) Next() bool { app.Append(t, v) for vt := p.currDelIter.Next(); vt != chunkenc.ValNone; vt = p.currDelIter.Next() { if vt != chunkenc.ValFloat { - panic(fmt.Errorf("found value type %v in float chunk", vt)) + err = fmt.Errorf("found value type %v in float chunk", vt) + break } t, v = p.currDelIter.At() app.Append(t, v)