Generic write cleanups and changes.

- fold metric name into labels
- return initialization errors back to main
- add snappy compression
- better context handling
- pre-allocation of labels
- remove generic naming
- other cleanups
This commit is contained in:
Julius Volz 2016-08-29 18:48:20 +02:00
parent d8ce6e5849
commit aa3f2b7216
12 changed files with 406 additions and 334 deletions

View file

@ -207,8 +207,8 @@ func init() {
"The name of the database to use for storing samples in InfluxDB.",
)
cfg.fs.StringVar(
&cfg.remote.GenericAddress, "storage.remote.generic-address", "",
"The address of the generic remote server to send samples to via gRPC. None, if empty.",
&cfg.remote.Address, "storage.remote.address", "",
"The address of the remote server to send samples to. None, if empty.",
)
cfg.fs.DurationVar(

View file

@ -77,9 +77,15 @@ func Main() int {
var (
memStorage = local.NewMemorySeriesStorage(&cfg.storage)
remoteStorage = remote.New(&cfg.remote)
sampleAppender = storage.Fanout{memStorage}
)
remoteStorage, err := remote.New(&cfg.remote)
if err != nil {
log.Errorf("Error initializing remote storage: %s", err)
return 1
}
if remoteStorage != nil {
sampleAppender = append(sampleAppender, remoteStorage)
reloadables = append(reloadables, remoteStorage)

View file

@ -0,0 +1,17 @@
## Generic Remote Storage Example
This is a simple example of how to write a server to
receive samples from the remote storage output.
To use it:
```
go build
remote_storage
```
...and then run Prometheus as:
```
./prometheus -storage.remote.address=localhost:1234
```

View file

@ -15,39 +15,54 @@ package main
import (
"fmt"
"io"
"io/ioutil"
"log"
"net"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/prometheus/prometheus/storage/remote/generic"
"github.com/golang/snappy"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/remote"
)
type server struct{}
func (server *server) Write(ctx context.Context, req *generic.GenericWriteRequest) (*generic.GenericWriteResponse, error) {
func (server *server) Write(ctx context.Context, req *remote.WriteRequest) (*remote.WriteResponse, error) {
for _, ts := range req.Timeseries {
fmt.Printf("%s", ts.Name)
m := make(model.Metric, len(ts.Labels))
for _, l := range ts.Labels {
fmt.Printf(" %s=%s", l.Name, l.Value)
m[model.LabelName(l.Name)] = model.LabelValue(l.Value)
}
fmt.Printf("\n")
fmt.Println(m)
for _, s := range ts.Samples {
fmt.Printf(" %f %d\n", s.Value, s.TimestampMs)
}
}
return &generic.GenericWriteResponse{}, nil
return &remote.WriteResponse{}, nil
}
type snappyDecompressor struct{}
func (d *snappyDecompressor) Do(r io.Reader) ([]byte, error) {
sr := snappy.NewReader(r)
return ioutil.ReadAll(sr)
}
func (d *snappyDecompressor) Type() string {
return "snappy"
}
func main() {
lis, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatalf("failed to listen: %v", err)
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
generic.RegisterGenericWriteServer(s, &server{})
s := grpc.NewServer(grpc.RPCDecompressor(&snappyDecompressor{}))
remote.RegisterWriteServer(s, &server{})
s.Serve(lis)
}

View file

@ -1,17 +0,0 @@
## Generic Remote Storage Example
This is a simple example of how to write a server to
recieve samples from the generic remote storage output.
To use it:
```
go build
remote_storage_generic
```
and then run Prometheus as:
```
./prometheus -storage.remote.generic-url http://localhost:1234/remote
```

94
storage/remote/client.go Normal file
View file

@ -0,0 +1,94 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/prometheus/common/model"
)
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
type Client struct {
client WriteClient
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(address string, timeout time.Duration) (*Client, error) {
conn, err := grpc.Dial(
address,
grpc.WithInsecure(),
grpc.WithTimeout(timeout),
grpc.WithCompressor(&snappyCompressor{}),
)
if err != nil {
// grpc.Dial() returns immediately and doesn't error when the server is
// unreachable when not passing in the WithBlock() option. The client then
// will continuously try to (re)establish the connection in the background.
// So this will only return here if some other uncommon error occured.
return nil, err
}
return &Client{
client: NewWriteClient(conn),
timeout: timeout,
}, nil
}
// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &WriteRequest{
Timeseries: make([]*TimeSeries, 0, len(samples)),
}
for _, s := range samples {
ts := &TimeSeries{
Labels: make([]*LabelPair, 0, len(s.Metric)),
}
for k, v := range s.Metric {
ts.Labels = append(ts.Labels,
&LabelPair{
Name: string(k),
Value: string(v),
})
}
ts.Samples = []*Sample{
&Sample{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
ctxt, cancel := context.WithTimeout(context.TODO(), c.timeout)
defer cancel()
_, err := c.client.Write(ctxt, req)
if err != nil {
return err
}
return nil
}
// Name identifies the client as a generic client.
//
// TODO: This client is going to be the only one soon - then this method
// will simply be removed in the restructuring and the whole "generic" naming
// will be gone for good.
func (c Client) Name() string {
return "generic"
}

View file

@ -1,77 +0,0 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package generic
import (
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/prometheus/common/model"
)
// Client allows sending batches of Prometheus samples to a http endpoint.
type Client struct {
conn *grpc.ClientConn
timeout time.Duration
}
// NewClient creates a new Client.
func NewClient(address string, timeout time.Duration) *Client {
// TODO: Do something with this error.
conn, _ := grpc.Dial(address, grpc.WithInsecure())
return &Client{
conn: conn,
timeout: timeout,
}
}
// Store sends a batch of samples to the http endpoint.
func (c *Client) Store(samples model.Samples) error {
req := &GenericWriteRequest{}
for _, s := range samples {
ts := &TimeSeries{
Name: string(s.Metric[model.MetricNameLabel]),
}
for k, v := range s.Metric {
if k != model.MetricNameLabel {
ts.Labels = append(ts.Labels,
&LabelPair{
Name: string(k),
Value: string(v),
})
}
}
ts.Samples = []*Sample{
&Sample{
Value: float64(s.Value),
TimestampMs: int64(s.Timestamp),
},
}
req.Timeseries = append(req.Timeseries, ts)
}
client := NewGenericWriteClient(c.conn)
ctxt, _ := context.WithTimeout(context.Background(), c.timeout)
_, err := client.Write(ctxt, req)
if err != nil {
return err
}
return nil
}
// Name identifies the client as a genric client.
func (c Client) Name() string {
return "generic"
}

View file

@ -1,211 +0,0 @@
// Code generated by protoc-gen-go.
// source: generic.proto
// DO NOT EDIT!
/*
Package generic is a generated protocol buffer package.
It is generated from these files:
generic.proto
It has these top-level messages:
Sample
LabelPair
TimeSeries
GenericWriteRequest
GenericWriteResponse
*/
package generic
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
}
func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type LabelPair struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
}
func (m *LabelPair) Reset() { *m = LabelPair{} }
func (m *LabelPair) String() string { return proto.CompactTextString(m) }
func (*LabelPair) ProtoMessage() {}
func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type TimeSeries struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Labels []*LabelPair `protobuf:"bytes,2,rep,name=labels" json:"labels,omitempty"`
// Sorted by time, oldest sample first.
Samples []*Sample `protobuf:"bytes,3,rep,name=samples" json:"samples,omitempty"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *TimeSeries) GetLabels() []*LabelPair {
if m != nil {
return m.Labels
}
return nil
}
func (m *TimeSeries) GetSamples() []*Sample {
if m != nil {
return m.Samples
}
return nil
}
type GenericWriteRequest struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *GenericWriteRequest) Reset() { *m = GenericWriteRequest{} }
func (m *GenericWriteRequest) String() string { return proto.CompactTextString(m) }
func (*GenericWriteRequest) ProtoMessage() {}
func (*GenericWriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *GenericWriteRequest) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type GenericWriteResponse struct {
}
func (m *GenericWriteResponse) Reset() { *m = GenericWriteResponse{} }
func (m *GenericWriteResponse) String() string { return proto.CompactTextString(m) }
func (*GenericWriteResponse) ProtoMessage() {}
func (*GenericWriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func init() {
proto.RegisterType((*Sample)(nil), "generic.Sample")
proto.RegisterType((*LabelPair)(nil), "generic.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "generic.TimeSeries")
proto.RegisterType((*GenericWriteRequest)(nil), "generic.GenericWriteRequest")
proto.RegisterType((*GenericWriteResponse)(nil), "generic.GenericWriteResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
// Client API for GenericWrite service
type GenericWriteClient interface {
Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error)
}
type genericWriteClient struct {
cc *grpc.ClientConn
}
func NewGenericWriteClient(cc *grpc.ClientConn) GenericWriteClient {
return &genericWriteClient{cc}
}
func (c *genericWriteClient) Write(ctx context.Context, in *GenericWriteRequest, opts ...grpc.CallOption) (*GenericWriteResponse, error) {
out := new(GenericWriteResponse)
err := grpc.Invoke(ctx, "/generic.GenericWrite/Write", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for GenericWrite service
type GenericWriteServer interface {
Write(context.Context, *GenericWriteRequest) (*GenericWriteResponse, error)
}
func RegisterGenericWriteServer(s *grpc.Server, srv GenericWriteServer) {
s.RegisterService(&_GenericWrite_serviceDesc, srv)
}
func _GenericWrite_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GenericWriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GenericWriteServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/generic.GenericWrite/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GenericWriteServer).Write(ctx, req.(*GenericWriteRequest))
}
return interceptor(ctx, in, info, handler)
}
var _GenericWrite_serviceDesc = grpc.ServiceDesc{
ServiceName: "generic.GenericWrite",
HandlerType: (*GenericWriteServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Write",
Handler: _GenericWrite_Write_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
}
func init() { proto.RegisterFile("generic.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 264 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x74, 0x91, 0x4f, 0x4b, 0xc3, 0x40,
0x10, 0xc5, 0x4d, 0x63, 0x53, 0xfa, 0x5a, 0x11, 0xa6, 0x45, 0x82, 0x28, 0xe8, 0x9e, 0xaa, 0x87,
0x1e, 0x5a, 0xfc, 0x00, 0x5e, 0x14, 0x44, 0x41, 0xb6, 0xa2, 0x47, 0xd9, 0xca, 0x20, 0x81, 0xfc,
0x33, 0xbb, 0xd5, 0xaf, 0xef, 0x66, 0xb7, 0xdd, 0x56, 0xa8, 0xb7, 0x9d, 0x79, 0x2f, 0xbf, 0x79,
0x8f, 0xe0, 0xe8, 0x93, 0x4b, 0x6e, 0xb2, 0x8f, 0x69, 0xdd, 0x54, 0xa6, 0xa2, 0xde, 0x7a, 0x14,
0xb7, 0x48, 0x16, 0xaa, 0xa8, 0x73, 0xa6, 0x31, 0xba, 0xdf, 0x2a, 0x5f, 0x71, 0x1a, 0x5d, 0x44,
0x93, 0x48, 0xfa, 0x81, 0x2e, 0x31, 0x34, 0x59, 0xc1, 0xda, 0x58, 0xd3, 0x7b, 0xa1, 0xd3, 0x8e,
0x15, 0x63, 0x39, 0x08, 0xbb, 0x27, 0x2d, 0x6e, 0xd0, 0x7f, 0x54, 0x4b, 0xce, 0x9f, 0x55, 0xd6,
0x10, 0xe1, 0xb0, 0x54, 0x85, 0x87, 0xf4, 0xa5, 0x7b, 0x6f, 0xc9, 0x1d, 0xb7, 0xf4, 0x83, 0xf8,
0x01, 0x5e, 0x2c, 0x65, 0x61, 0x63, 0xb0, 0xde, 0xfb, 0xdd, 0x35, 0x92, 0xbc, 0x05, 0xb7, 0x57,
0xe3, 0xc9, 0x60, 0x46, 0xd3, 0x4d, 0x89, 0x70, 0x4f, 0xae, 0x1d, 0x74, 0x85, 0x9e, 0x76, 0x3d,
0x74, 0x1a, 0x3b, 0xf3, 0x71, 0x30, 0xfb, 0x7e, 0x72, 0xa3, 0x8b, 0x07, 0x8c, 0xee, 0xbd, 0xf4,
0xd6, 0x64, 0x86, 0x25, 0x7f, 0xad, 0x6c, 0x17, 0x9a, 0x03, 0xae, 0x95, 0xcb, 0x63, 0x73, 0xb4,
0x90, 0x51, 0x80, 0x6c, 0xa3, 0xca, 0x1d, 0x9b, 0x38, 0xc1, 0xf8, 0x2f, 0x4b, 0xd7, 0x55, 0xa9,
0x79, 0xf6, 0x8a, 0xe1, 0xee, 0x9e, 0xee, 0xd0, 0xf5, 0x8f, 0xb3, 0x40, 0xdc, 0x93, 0xe1, 0xf4,
0xfc, 0x1f, 0xd5, 0x53, 0xc5, 0xc1, 0x32, 0x71, 0xbf, 0x6f, 0xfe, 0x1b, 0x00, 0x00, 0xff, 0xff,
0x40, 0x1e, 0x0f, 0xf8, 0xcf, 0x01, 0x00, 0x00,
}

View file

@ -24,7 +24,6 @@ import (
influx "github.com/influxdb/influxdb/client"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/storage/remote/generic"
"github.com/prometheus/prometheus/storage/remote/graphite"
"github.com/prometheus/prometheus/storage/remote/influxdb"
"github.com/prometheus/prometheus/storage/remote/opentsdb"
@ -47,7 +46,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
}
// New returns a new remote Storage.
func New(o *Options) *Storage {
func New(o *Options) (*Storage, error) {
s := &Storage{}
if o.GraphiteAddress != "" {
c := graphite.NewClient(
@ -70,14 +69,17 @@ func New(o *Options) *Storage {
prometheus.MustRegister(c)
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if o.GenericAddress != "" {
c := generic.NewClient(o.GenericAddress, o.StorageTimeout)
if o.Address != "" {
c, err := NewClient(o.Address, o.StorageTimeout)
if err != nil {
return nil, err
}
s.queues = append(s.queues, NewStorageQueueManager(c, 100*1024))
}
if len(s.queues) == 0 {
return nil
return nil, nil
}
return s
return s, nil
}
// Options contains configuration parameters for a remote storage.
@ -92,7 +94,9 @@ type Options struct {
GraphiteAddress string
GraphiteTransport string
GraphitePrefix string
GenericAddress string
// TODO: This just being called "Address" will make more sense once the
// other remote storage mechanisms are removed.
Address string
}
// Run starts the background processing of the storage queues.

209
storage/remote/remote.pb.go Normal file
View file

@ -0,0 +1,209 @@
// Code generated by protoc-gen-go.
// source: remote.proto
// DO NOT EDIT!
/*
Package remote is a generated protocol buffer package.
It is generated from these files:
remote.proto
It has these top-level messages:
Sample
LabelPair
TimeSeries
WriteRequest
WriteResponse
*/
package remote
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Sample struct {
Value float64 `protobuf:"fixed64,1,opt,name=value" json:"value,omitempty"`
TimestampMs int64 `protobuf:"varint,2,opt,name=timestamp_ms,json=timestampMs" json:"timestamp_ms,omitempty"`
}
func (m *Sample) Reset() { *m = Sample{} }
func (m *Sample) String() string { return proto.CompactTextString(m) }
func (*Sample) ProtoMessage() {}
func (*Sample) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type LabelPair struct {
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value" json:"value,omitempty"`
}
func (m *LabelPair) Reset() { *m = LabelPair{} }
func (m *LabelPair) String() string { return proto.CompactTextString(m) }
func (*LabelPair) ProtoMessage() {}
func (*LabelPair) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
type TimeSeries struct {
Labels []*LabelPair `protobuf:"bytes,1,rep,name=labels" json:"labels,omitempty"`
// Sorted by time, oldest sample first.
Samples []*Sample `protobuf:"bytes,2,rep,name=samples" json:"samples,omitempty"`
}
func (m *TimeSeries) Reset() { *m = TimeSeries{} }
func (m *TimeSeries) String() string { return proto.CompactTextString(m) }
func (*TimeSeries) ProtoMessage() {}
func (*TimeSeries) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
func (m *TimeSeries) GetLabels() []*LabelPair {
if m != nil {
return m.Labels
}
return nil
}
func (m *TimeSeries) GetSamples() []*Sample {
if m != nil {
return m.Samples
}
return nil
}
type WriteRequest struct {
Timeseries []*TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries,omitempty"`
}
func (m *WriteRequest) Reset() { *m = WriteRequest{} }
func (m *WriteRequest) String() string { return proto.CompactTextString(m) }
func (*WriteRequest) ProtoMessage() {}
func (*WriteRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *WriteRequest) GetTimeseries() []*TimeSeries {
if m != nil {
return m.Timeseries
}
return nil
}
type WriteResponse struct {
}
func (m *WriteResponse) Reset() { *m = WriteResponse{} }
func (m *WriteResponse) String() string { return proto.CompactTextString(m) }
func (*WriteResponse) ProtoMessage() {}
func (*WriteResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} }
func init() {
proto.RegisterType((*Sample)(nil), "remote.Sample")
proto.RegisterType((*LabelPair)(nil), "remote.LabelPair")
proto.RegisterType((*TimeSeries)(nil), "remote.TimeSeries")
proto.RegisterType((*WriteRequest)(nil), "remote.WriteRequest")
proto.RegisterType((*WriteResponse)(nil), "remote.WriteResponse")
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion3
// Client API for Write service
type WriteClient interface {
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
}
type writeClient struct {
cc *grpc.ClientConn
}
func NewWriteClient(cc *grpc.ClientConn) WriteClient {
return &writeClient{cc}
}
func (c *writeClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
out := new(WriteResponse)
err := grpc.Invoke(ctx, "/remote.Write/Write", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for Write service
type WriteServer interface {
Write(context.Context, *WriteRequest) (*WriteResponse, error)
}
func RegisterWriteServer(s *grpc.Server, srv WriteServer) {
s.RegisterService(&_Write_serviceDesc, srv)
}
func _Write_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(WriteServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/remote.Write/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(WriteServer).Write(ctx, req.(*WriteRequest))
}
return interceptor(ctx, in, info, handler)
}
var _Write_serviceDesc = grpc.ServiceDesc{
ServiceName: "remote.Write",
HandlerType: (*WriteServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Write",
Handler: _Write_Write_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: fileDescriptor0,
}
func init() { proto.RegisterFile("remote.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 250 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x5c, 0x91, 0x4f, 0x4b, 0x03, 0x31,
0x10, 0xc5, 0xdd, 0xd6, 0xae, 0x74, 0xba, 0x2a, 0x0e, 0x15, 0x8a, 0x27, 0xcd, 0x69, 0xbd, 0xf4,
0xb0, 0xa2, 0x57, 0xd1, 0xb3, 0x82, 0xa4, 0x82, 0x47, 0x49, 0x61, 0x0e, 0x0b, 0x9b, 0x66, 0xcd,
0xa4, 0x7e, 0x7e, 0xb3, 0xf9, 0xd3, 0x2e, 0xde, 0x66, 0xde, 0xbc, 0xf9, 0xcd, 0x0b, 0x81, 0xca,
0x92, 0x36, 0x8e, 0xd6, 0xbd, 0x35, 0xce, 0x60, 0x19, 0x3b, 0xf1, 0x02, 0xe5, 0x46, 0xe9, 0xbe,
0x23, 0x5c, 0xc2, 0xec, 0x57, 0x75, 0x7b, 0x5a, 0x15, 0xb7, 0x45, 0x5d, 0xc8, 0xd8, 0xe0, 0x1d,
0x54, 0xae, 0xd5, 0xc4, 0xce, 0x9b, 0xbe, 0x35, 0xaf, 0x26, 0x7e, 0x38, 0x95, 0x8b, 0x83, 0xf6,
0xce, 0xe2, 0x11, 0xe6, 0x6f, 0x6a, 0x4b, 0xdd, 0x87, 0x6a, 0x2d, 0x22, 0x9c, 0xee, 0x94, 0x8e,
0x90, 0xb9, 0x0c, 0xf5, 0x91, 0x3c, 0x09, 0x62, 0x6c, 0x84, 0x02, 0xf8, 0xf4, 0x94, 0x0d, 0xd9,
0x96, 0x18, 0xef, 0xa1, 0xec, 0x06, 0x08, 0xfb, 0xcd, 0x69, 0xbd, 0x68, 0xae, 0xd6, 0x29, 0xee,
0x01, 0x2d, 0x93, 0x01, 0x6b, 0x38, 0xe3, 0x10, 0x79, 0x48, 0x33, 0x78, 0x2f, 0xb2, 0x37, 0xbe,
0x44, 0xe6, 0xb1, 0x78, 0x85, 0xea, 0xcb, 0xb6, 0x8e, 0x24, 0xfd, 0xec, 0x7d, 0x5c, 0x6c, 0x00,
0x42, 0xf0, 0x70, 0x32, 0x1d, 0xc2, 0xbc, 0x7c, 0x0c, 0x23, 0x47, 0x2e, 0x71, 0x09, 0xe7, 0x89,
0xc1, 0xbd, 0xd9, 0x31, 0x35, 0xcf, 0x30, 0x0b, 0x02, 0x3e, 0xe5, 0x62, 0x99, 0x11, 0xe3, 0x63,
0x37, 0xd7, 0xff, 0xd4, 0xb8, 0x2e, 0x4e, 0xb6, 0x65, 0xf8, 0x81, 0x87, 0xbf, 0x00, 0x00, 0x00,
0xff, 0xff, 0xed, 0x75, 0xbc, 0xc4, 0x91, 0x01, 0x00, 0x00,
}

View file

@ -13,11 +13,10 @@
syntax = "proto3";
package generic;
package remote;
message Sample {
double value = 1;
double value = 1;
int64 timestamp_ms = 2;
}
@ -27,19 +26,18 @@ message LabelPair {
}
message TimeSeries {
string name = 1;
repeated LabelPair labels = 2;
repeated LabelPair labels = 1;
// Sorted by time, oldest sample first.
repeated Sample samples = 3;
repeated Sample samples = 2;
}
message GenericWriteRequest {
message WriteRequest {
repeated TimeSeries timeseries = 1;
}
message GenericWriteResponse {
message WriteResponse {
}
service GenericWrite {
rpc Write(GenericWriteRequest) returns (GenericWriteResponse) {}
service Write {
rpc Write(WriteRequest) returns (WriteResponse) {}
}

34
storage/remote/snappy.go Normal file
View file

@ -0,0 +1,34 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
"io"
"github.com/golang/snappy"
)
type snappyCompressor struct{}
func (c *snappyCompressor) Do(w io.Writer, p []byte) error {
sw := snappy.NewWriter(w)
if _, err := sw.Write(p); err != nil {
return err
}
return sw.Close()
}
func (c *snappyCompressor) Type() string {
return "snappy"
}