Merge pull request #719 from prometheus/update-vendoring

Update all vendored dependency to latest versions.
This commit is contained in:
Fabian Reinartz 2015-05-22 14:10:24 +02:00
commit 25ce5f8667
64 changed files with 2014 additions and 812 deletions

34
Godeps/Godeps.json generated
View file

@ -9,8 +9,8 @@
},
{
"ImportPath": "github.com/Sirupsen/logrus",
"Comment": "v0.7.3-8-g52919f1",
"Rev": "52919f182f9c314f8a38c5afe96506f73d02b4b2"
"Comment": "v0.7.3-13-g81e2611",
"Rev": "81e2611f37acccd8cb5e4e1a5a4a5f6c9c7f4537"
},
{
"ImportPath": "github.com/beorn7/perks/quantile",
@ -18,11 +18,12 @@
},
{
"ImportPath": "github.com/golang/protobuf/proto",
"Rev": "655cdfa588ea190e901bc5590e65d5621688847c"
"Rev": "16256d3ce6929458613798ee44b7914a3f59f5c6"
},
{
"ImportPath": "github.com/hashicorp/consul/api",
"Rev": "9fb235a98d8e88f7857b21bb2dd3efc428c01427"
"Comment": "v0.5.2-9-g145b495",
"Rev": "145b495e22388832240ee78788524bd975e443ca"
},
{
"ImportPath": "github.com/matttproud/golang_protobuf_extensions/pbutil",
@ -30,27 +31,27 @@
},
{
"ImportPath": "github.com/miekg/dns",
"Rev": "e6898c8f30b5d002db962043a62db90552e90bf7"
"Rev": "bb1103f648f811d2018d4bedcb2d4b2bce34a0f1"
},
{
"ImportPath": "github.com/prometheus/client_golang/extraction",
"Comment": "0.5.0",
"Rev": "b0bd7e1be33327b85cb4853e7011156e3cedd657"
"Comment": "0.5.0-8-gfcd2986",
"Rev": "fcd2986466589bcf7a411ec3b52d85a8df9dcc8b"
},
{
"ImportPath": "github.com/prometheus/client_golang/model",
"Comment": "0.5.0",
"Rev": "b0bd7e1be33327b85cb4853e7011156e3cedd657"
"Comment": "0.5.0-8-gfcd2986",
"Rev": "fcd2986466589bcf7a411ec3b52d85a8df9dcc8b"
},
{
"ImportPath": "github.com/prometheus/client_golang/prometheus",
"Comment": "0.5.0",
"Rev": "b0bd7e1be33327b85cb4853e7011156e3cedd657"
"Comment": "0.5.0-8-gfcd2986",
"Rev": "fcd2986466589bcf7a411ec3b52d85a8df9dcc8b"
},
{
"ImportPath": "github.com/prometheus/client_golang/text",
"Comment": "0.5.0",
"Rev": "b0bd7e1be33327b85cb4853e7011156e3cedd657"
"Comment": "0.5.0-8-gfcd2986",
"Rev": "fcd2986466589bcf7a411ec3b52d85a8df9dcc8b"
},
{
"ImportPath": "github.com/prometheus/client_model/go",
@ -63,11 +64,11 @@
},
{
"ImportPath": "github.com/prometheus/procfs",
"Rev": "490cc6eb5fa45bf8a8b7b73c8bc82a8160e8531d"
"Rev": "ee2372b58cee877abe07cde670d04d3b3bac5ee6"
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "4875955338b0a434238a31165cb87255ab6e9e4a"
"Rev": "315fcfb05d4d46d4354b313d146ef688dda272a9"
},
{
"ImportPath": "github.com/syndtr/gosnappy/snappy",
@ -75,10 +76,11 @@
},
{
"ImportPath": "golang.org/x/net/context",
"Rev": "b6fdb7d8a4ccefede406f8fe0f017fb58265054c"
"Rev": "ad9eb3904af97b912b9a242efb203c5c6782e72a"
},
{
"ImportPath": "gopkg.in/fsnotify.v1",
"Comment": "v1.2.0",
"Rev": "96c060f6a6b7e0d6f75fddd10efeaca3e5d1bcb0"
},
{

View file

@ -211,6 +211,7 @@ func init() {
| [Slackrus](https://github.com/johntdyer/slackrus) | Hook for Slack chat. |
| [Journalhook](https://github.com/wercker/journalhook) | Hook for logging to `systemd-journald` |
| [Graylog](https://github.com/gemnasium/logrus-hooks/tree/master/graylog) | Hook for logging to [Graylog](http://graylog2.org/) |
| [Raygun](https://github.com/squirkle/logrus-raygun-hook) | Hook for logging to [Raygun.io](http://raygun.io/) |
#### Level logging

View file

@ -727,8 +727,14 @@ func (o *Buffer) dec_new_map(p *Properties, base structPointer) error {
return fmt.Errorf("proto: bad map data tag %d", raw[0])
}
}
keyelem, valelem := keyptr.Elem(), valptr.Elem()
if !keyelem.IsValid() || !valelem.IsValid() {
// We did not decode the key or the value in the map entry.
// Either way, it's an invalid map entry.
return fmt.Errorf("proto: bad map data: missing key/val")
}
v.SetMapIndex(keyptr.Elem(), valptr.Elem())
v.SetMapIndex(keyelem, valelem)
return nil
}

View file

@ -385,13 +385,13 @@ func UnmarshalJSONEnum(m map[string]int32, data []byte, enumName string) (int32,
// DebugPrint dumps the encoded data in b in a debugging format with a header
// including the string s. Used in testing but made available for general debugging.
func (o *Buffer) DebugPrint(s string, b []byte) {
func (p *Buffer) DebugPrint(s string, b []byte) {
var u uint64
obuf := o.buf
index := o.index
o.buf = b
o.index = 0
obuf := p.buf
index := p.index
p.buf = b
p.index = 0
depth := 0
fmt.Printf("\n--- %s ---\n", s)
@ -402,12 +402,12 @@ out:
fmt.Print(" ")
}
index := o.index
if index == len(o.buf) {
index := p.index
if index == len(p.buf) {
break
}
op, err := o.DecodeVarint()
op, err := p.DecodeVarint()
if err != nil {
fmt.Printf("%3d: fetching op err %v\n", index, err)
break out
@ -424,7 +424,7 @@ out:
case WireBytes:
var r []byte
r, err = o.DecodeRawBytes(false)
r, err = p.DecodeRawBytes(false)
if err != nil {
break out
}
@ -445,7 +445,7 @@ out:
fmt.Printf("\n")
case WireFixed32:
u, err = o.DecodeFixed32()
u, err = p.DecodeFixed32()
if err != nil {
fmt.Printf("%3d: t=%3d fix32 err %v\n", index, tag, err)
break out
@ -453,7 +453,7 @@ out:
fmt.Printf("%3d: t=%3d fix32 %d\n", index, tag, u)
case WireFixed64:
u, err = o.DecodeFixed64()
u, err = p.DecodeFixed64()
if err != nil {
fmt.Printf("%3d: t=%3d fix64 err %v\n", index, tag, err)
break out
@ -462,7 +462,7 @@ out:
break
case WireVarint:
u, err = o.DecodeVarint()
u, err = p.DecodeVarint()
if err != nil {
fmt.Printf("%3d: t=%3d varint err %v\n", index, tag, err)
break out
@ -488,12 +488,12 @@ out:
}
if depth != 0 {
fmt.Printf("%3d: start-end not balanced %d\n", o.index, depth)
fmt.Printf("%3d: start-end not balanced %d\n", p.index, depth)
}
fmt.Printf("\n")
o.buf = obuf
o.index = index
p.buf = obuf
p.index = index
}
// SetDefaults sets unset protocol buffer fields to their default values.

View file

@ -0,0 +1,122 @@
// Code generated by protoc-gen-go.
// source: proto3_proto/proto3.proto
// DO NOT EDIT!
/*
Package proto3_proto is a generated protocol buffer package.
It is generated from these files:
proto3_proto/proto3.proto
It has these top-level messages:
Message
Nested
MessageWithMap
*/
package proto3_proto
import proto "github.com/golang/protobuf/proto"
import testdata "github.com/golang/protobuf/proto/testdata"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
type Message_Humour int32
const (
Message_UNKNOWN Message_Humour = 0
Message_PUNS Message_Humour = 1
Message_SLAPSTICK Message_Humour = 2
Message_BILL_BAILEY Message_Humour = 3
)
var Message_Humour_name = map[int32]string{
0: "UNKNOWN",
1: "PUNS",
2: "SLAPSTICK",
3: "BILL_BAILEY",
}
var Message_Humour_value = map[string]int32{
"UNKNOWN": 0,
"PUNS": 1,
"SLAPSTICK": 2,
"BILL_BAILEY": 3,
}
func (x Message_Humour) String() string {
return proto.EnumName(Message_Humour_name, int32(x))
}
type Message struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Hilarity Message_Humour `protobuf:"varint,2,opt,name=hilarity,enum=proto3_proto.Message_Humour" json:"hilarity,omitempty"`
HeightInCm uint32 `protobuf:"varint,3,opt,name=height_in_cm" json:"height_in_cm,omitempty"`
Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
ResultCount int64 `protobuf:"varint,7,opt,name=result_count" json:"result_count,omitempty"`
TrueScotsman bool `protobuf:"varint,8,opt,name=true_scotsman" json:"true_scotsman,omitempty"`
Score float32 `protobuf:"fixed32,9,opt,name=score" json:"score,omitempty"`
Key []uint64 `protobuf:"varint,5,rep,name=key" json:"key,omitempty"`
Nested *Nested `protobuf:"bytes,6,opt,name=nested" json:"nested,omitempty"`
Terrain map[string]*Nested `protobuf:"bytes,10,rep,name=terrain" json:"terrain,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
Proto2Field *testdata.SubDefaults `protobuf:"bytes,11,opt,name=proto2_field" json:"proto2_field,omitempty"`
Proto2Value map[string]*testdata.SubDefaults `protobuf:"bytes,13,rep,name=proto2_value" json:"proto2_value,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
}
func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {}
func (m *Message) GetNested() *Nested {
if m != nil {
return m.Nested
}
return nil
}
func (m *Message) GetTerrain() map[string]*Nested {
if m != nil {
return m.Terrain
}
return nil
}
func (m *Message) GetProto2Field() *testdata.SubDefaults {
if m != nil {
return m.Proto2Field
}
return nil
}
func (m *Message) GetProto2Value() map[string]*testdata.SubDefaults {
if m != nil {
return m.Proto2Value
}
return nil
}
type Nested struct {
Bunny string `protobuf:"bytes,1,opt,name=bunny" json:"bunny,omitempty"`
}
func (m *Nested) Reset() { *m = Nested{} }
func (m *Nested) String() string { return proto.CompactTextString(m) }
func (*Nested) ProtoMessage() {}
type MessageWithMap struct {
ByteMapping map[bool][]byte `protobuf:"bytes,1,rep,name=byte_mapping" json:"byte_mapping,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value,proto3"`
}
func (m *MessageWithMap) Reset() { *m = MessageWithMap{} }
func (m *MessageWithMap) String() string { return proto.CompactTextString(m) }
func (*MessageWithMap) ProtoMessage() {}
func (m *MessageWithMap) GetByteMapping() map[bool][]byte {
if m != nil {
return m.ByteMapping
}
return nil
}
func init() {
proto.RegisterEnum("proto3_proto.Message_Humour", Message_Humour_name, Message_Humour_value)
}

View file

@ -567,6 +567,9 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error {
if err := p.readAny(key, props.mkeyprop); err != nil {
return err
}
if err := p.consumeOptionalSeparator(); err != nil {
return err
}
if err := p.consumeToken("value"); err != nil {
return err
}
@ -576,6 +579,9 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error {
if err := p.readAny(val, props.mvalprop); err != nil {
return err
}
if err := p.consumeOptionalSeparator(); err != nil {
return err
}
if err := p.consumeToken(terminator); err != nil {
return err
}
@ -605,14 +611,10 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error {
}
}
// For backward compatibility, permit a semicolon or comma after a field.
tok = p.next()
if tok.err != nil {
return tok.err
}
if tok.value != ";" && tok.value != "," {
p.back()
if err := p.consumeOptionalSeparator(); err != nil {
return err
}
}
if reqCount > 0 {
@ -621,6 +623,19 @@ func (p *textParser) readStruct(sv reflect.Value, terminator string) error {
return reqFieldErr
}
// consumeOptionalSeparator consumes an optional semicolon or comma.
// It is used in readStruct to provide backward compatibility.
func (p *textParser) consumeOptionalSeparator() error {
tok := p.next()
if tok.err != nil {
return tok.err
}
if tok.value != ";" && tok.value != "," {
p.back()
}
return nil
}
func (p *textParser) readAny(v reflect.Value, props *Properties) error {
tok := p.next()
if tok.err != nil {

View file

@ -462,7 +462,7 @@ func TestProto3TextParsing(t *testing.T) {
func TestMapParsing(t *testing.T) {
m := new(MessageWithMap)
const in = `name_mapping:<key:1234 value:"Feist"> name_mapping:<key:1 value:"Beatles">` +
`msg_mapping:<key:-4 value:<f: 2.0>>` +
`msg_mapping:<key:-4, value:<f: 2.0>,>` + // separating commas are okay
`msg_mapping<key:-2 value<f: 4.0>>` + // no colon after "value"
`byte_mapping:<key:true value:"so be it">`
want := &MessageWithMap{

View file

@ -22,11 +22,6 @@ const (
// a Semaphore acquisition.
DefaultSemaphoreWaitTime = 15 * time.Second
// DefaultSemaphoreRetryTime is how long we wait after a failed lock acquisition
// before attempting to do the lock again. This is so that once a lock-delay
// is in affect, we do not hot loop retrying the acquisition.
DefaultSemaphoreRetryTime = 5 * time.Second
// DefaultSemaphoreKey is the key used within the prefix to
// use for coordination between all the contenders.
DefaultSemaphoreKey = ".lock"

View file

@ -36,6 +36,7 @@ A not-so-up-to-date-list-that-may-be-actually-current:
* https://github.com/duedil-ltd/discodns
* https://github.com/StalkR/dns-reverse-proxy
* https://github.com/tianon/rawdns
* https://mesosphere.github.io/mesos-dns/
Send pull request if you want to be listed here.

View file

@ -66,6 +66,9 @@ func Exchange(m *Msg, a string) (r *Msg, err error) {
return nil, err
}
r, err = co.ReadMsg()
if err == nil && r.Id != m.Id {
err = ErrId
}
return r, err
}
@ -86,6 +89,9 @@ func ExchangeConn(c net.Conn, m *Msg) (r *Msg, err error) {
return nil, err
}
r, err = co.ReadMsg()
if err == nil && r.Id != m.Id {
err = ErrId
}
return r, err
}
@ -122,31 +128,39 @@ func (c *Client) Exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err erro
return r, rtt, nil
}
func (c *Client) exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err error) {
timeout := dnsTimeout
var co *Conn
func (c *Client) dialTimeout() time.Duration {
if c.DialTimeout != 0 {
timeout = c.DialTimeout
return c.DialTimeout
}
return dnsTimeout
}
func (c *Client) readTimeout() time.Duration {
if c.ReadTimeout != 0 {
return c.ReadTimeout
}
return dnsTimeout
}
func (c *Client) writeTimeout() time.Duration {
if c.WriteTimeout != 0 {
return c.WriteTimeout
}
return dnsTimeout
}
func (c *Client) exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err error) {
var co *Conn
if c.Net == "" {
co, err = DialTimeout("udp", a, timeout)
co, err = DialTimeout("udp", a, c.dialTimeout())
} else {
co, err = DialTimeout(c.Net, a, timeout)
co, err = DialTimeout(c.Net, a, c.dialTimeout())
}
if err != nil {
return nil, 0, err
}
timeout = dnsTimeout
if c.ReadTimeout != 0 {
timeout = c.ReadTimeout
}
co.SetReadDeadline(time.Now().Add(timeout))
timeout = dnsTimeout
if c.WriteTimeout != 0 {
timeout = c.WriteTimeout
}
co.SetWriteDeadline(time.Now().Add(timeout))
defer co.Close()
opt := m.IsEdns0()
// If EDNS0 is used use that for size.
if opt != nil && opt.UDPSize() >= MinMsgSize {
@ -156,11 +170,18 @@ func (c *Client) exchange(m *Msg, a string) (r *Msg, rtt time.Duration, err erro
if opt == nil && c.UDPSize >= MinMsgSize {
co.UDPSize = c.UDPSize
}
co.SetReadDeadline(time.Now().Add(c.readTimeout()))
co.SetWriteDeadline(time.Now().Add(c.writeTimeout()))
co.TsigSecret = c.TsigSecret
if err = co.WriteMsg(m); err != nil {
return nil, 0, err
}
r, err = co.ReadMsg()
if err == nil && r.Id != m.Id {
err = ErrId
}
return r, co.rtt, err
}

View file

@ -37,6 +37,29 @@ func TestClientSync(t *testing.T) {
}
}
func TestClientSyncBadId(t *testing.T) {
HandleFunc("miek.nl.", HelloServerBadId)
defer HandleRemove("miek.nl.")
s, addrstr, err := RunLocalUDPServer("127.0.0.1:0")
if err != nil {
t.Fatalf("Unable to run test server: %v", err)
}
defer s.Shutdown()
m := new(Msg)
m.SetQuestion("miek.nl.", TypeSOA)
c := new(Client)
if _, _, err := c.Exchange(m, addrstr); err != ErrId {
t.Errorf("did not find a bad Id")
}
// And now with plain Exchange().
if _, err := Exchange(m, addrstr); err != ErrId {
t.Errorf("did not find a bad Id")
}
}
func TestClientEDNS0(t *testing.T) {
HandleFunc("miek.nl.", HelloServer)
defer HandleRemove("miek.nl.")

View file

@ -189,6 +189,22 @@ func (k *DNSKEY) ToDS(h uint8) *DS {
return ds
}
// ToCDNSKEY converts a DNSKEY record to a CDNSKEY record.
func (k *DNSKEY) ToCDNSKEY() *CDNSKEY {
c := &CDNSKEY{DNSKEY: *k}
c.Hdr = *k.Hdr.copyHeader()
c.Hdr.Rrtype = TypeCDNSKEY
return c
}
// ToCDS converts a DS record to a CDS record.
func (d *DS) ToCDS() *CDS {
c := &CDS{DS: *d}
c.Hdr = *d.Hdr.copyHeader()
c.Hdr.Rrtype = TypeCDS
return c
}
// Sign signs an RRSet. The signature needs to be filled in with
// the values: Inception, Expiration, KeyTag, SignerName and Algorithm.
// The rest is copied from the RRset. Sign returns true when the signing went OK,

View file

@ -6,6 +6,7 @@ import (
"crypto/rsa"
"io"
"math/big"
"strconv"
"strings"
)
@ -34,8 +35,12 @@ func (k *DNSKEY) ReadPrivateKey(q io.Reader, file string) (PrivateKey, error) {
return nil, ErrPrivKey
}
// TODO(mg): check if the pubkey matches the private key
switch m["algorithm"] {
case "3 (DSA)":
algo, err := strconv.Atoi(strings.SplitN(m["algorithm"], " ", 2)[0])
if err != nil {
return nil, ErrPrivKey
}
switch uint8(algo) {
case DSA:
priv, e := readPrivateKeyDSA(m)
if e != nil {
return nil, e
@ -46,15 +51,15 @@ func (k *DNSKEY) ReadPrivateKey(q io.Reader, file string) (PrivateKey, error) {
}
priv.PublicKey = *pub
return (*DSAPrivateKey)(priv), e
case "1 (RSAMD5)":
case RSAMD5:
fallthrough
case "5 (RSASHA1)":
case RSASHA1:
fallthrough
case "7 (RSASHA1NSEC3SHA1)":
case RSASHA1NSEC3SHA1:
fallthrough
case "8 (RSASHA256)":
case RSASHA256:
fallthrough
case "10 (RSASHA512)":
case RSASHA512:
priv, e := readPrivateKeyRSA(m)
if e != nil {
return nil, e
@ -65,11 +70,11 @@ func (k *DNSKEY) ReadPrivateKey(q io.Reader, file string) (PrivateKey, error) {
}
priv.PublicKey = *pub
return (*RSAPrivateKey)(priv), e
case "12 (ECC-GOST)":
case ECCGOST:
return nil, ErrPrivKey
case "13 (ECDSAP256SHA256)":
case ECDSAP256SHA256:
fallthrough
case "14 (ECDSAP384SHA384)":
case ECDSAP384SHA384:
priv, e := readPrivateKeyECDSA(m)
if e != nil {
return nil, e

View file

@ -548,6 +548,9 @@ a.example.com. IN A 127.0.0.1
8db7._openpgpkey.example.com. IN OPENPGPKEY mQCNAzIG
$ORIGIN a.example.com.
test IN A 127.0.0.1
IN SSHFP 1 2 (
BC6533CDC95A79078A39A56EA7635984ED655318ADA9
B6159E30723665DA95BB )
$ORIGIN b.example.com.
test IN CNAME test.a.example.com.
`
@ -904,8 +907,9 @@ func TestILNP(t *testing.T) {
func TestNsapGposEidNimloc(t *testing.T) {
dt := map[string]string{
"foo.bar.com. IN NSAP 21 47000580ffff000000321099991111222233334444": "foo.bar.com.\t3600\tIN\tNSAP\t21 47000580ffff000000321099991111222233334444",
"host.school.de IN NSAP 17 39276f3100111100002222333344449876": "host.school.de.\t3600\tIN\tNSAP\t17 39276f3100111100002222333344449876",
"foo.bar.com. IN NSAP 21 47000580ffff000000321099991111222233334444": "foo.bar.com.\t3600\tIN\tNSAP\t0x47000580ffff000000321099991111222233334444",
"foo.bar.com. IN NSAP 0x47000580ffff000000321099991111222233334444": "foo.bar.com.\t3600\tIN\tNSAP\t0x47000580ffff000000321099991111222233334444",
"host.school.de IN NSAP 17 39276f3100111100002222333344449876": "host.school.de.\t3600\tIN\tNSAP\t0x39276f3100111100002222333344449876",
"444433332222111199990123000000ff. NSAP-PTR foo.bar.com.": "444433332222111199990123000000ff.\t3600\tIN\tNSAP-PTR\tfoo.bar.com.",
"lillee. IN GPOS -32.6882 116.8652 10.0": "lillee.\t3600\tIN\tGPOS\t-32.6882 116.8652 10.0",
"hinault. IN GPOS -22.6882 116.8652 250.0": "hinault.\t3600\tIN\tGPOS\t-22.6882 116.8652 250.0",
@ -1376,3 +1380,78 @@ func TestParseIPSECKEY(t *testing.T) {
i++
}
}
func TestParseTokenOverflow(t *testing.T) {
_, err := NewRR("_443._tcp.example.org. IN TLSA 0 0 0 308205e8308204d0a00302010202100411de8f53b462f6a5a861b712ec6b59300d06092a864886f70d01010b05003070310b300906035504061302555331153013060355040a130c446967694365727420496e6331193017060355040b13107777772e64696769636572742e636f6d312f302d06035504031326446967694365727420534841322048696768204173737572616e636520536572766572204341301e170d3134313130363030303030305a170d3135313131333132303030305a3081a5310b3009060355040613025553311330110603550408130a43616c69666f726e6961311430120603550407130b4c6f7320416e67656c6573313c303a060355040a1333496e7465726e657420436f72706f726174696f6e20666f722041737369676e6564204e616d657320616e64204e756d6265727331133011060355040b130a546563686e6f6c6f6779311830160603550403130f7777772e6578616d706c652e6f726730820122300d06092a864886f70d01010105000382010f003082010a02820101009e663f52a3d18cb67cdfed547408a4e47e4036538988da2798da3b6655f7240d693ed1cb3fe6d6ad3a9e657ff6efa86b83b0cad24e5d31ff2bf70ec3b78b213f1b4bf61bdc669cbbc07d67154128ca92a9b3cbb4213a836fb823ddd4d7cc04918314d25f06086fa9970ba17e357cca9b458c27eb71760ab95e3f9bc898ae89050ae4d09ba2f7e4259d9ff1e072a6971b18355a8b9e53670c3d5dbdbd283f93a764e71b3a4140ca0746090c08510e2e21078d7d07844bf9c03865b531a0bf2ee766bc401f6451c5a1e6f6fb5d5c1d6a97a0abe91ae8b02e89241e07353909ccd5b41c46de207c06801e08f20713603827f2ae3e68cf15ef881d7e0608f70742e30203010001a382024630820242301f0603551d230418301680145168ff90af0207753cccd9656462a212b859723b301d0603551d0e04160414b000a7f422e9b1ce216117c4c46e7164c8e60c553081810603551d11047a3078820f7777772e6578616d706c652e6f7267820b6578616d706c652e636f6d820b6578616d706c652e656475820b6578616d706c652e6e6574820b6578616d706c652e6f7267820f7777772e6578616d706c652e636f6d820f7777772e6578616d706c652e656475820f7777772e6578616d706c652e6e6574300e0603551d0f0101ff0404030205a0301d0603551d250416301406082b0601050507030106082b0601050507030230750603551d1f046e306c3034a032a030862e687474703a2f2f63726c332e64696769636572742e636f6d2f736861322d68612d7365727665722d67332e63726c3034a032a030862e687474703a2f2f63726c342e64696769636572742e636f6d2f736861322d68612d7365727665722d67332e63726c30420603551d20043b3039303706096086480186fd6c0101302a302806082b06010505070201161c68747470733a2f2f7777772e64696769636572742e636f6d2f43505330818306082b0601050507010104773075302406082b060105050730018618687474703a2f2f6f6373702e64696769636572742e636f6d304d06082b060105050730028641687474703a2f2f636163657274732e64696769636572742e636f6d2f446967694365727453484132486967684173737572616e636553657276657243412e637274300c0603551d130101ff04023000300d06092a864886f70d01010b050003820101005eac2124dedb3978a86ff3608406acb542d3cb54cb83facd63aec88144d6a1bf15dbf1f215c4a73e241e582365cba9ea50dd306541653b3513af1a0756c1b2720e8d112b34fb67181efad9c4609bdc670fb025fa6e6d42188161b026cf3089a08369c2f3609fc84bcc3479140c1922ede430ca8dbac2b2a3cdacb305ba15dc7361c4c3a5e6daa99cb446cb221b28078a7a944efba70d96f31ac143d959bccd2fd50e30c325ea2624fb6b6dbe9344dbcf133bfbd5b4e892d635dbf31596451672c6b65ba5ac9b3cddea92b35dab1065cae3c8cb6bb450a62ea2f72ea7c6bdc7b65fa09b012392543734083c7687d243f8d0375304d99ccd2e148966a8637a6797")
if err == nil {
t.Fatalf("token overflow should return an error")
}
t.Logf("err: %s\n", err)
}
func TestParseTLSA(t *testing.T) {
lt := []string{
"_443._tcp.example.org.\t3600\tIN\tTLSA\t1 1 1 c22be239f483c08957bc106219cc2d3ac1a308dfbbdd0a365f17b9351234cf00",
"_443._tcp.example.org.\t3600\tIN\tTLSA\t2 1 2 4e85f45179e9cd6e0e68e2eb5be2e85ec9b92d91c609caf3ef0315213e3f92ece92c38397a607214de95c7fadc0ad0f1c604a469a0387959745032c0d51492f3",
"_443._tcp.example.org.\t3600\tIN\tTLSA\t3 0 2 69ec8d2277360b215d0cd956b0e2747108dff34b27d461a41c800629e38ee6c2d1230cc9e8e36711330adc6766e6ff7c5fbb37f106f248337c1a20ad682888d2",
}
for _, o := range lt {
rr, err := NewRR(o)
if err != nil {
t.Error("failed to parse RR: ", err)
continue
}
if rr.String() != o {
t.Errorf("`%s' should be equal to\n`%s', but is `%s'", o, o, rr.String())
} else {
t.Logf("RR is OK: `%s'", rr.String())
}
}
}
func TestParseSSHFP(t *testing.T) {
lt := []string{
"test.example.org.\t300\tSSHFP\t1 2 (\n" +
"\t\t\t\t\tBC6533CDC95A79078A39A56EA7635984ED655318ADA9\n" +
"\t\t\t\t\tB6159E30723665DA95BB )",
"test.example.org.\t300\tSSHFP\t1 2 ( BC6533CDC 95A79078A39A56EA7635984ED655318AD A9B6159E3072366 5DA95BB )",
}
result := "test.example.org.\t300\tIN\tSSHFP\t1 2 BC6533CDC95A79078A39A56EA7635984ED655318ADA9B6159E30723665DA95BB"
for _, o := range lt {
rr, err := NewRR(o)
if err != nil {
t.Error("failed to parse RR: ", err)
continue
}
if rr.String() != result {
t.Errorf("`%s' should be equal to\n\n`%s', but is \n`%s'", o, result, rr.String())
} else {
t.Logf("RR is OK: `%s'", rr.String())
}
}
}
func TestParseHINFO(t *testing.T) {
dt := map[string]string{
"example.net. HINFO A B": "example.net. 3600 IN HINFO \"A\" \"B\"",
"example.net. HINFO \"A\" \"B\"": "example.net. 3600 IN HINFO \"A\" \"B\"",
"example.net. HINFO A B C D E F": "example.net. 3600 IN HINFO \"A\" \"B C D E F\"",
"example.net. HINFO AB": "example.net. 3600 IN HINFO \"AB\" \"\"",
// "example.net. HINFO PC-Intel-700mhz \"Redhat Linux 7.1\"": "example.net. 3600 IN HINFO \"PC-Intel-700mhz\" \"Redhat Linux 7.1\"",
// This one is recommended in Pro Bind book http://www.zytrax.com/books/dns/ch8/hinfo.html
// but effectively, even Bind would replace it to correctly formed text when you AXFR
// TODO: remove this set of comments or figure support for quoted/unquoted combinations in endingToTxtSlice function
}
for i, o := range dt {
rr, err := NewRR(i)
if err != nil {
t.Error("failed to parse RR: ", err)
continue
}
if rr.String() != o {
t.Errorf("`%s' should be equal to\n`%s', but is `%s'", i, o, rr.String())
} else {
t.Logf("RR is OK: `%s'", rr.String())
}
}
}

View file

@ -264,6 +264,7 @@ func (srv *Server) ListenAndServe() error {
if e != nil {
return e
}
srv.Listener = l
return srv.serveTCP(l)
case "udp", "udp4", "udp6":
a, e := net.ResolveUDPAddr(srv.Net, addr)
@ -277,6 +278,7 @@ func (srv *Server) ListenAndServe() error {
if e := setUDPSocketOptions(l); e != nil {
return e
}
srv.PacketConn = l
return srv.serveUDP(l)
}
return &Error{err: "bad network"}

View file

@ -17,6 +17,16 @@ func HelloServer(w ResponseWriter, req *Msg) {
w.WriteMsg(m)
}
func HelloServerBadId(w ResponseWriter, req *Msg) {
m := new(Msg)
m.SetReply(req)
m.Id += 1
m.Extra = make([]RR, 1)
m.Extra[0] = &TXT{Hdr: RR_Header{Name: m.Question[0].Name, Rrtype: TypeTXT, Class: ClassINET, Ttl: 0}, Txt: []string{"Hello world"}}
w.WriteMsg(m)
}
func AnotherHelloServer(w ResponseWriter, req *Msg) {
m := new(Msg)
m.SetReply(req)

View file

@ -255,7 +255,9 @@ type HINFO struct {
func (rr *HINFO) Header() *RR_Header { return &rr.Hdr }
func (rr *HINFO) copy() RR { return &HINFO{*rr.Hdr.copyHeader(), rr.Cpu, rr.Os} }
func (rr *HINFO) String() string { return rr.Hdr.String() + rr.Cpu + " " + rr.Os }
func (rr *HINFO) String() string {
return rr.Hdr.String() + sprintTxt([]string{rr.Cpu, rr.Os})
}
func (rr *HINFO) len() int { return rr.Hdr.len() + len(rr.Cpu) + len(rr.Os) }
type MB struct {
@ -1147,13 +1149,12 @@ func (rr *RKEY) String() string {
type NSAP struct {
Hdr RR_Header
Length uint8
Nsap string
}
func (rr *NSAP) Header() *RR_Header { return &rr.Hdr }
func (rr *NSAP) copy() RR { return &NSAP{*rr.Hdr.copyHeader(), rr.Length, rr.Nsap} }
func (rr *NSAP) String() string { return rr.Hdr.String() + strconv.Itoa(int(rr.Length)) + " " + rr.Nsap }
func (rr *NSAP) copy() RR { return &NSAP{*rr.Hdr.copyHeader(), rr.Nsap} }
func (rr *NSAP) String() string { return rr.Hdr.String() + "0x" + rr.Nsap }
func (rr *NSAP) len() int { return rr.Hdr.len() + 1 + len(rr.Nsap) + 1 }
type NSAPPTR struct {

View file

@ -500,14 +500,14 @@ func zlexer(s *scan, c chan lex) {
for err == nil {
l.column = s.position.Column
l.line = s.position.Line
if stri > maxTok {
if stri >= maxTok {
l.token = "token length insufficient for parsing"
l.err = true
debug.Printf("[%+v]", l.token)
c <- l
return
}
if comi > maxTok {
if comi >= maxTok {
l.token = "comment length insufficient for parsing"
l.err = true
debug.Printf("[%+v]", l.token)

View file

@ -49,6 +49,9 @@ func endingToString(c chan lex, errstr, f string) (string, *ParseError, string)
s := ""
l := <-c // zString
for l.value != zNewline && l.value != zEOF {
if l.err {
return s, &ParseError{f, errstr, l}, ""
}
switch l.value {
case zString:
s += l.token
@ -68,11 +71,17 @@ func endingToTxtSlice(c chan lex, errstr, f string) ([]string, *ParseError, stri
quote := false
l := <-c
var s []string
if l.err {
return s, &ParseError{f, errstr, l}, ""
}
switch l.value == zQuote {
case true: // A number of quoted string
s = make([]string, 0)
empty := true
for l.value != zNewline && l.value != zEOF {
if l.err {
return nil, &ParseError{f, errstr, l}, ""
}
switch l.value {
case zString:
empty = false
@ -91,7 +100,7 @@ func endingToTxtSlice(c chan lex, errstr, f string) ([]string, *ParseError, stri
p, i = p+255, i+255
}
s = append(s, sx...)
break;
break
}
s = append(s, l.token)
@ -117,6 +126,9 @@ func endingToTxtSlice(c chan lex, errstr, f string) ([]string, *ParseError, stri
case false: // Unquoted text record
s = make([]string, 1)
for l.value != zNewline && l.value != zEOF {
if l.err {
return s, &ParseError{f, errstr, l}, ""
}
s[0] += l.token
l = <-c
}
@ -333,11 +345,24 @@ func setHINFO(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
rr := new(HINFO)
rr.Hdr = h
l := <-c
rr.Cpu = l.token
<-c // zBlank
l = <-c // zString
rr.Os = l.token
chunks, e, c1 := endingToTxtSlice(c, "bad HINFO Fields", f)
if e != nil {
return nil, e, c1
}
if ln := len(chunks); ln == 0 {
return rr, nil, ""
} else if ln == 1 {
// Can we split it?
if out := strings.Fields(chunks[0]); len(out) > 1 {
chunks = out
} else {
chunks = append(chunks, "")
}
}
rr.Cpu = chunks[0]
rr.Os = strings.Join(chunks[1:], " ")
return rr, nil, ""
}
@ -1438,9 +1463,9 @@ func setWKS(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
case zString:
if k, err = net.LookupPort(proto, l.token); err != nil {
if i, e := strconv.Atoi(l.token); e != nil { // If a number use that
rr.BitMap = append(rr.BitMap, uint16(i))
} else {
return nil, &ParseError{f, "bad WKS BitMap", l}, ""
} else {
rr.BitMap = append(rr.BitMap, uint16(i))
}
}
rr.BitMap = append(rr.BitMap, uint16(k))
@ -1473,8 +1498,11 @@ func setSSHFP(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
}
rr.Type = uint8(i)
<-c // zBlank
l = <-c
rr.FingerPrint = l.token
s, e1, c1 := endingToString(c, "bad SSHFP Fingerprint", f)
if e1 != nil {
return nil, e1, c1
}
rr.FingerPrint = s
return rr, nil, ""
}
@ -1594,21 +1622,28 @@ func setNIMLOC(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
func setNSAP(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
rr := new(NSAP)
rr.Hdr = h
l := <-c
if l.length == 0 {
return rr, nil, l.comment
}
i, e := strconv.Atoi(l.token)
if e != nil {
return nil, &ParseError{f, "bad NSAP Length", l}, ""
}
rr.Length = uint8(i)
<-c // zBlank
s, e1, c1 := endingToString(c, "bad NSAP Nsap", f)
if e != nil {
chunks, e1, c1 := endingToTxtSlice(c, "bad NSAP Nsap", f)
if e1 != nil {
return nil, e1, c1
}
rr.Nsap = s
// data would come as one string or multiple... Just to ignore possible
// variety let's merge things back together and split to actual "words"
s := strings.Fields(strings.Join(chunks, " "))
if len(s) == 0 {
return rr, nil, c1
}
if len(s[0]) >= 2 && s[0][0:2] == "0x" || s[0][0:2] == "0X" {
// although RFC only suggests 0x there is no clarification that X is not allowed
rr.Nsap = strings.Join(s, "")[2:]
} else {
// since we do not know what to do with this data, and, we would not use original length
// in formatting, it's moot to check correctness of the length
_, err := strconv.Atoi(s[0])
if err != nil {
return nil, &ParseError{f, "bad NSAP Length", lex{token: s[0]}}, ""
}
rr.Nsap = strings.Join(s[1:], "")
}
return rr, nil, c1
}
@ -1693,7 +1728,7 @@ func setDLV(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
}
func setCDS(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
r, e, s := setDSs(h, c, o, f, "DLV")
r, e, s := setDSs(h, c, o, f, "CDS")
if r != nil {
return &CDS{*r.(*DS)}, e, s
}
@ -1764,9 +1799,10 @@ func setTLSA(h RR_Header, c chan lex, o, f string) (RR, *ParseError, string) {
return nil, &ParseError{f, "bad TLSA MatchingType", l}, ""
}
rr.MatchingType = uint8(i)
s, e, c1 := endingToString(c, "bad TLSA Certificate", f)
if e != nil {
return nil, e.(*ParseError), c1
// So this needs be e2 (i.e. different than e), because...??t
s, e2, c1 := endingToString(c, "bad TLSA Certificate", f)
if e2 != nil {
return nil, e2, c1
}
rr.Certificate = s
return rr, nil, c1
@ -2153,7 +2189,7 @@ var typeToparserFunc = map[uint16]parserFunc{
TypeEUI64: parserFunc{setEUI64, false},
TypeGID: parserFunc{setGID, false},
TypeGPOS: parserFunc{setGPOS, false},
TypeHINFO: parserFunc{setHINFO, false},
TypeHINFO: parserFunc{setHINFO, true},
TypeHIP: parserFunc{setHIP, true},
TypeIPSECKEY: parserFunc{setIPSECKEY, true},
TypeKX: parserFunc{setKX, false},
@ -2189,7 +2225,7 @@ var typeToparserFunc = map[uint16]parserFunc{
TypeSOA: parserFunc{setSOA, false},
TypeSPF: parserFunc{setSPF, true},
TypeSRV: parserFunc{setSRV, false},
TypeSSHFP: parserFunc{setSSHFP, false},
TypeSSHFP: parserFunc{setSSHFP, true},
TypeTALINK: parserFunc{setTALINK, false},
TypeTA: parserFunc{setTA, true},
TypeTLSA: parserFunc{setTLSA, true},

View file

@ -41,7 +41,8 @@ const (
ReservedLabelPrefix = "__"
// MetaLabelPrefix is a prefix for labels that provide meta information.
// Labels with the prefix will not be attached to time series.
// Labels with this prefix are used for intermediate label processing and
// will not be attached to time series.
MetaLabelPrefix = "__meta_"
// JobLabel is the label name indicating the job from which a timeseries
@ -66,7 +67,7 @@ var labelNameRE = regexp.MustCompile("^[a-zA-Z_][a-zA-Z0-9_]*$")
// therewith.
type LabelName string
// UnmarshalYAML implements the yaml.Unmarshaller interface.
// UnmarshalYAML implements the yaml.Unmarshaler interface.
func (ln *LabelName) UnmarshalYAML(unmarshal func(interface{}) error) error {
var s string
if err := unmarshal(&s); err != nil {

View file

@ -16,7 +16,7 @@ type goCollector struct {
func NewGoCollector() *goCollector {
return &goCollector{
goroutines: NewGauge(GaugeOpts{
Name: "process_goroutines",
Name: "go_goroutines",
Help: "Number of goroutines that currently exist.",
}),
gcDesc: NewDesc(

View file

@ -213,6 +213,13 @@ func newHistogram(desc *Desc, opts HistogramOpts, labelValues ...string) Histogr
}
type histogram struct {
// sumBits contains the bits of the float64 representing the sum of all
// observations. sumBits and count have to go first in the struct to
// guarantee alignment for atomic operations.
// http://golang.org/pkg/sync/atomic/#pkg-note-BUG
sumBits uint64
count uint64
SelfCollector
// Note that there is no mutex required.
@ -222,9 +229,6 @@ type histogram struct {
counts []uint64
labelPairs []*dto.LabelPair
sumBits uint64 // The bits of the float64 representing the sum of all observations.
count uint64
}
func (h *histogram) Desc() *Desc {

View file

@ -43,11 +43,15 @@ var errInconsistentCardinality = errors.New("inconsistent label cardinality")
// ValueType. This is a low-level building block used by the library to back the
// implementations of Counter, Gauge, and Untyped.
type value struct {
// valBits containst the bits of the represented float64 value. It has
// to go first in the struct to guarantee alignment for atomic
// operations. http://golang.org/pkg/sync/atomic/#pkg-note-BUG
valBits uint64
SelfCollector
desc *Desc
valType ValueType
valBits uint64 // These are the bits of the represented float64 value.
labelPairs []*dto.LabelPair
}

View file

@ -175,9 +175,9 @@ http_response_size_bytes_count{handler="prometheus"} 119
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
# TYPE process_cpu_seconds_total counter
process_cpu_seconds_total 0.55
# HELP process_goroutines Number of goroutines that currently exist.
# TYPE process_goroutines gauge
process_goroutines 70
# HELP go_goroutines Number of goroutines that currently exist.
# TYPE go_goroutines gauge
go_goroutines 70
# HELP process_max_fds Maximum number of open file descriptors.
# TYPE process_max_fds gauge
process_max_fds 8192

View file

@ -8,4 +8,5 @@ Maintainers of this repository:
The following individuals have contributed code to this repository
(listed in alphabetical order):
* Tobias Schmidt <ts@soundcloud.com>
* Ji-Hoon, Seol <jihoon.seol@gmail.com>
* Tobias Schmidt <tobidt@gmail.com>

View file

@ -3,5 +3,8 @@
This procfs package provides functions to retrieve system, kernel and process
metrics from the pseudo-filesystem proc.
*WARNING*: This package is a work in progress. Its API may still break in
backwards-incompatible ways without warnings. Use it at your own risk.
[![GoDoc](https://godoc.org/github.com/prometheus/procfs?status.png)](https://godoc.org/github.com/prometheus/procfs)
[![Build Status](https://travis-ci.org/prometheus/procfs.svg?branch=master)](https://travis-ci.org/prometheus/procfs)

View file

@ -0,0 +1 @@
/usr/bin/vim

View file

@ -0,0 +1,7 @@
rchar: 750339
wchar: 818609
syscr: 7405
syscw: 5245
read_bytes: 1024
write_bytes: 2048
cancelled_write_bytes: -1024

View file

@ -0,0 +1,17 @@
Limit Soft Limit Hard Limit Units
Max cpu time unlimited unlimited seconds
Max file size unlimited unlimited bytes
Max data size unlimited unlimited bytes
Max stack size 8388608 unlimited bytes
Max core file size 0 unlimited bytes
Max resident set unlimited unlimited bytes
Max processes 29436 29436 processes
Max open files 1024 4096 files
Max locked memory 65536 65536 bytes
Max address space unlimited unlimited bytes
Max file locks unlimited unlimited locks
Max pending signals 29436 29436 signals
Max msgqueue size 819200 819200 bytes
Max nice priority 0 0
Max realtime priority 0 0
Max realtime timeout unlimited unlimited us

View file

@ -0,0 +1 @@
33 (ata_sff) S 2 0 0 0 -1 69238880 0 0 0 0 0 0 0 0 0 -20 1 0 5 0 0 18446744073709551615 0 0 0 0 0 0 0 2147483647 0 18446744073709551615 0 0 17 1 0 0 0 0 0 0 0 0 0 0 0 0 0

View file

@ -0,0 +1,14 @@
IP Virtual Server version 1.2.1 (size=4096)
Prot LocalAddress:Port Scheduler Flags
-> RemoteAddress:Port Forward Weight ActiveConn InActConn
TCP C0A80016:0CEA wlc
-> C0A85216:0CEA Tunnel 100 248 2
-> C0A85318:0CEA Tunnel 100 248 2
-> C0A85315:0CEA Tunnel 100 248 1
TCP C0A80039:0CEA wlc
-> C0A85416:0CEA Tunnel 0 0 0
-> C0A85215:0CEA Tunnel 100 1499 0
-> C0A83215:0CEA Tunnel 100 1498 0
TCP C0A80037:0CEA wlc
-> C0A8321A:0CEA Tunnel 0 0 0
-> C0A83120:0CEA Tunnel 100 0 0

View file

@ -0,0 +1,6 @@
Total Incoming Outgoing Incoming Outgoing
Conns Packets Packets Bytes Bytes
16AA370 E33656E5 0 51D8C8883AB3 0
Conns/s Pkts/s Pkts/s Bytes/s Bytes/s
4 1FB3C 0 1282A8F 0

View file

@ -34,3 +34,7 @@ func (fs FS) stat(p string) (os.FileInfo, error) {
func (fs FS) open(p string) (*os.File, error) {
return os.Open(path.Join(string(fs), p))
}
func (fs FS) readlink(p string) (string, error) {
return os.Readlink(path.Join(string(fs), p))
}

View file

@ -0,0 +1,223 @@
package procfs
import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"strconv"
"strings"
)
// IPVSStats holds IPVS statistics, as exposed by the kernel in `/proc/net/ip_vs_stats`.
type IPVSStats struct {
// Total count of connections.
Connections uint64
// Total incoming packages processed.
IncomingPackets uint64
// Total outgoing packages processed.
OutgoingPackets uint64
// Total incoming traffic.
IncomingBytes uint64
// Total outgoing traffic.
OutgoingBytes uint64
}
// IPVSBackendStatus holds current metrics of one virtual / real address pair.
type IPVSBackendStatus struct {
// The local (virtual) IP address.
LocalAddress net.IP
// The local (virtual) port.
LocalPort uint16
// The transport protocol (TCP, UDP).
Proto string
// The remote (real) IP address.
RemoteAddress net.IP
// The remote (real) port.
RemotePort uint16
// The current number of active connections for this virtual/real address pair.
ActiveConn uint64
// The current number of inactive connections for this virtual/real address pair.
InactConn uint64
// The current weight of this virtual/real address pair.
Weight uint64
}
// NewIPVSStats reads the IPVS statistics.
func NewIPVSStats() (IPVSStats, error) {
fs, err := NewFS(DefaultMountPoint)
if err != nil {
return IPVSStats{}, err
}
return fs.NewIPVSStats()
}
// NewIPVSStats reads the IPVS statistics from the specified `proc` filesystem.
func (fs FS) NewIPVSStats() (IPVSStats, error) {
file, err := fs.open("net/ip_vs_stats")
if err != nil {
return IPVSStats{}, err
}
defer file.Close()
return parseIPVSStats(file)
}
// parseIPVSStats performs the actual parsing of `ip_vs_stats`.
func parseIPVSStats(file io.Reader) (IPVSStats, error) {
var (
statContent []byte
statLines []string
statFields []string
stats IPVSStats
)
statContent, err := ioutil.ReadAll(file)
if err != nil {
return IPVSStats{}, err
}
statLines = strings.SplitN(string(statContent), "\n", 4)
if len(statLines) != 4 {
return IPVSStats{}, errors.New("ip_vs_stats corrupt: too short")
}
statFields = strings.Fields(statLines[2])
if len(statFields) != 5 {
return IPVSStats{}, errors.New("ip_vs_stats corrupt: unexpected number of fields")
}
stats.Connections, err = strconv.ParseUint(statFields[0], 16, 64)
if err != nil {
return IPVSStats{}, err
}
stats.IncomingPackets, err = strconv.ParseUint(statFields[1], 16, 64)
if err != nil {
return IPVSStats{}, err
}
stats.OutgoingPackets, err = strconv.ParseUint(statFields[2], 16, 64)
if err != nil {
return IPVSStats{}, err
}
stats.IncomingBytes, err = strconv.ParseUint(statFields[3], 16, 64)
if err != nil {
return IPVSStats{}, err
}
stats.OutgoingBytes, err = strconv.ParseUint(statFields[4], 16, 64)
if err != nil {
return IPVSStats{}, err
}
return stats, nil
}
// NewIPVSBackendStatus reads and returns the status of all (virtual,real) server pairs.
func NewIPVSBackendStatus() ([]IPVSBackendStatus, error) {
fs, err := NewFS(DefaultMountPoint)
if err != nil {
return []IPVSBackendStatus{}, err
}
return fs.NewIPVSBackendStatus()
}
// NewIPVSBackendStatus reads and returns the status of all (virtual,real) server pairs from the specified `proc` filesystem.
func (fs FS) NewIPVSBackendStatus() ([]IPVSBackendStatus, error) {
file, err := fs.open("net/ip_vs")
if err != nil {
return nil, err
}
defer file.Close()
return parseIPVSBackendStatus(file)
}
func parseIPVSBackendStatus(file io.Reader) ([]IPVSBackendStatus, error) {
var (
status []IPVSBackendStatus
scanner = bufio.NewScanner(file)
proto string
localAddress net.IP
localPort uint16
err error
)
for scanner.Scan() {
fields := strings.Fields(string(scanner.Text()))
if len(fields) == 0 {
continue
}
switch {
case fields[0] == "IP" || fields[0] == "Prot" || fields[1] == "RemoteAddress:Port":
continue
case fields[0] == "TCP" || fields[0] == "UDP":
if len(fields) < 2 {
continue
}
proto = fields[0]
localAddress, localPort, err = parseIPPort(fields[1])
if err != nil {
return nil, err
}
case fields[0] == "->":
if len(fields) < 6 {
continue
}
remoteAddress, remotePort, err := parseIPPort(fields[1])
if err != nil {
return nil, err
}
weight, err := strconv.ParseUint(fields[3], 10, 64)
if err != nil {
return nil, err
}
activeConn, err := strconv.ParseUint(fields[4], 10, 64)
if err != nil {
return nil, err
}
inactConn, err := strconv.ParseUint(fields[5], 10, 64)
if err != nil {
return nil, err
}
status = append(status, IPVSBackendStatus{
LocalAddress: localAddress,
LocalPort: localPort,
RemoteAddress: remoteAddress,
RemotePort: remotePort,
Proto: proto,
Weight: weight,
ActiveConn: activeConn,
InactConn: inactConn,
})
}
}
return status, nil
}
func parseIPPort(s string) (net.IP, uint16, error) {
tmp := strings.SplitN(s, ":", 2)
if len(tmp) != 2 {
return nil, 0, fmt.Errorf("invalid IP:Port: %s", s)
}
if len(tmp[0]) != 8 && len(tmp[0]) != 32 {
return nil, 0, fmt.Errorf("invalid IP: %s", tmp[0])
}
ip, err := hex.DecodeString(tmp[0])
if err != nil {
return nil, 0, err
}
port, err := strconv.ParseUint(tmp[1], 16, 16)
if err != nil {
return nil, 0, err
}
return ip, uint16(port), nil
}

View file

@ -0,0 +1,196 @@
package procfs
import (
"net"
"testing"
)
var (
expectedIPVSStats = IPVSStats{
Connections: 23765872,
IncomingPackets: 3811989221,
OutgoingPackets: 0,
IncomingBytes: 89991519156915,
OutgoingBytes: 0,
}
expectedIPVSBackendStatuses = []IPVSBackendStatus{
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.22"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.82.22"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 248,
InactConn: 2,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.22"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.83.24"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 248,
InactConn: 2,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.22"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.83.21"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 248,
InactConn: 1,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.57"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.84.22"),
RemotePort: 3306,
Proto: "TCP",
Weight: 0,
ActiveConn: 0,
InactConn: 0,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.57"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.82.21"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 1499,
InactConn: 0,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.57"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.50.21"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 1498,
InactConn: 0,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.55"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.50.26"),
RemotePort: 3306,
Proto: "TCP",
Weight: 0,
ActiveConn: 0,
InactConn: 0,
},
IPVSBackendStatus{
LocalAddress: net.ParseIP("192.168.0.55"),
LocalPort: 3306,
RemoteAddress: net.ParseIP("192.168.49.32"),
RemotePort: 3306,
Proto: "TCP",
Weight: 100,
ActiveConn: 0,
InactConn: 0,
},
}
)
func TestIPVSStats(t *testing.T) {
fs, err := NewFS("fixtures")
if err != nil {
t.Fatal(err)
}
stats, err := fs.NewIPVSStats()
if err != nil {
t.Fatal(err)
}
if stats != expectedIPVSStats {
t.Errorf("want %+v, got %+v", expectedIPVSStats, stats)
}
}
func TestParseIPPort(t *testing.T) {
ip := net.ParseIP("192.168.0.22")
port := uint16(3306)
gotIP, gotPort, err := parseIPPort("C0A80016:0CEA")
if err != nil {
t.Fatal(err)
}
if !(gotIP.Equal(ip) && port == gotPort) {
t.Errorf("want %s:%d, got %s:%d", ip, port, gotIP, gotPort)
}
}
func TestParseIPPortInvalid(t *testing.T) {
testcases := []string{
"",
"C0A80016",
"C0A800:1234",
"FOOBARBA:1234",
"C0A80016:0CEA:1234",
}
for _, s := range testcases {
ip, port, err := parseIPPort(s)
if ip != nil || port != uint16(0) || err == nil {
t.Errorf("Expected error for input %s, got ip = %s, port = %v, err = %v", s, ip, port, err)
}
}
}
func TestParseIPPortIPv6(t *testing.T) {
ip := net.ParseIP("dead:beef::1")
port := uint16(8080)
gotIP, gotPort, err := parseIPPort("DEADBEEF000000000000000000000001:1F90")
if err != nil {
t.Fatal(err)
}
if !(gotIP.Equal(ip) && port == gotPort) {
t.Errorf("want %s:%d, got %s:%d", ip, port, gotIP, gotPort)
}
}
func TestIPVSBackendStatus(t *testing.T) {
fs, err := NewFS("fixtures")
if err != nil {
t.Fatal(err)
}
backendStats, err := fs.NewIPVSBackendStatus()
if err != nil {
t.Fatal(err)
}
for idx, expect := range expectedIPVSBackendStatuses {
if !backendStats[idx].LocalAddress.Equal(expect.LocalAddress) {
t.Errorf("expected LocalAddress %s, got %s", expect.LocalAddress, backendStats[idx].LocalAddress)
}
if backendStats[idx].LocalPort != expect.LocalPort {
t.Errorf("expected LocalPort %d, got %d", expect.LocalPort, backendStats[idx].LocalPort)
}
if !backendStats[idx].RemoteAddress.Equal(expect.RemoteAddress) {
t.Errorf("expected RemoteAddress %s, got %s", expect.RemoteAddress, backendStats[idx].RemoteAddress)
}
if backendStats[idx].RemotePort != expect.RemotePort {
t.Errorf("expected RemotePort %d, got %d", expect.RemotePort, backendStats[idx].RemotePort)
}
if backendStats[idx].Proto != expect.Proto {
t.Errorf("expected Proto %s, got %s", expect.Proto, backendStats[idx].Proto)
}
if backendStats[idx].Weight != expect.Weight {
t.Errorf("expected Weight %d, got %d", expect.Weight, backendStats[idx].Weight)
}
if backendStats[idx].ActiveConn != expect.ActiveConn {
t.Errorf("expected ActiveConn %d, got %d", expect.ActiveConn, backendStats[idx].ActiveConn)
}
if backendStats[idx].InactConn != expect.InactConn {
t.Errorf("expected InactConn %d, got %d", expect.InactConn, backendStats[idx].InactConn)
}
}
}

View file

@ -96,9 +96,24 @@ func (p Proc) CmdLine() ([]string, error) {
return nil, err
}
if len(data) < 1 {
return []string{}, nil
}
return strings.Split(string(data[:len(data)-1]), string(byte(0))), nil
}
// Executable returns the absolute path of the executable command of a process.
func (p Proc) Executable() (string, error) {
exe, err := p.readlink("exe")
if os.IsNotExist(err) {
return "", nil
}
return exe, err
}
// FileDescriptors returns the currently open file descriptors of a process.
func (p Proc) FileDescriptors() ([]uintptr, error) {
names, err := p.fileDescriptors()
@ -147,3 +162,7 @@ func (p Proc) fileDescriptors() ([]string, error) {
func (p Proc) open(pa string) (*os.File, error) {
return p.fs.open(path.Join(strconv.Itoa(p.PID), pa))
}
func (p Proc) readlink(pa string) (string, error) {
return p.fs.readlink(path.Join(strconv.Itoa(p.PID), pa))
}

View file

@ -0,0 +1,54 @@
package procfs
import (
"fmt"
"io/ioutil"
)
// ProcIO models the content of /proc/<pid>/io.
type ProcIO struct {
// Chars read.
RChar uint64
// Chars written.
WChar uint64
// Read syscalls.
SyscR uint64
// Write syscalls.
SyscW uint64
// Bytes read.
ReadBytes uint64
// Bytes written.
WriteBytes uint64
// Bytes written, but taking into account truncation. See
// Documentation/filesystems/proc.txt in the kernel sources for
// detailed explanation.
CancelledWriteBytes int64
}
// NewIO creates a new ProcIO instance from a given Proc instance.
func (p Proc) NewIO() (ProcIO, error) {
pio := ProcIO{}
f, err := p.open("io")
if err != nil {
return pio, err
}
defer f.Close()
data, err := ioutil.ReadAll(f)
if err != nil {
return pio, err
}
ioFormat := "rchar: %d\nwchar: %d\nsyscr: %d\nsyscw: %d\n" +
"read_bytes: %d\nwrite_bytes: %d\n" +
"cancelled_write_bytes: %d\n"
_, err = fmt.Sscanf(string(data), ioFormat, &pio.RChar, &pio.WChar, &pio.SyscR,
&pio.SyscW, &pio.ReadBytes, &pio.WriteBytes, &pio.CancelledWriteBytes)
if err != nil {
return pio, err
}
return pio, nil
}

View file

@ -0,0 +1,49 @@
package procfs
import "testing"
func TestProcIO(t *testing.T) {
fs, err := NewFS("fixtures")
if err != nil {
t.Fatal(err)
}
p, err := fs.NewProc(26231)
if err != nil {
t.Fatal(err)
}
s, err := p.NewIO()
if err != nil {
t.Fatal(err)
}
for _, test := range []struct {
name string
want uint64
got uint64
}{
{name: "RChar", want: 750339, got: s.RChar},
{name: "WChar", want: 818609, got: s.WChar},
{name: "SyscR", want: 7405, got: s.SyscR},
{name: "SyscW", want: 5245, got: s.SyscW},
{name: "ReadBytes", want: 1024, got: s.ReadBytes},
{name: "WriteBytes", want: 2048, got: s.WriteBytes},
} {
if test.want != test.got {
t.Errorf("want %s %d, got %d", test.name, test.want, test.got)
}
}
for _, test := range []struct {
name string
want int64
got int64
}{
{name: "CancelledWriteBytes", want: -1024, got: s.CancelledWriteBytes},
} {
if test.want != test.got {
t.Errorf("want %s %d, got %d", test.name, test.want, test.got)
}
}
}

View file

@ -40,16 +40,46 @@ func TestAllProcs(t *testing.T) {
}
func TestCmdLine(t *testing.T) {
p1, err := testProcess(26231)
for _, tt := range []struct {
process int
want []string
}{
{process: 26231, want: []string{"vim", "test.go", "+10"}},
{process: 26232, want: []string{}},
} {
p1, err := testProcess(tt.process)
if err != nil {
t.Fatal(err)
}
c, err := p1.CmdLine()
c1, err := p1.CmdLine()
if err != nil {
t.Fatal(err)
}
if want := []string{"vim", "test.go", "+10"}; !reflect.DeepEqual(want, c) {
t.Errorf("want cmdline %v, got %v", want, c)
if !reflect.DeepEqual(tt.want, c1) {
t.Errorf("want cmdline %v, got %v", tt.want, c1)
}
}
}
func TestExecutable(t *testing.T) {
for _, tt := range []struct {
process int
want string
}{
{process: 26231, want: "/usr/bin/vim"},
{process: 26232, want: ""},
} {
p, err := testProcess(tt.process)
if err != nil {
t.Fatal(err)
}
exe, err := p.Executable()
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(tt.want, exe) {
t.Errorf("want absolute path to cmdline %v, got %v", tt.want, exe)
}
}
}

View file

@ -69,6 +69,7 @@ type DB struct {
compErrC chan error
compPerErrC chan error
compErrSetC chan error
compWriteLocking bool
compStats []cStats
// Close.
@ -108,6 +109,16 @@ func openDB(s *session) (*DB, error) {
closeC: make(chan struct{}),
}
// Read-only mode.
readOnly := s.o.GetReadOnly()
if readOnly {
// Recover journals (read-only mode).
if err := db.recoverJournalRO(); err != nil {
return nil, err
}
} else {
// Recover journals.
if err := db.recoverJournal(); err != nil {
return nil, err
}
@ -122,14 +133,20 @@ func openDB(s *session) (*DB, error) {
return nil, err
}
}
// Doesn't need to be included in the wait group.
go db.compactionError()
go db.mpoolDrain()
if readOnly {
db.SetReadOnly()
} else {
db.closeW.Add(3)
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start))
@ -275,7 +292,7 @@ func recoverTable(s *session, o *opt.Options) error {
// We will drop corrupted table.
strict = o.GetStrict(opt.StrictRecovery)
rec = &sessionRecord{numLevel: o.GetNumLevel()}
rec = &sessionRecord{}
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
)
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
@ -450,78 +467,92 @@ func recoverTable(s *session, o *opt.Options) error {
}
func (db *DB) recoverJournal() error {
// Get all tables and sort it by file number.
journalFiles_, err := db.s.getFiles(storage.TypeJournal)
// Get all journals and sort it by file number.
allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
if err != nil {
return err
}
journalFiles := files(journalFiles_)
journalFiles.sort()
files(allJournalFiles).sort()
// Discard older journal.
prev := -1
for i, file := range journalFiles {
if file.Num() >= db.s.stJournalNum {
if prev >= 0 {
i--
journalFiles[i] = journalFiles[prev]
}
journalFiles = journalFiles[i:]
break
} else if file.Num() == db.s.stPrevJournalNum {
prev = i
// Journals that will be recovered.
var recJournalFiles []storage.File
for _, jf := range allJournalFiles {
if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
recJournalFiles = append(recJournalFiles, jf)
}
}
var jr *journal.Reader
var of storage.File
var mem *memdb.DB
batch := new(Batch)
cm := newCMem(db.s)
buf := new(util.Buffer)
var (
of storage.File // Obsolete file.
rec = &sessionRecord{}
)
// Recover journals.
if len(recJournalFiles) > 0 {
db.logf("journal@recovery F·%d", len(recJournalFiles))
// Mark file number as used.
db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num())
var (
// Options.
strict := db.s.o.GetStrict(opt.StrictJournal)
checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer := db.s.o.GetWriteBuffer()
recoverJournal := func(file storage.File) error {
db.logf("journal@recovery recovering @%d", file.Num())
reader, err := file.Open()
strict = db.s.o.GetStrict(opt.StrictJournal)
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
jr *journal.Reader
mdb = memdb.New(db.s.icmp, writeBuffer)
buf = &util.Buffer{}
batch = &Batch{}
)
for _, jf := range recJournalFiles {
db.logf("journal@recovery recovering @%d", jf.Num())
fr, err := jf.Open()
if err != nil {
return err
}
defer reader.Close()
// Create/reset journal reader instance.
// Create or reset journal reader instance.
if jr == nil {
jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum)
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
} else {
jr.Reset(reader, dropper{db.s, file}, strict, checksum)
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
}
// Flush memdb and remove obsolete journal file.
if of != nil {
if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
fr.Close()
return err
}
}
if err := cm.commit(file.Num(), db.seq); err != nil {
rec.setJournalNum(jf.Num())
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
fr.Close()
return err
}
cm.reset()
rec.resetAddedTables()
of.Remove()
of = nil
}
// Replay journal to memdb.
mem.Reset()
mdb.Reset()
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
return errors.SetFile(err, file)
fr.Close()
return errors.SetFile(err, jf)
}
buf.Reset()
@ -529,53 +560,43 @@ func (db *DB) recoverJournal() error {
if err == io.ErrUnexpectedEOF {
// This is error returned due to corruption, with strict == false.
continue
} else {
return errors.SetFile(err, file)
}
fr.Close()
return errors.SetFile(err, jf)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mem); err != nil {
if strict || !errors.IsCorrupted(err) {
return errors.SetFile(err, file)
} else {
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
// Flush it if large enough.
if mem.Size() >= writeBuffer {
if err := cm.flush(mem, 0); err != nil {
if mdb.Size() >= writeBuffer {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
return err
}
mem.Reset()
mdb.Reset()
}
}
of = file
return nil
fr.Close()
of = jf
}
// Recover all journals.
if len(journalFiles) > 0 {
db.logf("journal@recovery F·%d", len(journalFiles))
// Mark file number as used.
db.s.markFileNum(journalFiles[len(journalFiles)-1].Num())
mem = memdb.New(db.s.icmp, writeBuffer)
for _, file := range journalFiles {
if err := recoverJournal(file); err != nil {
return err
}
}
// Flush the last journal.
if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil {
// Flush the last memdb.
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
return err
}
}
@ -587,8 +608,10 @@ func (db *DB) recoverJournal() error {
}
// Commit.
if err := cm.commit(db.journalFile.Num(), db.seq); err != nil {
// Close journal.
rec.setJournalNum(db.journalFile.Num())
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
// Close journal on error.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
@ -604,6 +627,103 @@ func (db *DB) recoverJournal() error {
return nil
}
func (db *DB) recoverJournalRO() error {
// Get all journals and sort it by file number.
allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
if err != nil {
return err
}
files(allJournalFiles).sort()
// Journals that will be recovered.
var recJournalFiles []storage.File
for _, jf := range allJournalFiles {
if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
recJournalFiles = append(recJournalFiles, jf)
}
}
var (
// Options.
strict = db.s.o.GetStrict(opt.StrictJournal)
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
mdb = memdb.New(db.s.icmp, writeBuffer)
)
// Recover journals.
if len(recJournalFiles) > 0 {
db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
var (
jr *journal.Reader
buf = &util.Buffer{}
batch = &Batch{}
)
for _, jf := range recJournalFiles {
db.logf("journal@recovery recovering @%d", jf.Num())
fr, err := jf.Open()
if err != nil {
return err
}
// Create or reset journal reader instance.
if jr == nil {
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
} else {
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
}
// Replay journal to memdb.
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
fr.Close()
return errors.SetFile(err, jf)
}
buf.Reset()
if _, err := buf.ReadFrom(r); err != nil {
if err == io.ErrUnexpectedEOF {
// This is error returned due to corruption, with strict == false.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
}
fr.Close()
}
}
// Set memDB.
db.mem = &memDB{db: db, DB: mdb, ref: 1}
return nil
}
func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := newIkey(key, seq, ktSeek)
@ -614,7 +734,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
}
defer m.decref()
mk, mv, me := m.mdb.Find(ikey)
mk, mv, me := m.Find(ikey)
if me == nil {
ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil {
@ -652,7 +772,7 @@ func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err er
}
defer m.decref()
mk, _, me := m.mdb.Find(ikey)
mk, _, me := m.Find(ikey)
if me == nil {
ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil {
@ -784,7 +904,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
const prefix = "leveldb."
if !strings.HasPrefix(name, prefix) {
return "", errors.New("leveldb: GetProperty: unknown property: " + name)
return "", ErrNotFound
}
p := name[len(prefix):]
@ -798,7 +918,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
var rest string
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
if n != 1 || int(level) >= db.s.o.GetNumLevel() {
err = errors.New("leveldb: GetProperty: invalid property: " + name)
err = ErrNotFound
} else {
value = fmt.Sprint(v.tLen(int(level)))
}
@ -837,7 +957,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
case p == "aliveiters":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
err = ErrNotFound
}
return
@ -900,6 +1020,9 @@ func (db *DB) Close() error {
var err error
select {
case err = <-db.compErrC:
if err == ErrReadOnly {
err = nil
}
default:
}

View file

@ -11,7 +11,6 @@ import (
"time"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
@ -62,58 +61,8 @@ func (p *cStatsStaging) stopTimer() {
}
}
type cMem struct {
s *session
level int
rec *sessionRecord
}
func newCMem(s *session) *cMem {
return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}}
}
func (c *cMem) flush(mem *memdb.DB, level int) error {
s := c.s
// Write memdb to table.
iter := mem.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
return err
}
// Pick level.
if level < 0 {
v := s.version()
level = v.pickLevel(t.imin.ukey(), t.imax.ukey())
v.release()
}
c.rec.addTableFile(level, t)
s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
c.level = level
return nil
}
func (c *cMem) reset() {
c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()}
}
func (c *cMem) commit(journal, seq uint64) error {
c.rec.setJournalNum(journal)
c.rec.setSeqNum(seq)
// Commit changes.
return c.s.commit(c.rec)
}
func (db *DB) compactionError() {
var (
err error
wlocked bool
)
var err error
noerr:
// No error.
for {
@ -121,7 +70,7 @@ noerr:
case err = <-db.compErrSetC:
switch {
case err == nil:
case errors.IsCorrupted(err):
case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
goto haserr
@ -139,7 +88,7 @@ haserr:
switch {
case err == nil:
goto noerr
case errors.IsCorrupted(err):
case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr
default:
}
@ -155,9 +104,9 @@ hasperr:
case db.compPerErrC <- err:
case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through.
wlocked = true
db.compWriteLocking = true
case _, _ = <-db.closeC:
if wlocked {
if db.compWriteLocking {
// We should release the lock or Close will hang.
<-db.writeLockC
}
@ -287,21 +236,18 @@ func (db *DB) compactionExitTransact() {
}
func (db *DB) memCompaction() {
mem := db.getFrozenMem()
if mem == nil {
mdb := db.getFrozenMem()
if mdb == nil {
return
}
defer mem.decref()
defer mdb.decref()
c := newCMem(db.s)
stats := new(cStatsStaging)
db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
// Don't compact empty memdb.
if mem.mdb.Len() == 0 {
db.logf("mem@flush skipping")
// drop frozen mem
if mdb.Len() == 0 {
db.logf("memdb@flush skipping")
// drop frozen memdb
db.dropFrozenMem()
return
}
@ -317,13 +263,20 @@ func (db *DB) memCompaction() {
return
}
db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) {
var (
rec = &sessionRecord{}
stats = &cStatsStaging{}
flushLevel int
)
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
return c.flush(mem.mdb, -1)
flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1)
stats.stopTimer()
return
}, func() error {
for _, r := range c.rec.addedTables {
db.logf("mem@flush revert @%d", r.num)
for _, r := range rec.addedTables {
db.logf("memdb@flush revert @%d", r.num)
f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil {
return err
@ -332,20 +285,23 @@ func (db *DB) memCompaction() {
return nil
})
db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) {
db.compactionTransactFunc("memdb@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
return c.commit(db.journalFile.Num(), db.frozenSeq)
rec.setJournalNum(db.journalFile.Num())
rec.setSeqNum(db.frozenSeq)
err = db.s.commit(rec)
stats.stopTimer()
return
}, nil)
db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration)
db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
for _, r := range c.rec.addedTables {
for _, r := range rec.addedTables {
stats.write += r.size
}
db.compStats[c.level].add(stats)
db.compStats[flushLevel].add(stats)
// Drop frozen mem.
// Drop frozen memdb.
db.dropFrozenMem()
// Resume table compaction.
@ -557,7 +513,7 @@ func (b *tableCompactionBuilder) revert() error {
func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
defer c.release()
rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()}
rec := &sessionRecord{}
rec.addCompPtr(c.level, c.imax)
if !noTrivial && c.trivial() {

View file

@ -40,11 +40,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
emi := em.mdb.NewIterator(slice)
emi := em.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
fmi := fm.mdb.NewIterator(slice)
fmi := fm.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}

View file

@ -16,7 +16,7 @@ import (
type memDB struct {
db *DB
mdb *memdb.DB
*memdb.DB
ref int32
}
@ -27,12 +27,12 @@ func (m *memDB) incref() {
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
// Only put back memdb with std capacity.
if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
m.mdb.Reset()
m.db.mpoolPut(m.mdb)
if m.Capacity() == m.db.s.o.GetWriteBuffer() {
m.Reset()
m.db.mpoolPut(m.DB)
}
m.db = nil
m.mdb = nil
m.DB = nil
} else if ref < 0 {
panic("negative memdb ref")
}
@ -126,7 +126,7 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
}
mem = &memDB{
db: db,
mdb: mdb,
DB: mdb,
ref: 2,
}
db.mem = mem

View file

@ -2445,7 +2445,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
rec := &sessionRecord{}
rec.addTableFile(i, tf)
if err := s.commit(rec); err != nil {
t.Fatal(err)
@ -2455,7 +2455,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Build grandparent.
v := s.version()
c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
rec := &sessionRecord{}
b := &tableCompactionBuilder{
s: s,
c: c,
@ -2479,7 +2479,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Build level-1.
v = s.version()
c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
@ -2523,7 +2523,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Compaction with transient error.
v = s.version()
c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
rec = &sessionRecord{}
b = &tableCompactionBuilder{
s: s,
c: c,
@ -2663,3 +2663,39 @@ func TestDB_IterTriggeredCompaction(t *testing.T) {
func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
testDB_IterTriggeredCompaction(t, 2)
}
func TestDB_ReadOnly(t *testing.T) {
h := newDbHarness(t)
defer h.close()
h.put("foo", "v1")
h.put("bar", "v2")
h.compactMem()
h.put("xfoo", "v1")
h.put("xbar", "v2")
t.Log("Trigger read-only")
if err := h.db.SetReadOnly(); err != nil {
h.close()
t.Fatalf("SetReadOnly error: %v", err)
}
h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync)
ro := func(key, value, wantValue string) {
if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
t.Fatalf("unexpected error: %v", err)
}
h.getVal(key, wantValue)
}
ro("foo", "vx", "v1")
h.o.ReadOnly = true
h.reopenDB()
ro("foo", "vx", "v1")
ro("bar", "vx", "v2")
h.assertNumKeys(4)
}

View file

@ -63,24 +63,24 @@ func (db *DB) rotateMem(n int) (mem *memDB, err error) {
return
}
func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
mem = db.getEffectiveMem()
mdb = db.getEffectiveMem()
defer func() {
if retry {
mem.decref()
mem = nil
mdb.decref()
mdb = nil
}
}()
nn = mem.mdb.Free()
mdbFree = mdb.Free()
switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true
time.Sleep(time.Millisecond)
case nn >= n:
case mdbFree >= n:
return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true
@ -90,15 +90,15 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
}
default:
// Allow memdb to grow if it has no entry.
if mem.mdb.Len() == 0 {
nn = n
if mdb.Len() == 0 {
mdbFree = n
} else {
mem.decref()
mem, err = db.rotateMem(n)
mdb.decref()
mdb, err = db.rotateMem(n)
if err == nil {
nn = mem.mdb.Free()
mdbFree = mdb.Free()
} else {
nn = 0
mdbFree = 0
}
}
return false
@ -157,18 +157,18 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
}
}()
mem, memFree, err := db.flush(b.size())
mdb, mdbFree, err := db.flush(b.size())
if err != nil {
return
}
defer mem.decref()
defer mdb.decref()
// Calculate maximum size of the batch.
m := 1 << 20
if x := b.size(); x <= 128<<10 {
m = x + (128 << 10)
}
m = minInt(m, memFree)
m = minInt(m, mdbFree)
// Merge with other batch.
drain:
@ -197,7 +197,7 @@ drain:
select {
case db.journalC <- b:
// Write into memdb
if berr := b.memReplay(mem.mdb); berr != nil {
if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
case err = <-db.compPerErrC:
@ -211,7 +211,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
if berr := b.revertMemReplay(mem.mdb); berr != nil {
if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr)
}
return
@ -225,7 +225,7 @@ drain:
if err != nil {
return
}
if berr := b.memReplay(mem.mdb); berr != nil {
if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
}
@ -233,7 +233,7 @@ drain:
// Set last seq number.
db.addSeq(uint64(b.Len()))
if b.size() >= memFree {
if b.size() >= mdbFree {
db.rotateMem(0)
}
return
@ -290,9 +290,9 @@ func (db *DB) CompactRange(r util.Range) error {
}
// Check for overlaps in memdb.
mem := db.getEffectiveMem()
defer mem.decref()
if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
mdb := db.getEffectiveMem()
defer mdb.decref()
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC
@ -309,3 +309,31 @@ func (db *DB) CompactRange(r util.Range) error {
// Table compaction.
return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
}
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
func (db *DB) SetReadOnly() error {
if err := db.ok(); err != nil {
return err
}
// Lock writer.
select {
case db.writeLockC <- struct{}{}:
db.compWriteLocking = true
case err := <-db.compPerErrC:
return err
case _, _ = <-db.closeC:
return ErrClosed
}
// Set compaction read-only.
select {
case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC:
return perr
case _, _ = <-db.closeC:
return ErrClosed
}
return nil
}

View file

@ -12,6 +12,7 @@ import (
var (
ErrNotFound = errors.ErrNotFound
ErrReadOnly = errors.New("leveldb: read-only mode")
ErrSnapshotReleased = errors.New("leveldb: snapshot released")
ErrIterReleased = errors.New("leveldb: iterator released")
ErrClosed = errors.New("leveldb: closed")

View file

@ -206,6 +206,7 @@ func (p *DB) randHeight() (h int) {
return
}
// Must hold RW-lock if prev == true, as it use shared prevNode slice.
func (p *DB) findGE(key []byte, prev bool) (int, bool) {
node := 0
h := p.maxHeight - 1
@ -302,7 +303,7 @@ func (p *DB) Put(key []byte, value []byte) error {
node := len(p.nodeData)
p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h)
for i, n := range p.prevNode[:h] {
m := n + 4 + i
m := n + nNext + i
p.nodeData = append(p.nodeData, p.nodeData[m])
p.nodeData[m] = node
}
@ -434,20 +435,22 @@ func (p *DB) Len() int {
// Reset resets the DB to initial empty state. Allows reuse the buffer.
func (p *DB) Reset() {
p.mu.Lock()
p.rnd = rand.New(rand.NewSource(0xdeadbeef))
p.maxHeight = 1
p.n = 0
p.kvSize = 0
p.kvData = p.kvData[:0]
p.nodeData = p.nodeData[:4+tMaxHeight]
p.nodeData = p.nodeData[:nNext+tMaxHeight]
p.nodeData[nKV] = 0
p.nodeData[nKey] = 0
p.nodeData[nVal] = 0
p.nodeData[nHeight] = tMaxHeight
for n := 0; n < tMaxHeight; n++ {
p.nodeData[4+n] = 0
p.nodeData[nNext+n] = 0
p.prevNode[n] = 0
}
p.mu.Unlock()
}
// New creates a new initalized in-memory key/value DB. The capacity

View file

@ -250,6 +250,11 @@ type Options struct {
// The default value (DefaultCompression) uses snappy compression.
Compression Compression
// DisableBufferPool allows disable use of util.BufferPool functionality.
//
// The default value is false.
DisableBufferPool bool
// DisableBlockCache allows disable use of cache.Cache functionality on
// 'sorted table' block.
//
@ -321,6 +326,11 @@ type Options struct {
// The default value is 500.
OpenFilesCacheCapacity int
// If true then opens DB in read-only mode.
//
// The default value is false.
ReadOnly bool
// Strict defines the DB strict level.
Strict Strict
@ -472,6 +482,20 @@ func (o *Options) GetCompression() Compression {
return o.Compression
}
func (o *Options) GetDisableBufferPool() bool {
if o == nil {
return false
}
return o.DisableBufferPool
}
func (o *Options) GetDisableBlockCache() bool {
if o == nil {
return false
}
return o.DisableBlockCache
}
func (o *Options) GetDisableCompactionBackoff() bool {
if o == nil {
return false
@ -548,6 +572,13 @@ func (o *Options) GetOpenFilesCacheCapacity() int {
return o.OpenFilesCacheCapacity
}
func (o *Options) GetReadOnly() bool {
if o == nil {
return false
}
return o.ReadOnly
}
func (o *Options) GetStrict(strict Strict) bool {
if o == nil || o.Strict == 0 {
return DefaultStrict&strict != 0

View file

@ -11,10 +11,8 @@ import (
"io"
"os"
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
@ -127,11 +125,16 @@ func (s *session) recover() (err error) {
return
}
defer reader.Close()
strict := s.o.GetStrict(opt.StrictManifest)
jr := journal.NewReader(reader, dropper{s, m}, strict, true)
staging := s.stVersion.newStaging()
rec := &sessionRecord{numLevel: s.o.GetNumLevel()}
var (
// Options.
numLevel = s.o.GetNumLevel()
strict = s.o.GetStrict(opt.StrictManifest)
jr = journal.NewReader(reader, dropper{s, m}, strict, true)
rec = &sessionRecord{}
staging = s.stVersion.newStaging()
)
for {
var r io.Reader
r, err = jr.Next()
@ -143,7 +146,7 @@ func (s *session) recover() (err error) {
return errors.SetFile(err, m)
}
err = rec.decode(r)
err = rec.decode(r, numLevel)
if err == nil {
// save compact pointers
for _, r := range rec.compPtrs {
@ -206,250 +209,3 @@ func (s *session) commit(r *sessionRecord) (err error) {
return
}
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
v := s.version()
var level int
var t0 tFiles
if v.cScore >= 1 {
level = v.cLevel
cptr := s.stCompPtrs[level]
tables := v.tables[level]
for _, t := range tables {
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
break
}
}
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
level = ts.level
t0 = append(t0, ts.table)
} else {
v.release()
return nil
}
}
return newCompaction(s, v, level, t0)
}
// Create compaction from given level and range; need external synchronization.
func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
v := s.version()
t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
if len(t0) == 0 {
v.release()
return nil
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if level > 0 {
limit := uint64(v.s.o.GetCompactionSourceLimit(level))
total := uint64(0)
for i, t := range t0 {
total += t.size
if total >= limit {
s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
t0 = t0[:i+1]
break
}
}
}
return newCompaction(s, v, level, t0)
}
func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
c := &compaction{
s: s,
v: v,
level: level,
tables: [2]tFiles{t0, nil},
maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
tPtrs: make([]int, s.o.GetNumLevel()),
}
c.expand()
c.save()
return c
}
// compaction represent a compaction state.
type compaction struct {
s *session
v *version
level int
tables [2]tFiles
maxGPOverlaps uint64
gp tFiles
gpi int
seenKey bool
gpOverlappedBytes uint64
imin, imax iKey
tPtrs []int
released bool
snapGPI int
snapSeenKey bool
snapGPOverlappedBytes uint64
snapTPtrs []int
}
func (c *compaction) save() {
c.snapGPI = c.gpi
c.snapSeenKey = c.seenKey
c.snapGPOverlappedBytes = c.gpOverlappedBytes
c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
}
func (c *compaction) restore() {
c.gpi = c.snapGPI
c.seenKey = c.snapSeenKey
c.gpOverlappedBytes = c.snapGPOverlappedBytes
c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
}
func (c *compaction) release() {
if !c.released {
c.released = true
c.v.release()
}
}
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
t0, t1 := c.tables[0], c.tables[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
if len(t0) != len(c.tables[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
amin, amax := append(t0, t1...).getRange(c.s.icmp)
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if len(t1) > 0 {
exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
xmin, xmax := exp0.getRange(c.s.icmp)
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(c.s.icmp)
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if c.level+2 < c.s.o.GetNumLevel() {
c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
c.tables[0], c.tables[1] = t0, t1
c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
for level, tables := range c.v.tables[c.level+2:] {
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
// We've advanced far enough.
if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
// Key falls in this file's range, so definitely not base level.
return false
}
break
}
c.tPtrs[level]++
}
}
return true
}
func (c *compaction) shouldStopBefore(ikey iKey) bool {
for ; c.gpi < len(c.gp); c.gpi++ {
gp := c.gp[c.gpi]
if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break
}
if c.seenKey {
c.gpOverlappedBytes += gp.size
}
}
c.seenKey = true
if c.gpOverlappedBytes > c.maxGPOverlaps {
// Too much overlap for current output; start new output.
c.gpOverlappedBytes = 0
return true
}
return false
}
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
// Creates iterator slice.
icap := len(c.tables)
if c.level == 0 {
// Special case for level-0
icap = len(c.tables[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
// Options.
ro := &opt.ReadOptions{
DontFillCache: true,
Strict: opt.StrictOverride,
}
strict := c.s.o.GetStrict(opt.StrictCompaction)
if strict {
ro.Strict |= opt.StrictReader
}
for i, tables := range c.tables {
if len(tables) == 0 {
continue
}
// Level-0 is not sorted and may overlaps each other.
if c.level+i == 0 {
for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
}
} else {
it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
its = append(its, it)
}
}
return iterator.NewMergedIterator(its, c.s.icmp, strict)
}

View file

@ -0,0 +1,287 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
func (s *session) pickMemdbLevel(umin, umax []byte) int {
v := s.version()
defer v.release()
return v.pickMemdbLevel(umin, umax)
}
func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) {
// Create sorted table.
iter := mdb.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
return level, err
}
// Pick level and add to record.
if level < 0 {
level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey())
}
rec.addTableFile(level, t)
s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
return level, nil
}
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
v := s.version()
var level int
var t0 tFiles
if v.cScore >= 1 {
level = v.cLevel
cptr := s.stCompPtrs[level]
tables := v.tables[level]
for _, t := range tables {
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
break
}
}
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
level = ts.level
t0 = append(t0, ts.table)
} else {
v.release()
return nil
}
}
return newCompaction(s, v, level, t0)
}
// Create compaction from given level and range; need external synchronization.
func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
v := s.version()
t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
if len(t0) == 0 {
v.release()
return nil
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if level > 0 {
limit := uint64(v.s.o.GetCompactionSourceLimit(level))
total := uint64(0)
for i, t := range t0 {
total += t.size
if total >= limit {
s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
t0 = t0[:i+1]
break
}
}
}
return newCompaction(s, v, level, t0)
}
func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
c := &compaction{
s: s,
v: v,
level: level,
tables: [2]tFiles{t0, nil},
maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
tPtrs: make([]int, s.o.GetNumLevel()),
}
c.expand()
c.save()
return c
}
// compaction represent a compaction state.
type compaction struct {
s *session
v *version
level int
tables [2]tFiles
maxGPOverlaps uint64
gp tFiles
gpi int
seenKey bool
gpOverlappedBytes uint64
imin, imax iKey
tPtrs []int
released bool
snapGPI int
snapSeenKey bool
snapGPOverlappedBytes uint64
snapTPtrs []int
}
func (c *compaction) save() {
c.snapGPI = c.gpi
c.snapSeenKey = c.seenKey
c.snapGPOverlappedBytes = c.gpOverlappedBytes
c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
}
func (c *compaction) restore() {
c.gpi = c.snapGPI
c.seenKey = c.snapSeenKey
c.gpOverlappedBytes = c.snapGPOverlappedBytes
c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
}
func (c *compaction) release() {
if !c.released {
c.released = true
c.v.release()
}
}
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
t0, t1 := c.tables[0], c.tables[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
if len(t0) != len(c.tables[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
amin, amax := append(t0, t1...).getRange(c.s.icmp)
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if len(t1) > 0 {
exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
xmin, xmax := exp0.getRange(c.s.icmp)
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(c.s.icmp)
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if c.level+2 < c.s.o.GetNumLevel() {
c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
c.tables[0], c.tables[1] = t0, t1
c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
for level, tables := range c.v.tables[c.level+2:] {
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
// We've advanced far enough.
if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
// Key falls in this file's range, so definitely not base level.
return false
}
break
}
c.tPtrs[level]++
}
}
return true
}
func (c *compaction) shouldStopBefore(ikey iKey) bool {
for ; c.gpi < len(c.gp); c.gpi++ {
gp := c.gp[c.gpi]
if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break
}
if c.seenKey {
c.gpOverlappedBytes += gp.size
}
}
c.seenKey = true
if c.gpOverlappedBytes > c.maxGPOverlaps {
// Too much overlap for current output; start new output.
c.gpOverlappedBytes = 0
return true
}
return false
}
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
// Creates iterator slice.
icap := len(c.tables)
if c.level == 0 {
// Special case for level-0.
icap = len(c.tables[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
// Options.
ro := &opt.ReadOptions{
DontFillCache: true,
Strict: opt.StrictOverride,
}
strict := c.s.o.GetStrict(opt.StrictCompaction)
if strict {
ro.Strict |= opt.StrictReader
}
for i, tables := range c.tables {
if len(tables) == 0 {
continue
}
// Level-0 is not sorted and may overlaps each other.
if c.level+i == 0 {
for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
}
} else {
it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
its = append(its, it)
}
}
return iterator.NewMergedIterator(its, c.s.icmp, strict)
}

View file

@ -52,8 +52,6 @@ type dtRecord struct {
}
type sessionRecord struct {
numLevel int
hasRec int
comparer string
journalNum uint64
@ -230,7 +228,7 @@ func (p *sessionRecord) readBytes(field string, r byteReader) []byte {
return x
}
func (p *sessionRecord) readLevel(field string, r io.ByteReader) int {
func (p *sessionRecord) readLevel(field string, r io.ByteReader, numLevel int) int {
if p.err != nil {
return 0
}
@ -238,14 +236,14 @@ func (p *sessionRecord) readLevel(field string, r io.ByteReader) int {
if p.err != nil {
return 0
}
if x >= uint64(p.numLevel) {
if x >= uint64(numLevel) {
p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"})
return 0
}
return int(x)
}
func (p *sessionRecord) decode(r io.Reader) error {
func (p *sessionRecord) decode(r io.Reader, numLevel int) error {
br, ok := r.(byteReader)
if !ok {
br = bufio.NewReader(r)
@ -286,13 +284,13 @@ func (p *sessionRecord) decode(r io.Reader) error {
p.setSeqNum(x)
}
case recCompPtr:
level := p.readLevel("comp-ptr.level", br)
level := p.readLevel("comp-ptr.level", br, numLevel)
ikey := p.readBytes("comp-ptr.ikey", br)
if p.err == nil {
p.addCompPtr(level, iKey(ikey))
}
case recAddTable:
level := p.readLevel("add-table.level", br)
level := p.readLevel("add-table.level", br, numLevel)
num := p.readUvarint("add-table.num", br)
size := p.readUvarint("add-table.size", br)
imin := p.readBytes("add-table.imin", br)
@ -301,7 +299,7 @@ func (p *sessionRecord) decode(r io.Reader) error {
p.addTable(level, num, size, imin, imax)
}
case recDelTable:
level := p.readLevel("del-table.level", br)
level := p.readLevel("del-table.level", br, numLevel)
num := p.readUvarint("del-table.num", br)
if p.err == nil {
p.delTable(level, num)

View file

@ -19,8 +19,8 @@ func decodeEncode(v *sessionRecord) (res bool, err error) {
if err != nil {
return
}
v2 := &sessionRecord{numLevel: opt.DefaultNumLevel}
err = v.decode(b)
v2 := &sessionRecord{}
err = v.decode(b, opt.DefaultNumLevel)
if err != nil {
return
}
@ -34,7 +34,7 @@ func decodeEncode(v *sessionRecord) (res bool, err error) {
func TestSessionRecord_EncodeDecode(t *testing.T) {
big := uint64(1) << 50
v := &sessionRecord{numLevel: opt.DefaultNumLevel}
v := &sessionRecord{}
i := uint64(0)
test := func() {
res, err := decodeEncode(v)

View file

@ -182,7 +182,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
defer v.release()
}
if rec == nil {
rec = &sessionRecord{numLevel: s.o.GetNumLevel()}
rec = &sessionRecord{}
}
s.fillRecord(rec, true)
v.fillRecord(rec)

View file

@ -42,6 +42,8 @@ type tsOp uint
const (
tsOpOpen tsOp = iota
tsOpCreate
tsOpReplace
tsOpRemove
tsOpRead
tsOpReadAt
tsOpWrite
@ -241,6 +243,10 @@ func (tf tsFile) Replace(newfile storage.File) (err error) {
if err != nil {
return
}
if tf.shouldErr(tsOpReplace) {
err = errors.New("leveldb.testStorage: emulated create error")
return
}
err = tf.File.Replace(newfile.(tsFile).File)
if err != nil {
ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
@ -258,6 +264,10 @@ func (tf tsFile) Remove() (err error) {
if err != nil {
return
}
if tf.shouldErr(tsOpRemove) {
err = errors.New("leveldb.testStorage: emulated create error")
return
}
err = tf.File.Remove()
if err != nil {
ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)

View file

@ -441,22 +441,26 @@ func newTableOps(s *session) *tOps {
var (
cacher cache.Cacher
bcache *cache.Cache
bpool *util.BufferPool
)
if s.o.GetOpenFilesCacheCapacity() > 0 {
cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
}
if !s.o.DisableBlockCache {
if !s.o.GetDisableBlockCache() {
var bcacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 {
bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
}
bcache = cache.NewCache(bcacher)
}
if !s.o.GetDisableBufferPool() {
bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
}
return &tOps{
s: s,
cache: cache.NewCache(cacher),
bcache: bcache,
bpool: util.NewBufferPool(s.o.GetBlockSize() + 5),
bpool: bpool,
}
}

View file

@ -300,7 +300,7 @@ func (v *version) offsetOf(ikey iKey) (n uint64, err error) {
return
}
func (v *version) pickLevel(umin, umax []byte) (level int) {
func (v *version) pickMemdbLevel(umin, umax []byte) (level int) {
if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
var overlaps tFiles
maxLevel := v.s.o.GetMaxMemCompationLevel()

View file