From aa3f2b7216886045e66ac20ead54a2ef6e3dbe36 Mon Sep 17 00:00:00 2001 From: Julius Volz Date: Mon, 29 Aug 2016 18:48:20 +0200 Subject: [PATCH] Generic write cleanups and changes. - fold metric name into labels - return initialization errors back to main - add snappy compression - better context handling - pre-allocation of labels - remove generic naming - other cleanups --- cmd/prometheus/config.go | 4 +- cmd/prometheus/main.go | 8 +- .../examples/remote_storage/README.md | 17 ++ .../server.go | 33 ++- .../examples/remote_storage_generic/README.md | 17 -- storage/remote/client.go | 94 ++++++++ storage/remote/generic/generic.go | 77 ------- storage/remote/generic/generic.pb.go | 211 ------------------ storage/remote/remote.go | 18 +- storage/remote/remote.pb.go | 209 +++++++++++++++++ .../{generic/generic.proto => remote.proto} | 18 +- storage/remote/snappy.go | 34 +++ 12 files changed, 406 insertions(+), 334 deletions(-) create mode 100644 documentation/examples/remote_storage/README.md rename documentation/examples/{remote_storage_generic => remote_storage}/server.go (56%) delete mode 100644 documentation/examples/remote_storage_generic/README.md create mode 100644 storage/remote/client.go delete mode 100644 storage/remote/generic/generic.go delete mode 100644 storage/remote/generic/generic.pb.go create mode 100644 storage/remote/remote.pb.go rename storage/remote/{generic/generic.proto => remote.proto} (75%) create mode 100644 storage/remote/snappy.go diff --git a/cmd/prometheus/config.go b/cmd/prometheus/config.go index 19fa01a29..2ab3a1709 100644 --- a/cmd/prometheus/config.go +++ b/cmd/prometheus/config.go @@ -207,8 +207,8 @@ func init() { "The name of the database to use for storing samples in InfluxDB.", ) cfg.fs.StringVar( - &cfg.remote.GenericAddress, "storage.remote.generic-address", "", - "The address of the generic remote server to send samples to via gRPC. None, if empty.", + &cfg.remote.Address, "storage.remote.address", "", + "The address of the remote server to send samples to. None, if empty.", ) cfg.fs.DurationVar( diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 656ed5fc5..8f7b65bdc 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -77,9 +77,15 @@ func Main() int { var ( memStorage = local.NewMemorySeriesStorage(&cfg.storage) - remoteStorage = remote.New(&cfg.remote) sampleAppender = storage.Fanout{memStorage} ) + + remoteStorage, err := remote.New(&cfg.remote) + if err != nil { + log.Errorf("Error initializing remote storage: %s", err) + return 1 + } + if remoteStorage != nil { sampleAppender = append(sampleAppender, remoteStorage) reloadables = append(reloadables, remoteStorage) diff --git a/documentation/examples/remote_storage/README.md b/documentation/examples/remote_storage/README.md new file mode 100644 index 000000000..91dd09632 --- /dev/null +++ b/documentation/examples/remote_storage/README.md @@ -0,0 +1,17 @@ +## Generic Remote Storage Example + +This is a simple example of how to write a server to +receive samples from the remote storage output. + +To use it: + +``` +go build +remote_storage +``` + +...and then run Prometheus as: + +``` +./prometheus -storage.remote.address=localhost:1234 +``` diff --git a/documentation/examples/remote_storage_generic/server.go b/documentation/examples/remote_storage/server.go similarity index 56% rename from documentation/examples/remote_storage_generic/server.go rename to documentation/examples/remote_storage/server.go index 7ae2f31fe..2f5bda9ec 100644 --- a/documentation/examples/remote_storage_generic/server.go +++ b/documentation/examples/remote_storage/server.go @@ -15,39 +15,54 @@ package main import ( "fmt" + "io" + "io/ioutil" "log" "net" "golang.org/x/net/context" "google.golang.org/grpc" - "github.com/prometheus/prometheus/storage/remote/generic" + "github.com/golang/snappy" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage/remote" ) type server struct{} -func (server *server) Write(ctx context.Context, req *generic.GenericWriteRequest) (*generic.GenericWriteResponse, error) { +func (server *server) Write(ctx context.Context, req *remote.WriteRequest) (*remote.WriteResponse, error) { for _, ts := range req.Timeseries { - fmt.Printf("%s", ts.Name) + m := make(model.Metric, len(ts.Labels)) for _, l := range ts.Labels { - fmt.Printf(" %s=%s", l.Name, l.Value) + m[model.LabelName(l.Name)] = model.LabelValue(l.Value) } - fmt.Printf("\n") + fmt.Println(m) for _, s := range ts.Samples { fmt.Printf(" %f %d\n", s.Value, s.TimestampMs) } } - return &generic.GenericWriteResponse{}, nil + return &remote.WriteResponse{}, nil +} + +type snappyDecompressor struct{} + +func (d *snappyDecompressor) Do(r io.Reader) ([]byte, error) { + sr := snappy.NewReader(r) + return ioutil.ReadAll(sr) +} + +func (d *snappyDecompressor) Type() string { + return "snappy" } func main() { lis, err := net.Listen("tcp", ":1234") if err != nil { - log.Fatalf("failed to listen: %v", err) + log.Fatalf("Failed to listen: %v", err) } - s := grpc.NewServer() - generic.RegisterGenericWriteServer(s, &server{}) + s := grpc.NewServer(grpc.RPCDecompressor(&snappyDecompressor{})) + remote.RegisterWriteServer(s, &server{}) s.Serve(lis) } diff --git a/documentation/examples/remote_storage_generic/README.md b/documentation/examples/remote_storage_generic/README.md deleted file mode 100644 index 6c97de380..000000000 --- a/documentation/examples/remote_storage_generic/README.md +++ /dev/null @@ -1,17 +0,0 @@ -## Generic Remote Storage Example - -This is a simple example of how to write a server to -recieve samples from the generic remote storage output. - -To use it: - -``` -go build -remote_storage_generic -``` - -and then run Prometheus as: - -``` -./prometheus -storage.remote.generic-url http://localhost:1234/remote -``` diff --git a/storage/remote/client.go b/storage/remote/client.go new file mode 100644 index 000000000..de7a3100f --- /dev/null +++ b/storage/remote/client.go @@ -0,0 +1,94 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + "github.com/prometheus/common/model" +) + +// Client allows sending batches of Prometheus samples to an HTTP endpoint. +type Client struct { + client WriteClient + timeout time.Duration +} + +// NewClient creates a new Client. +func NewClient(address string, timeout time.Duration) (*Client, error) { + conn, err := grpc.Dial( + address, + grpc.WithInsecure(), + grpc.WithTimeout(timeout), + grpc.WithCompressor(&snappyCompressor{}), + ) + if err != nil { + // grpc.Dial() returns immediately and doesn't error when the server is + // unreachable when not passing in the WithBlock() option. The client then + // will continuously try to (re)establish the connection in the background. + // So this will only return here if some other uncommon error occured. + return nil, err + } + return &Client{ + client: NewWriteClient(conn), + timeout: timeout, + }, nil +} + +// Store sends a batch of samples to the HTTP endpoint. +func (c *Client) Store(samples model.Samples) error { + req := &WriteRequest{ + Timeseries: make([]*TimeSeries, 0, len(samples)), + } + for _, s := range samples { + ts := &TimeSeries{ + Labels: make([]*LabelPair, 0, len(s.Metric)), + } + for k, v := range s.Metric { + ts.Labels = append(ts.Labels, + &LabelPair{ + Name: string(k), + Value: string(v), + }) + } + ts.Samples = []*Sample{ + &Sample{ + Value: float64(s.Value), + TimestampMs: int64(s.Timestamp), + }, + } + req.Timeseries = append(req.Timeseries, ts) + } + + ctxt, cancel := context.WithTimeout(context.TODO(), c.timeout) + defer cancel() + + _, err := c.client.Write(ctxt, req) + if err != nil { + return err + } + return nil +} + +// Name identifies the client as a generic client. +// +// TODO: This client is going to be the only one soon - then this method +// will simply be removed in the restructuring and the whole "generic" naming +// will be gone for good. +func (c Client) Name() string { + return "generic" +} diff --git a/storage/remote/generic/generic.go b/storage/remote/generic/generic.go deleted file mode 100644 index d9458a3f3..000000000 --- a/storage/remote/generic/generic.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2016 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package generic - -import ( - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - "github.com/prometheus/common/model" -) - -// Client allows sending batches of Prometheus samples to a http endpoint. -type Client struct { - conn *grpc.ClientConn - timeout time.Duration -} - -// NewClient creates a new Client. -func NewClient(address string, timeout time.Duration) *Client { - // TODO: Do something with this error. - conn, _ := grpc.Dial(address, grpc.WithInsecure()) - return &Client{ - conn: conn, - timeout: timeout, - } -} - -// Store sends a batch of samples to the http endpoint. -func (c *Client) Store(samples model.Samples) error { - req := &GenericWriteRequest{} - for _, s := range samples { - ts := &TimeSeries{ - Name: string(s.Metric[model.MetricNameLabel]), - } - for k, v := range s.Metric { - if k != model.MetricNameLabel { - ts.Labels = append(ts.Labels, - &LabelPair{ - Name: string(k), - Value: string(v), - }) - } - } - ts.Samples = []*Sample{ - &Sample{ - Value: float64(s.Value), - TimestampMs: int64(s.Timestamp), - }, - } - req.Timeseries = append(req.Timeseries, ts) - } - client := NewGenericWriteClient(c.conn) - ctxt, _ := context.WithTimeout(context.Background(), c.timeout) - _, err := client.Write(ctxt, req) - if err != nil { - return err - } - return nil -} - -// Name identifies the client as a genric client. -func (c Client) Name() string { - return "generic" -} diff --git a/storage/remote/generic/generic.pb.go b/storage/remote/generic/generic.pb.go deleted file mode 100644 index 628fb52e7..000000000 --- a/storage/remote/generic/generic.pb.go +++ /dev/null @@ -1,211 +0,0 @@ -// Code generated by protoc-gen-go. -// source: generic.proto -// DO NOT EDIT! - -/* -Package generic is a generated protocol buffer package. - -It is generated from these files: - generic.proto - -It has these top-level messages: - Sample - LabelPair - TimeSeries - GenericWriteRequest - GenericWriteResponse -*/ -package generic - -import proto "github.com/golang/protobuf/proto" -import fmt "fmt" -import math "math" - -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package - -type Sample struct { - Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` - TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` -} - -func (m *Sample) Reset() { *m = Sample{} } -func (m *Sample) String() string { return proto.CompactTextString(m) } -func (*Sample) ProtoMessage() {} -func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type LabelPair struct { - Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` - Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` -} - -func (m *LabelPair) Reset() { *m = LabelPair{} } -func (m *LabelPair) String() string { return proto.CompactTextString(m) } -func (*LabelPair) ProtoMessage() {} -func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -type TimeSeries struct { - Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` - Labels []*LabelPair `protobuf:"bytes,2,rep,name=labels" json:"labels,omitempty"` - // Sorted by time, oldest sample first. - Samples []*Sample `protobuf:"bytes,3,rep,name=samples" json:"samples,omitempty"` -} - -func (m *TimeSeries) Reset() { *m = TimeSeries{} } -func (m *TimeSeries) String() string { return proto.CompactTextString(m) } -func (*TimeSeries) ProtoMessage() {} -func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -func (m *TimeSeries) GetLabels() []*LabelPair { - if m != nil { - return m.Labels - } - return nil -} - -func (m *TimeSeries) GetSamples() []*Sample { - if m != nil { - return m.Samples - } - return nil -} - -type GenericWriteRequest struct { - Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` -} - -func (m *GenericWriteRequest) Reset() { *m = GenericWriteRequest{} } -func (m *GenericWriteRequest) String() string { return proto.CompactTextString(m) } -func (*GenericWriteRequest) ProtoMessage() {} -func (*GenericWriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } - -func (m *GenericWriteRequest) GetTimeseries() []*TimeSeries { - if m != nil { - return m.Timeseries - } - return nil -} - -type GenericWriteResponse struct { -} - -func (m *GenericWriteResponse) Reset() { *m = GenericWriteResponse{} } -func (m *GenericWriteResponse) String() string { return proto.CompactTextString(m) } -func (*GenericWriteResponse) ProtoMessage() {} -func (*GenericWriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } - -func init() { - proto.RegisterType((*Sample)(nil), "generic.Sample") - proto.RegisterType((*LabelPair)(nil), "generic.LabelPair") - proto.RegisterType((*TimeSeries)(nil), "generic.TimeSeries") - proto.RegisterType((*GenericWriteRequest)(nil), "generic.GenericWriteRequest") - proto.RegisterType((*GenericWriteResponse)(nil), "generic.GenericWriteResponse") -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion3 - -// Client API for GenericWrite service - -type GenericWriteClient interface { - Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) -} - -type genericWriteClient struct { - cc *grpc.ClientConn -} - -func NewGenericWriteClient(cc *grpc.ClientConn) GenericWriteClient { - return &genericWriteClient{cc} -} - -func (c *genericWriteClient) Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) { - out := new(GenericWriteResponse) - err := grpc.Invoke(ctx, "/generic.GenericWrite/Write", in, out, c.cc, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// Server API for GenericWrite service - -type GenericWriteServer interface { - Write(context.Context, *GenericWriteRequest) (*GenericWriteResponse, error) -} - -func RegisterGenericWriteServer(s *grpc.Server, srv GenericWriteServer) { - s.RegisterService(&_GenericWrite_serviceDesc, srv) -} - -func _GenericWrite_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(GenericWriteRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(GenericWriteServer).Write(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/generic.GenericWrite/Write", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(GenericWriteServer).Write(ctx, req.(*GenericWriteRequest)) - } - return interceptor(ctx, in, info, handler) -} - -var _GenericWrite_serviceDesc = grpc.ServiceDesc{ - ServiceName: "generic.GenericWrite", - HandlerType: (*GenericWriteServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Write", - Handler: _GenericWrite_Write_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: fileDescriptor0, -} - -func init() { proto.RegisterFile("generic.proto", fileDescriptor0) } - -var fileDescriptor0 = []byte{ - // 264 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x91, 0x4f, 0x4b, 0xc3, 0x40, - 0x10, 0xc5, 0x4d, 0x63, 0x53, 0xfa, 0x5a, 0x11, 0xa6, 0x45, 0x82, 0x28, 0xe8, 0x9e, 0xaa, 0x87, - 0x1e, 0x5a, 0xfc, 0x00, 0x5e, 0x14, 0x44, 0x41, 0xb6, 0xa2, 0x47, 0xd9, 0xca, 0x20, 0x81, 0xfc, - 0x33, 0xbb, 0xd5, 0xaf, 0xef, 0x66, 0xb7, 0xdd, 0x56, 0xa8, 0xb7, 0x9d, 0x79, 0x2f, 0xbf, 0x79, - 0x8f, 0xe0, 0xe8, 0x93, 0x4b, 0x6e, 0xb2, 0x8f, 0x69, 0xdd, 0x54, 0xa6, 0xa2, 0xde, 0x7a, 0x14, - 0xb7, 0x48, 0x16, 0xaa, 0xa8, 0x73, 0xa6, 0x31, 0xba, 0xdf, 0x2a, 0x5f, 0x71, 0x1a, 0x5d, 0x44, - 0x93, 0x48, 0xfa, 0x81, 0x2e, 0x31, 0x34, 0x59, 0xc1, 0xda, 0x58, 0xd3, 0x7b, 0xa1, 0xd3, 0x8e, - 0x15, 0x63, 0x39, 0x08, 0xbb, 0x27, 0x2d, 0x6e, 0xd0, 0x7f, 0x54, 0x4b, 0xce, 0x9f, 0x55, 0xd6, - 0x10, 0xe1, 0xb0, 0x54, 0x85, 0x87, 0xf4, 0xa5, 0x7b, 0x6f, 0xc9, 0x1d, 0xb7, 0xf4, 0x83, 0xf8, - 0x01, 0x5e, 0x2c, 0x65, 0x61, 0x63, 0xb0, 0xde, 0xfb, 0xdd, 0x35, 0x92, 0xbc, 0x05, 0xb7, 0x57, - 0xe3, 0xc9, 0x60, 0x46, 0xd3, 0x4d, 0x89, 0x70, 0x4f, 0xae, 0x1d, 0x74, 0x85, 0x9e, 0x76, 0x3d, - 0x74, 0x1a, 0x3b, 0xf3, 0x71, 0x30, 0xfb, 0x7e, 0x72, 0xa3, 0x8b, 0x07, 0x8c, 0xee, 0xbd, 0xf4, - 0xd6, 0x64, 0x86, 0x25, 0x7f, 0xad, 0x6c, 0x17, 0x9a, 0x03, 0xae, 0x95, 0xcb, 0x63, 0x73, 0xb4, - 0x90, 0x51, 0x80, 0x6c, 0xa3, 0xca, 0x1d, 0x9b, 0x38, 0xc1, 0xf8, 0x2f, 0x4b, 0xd7, 0x55, 0xa9, - 0x79, 0xf6, 0x8a, 0xe1, 0xee, 0x9e, 0xee, 0xd0, 0xf5, 0x8f, 0xb3, 0x40, 0xdc, 0x93, 0xe1, 0xf4, - 0xfc, 0x1f, 0xd5, 0x53, 0xc5, 0xc1, 0x32, 0x71, 0xbf, 0x6f, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff, - 0x40, 0x1e, 0x0f, 0xf8, 0xcf, 0x01, 0x00, 0x00, -} diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 5a28e7843..2c1923b79 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -24,7 +24,6 @@ import ( influx "github.com/influxdb/influxdb/client" "github.com/prometheus/prometheus/config" - "github.com/prometheus/prometheus/storage/remote/generic" "github.com/prometheus/prometheus/storage/remote/graphite" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -47,7 +46,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { } // New returns a new remote Storage. -func New(o *Options) *Storage { +func New(o *Options) (*Storage, error) { s := &Storage{} if o.GraphiteAddress != "" { c := graphite.NewClient( @@ -70,14 +69,17 @@ func New(o *Options) *Storage { prometheus.MustRegister(c) s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) } - if o.GenericAddress != "" { - c := generic.NewClient(o.GenericAddress, o.StorageTimeout) + if o.Address != "" { + c, err := NewClient(o.Address, o.StorageTimeout) + if err != nil { + return nil, err + } s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024)) } if len(s.queues) == 0 { - return nil + return nil, nil } - return s + return s, nil } // Options contains configuration parameters for a remote storage. @@ -92,7 +94,9 @@ type Options struct { GraphiteAddress string GraphiteTransport string GraphitePrefix string - GenericAddress string + // TODO: This just being called "Address" will make more sense once the + // other remote storage mechanisms are removed. + Address string } // Run starts the background processing of the storage queues. diff --git a/storage/remote/remote.pb.go b/storage/remote/remote.pb.go new file mode 100644 index 000000000..2d67efabc --- /dev/null +++ b/storage/remote/remote.pb.go @@ -0,0 +1,209 @@ +// Code generated by protoc-gen-go. +// source: remote.proto +// DO NOT EDIT! + +/* +Package remote is a generated protocol buffer package. + +It is generated from these files: + remote.proto + +It has these top-level messages: + Sample + LabelPair + TimeSeries + WriteRequest + WriteResponse +*/ +package remote + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Sample struct { + Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"` + TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"` +} + +func (m *Sample) Reset() { *m = Sample{} } +func (m *Sample) String() string { return proto.CompactTextString(m) } +func (*Sample) ProtoMessage() {} +func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type LabelPair struct { + Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"` + Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"` +} + +func (m *LabelPair) Reset() { *m = LabelPair{} } +func (m *LabelPair) String() string { return proto.CompactTextString(m) } +func (*LabelPair) ProtoMessage() {} +func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type TimeSeries struct { + Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"` + // Sorted by time, oldest sample first. + Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"` +} + +func (m *TimeSeries) Reset() { *m = TimeSeries{} } +func (m *TimeSeries) String() string { return proto.CompactTextString(m) } +func (*TimeSeries) ProtoMessage() {} +func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *TimeSeries) GetLabels() []*LabelPair { + if m != nil { + return m.Labels + } + return nil +} + +func (m *TimeSeries) GetSamples() []*Sample { + if m != nil { + return m.Samples + } + return nil +} + +type WriteRequest struct { + Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"` +} + +func (m *WriteRequest) Reset() { *m = WriteRequest{} } +func (m *WriteRequest) String() string { return proto.CompactTextString(m) } +func (*WriteRequest) ProtoMessage() {} +func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *WriteRequest) GetTimeseries() []*TimeSeries { + if m != nil { + return m.Timeseries + } + return nil +} + +type WriteResponse struct { +} + +func (m *WriteResponse) Reset() { *m = WriteResponse{} } +func (m *WriteResponse) String() string { return proto.CompactTextString(m) } +func (*WriteResponse) ProtoMessage() {} +func (*WriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func init() { + proto.RegisterType((*Sample)(nil), "remote.Sample") + proto.RegisterType((*LabelPair)(nil), "remote.LabelPair") + proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries") + proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest") + proto.RegisterType((*WriteResponse)(nil), "remote.WriteResponse") +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion3 + +// Client API for Write service + +type WriteClient interface { + Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) +} + +type writeClient struct { + cc *grpc.ClientConn +} + +func NewWriteClient(cc *grpc.ClientConn) WriteClient { + return &writeClient{cc} +} + +func (c *writeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) { + out := new(WriteResponse) + err := grpc.Invoke(ctx, "/remote.Write/Write", in, out, c.cc, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// Server API for Write service + +type WriteServer interface { + Write(context.Context, *WriteRequest) (*WriteResponse, error) +} + +func RegisterWriteServer(s *grpc.Server, srv WriteServer) { + s.RegisterService(&_Write_serviceDesc, srv) +} + +func _Write_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WriteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WriteServer).Write(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/remote.Write/Write", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WriteServer).Write(ctx, req.(*WriteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Write_serviceDesc = grpc.ServiceDesc{ + ServiceName: "remote.Write", + HandlerType: (*WriteServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Write", + Handler: _Write_Write_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: fileDescriptor0, +} + +func init() { proto.RegisterFile("remote.proto", fileDescriptor0) } + +var fileDescriptor0 = []byte{ + // 250 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0x4f, 0x4b, 0x03, 0x31, + 0x10, 0xc5, 0xdd, 0xd6, 0xae, 0x74, 0xba, 0x2a, 0x0e, 0x15, 0x8a, 0x27, 0xcd, 0x69, 0xbd, 0xf4, + 0xb0, 0xa2, 0x57, 0xd1, 0xb3, 0x82, 0xa4, 0x82, 0x47, 0x49, 0x61, 0x0e, 0x0b, 0x9b, 0x66, 0xcd, + 0xa4, 0x7e, 0x7e, 0xb3, 0xf9, 0xd3, 0x2e, 0xde, 0x66, 0xde, 0xbc, 0xf9, 0xcd, 0x0b, 0x81, 0xca, + 0x92, 0x36, 0x8e, 0xd6, 0xbd, 0x35, 0xce, 0x60, 0x19, 0x3b, 0xf1, 0x02, 0xe5, 0x46, 0xe9, 0xbe, + 0x23, 0x5c, 0xc2, 0xec, 0x57, 0x75, 0x7b, 0x5a, 0x15, 0xb7, 0x45, 0x5d, 0xc8, 0xd8, 0xe0, 0x1d, + 0x54, 0xae, 0xd5, 0xc4, 0xce, 0x9b, 0xbe, 0x35, 0xaf, 0x26, 0x7e, 0x38, 0x95, 0x8b, 0x83, 0xf6, + 0xce, 0xe2, 0x11, 0xe6, 0x6f, 0x6a, 0x4b, 0xdd, 0x87, 0x6a, 0x2d, 0x22, 0x9c, 0xee, 0x94, 0x8e, + 0x90, 0xb9, 0x0c, 0xf5, 0x91, 0x3c, 0x09, 0x62, 0x6c, 0x84, 0x02, 0xf8, 0xf4, 0x94, 0x0d, 0xd9, + 0x96, 0x18, 0xef, 0xa1, 0xec, 0x06, 0x08, 0xfb, 0xcd, 0x69, 0xbd, 0x68, 0xae, 0xd6, 0x29, 0xee, + 0x01, 0x2d, 0x93, 0x01, 0x6b, 0x38, 0xe3, 0x10, 0x79, 0x48, 0x33, 0x78, 0x2f, 0xb2, 0x37, 0xbe, + 0x44, 0xe6, 0xb1, 0x78, 0x85, 0xea, 0xcb, 0xb6, 0x8e, 0x24, 0xfd, 0xec, 0x7d, 0x5c, 0x6c, 0x00, + 0x42, 0xf0, 0x70, 0x32, 0x1d, 0xc2, 0xbc, 0x7c, 0x0c, 0x23, 0x47, 0x2e, 0x71, 0x09, 0xe7, 0x89, + 0xc1, 0xbd, 0xd9, 0x31, 0x35, 0xcf, 0x30, 0x0b, 0x02, 0x3e, 0xe5, 0x62, 0x99, 0x11, 0xe3, 0x63, + 0x37, 0xd7, 0xff, 0xd4, 0xb8, 0x2e, 0x4e, 0xb6, 0x65, 0xf8, 0x81, 0x87, 0xbf, 0x00, 0x00, 0x00, + 0xff, 0xff, 0xed, 0x75, 0xbc, 0xc4, 0x91, 0x01, 0x00, 0x00, +} diff --git a/storage/remote/generic/generic.proto b/storage/remote/remote.proto similarity index 75% rename from storage/remote/generic/generic.proto rename to storage/remote/remote.proto index 9c92d55dc..0c66d9aaa 100644 --- a/storage/remote/generic/generic.proto +++ b/storage/remote/remote.proto @@ -13,11 +13,10 @@ syntax = "proto3"; -package generic; - +package remote; message Sample { - double value = 1; + double value = 1; int64 timestamp_ms = 2; } @@ -27,19 +26,18 @@ message LabelPair { } message TimeSeries { - string name = 1; - repeated LabelPair labels = 2; + repeated LabelPair labels = 1; // Sorted by time, oldest sample first. - repeated Sample samples = 3; + repeated Sample samples = 2; } -message GenericWriteRequest { +message WriteRequest { repeated TimeSeries timeseries = 1; } -message GenericWriteResponse { +message WriteResponse { } -service GenericWrite { - rpc Write(GenericWriteRequest) returns (GenericWriteResponse) {} +service Write { + rpc Write(WriteRequest) returns (WriteResponse) {} } diff --git a/storage/remote/snappy.go b/storage/remote/snappy.go new file mode 100644 index 000000000..5823a1a52 --- /dev/null +++ b/storage/remote/snappy.go @@ -0,0 +1,34 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package remote + +import ( + "io" + + "github.com/golang/snappy" +) + +type snappyCompressor struct{} + +func (c *snappyCompressor) Do(w io.Writer, p []byte) error { + sw := snappy.NewWriter(w) + if _, err := sw.Write(p); err != nil { + return err + } + return sw.Close() +} + +func (c *snappyCompressor) Type() string { + return "snappy" +}