2017-04-10 11:59:45 -07:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-11-30 06:34:49 -08:00
package chunks
2017-03-07 03:47:49 -08:00
import (
"bufio"
2019-12-03 23:37:49 -08:00
"bytes"
2017-03-07 03:47:49 -08:00
"encoding/binary"
"fmt"
2017-04-25 07:45:44 -07:00
"hash"
2017-11-30 06:34:49 -08:00
"hash/crc32"
2017-03-07 03:47:49 -08:00
"io"
2017-11-30 06:34:49 -08:00
"io/ioutil"
2017-03-07 03:47:49 -08:00
"os"
2017-11-30 06:34:49 -08:00
"path/filepath"
"strconv"
2017-03-07 03:47:49 -08:00
"github.com/pkg/errors"
2019-08-13 01:34:14 -07:00
"github.com/prometheus/prometheus/tsdb/chunkenc"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
2017-03-07 03:47:49 -08:00
)
2019-12-03 23:37:49 -08:00
// Segment header fields constants.
2017-03-07 03:47:49 -08:00
const (
2017-04-28 06:41:42 -07:00
// MagicChunks is 4 bytes at the head of a series file.
2017-03-07 03:47:49 -08:00
MagicChunks = 0x85BD40DD
2019-03-24 13:33:08 -07:00
// MagicChunksSize is the size in bytes of MagicChunks.
2019-12-03 23:37:49 -08:00
MagicChunksSize = 4
chunksFormatV1 = 1
ChunksFormatVersionSize = 1
segmentHeaderPaddingSize = 3
// SegmentHeaderSize defines the total size of the header part.
SegmentHeaderSize = MagicChunksSize + ChunksFormatVersionSize + segmentHeaderPaddingSize
)
2019-03-24 13:33:08 -07:00
2019-12-03 23:37:49 -08:00
// Chunk fields constants.
const (
// MaxChunkLengthFieldSize defines the maximum size of the data length part.
MaxChunkLengthFieldSize = binary . MaxVarintLen32
// ChunkEncodingSize defines the size of the chunk encoding part.
ChunkEncodingSize = 1
2017-03-07 03:47:49 -08:00
)
2017-11-30 06:34:49 -08:00
// Meta holds information about a chunk of data.
type Meta struct {
2017-03-07 03:47:49 -08:00
// Ref and Chunk hold either a reference that can be used to retrieve
// chunk data or the data itself.
2019-12-03 23:37:49 -08:00
// When it is a reference it is the segment offset at which the chunk bytes start.
2017-03-07 03:47:49 -08:00
// Generally, only one of them is set.
Ref uint64
2017-11-30 06:34:49 -08:00
Chunk chunkenc . Chunk
2017-03-07 03:47:49 -08:00
2019-07-03 03:47:31 -07:00
// Time range the data covers.
// When MaxTime == math.MaxInt64 the chunk is still open and being appended to.
MinTime , MaxTime int64
2017-05-14 02:06:26 -07:00
}
2020-03-24 13:15:47 -07:00
// Iterator iterates over the chunk of a time series.
type Iterator interface {
// At returns the current meta.
// It depends on implementation if the chunk is populated or not.
At ( ) Meta
// Next advances the iterator by one.
Next ( ) bool
// Err returns optional error if Next is false.
Err ( ) error
}
2017-05-16 00:13:33 -07:00
// writeHash writes the chunk encoding and raw data into the provided hash.
2019-07-12 06:12:34 -07:00
func ( cm * Meta ) writeHash ( h hash . Hash , buf [ ] byte ) error {
buf = append ( buf [ : 0 ] , byte ( cm . Chunk . Encoding ( ) ) )
if _ , err := h . Write ( buf [ : 1 ] ) ; err != nil {
2017-05-16 00:13:33 -07:00
return err
}
if _ , err := h . Write ( cm . Chunk . Bytes ( ) ) ; err != nil {
return err
}
return nil
}
2019-03-25 01:17:28 -07:00
// OverlapsClosedInterval Returns true if the chunk overlaps [mint, maxt].
2018-07-02 01:23:36 -07:00
func ( cm * Meta ) OverlapsClosedInterval ( mint , maxt int64 ) bool {
// The chunk itself is a closed interval [cm.MinTime, cm.MaxTime].
return cm . MinTime <= maxt && mint <= cm . MaxTime
}
2017-11-30 06:34:49 -08:00
var (
2019-01-04 06:45:50 -08:00
errInvalidSize = fmt . Errorf ( "invalid size" )
2017-11-30 06:34:49 -08:00
)
2017-05-14 02:06:26 -07:00
2017-11-30 06:34:49 -08:00
var castagnoliTable * crc32 . Table
2017-05-14 02:06:26 -07:00
2017-11-30 06:34:49 -08:00
func init ( ) {
castagnoliTable = crc32 . MakeTable ( crc32 . Castagnoli )
2017-05-14 02:06:26 -07:00
}
2017-11-30 06:34:49 -08:00
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// polynomial may be easily changed in one location at a later time, if necessary.
func newCRC32 ( ) hash . Hash32 {
return crc32 . New ( castagnoliTable )
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
// Writer implements the ChunkWriter interface for the standard
2017-03-07 03:47:49 -08:00
// serialization format.
2017-11-30 06:34:49 -08:00
type Writer struct {
2017-03-07 03:47:49 -08:00
dirFile * os . File
files [ ] * os . File
wbuf * bufio . Writer
n int64
2017-04-28 05:17:53 -07:00
crc32 hash . Hash
2019-07-12 06:12:34 -07:00
buf [ binary . MaxVarintLen32 ] byte
2017-03-07 03:47:49 -08:00
segmentSize int64
}
const (
2019-12-03 23:37:49 -08:00
// DefaultChunkSegmentSize is the default chunks segment size.
DefaultChunkSegmentSize = 512 * 1024 * 1024
2017-03-07 03:47:49 -08:00
)
2019-12-03 23:37:49 -08:00
// NewWriterWithSegSize returns a new writer against the given directory
// and allows setting a custom size for the segments.
func NewWriterWithSegSize ( dir string , segmentSize int64 ) ( * Writer , error ) {
return newWriter ( dir , segmentSize )
}
// NewWriter returns a new writer against the given directory
// using the default segment size.
2017-11-30 06:34:49 -08:00
func NewWriter ( dir string ) ( * Writer , error ) {
2019-12-03 23:37:49 -08:00
return newWriter ( dir , DefaultChunkSegmentSize )
}
func newWriter ( dir string , segmentSize int64 ) ( * Writer , error ) {
if segmentSize <= 0 {
segmentSize = DefaultChunkSegmentSize
}
2017-03-07 03:47:49 -08:00
if err := os . MkdirAll ( dir , 0777 ) ; err != nil {
return nil , err
}
dirFile , err := fileutil . OpenDir ( dir )
if err != nil {
return nil , err
}
2019-12-03 23:37:49 -08:00
return & Writer {
2017-03-07 03:47:49 -08:00
dirFile : dirFile ,
n : 0 ,
2017-08-26 09:04:00 -07:00
crc32 : newCRC32 ( ) ,
2019-12-03 23:37:49 -08:00
segmentSize : segmentSize ,
} , nil
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
func ( w * Writer ) tail ( ) * os . File {
2017-03-07 03:47:49 -08:00
if len ( w . files ) == 0 {
return nil
}
return w . files [ len ( w . files ) - 1 ]
}
// finalizeTail writes all pending data to the current tail file,
// truncates its size, and closes it.
2017-11-30 06:34:49 -08:00
func ( w * Writer ) finalizeTail ( ) error {
2017-03-07 03:47:49 -08:00
tf := w . tail ( )
if tf == nil {
return nil
}
if err := w . wbuf . Flush ( ) ; err != nil {
return err
}
2019-04-03 01:16:54 -07:00
if err := tf . Sync ( ) ; err != nil {
2017-03-07 03:47:49 -08:00
return err
}
// As the file was pre-allocated, we truncate any superfluous zero bytes.
2018-03-21 13:39:43 -07:00
off , err := tf . Seek ( 0 , io . SeekCurrent )
2017-03-07 03:47:49 -08:00
if err != nil {
return err
}
if err := tf . Truncate ( off ) ; err != nil {
return err
}
2017-10-31 07:37:41 -07:00
2017-03-07 03:47:49 -08:00
return tf . Close ( )
}
2017-11-30 06:34:49 -08:00
func ( w * Writer ) cut ( ) error {
2017-03-07 03:47:49 -08:00
// Sync current tail to disk and close.
2017-04-28 06:59:23 -07:00
if err := w . finalizeTail ( ) ; err != nil {
return err
}
2017-03-07 03:47:49 -08:00
2020-03-19 09:33:44 -07:00
n , f , _ , err := cutSegmentFile ( w . dirFile , MagicChunks , chunksFormatV1 , w . segmentSize )
2017-03-07 03:47:49 -08:00
if err != nil {
return err
}
2020-02-05 05:39:40 -08:00
w . n = int64 ( n )
w . files = append ( w . files , f )
if w . wbuf != nil {
w . wbuf . Reset ( f )
} else {
w . wbuf = bufio . NewWriterSize ( f , 8 * 1024 * 1024 )
}
return nil
}
2020-07-14 22:48:48 -07:00
func cutSegmentFile ( dirFile * os . File , magicNumber uint32 , chunksFormat byte , allocSize int64 ) ( headerSize int , newFile * os . File , seq int , returnErr error ) {
2020-02-05 05:39:40 -08:00
p , seq , err := nextSequenceFile ( dirFile . Name ( ) )
if err != nil {
2020-07-14 02:29:28 -07:00
return 0 , nil , 0 , errors . Wrap ( err , "next sequence file" )
2020-02-05 05:39:40 -08:00
}
2020-07-14 02:29:28 -07:00
ptmp := p + ".tmp"
f , err := os . OpenFile ( ptmp , os . O_WRONLY | os . O_CREATE , 0666 )
2017-03-07 03:47:49 -08:00
if err != nil {
2020-07-14 02:29:28 -07:00
return 0 , nil , 0 , errors . Wrap ( err , "open temp file" )
2017-03-07 03:47:49 -08:00
}
2020-07-14 22:48:48 -07:00
defer func ( ) {
if returnErr != nil {
var merr tsdb_errors . MultiError
merr . Add ( returnErr )
if f != nil {
merr . Add ( f . Close ( ) )
}
// Calling RemoveAll on a non-existent file does not return error.
merr . Add ( os . RemoveAll ( ptmp ) )
returnErr = merr . Err ( )
}
} ( )
2020-03-19 09:33:44 -07:00
if allocSize > 0 {
if err = fileutil . Preallocate ( f , allocSize , true ) ; err != nil {
2020-07-14 02:29:28 -07:00
return 0 , nil , 0 , errors . Wrap ( err , "preallocate" )
2020-02-05 05:39:40 -08:00
}
2017-03-07 03:47:49 -08:00
}
2020-02-05 05:39:40 -08:00
if err = dirFile . Sync ( ) ; err != nil {
2020-07-14 02:29:28 -07:00
return 0 , nil , 0 , errors . Wrap ( err , "sync directory" )
2017-03-07 03:47:49 -08:00
}
// Write header metadata for new file.
2019-12-03 23:37:49 -08:00
metab := make ( [ ] byte , SegmentHeaderSize )
2020-03-19 09:33:44 -07:00
binary . BigEndian . PutUint32 ( metab [ : MagicChunksSize ] , magicNumber )
2020-02-05 05:39:40 -08:00
metab [ 4 ] = chunksFormat
2017-03-07 03:47:49 -08:00
2019-12-03 23:37:49 -08:00
n , err := f . Write ( metab )
if err != nil {
2020-07-14 02:29:28 -07:00
return 0 , nil , 0 , errors . Wrap ( err , "write header" )
}
if err := f . Close ( ) ; err != nil {
return 0 , nil , 0 , errors . Wrap ( err , "close temp file" )
}
2020-07-14 22:48:48 -07:00
f = nil
2020-07-14 02:29:28 -07:00
if err := fileutil . Rename ( ptmp , p ) ; err != nil {
return 0 , nil , 0 , errors . Wrap ( err , "replace file" )
}
f , err = os . OpenFile ( p , os . O_WRONLY , 0666 )
if err != nil {
return 0 , nil , 0 , errors . Wrap ( err , "open final file" )
}
2020-07-14 22:48:48 -07:00
// Skip header for further writes.
2020-07-14 02:29:28 -07:00
if _ , err := f . Seek ( int64 ( n ) , 0 ) ; err != nil {
return 0 , nil , 0 , errors . Wrap ( err , "seek in final file" )
2017-03-07 03:47:49 -08:00
}
2020-02-05 05:39:40 -08:00
return n , f , seq , nil
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
func ( w * Writer ) write ( b [ ] byte ) error {
2017-04-24 08:10:12 -07:00
n , err := w . wbuf . Write ( b )
2017-03-07 03:47:49 -08:00
w . n += int64 ( n )
return err
}
2019-02-14 05:29:41 -08:00
// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks ( chks [ ] Meta ) ( [ ] Meta , error ) {
if len ( chks ) < 2 {
return chks , nil
}
newChks := make ( [ ] Meta , 0 , len ( chks ) ) // Will contain the merged chunks.
newChks = append ( newChks , chks [ 0 ] )
last := 0
for _ , c := range chks [ 1 : ] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c . MinTime > newChks [ last ] . MaxTime {
newChks = append ( newChks , c )
2019-07-03 03:47:31 -07:00
last ++
2019-02-14 05:29:41 -08:00
continue
}
nc := & newChks [ last ]
if c . MaxTime > nc . MaxTime {
nc . MaxTime = c . MaxTime
}
chk , err := MergeChunks ( nc . Chunk , c . Chunk )
if err != nil {
return nil , err
}
nc . Chunk = chk
}
return newChks , nil
}
// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks ( a , b chunkenc . Chunk ) ( * chunkenc . XORChunk , error ) {
newChunk := chunkenc . NewXORChunk ( )
app , err := newChunk . Appender ( )
if err != nil {
return nil , err
}
2019-07-09 02:49:34 -07:00
ait := a . Iterator ( nil )
bit := b . Iterator ( nil )
2019-02-14 05:29:41 -08:00
aok , bok := ait . Next ( ) , bit . Next ( )
for aok && bok {
at , av := ait . At ( )
bt , bv := bit . At ( )
if at < bt {
app . Append ( at , av )
aok = ait . Next ( )
} else if bt < at {
app . Append ( bt , bv )
bok = bit . Next ( )
} else {
app . Append ( bt , bv )
aok = ait . Next ( )
bok = bit . Next ( )
}
}
for aok {
at , av := ait . At ( )
app . Append ( at , av )
aok = ait . Next ( )
}
for bok {
bt , bv := bit . At ( )
app . Append ( bt , bv )
bok = bit . Next ( )
}
if ait . Err ( ) != nil {
return nil , ait . Err ( )
}
if bit . Err ( ) != nil {
return nil , bit . Err ( )
}
return newChunk , nil
}
2019-12-03 23:37:49 -08:00
// WriteChunks writes as many chunks as possible to the current segment,
// cuts a new segment when the current segment is full and
// writes the rest of the chunks in the new segment.
2017-11-30 06:34:49 -08:00
func ( w * Writer ) WriteChunks ( chks ... Meta ) error {
2019-12-03 23:37:49 -08:00
var (
batchSize = int64 ( 0 )
batchStart = 0
batches = make ( [ ] [ ] Meta , 1 )
batchID = 0
firstBatch = true
)
for i , chk := range chks {
// Each chunk contains: data length + encoding + the data itself + crc32
chkSize := int64 ( MaxChunkLengthFieldSize ) // The data length is a variable length field so use the maximum possible value.
chkSize += ChunkEncodingSize // The chunk encoding.
chkSize += int64 ( len ( chk . Chunk . Bytes ( ) ) ) // The data itself.
chkSize += crc32 . Size // The 4 bytes of crc32.
batchSize += chkSize
// Cut a new batch when it is not the first chunk(to avoid empty segments) and
// the batch is too large to fit in the current segment.
cutNewBatch := ( i != 0 ) && ( batchSize + SegmentHeaderSize > w . segmentSize )
// When the segment already has some data than
// the first batch size calculation should account for that.
if firstBatch && w . n > SegmentHeaderSize {
cutNewBatch = batchSize + w . n > w . segmentSize
if cutNewBatch {
firstBatch = false
}
}
if cutNewBatch {
batchStart = i
batches = append ( batches , [ ] Meta { } )
batchID ++
batchSize = chkSize
}
batches [ batchID ] = chks [ batchStart : i + 1 ]
2017-03-07 03:47:49 -08:00
}
2019-12-03 23:37:49 -08:00
// Create a new segment when one doesn't already exist.
if w . n == 0 {
2017-03-07 03:47:49 -08:00
if err := w . cut ( ) ; err != nil {
return err
}
}
2019-12-03 23:37:49 -08:00
for i , chks := range batches {
if err := w . writeChunks ( chks ) ; err != nil {
return err
}
// Cut a new segment only when there are more chunks to write.
// Avoid creating a new empty segment at the end of the write.
if i < len ( batches ) - 1 {
if err := w . cut ( ) ; err != nil {
return err
}
}
}
return nil
}
// writeChunks writes the chunks into the current segment irrespective
// of the configured segment size limit. A segment should have been already
// started before calling this.
func ( w * Writer ) writeChunks ( chks [ ] Meta ) error {
if len ( chks ) == 0 {
return nil
}
2019-07-12 06:12:34 -07:00
var seq = uint64 ( w . seq ( ) ) << 32
2017-08-06 11:41:24 -07:00
for i := range chks {
chk := & chks [ i ]
2019-12-03 23:37:49 -08:00
// The reference is set to the segment index and the offset where
// the data starts for this chunk.
//
// The upper 4 bytes are for the segment index and
// the lower 4 bytes are for the segment offset where to start reading this chunk.
2017-03-07 03:47:49 -08:00
chk . Ref = seq | uint64 ( w . n )
2019-07-12 06:12:34 -07:00
n := binary . PutUvarint ( w . buf [ : ] , uint64 ( len ( chk . Chunk . Bytes ( ) ) ) )
2017-03-07 03:47:49 -08:00
2019-07-12 06:12:34 -07:00
if err := w . write ( w . buf [ : n ] ) ; err != nil {
2017-03-07 03:47:49 -08:00
return err
}
2019-07-12 06:12:34 -07:00
w . buf [ 0 ] = byte ( chk . Chunk . Encoding ( ) )
if err := w . write ( w . buf [ : 1 ] ) ; err != nil {
2017-04-25 07:45:44 -07:00
return err
}
2017-04-24 08:10:12 -07:00
if err := w . write ( chk . Chunk . Bytes ( ) ) ; err != nil {
2017-03-07 03:47:49 -08:00
return err
}
2017-04-28 05:17:53 -07:00
w . crc32 . Reset ( )
2019-07-12 06:12:34 -07:00
if err := chk . writeHash ( w . crc32 , w . buf [ : ] ) ; err != nil {
2017-05-02 03:55:40 -07:00
return err
}
2019-07-12 06:12:34 -07:00
if err := w . write ( w . crc32 . Sum ( w . buf [ : 0 ] ) ) ; err != nil {
2017-04-28 05:17:53 -07:00
return err
}
2017-03-07 03:47:49 -08:00
}
return nil
}
2017-11-30 06:34:49 -08:00
func ( w * Writer ) seq ( ) int {
2017-03-07 03:47:49 -08:00
return len ( w . files ) - 1
}
2017-11-30 06:34:49 -08:00
func ( w * Writer ) Close ( ) error {
2017-10-31 07:37:41 -07:00
if err := w . finalizeTail ( ) ; err != nil {
return err
}
// close dir file (if not windows platform will fail on rename)
return w . dirFile . Close ( )
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
// ByteSlice abstracts a byte slice.
type ByteSlice interface {
Len ( ) int
Range ( start , end int ) [ ] byte
}
type realByteSlice [ ] byte
2017-03-07 03:47:49 -08:00
2017-11-30 06:34:49 -08:00
func ( b realByteSlice ) Len ( ) int {
return len ( b )
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
func ( b realByteSlice ) Range ( start , end int ) [ ] byte {
return b [ start : end ]
}
func ( b realByteSlice ) Sub ( start , end int ) ByteSlice {
return b [ start : end ]
}
2019-07-11 03:55:34 -07:00
// Reader implements a ChunkReader for a serialized byte stream
2017-03-07 03:47:49 -08:00
// of series data.
2017-11-30 06:34:49 -08:00
type Reader struct {
2019-12-03 23:37:49 -08:00
// The underlying bytes holding the encoded series data.
// Each slice holds the data for a different segment.
2019-12-24 13:55:22 -08:00
bs [ ] ByteSlice
cs [ ] io . Closer // Closers for resources behind the byte slices.
size int64 // The total size of bytes in the reader.
pool chunkenc . Pool
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
func newReader ( bs [ ] ByteSlice , cs [ ] io . Closer , pool chunkenc . Pool ) ( * Reader , error ) {
2019-12-24 13:55:22 -08:00
cr := Reader { pool : pool , bs : bs , cs : cs }
2017-11-10 02:38:22 -08:00
for i , b := range cr . bs {
2019-12-03 23:37:49 -08:00
if b . Len ( ) < SegmentHeaderSize {
return nil , errors . Wrapf ( errInvalidSize , "invalid segment header in segment %d" , i )
2017-11-10 02:38:22 -08:00
}
// Verify magic number.
2019-03-24 13:33:08 -07:00
if m := binary . BigEndian . Uint32 ( b . Range ( 0 , MagicChunksSize ) ) ; m != MagicChunks {
2018-06-08 01:25:12 -07:00
return nil , errors . Errorf ( "invalid magic number %x" , m )
2017-11-10 02:38:22 -08:00
}
2019-03-24 13:33:08 -07:00
// Verify chunk format version.
if v := int ( b . Range ( MagicChunksSize , MagicChunksSize + ChunksFormatVersionSize ) [ 0 ] ) ; v != chunksFormatV1 {
return nil , errors . Errorf ( "invalid chunk format version %d" , v )
}
2020-02-05 05:39:40 -08:00
cr . size += int64 ( b . Len ( ) )
2017-11-10 02:38:22 -08:00
}
return & cr , nil
}
2017-11-30 06:34:49 -08:00
// NewDirReader returns a new Reader against sequentially numbered files in the
2017-11-10 02:38:22 -08:00
// given directory.
2017-11-30 06:34:49 -08:00
func NewDirReader ( dir string , pool chunkenc . Pool ) ( * Reader , error ) {
2017-08-30 09:34:54 -07:00
files , err := sequenceFiles ( dir )
2017-03-07 03:47:49 -08:00
if err != nil {
return nil , err
}
2017-08-08 08:35:34 -07:00
if pool == nil {
2017-11-30 06:34:49 -08:00
pool = chunkenc . NewPool ( )
2017-08-08 08:35:34 -07:00
}
2017-11-10 02:38:22 -08:00
2019-01-16 02:03:52 -08:00
var (
2019-04-02 05:16:29 -07:00
bs [ ] ByteSlice
cs [ ] io . Closer
merr tsdb_errors . MultiError
2019-01-16 02:03:52 -08:00
)
2017-03-07 03:47:49 -08:00
for _ , fn := range files {
2017-11-30 06:34:49 -08:00
f , err := fileutil . OpenMmapFile ( fn )
2017-03-07 03:47:49 -08:00
if err != nil {
2019-04-02 05:16:29 -07:00
merr . Add ( errors . Wrap ( err , "mmap files" ) )
merr . Add ( closeAll ( cs ) )
return nil , merr
2017-03-07 03:47:49 -08:00
}
2017-11-10 02:38:22 -08:00
cs = append ( cs , f )
2017-11-30 06:34:49 -08:00
bs = append ( bs , realByteSlice ( f . Bytes ( ) ) )
2017-03-07 03:47:49 -08:00
}
2019-03-24 13:33:08 -07:00
reader , err := newReader ( bs , cs , pool )
if err != nil {
merr . Add ( err )
merr . Add ( closeAll ( cs ) )
return nil , merr
}
return reader , nil
2017-03-07 03:47:49 -08:00
}
2017-11-30 06:34:49 -08:00
func ( s * Reader ) Close ( ) error {
2019-02-11 01:57:46 -08:00
return closeAll ( s . cs )
2017-03-07 03:47:49 -08:00
}
2019-01-16 02:03:52 -08:00
// Size returns the size of the chunks.
func ( s * Reader ) Size ( ) int64 {
return s . size
}
2019-01-29 09:46:12 -08:00
// Chunk returns a chunk from a given reference.
2017-11-30 06:34:49 -08:00
func ( s * Reader ) Chunk ( ref uint64 ) ( chunkenc . Chunk , error ) {
2017-03-07 03:47:49 -08:00
var (
2019-12-03 23:37:49 -08:00
// Get the upper 4 bytes.
// These contain the segment index.
sgmIndex = int ( ref >> 32 )
// Get the lower 4 bytes.
// These contain the segment offset where the data for this chunk starts.
chkStart = int ( ( ref << 32 ) >> 32 )
2019-12-24 13:55:22 -08:00
chkCRC32 = newCRC32 ( )
2017-03-07 03:47:49 -08:00
)
2019-12-03 23:37:49 -08:00
if sgmIndex >= len ( s . bs ) {
return nil , errors . Errorf ( "segment index %d out of range" , sgmIndex )
2017-03-07 03:47:49 -08:00
}
2019-12-03 23:37:49 -08:00
sgmBytes := s . bs [ sgmIndex ]
if chkStart + MaxChunkLengthFieldSize > sgmBytes . Len ( ) {
return nil , errors . Errorf ( "segment doesn't include enough bytes to read the chunk size data field - required:%v, available:%v" , chkStart + MaxChunkLengthFieldSize , sgmBytes . Len ( ) )
2017-03-07 03:47:49 -08:00
}
2017-11-10 02:38:22 -08:00
// With the minimum chunk length this should never cause us reading
// over the end of the slice.
2019-12-03 23:37:49 -08:00
c := sgmBytes . Range ( chkStart , chkStart + MaxChunkLengthFieldSize )
chkDataLen , n := binary . Uvarint ( c )
2018-06-08 01:25:12 -07:00
if n <= 0 {
return nil , errors . Errorf ( "reading chunk length failed with %d" , n )
2017-03-07 03:47:49 -08:00
}
2019-12-03 23:37:49 -08:00
chkEncStart := chkStart + n
chkEnd := chkEncStart + ChunkEncodingSize + int ( chkDataLen ) + crc32 . Size
chkDataStart := chkEncStart + ChunkEncodingSize
chkDataEnd := chkEnd - crc32 . Size
if chkEnd > sgmBytes . Len ( ) {
return nil , errors . Errorf ( "segment doesn't include enough bytes to read the chunk - required:%v, available:%v" , chkEnd , sgmBytes . Len ( ) )
}
2019-12-24 13:55:22 -08:00
sum := sgmBytes . Range ( chkDataEnd , chkEnd )
if _ , err := chkCRC32 . Write ( sgmBytes . Range ( chkEncStart , chkDataEnd ) ) ; err != nil {
2019-12-03 23:37:49 -08:00
return nil , err
}
2019-12-24 13:55:22 -08:00
if act := chkCRC32 . Sum ( nil ) ; ! bytes . Equal ( act , sum ) {
2019-12-03 23:37:49 -08:00
return nil , errors . Errorf ( "checksum mismatch expected:%x, actual:%x" , sum , act )
}
chkData := sgmBytes . Range ( chkDataStart , chkDataEnd )
chkEnc := sgmBytes . Range ( chkEncStart , chkEncStart + ChunkEncodingSize ) [ 0 ]
return s . pool . Get ( chunkenc . Encoding ( chkEnc ) , chkData )
2017-11-30 06:34:49 -08:00
}
func nextSequenceFile ( dir string ) ( string , int , error ) {
2020-04-06 06:34:20 -07:00
files , err := ioutil . ReadDir ( dir )
2017-11-30 06:34:49 -08:00
if err != nil {
return "" , 0 , err
}
i := uint64 ( 0 )
2020-04-06 06:34:20 -07:00
for _ , f := range files {
j , err := strconv . ParseUint ( f . Name ( ) , 10 , 64 )
2017-11-30 06:34:49 -08:00
if err != nil {
continue
}
2020-02-05 05:39:40 -08:00
// It is not necessary that we find the files in number order,
// for example with '1000000' and '200000', '1000000' would come first.
// Though this is a very very race case, we check anyway for the max id.
if j > i {
i = j
}
2017-11-30 06:34:49 -08:00
}
2020-02-05 05:39:40 -08:00
return segmentFile ( dir , int ( i + 1 ) ) , int ( i + 1 ) , nil
}
func segmentFile ( baseDir string , index int ) string {
return filepath . Join ( baseDir , fmt . Sprintf ( "%0.6d" , index ) )
2017-11-30 06:34:49 -08:00
}
func sequenceFiles ( dir string ) ( [ ] string , error ) {
files , err := ioutil . ReadDir ( dir )
if err != nil {
return nil , err
}
var res [ ] string
for _ , fi := range files {
if _ , err := strconv . ParseUint ( fi . Name ( ) , 10 , 64 ) ; err != nil {
continue
}
res = append ( res , filepath . Join ( dir , fi . Name ( ) ) )
}
return res , nil
}
2019-07-19 02:16:34 -07:00
func closeAll ( cs [ ] io . Closer ) error {
var merr tsdb_errors . MultiError
2017-11-30 06:34:49 -08:00
for _ , c := range cs {
2019-07-19 02:16:34 -07:00
merr . Add ( c . Close ( ) )
2017-11-30 06:34:49 -08:00
}
2019-07-19 02:16:34 -07:00
return merr . Err ( )
2017-03-07 03:47:49 -08:00
}