// Copyright 2017 The Prometheus Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // The code in this file was largely written by Damian Gryski as part of // https://github.com/dgryski/go-tsz and published under the license below. // It was modified to accommodate reading from byte slices without modifying // the underlying bytes, which would panic when reading from mmapped // read-only byte slices. // Copyright (c) 2015,2016 Damian Gryski // All rights reserved. // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // // * Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, // OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. package chunkenc import ( "encoding/binary" "math" "math/bits" "github.com/prometheus/prometheus/model/histogram" ) const ( chunkCompactCapacityThreshold = 32 ) // XORChunk holds XOR encoded sample data. type XORChunk struct { b bstream } // NewXORChunk returns a new chunk with XOR encoding. func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) return &XORChunk{b: bstream{stream: b, count: 0}} } func (c *XORChunk) Reset(stream []byte) { c.b.Reset(stream) } // Encoding returns the encoding type. func (c *XORChunk) Encoding() Encoding { return EncXOR } // Bytes returns the underlying byte slice of the chunk. func (c *XORChunk) Bytes() []byte { return c.b.bytes() } // NumSamples returns the number of samples in the chunk. func (c *XORChunk) NumSamples() int { return int(binary.BigEndian.Uint16(c.Bytes())) } // Compact implements the Chunk interface. func (c *XORChunk) Compact() { if l := len(c.b.stream); cap(c.b.stream) > l+chunkCompactCapacityThreshold { buf := make([]byte, l) copy(buf, c.b.stream) c.b.stream = buf } } // Appender implements the Chunk interface. // It is not valid to call Appender() multiple times concurrently or to use multiple // Appenders on the same chunk. func (c *XORChunk) Appender() (Appender, error) { it := c.iterator(nil) // To get an appender we must know the state it would have if we had // appended all existing data from scratch. // We iterate through the end and populate via the iterator's state. for it.Next() != ValNone { } if err := it.Err(); err != nil { return nil, err } a := &xorAppender{ b: &c.b, t: it.t, v: it.val, tDelta: it.tDelta, leading: it.leading, trailing: it.trailing, } if it.numTotal == 0 { a.leading = 0xff } return a, nil } func (c *XORChunk) iterator(it Iterator) *xorIterator { if xorIter, ok := it.(*xorIterator); ok { xorIter.Reset(c.b.bytes()) return xorIter } return &xorIterator{ // The first 2 bytes contain chunk headers. // We skip that for actual samples. br: newBReader(c.b.bytes()[2:]), numTotal: binary.BigEndian.Uint16(c.b.bytes()), t: math.MinInt64, } } // Iterator implements the Chunk interface. // Iterator() must not be called concurrently with any modifications to the chunk, // but after it returns you can use an Iterator concurrently with an Appender or // other Iterators. func (c *XORChunk) Iterator(it Iterator) Iterator { return c.iterator(it) } type xorAppender struct { b *bstream t int64 v float64 tDelta uint64 leading uint8 trailing uint8 } func (a *xorAppender) Append(t int64, v float64) { var tDelta uint64 num := binary.BigEndian.Uint16(a.b.bytes()) switch num { case 0: buf := make([]byte, binary.MaxVarintLen64) for _, b := range buf[:binary.PutVarint(buf, t)] { a.b.writeByte(b) } a.b.writeBits(math.Float64bits(v), 64) case 1: tDelta = uint64(t - a.t) buf := make([]byte, binary.MaxVarintLen64) for _, b := range buf[:binary.PutUvarint(buf, tDelta)] { a.b.writeByte(b) } a.writeVDelta(v) default: tDelta = uint64(t - a.t) dod := int64(tDelta - a.tDelta) // Gorilla has a max resolution of seconds, Prometheus milliseconds. // Thus we use higher value range steps with larger bit size. // // TODO(beorn7): This seems to needlessly jump to large bit // sizes even for very small deviations from zero. Timestamp // compression can probably benefit from some smaller bit // buckets. See also what was done for histogram encoding in // varbit.go. switch { case dod == 0: a.b.writeBit(zero) case bitRange(dod, 14): a.b.writeBits(0b10, 2) a.b.writeBits(uint64(dod), 14) case bitRange(dod, 17): a.b.writeBits(0b110, 3) a.b.writeBits(uint64(dod), 17) case bitRange(dod, 20): a.b.writeBits(0b1110, 4) a.b.writeBits(uint64(dod), 20) default: a.b.writeBits(0b1111, 4) a.b.writeBits(uint64(dod), 64) } a.writeVDelta(v) } a.t = t a.v = v binary.BigEndian.PutUint16(a.b.bytes(), num+1) a.tDelta = tDelta } // bitRange returns whether the given integer can be represented by nbits. // See docs/bstream.md. func bitRange(x int64, nbits uint8) bool { return -((1<<(nbits-1))-1) <= x && x <= 1<<(nbits-1) } func (a *xorAppender) writeVDelta(v float64) { xorWrite(a.b, v, a.v, &a.leading, &a.trailing) } func (a *xorAppender) AppendHistogram(*HistogramAppender, int64, *histogram.Histogram, bool) (Chunk, bool, Appender, error) { panic("appended a histogram sample to a float chunk") } func (a *xorAppender) AppendFloatHistogram(*FloatHistogramAppender, int64, *histogram.FloatHistogram, bool) (Chunk, bool, Appender, error) { panic("appended a float histogram sample to a float chunk") } type xorIterator struct { br bstreamReader numTotal uint16 numRead uint16 t int64 val float64 leading uint8 trailing uint8 tDelta uint64 err error } func (it *xorIterator) Seek(t int64) ValueType { if it.err != nil { return ValNone } for t > it.t || it.numRead == 0 { if it.Next() == ValNone { return ValNone } } return ValFloat } func (it *xorIterator) At() (int64, float64) { return it.t, it.val } func (it *xorIterator) AtHistogram(*histogram.Histogram) (int64, *histogram.Histogram) { panic("cannot call xorIterator.AtHistogram") } func (it *xorIterator) AtFloatHistogram(*histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { panic("cannot call xorIterator.AtFloatHistogram") } func (it *xorIterator) AtT() int64 { return it.t } func (it *xorIterator) Err() error { return it.err } func (it *xorIterator) Reset(b []byte) { // The first 2 bytes contain chunk headers. // We skip that for actual samples. it.br = newBReader(b[2:]) it.numTotal = binary.BigEndian.Uint16(b) it.numRead = 0 it.t = 0 it.val = 0 it.leading = 0 it.trailing = 0 it.tDelta = 0 it.err = nil } func (it *xorIterator) Next() ValueType { if it.err != nil || it.numRead == it.numTotal { return ValNone } if it.numRead == 0 { t, err := binary.ReadVarint(&it.br) if err != nil { it.err = err return ValNone } v, err := it.br.readBits(64) if err != nil { it.err = err return ValNone } it.t = t it.val = math.Float64frombits(v) it.numRead++ return ValFloat } if it.numRead == 1 { tDelta, err := binary.ReadUvarint(&it.br) if err != nil { it.err = err return ValNone } it.tDelta = tDelta it.t += int64(it.tDelta) return it.readValue() } var d byte // read delta-of-delta for i := 0; i < 4; i++ { d <<= 1 bit, err := it.br.readBitFast() if err != nil { bit, err = it.br.readBit() } if err != nil { it.err = err return ValNone } if bit == zero { break } d |= 1 } var sz uint8 var dod int64 switch d { case 0b0: // dod == 0 case 0b10: sz = 14 case 0b110: sz = 17 case 0b1110: sz = 20 case 0b1111: // Do not use fast because it's very unlikely it will succeed. bits, err := it.br.readBits(64) if err != nil { it.err = err return ValNone } dod = int64(bits) } if sz != 0 { bits, err := it.br.readBitsFast(sz) if err != nil { bits, err = it.br.readBits(sz) } if err != nil { it.err = err return ValNone } // Account for negative numbers, which come back as high unsigned numbers. // See docs/bstream.md. if bits > (1 << (sz - 1)) { bits -= 1 << sz } dod = int64(bits) } it.tDelta = uint64(int64(it.tDelta) + dod) it.t += int64(it.tDelta) return it.readValue() } func (it *xorIterator) readValue() ValueType { err := xorRead(&it.br, &it.val, &it.leading, &it.trailing) if err != nil { it.err = err return ValNone } it.numRead++ return ValFloat } func xorWrite(b *bstream, newValue, currentValue float64, leading, trailing *uint8) { delta := math.Float64bits(newValue) ^ math.Float64bits(currentValue) if delta == 0 { b.writeBit(zero) return } b.writeBit(one) newLeading := uint8(bits.LeadingZeros64(delta)) newTrailing := uint8(bits.TrailingZeros64(delta)) // Clamp number of leading zeros to avoid overflow when encoding. if newLeading >= 32 { newLeading = 31 } if *leading != 0xff && newLeading >= *leading && newTrailing >= *trailing { // In this case, we stick with the current leading/trailing. b.writeBit(zero) b.writeBits(delta>>*trailing, 64-int(*leading)-int(*trailing)) return } // Update leading/trailing for the caller. *leading, *trailing = newLeading, newTrailing b.writeBit(one) b.writeBits(uint64(newLeading), 5) // Note that if newLeading == newTrailing == 0, then sigbits == 64. But // that value doesn't actually fit into the 6 bits we have. Luckily, we // never need to encode 0 significant bits, since that would put us in // the other case (vdelta == 0). So instead we write out a 0 and adjust // it back to 64 on unpacking. sigbits := 64 - newLeading - newTrailing b.writeBits(uint64(sigbits), 6) b.writeBits(delta>>newTrailing, int(sigbits)) } func xorRead(br *bstreamReader, value *float64, leading, trailing *uint8) error { bit, err := br.readBitFast() if err != nil { bit, err = br.readBit() } if err != nil { return err } if bit == zero { return nil } bit, err = br.readBitFast() if err != nil { bit, err = br.readBit() } if err != nil { return err } var ( bits uint64 newLeading, newTrailing, mbits uint8 ) if bit == zero { // Reuse leading/trailing zero bits. newLeading, newTrailing = *leading, *trailing mbits = 64 - newLeading - newTrailing } else { bits, err = br.readBitsFast(5) if err != nil { bits, err = br.readBits(5) } if err != nil { return err } newLeading = uint8(bits) bits, err = br.readBitsFast(6) if err != nil { bits, err = br.readBits(6) } if err != nil { return err } mbits = uint8(bits) // 0 significant bits here means we overflowed and we actually // need 64; see comment in xrWrite. if mbits == 0 { mbits = 64 } newTrailing = 64 - newLeading - mbits // Update leading/trailing zero bits for the caller. *leading, *trailing = newLeading, newTrailing } bits, err = br.readBitsFast(mbits) if err != nil { bits, err = br.readBits(mbits) } if err != nil { return err } vbits := math.Float64bits(*value) vbits ^= bits << newTrailing *value = math.Float64frombits(vbits) return nil }