mirror of
https://github.com/prometheus/prometheus.git
synced 2025-01-17 16:57:46 -08:00
1124 lines
32 KiB
Go
1124 lines
32 KiB
Go
/*
|
|
*
|
|
* Copyright 2014, Google Inc.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are
|
|
* met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above
|
|
* copyright notice, this list of conditions and the following disclaimer
|
|
* in the documentation and/or other materials provided with the
|
|
* distribution.
|
|
* * Neither the name of Google Inc. nor the names of its
|
|
* contributors may be used to endorse or promote products derived from
|
|
* this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
package grpc
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"net/http"
|
|
"reflect"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
"golang.org/x/net/http2"
|
|
"golang.org/x/net/trace"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/internal"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/stats"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/grpc/tap"
|
|
"google.golang.org/grpc/transport"
|
|
)
|
|
|
|
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
|
|
|
|
// MethodDesc represents an RPC service's method specification.
|
|
type MethodDesc struct {
|
|
MethodName string
|
|
Handler methodHandler
|
|
}
|
|
|
|
// ServiceDesc represents an RPC service's specification.
|
|
type ServiceDesc struct {
|
|
ServiceName string
|
|
// The pointer to the service interface. Used to check whether the user
|
|
// provided implementation satisfies the interface requirements.
|
|
HandlerType interface{}
|
|
Methods []MethodDesc
|
|
Streams []StreamDesc
|
|
Metadata interface{}
|
|
}
|
|
|
|
// service consists of the information of the server serving this service and
|
|
// the methods in this service.
|
|
type service struct {
|
|
server interface{} // the server for service methods
|
|
md map[string]*MethodDesc
|
|
sd map[string]*StreamDesc
|
|
mdata interface{}
|
|
}
|
|
|
|
// Server is a gRPC server to serve RPC requests.
|
|
type Server struct {
|
|
opts options
|
|
|
|
mu sync.Mutex // guards following
|
|
lis map[net.Listener]bool
|
|
conns map[io.Closer]bool
|
|
drain bool
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
|
|
// and all the transport goes away.
|
|
cv *sync.Cond
|
|
m map[string]*service // service name -> service info
|
|
events trace.EventLog
|
|
}
|
|
|
|
type options struct {
|
|
creds credentials.TransportCredentials
|
|
codec Codec
|
|
cp Compressor
|
|
dc Decompressor
|
|
maxMsgSize int
|
|
unaryInt UnaryServerInterceptor
|
|
streamInt StreamServerInterceptor
|
|
inTapHandle tap.ServerInHandle
|
|
statsHandler stats.Handler
|
|
maxConcurrentStreams uint32
|
|
useHandlerImpl bool // use http.Handler-based server
|
|
unknownStreamDesc *StreamDesc
|
|
keepaliveParams keepalive.ServerParameters
|
|
keepalivePolicy keepalive.EnforcementPolicy
|
|
initialWindowSize int32
|
|
initialConnWindowSize int32
|
|
}
|
|
|
|
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
|
|
|
|
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
|
type ServerOption func(*options)
|
|
|
|
// InitialWindowSize returns a ServerOption that sets window size for stream.
|
|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
|
func InitialWindowSize(s int32) ServerOption {
|
|
return func(o *options) {
|
|
o.initialWindowSize = s
|
|
}
|
|
}
|
|
|
|
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
|
|
// The lower bound for window size is 64K and any value smaller than that will be ignored.
|
|
func InitialConnWindowSize(s int32) ServerOption {
|
|
return func(o *options) {
|
|
o.initialConnWindowSize = s
|
|
}
|
|
}
|
|
|
|
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
|
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
|
return func(o *options) {
|
|
o.keepaliveParams = kp
|
|
}
|
|
}
|
|
|
|
// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
|
|
func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
|
|
return func(o *options) {
|
|
o.keepalivePolicy = kep
|
|
}
|
|
}
|
|
|
|
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
|
|
func CustomCodec(codec Codec) ServerOption {
|
|
return func(o *options) {
|
|
o.codec = codec
|
|
}
|
|
}
|
|
|
|
// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
|
|
func RPCCompressor(cp Compressor) ServerOption {
|
|
return func(o *options) {
|
|
o.cp = cp
|
|
}
|
|
}
|
|
|
|
// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
|
|
func RPCDecompressor(dc Decompressor) ServerOption {
|
|
return func(o *options) {
|
|
o.dc = dc
|
|
}
|
|
}
|
|
|
|
// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
|
|
// If this is not set, gRPC uses the default 4MB.
|
|
func MaxMsgSize(m int) ServerOption {
|
|
return func(o *options) {
|
|
o.maxMsgSize = m
|
|
}
|
|
}
|
|
|
|
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
|
|
// of concurrent streams to each ServerTransport.
|
|
func MaxConcurrentStreams(n uint32) ServerOption {
|
|
return func(o *options) {
|
|
o.maxConcurrentStreams = n
|
|
}
|
|
}
|
|
|
|
// Creds returns a ServerOption that sets credentials for server connections.
|
|
func Creds(c credentials.TransportCredentials) ServerOption {
|
|
return func(o *options) {
|
|
o.creds = c
|
|
}
|
|
}
|
|
|
|
// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
|
|
// server. Only one unary interceptor can be installed. The construction of multiple
|
|
// interceptors (e.g., chaining) can be implemented at the caller.
|
|
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
|
|
return func(o *options) {
|
|
if o.unaryInt != nil {
|
|
panic("The unary server interceptor was already set and may not be reset.")
|
|
}
|
|
o.unaryInt = i
|
|
}
|
|
}
|
|
|
|
// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
|
|
// server. Only one stream interceptor can be installed.
|
|
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
|
|
return func(o *options) {
|
|
if o.streamInt != nil {
|
|
panic("The stream server interceptor was already set and may not be reset.")
|
|
}
|
|
o.streamInt = i
|
|
}
|
|
}
|
|
|
|
// InTapHandle returns a ServerOption that sets the tap handle for all the server
|
|
// transport to be created. Only one can be installed.
|
|
func InTapHandle(h tap.ServerInHandle) ServerOption {
|
|
return func(o *options) {
|
|
if o.inTapHandle != nil {
|
|
panic("The tap handle was already set and may not be reset.")
|
|
}
|
|
o.inTapHandle = h
|
|
}
|
|
}
|
|
|
|
// StatsHandler returns a ServerOption that sets the stats handler for the server.
|
|
func StatsHandler(h stats.Handler) ServerOption {
|
|
return func(o *options) {
|
|
o.statsHandler = h
|
|
}
|
|
}
|
|
|
|
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
|
|
// unknown service handler. The provided method is a bidi-streaming RPC service
|
|
// handler that will be invoked instead of returning the "unimplemented" gRPC
|
|
// error whenever a request is received for an unregistered service or method.
|
|
// The handling function has full access to the Context of the request and the
|
|
// stream, and the invocation passes through interceptors.
|
|
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
|
|
return func(o *options) {
|
|
o.unknownStreamDesc = &StreamDesc{
|
|
StreamName: "unknown_service_handler",
|
|
Handler: streamHandler,
|
|
// We need to assume that the users of the streamHandler will want to use both.
|
|
ClientStreams: true,
|
|
ServerStreams: true,
|
|
}
|
|
}
|
|
}
|
|
|
|
// NewServer creates a gRPC server which has no service registered and has not
|
|
// started to accept requests yet.
|
|
func NewServer(opt ...ServerOption) *Server {
|
|
var opts options
|
|
opts.maxMsgSize = defaultMaxMsgSize
|
|
for _, o := range opt {
|
|
o(&opts)
|
|
}
|
|
if opts.codec == nil {
|
|
// Set the default codec.
|
|
opts.codec = protoCodec{}
|
|
}
|
|
s := &Server{
|
|
lis: make(map[net.Listener]bool),
|
|
opts: opts,
|
|
conns: make(map[io.Closer]bool),
|
|
m: make(map[string]*service),
|
|
}
|
|
s.cv = sync.NewCond(&s.mu)
|
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
|
if EnableTracing {
|
|
_, file, line, _ := runtime.Caller(1)
|
|
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
|
}
|
|
return s
|
|
}
|
|
|
|
// printf records an event in s's event log, unless s has been stopped.
|
|
// REQUIRES s.mu is held.
|
|
func (s *Server) printf(format string, a ...interface{}) {
|
|
if s.events != nil {
|
|
s.events.Printf(format, a...)
|
|
}
|
|
}
|
|
|
|
// errorf records an error in s's event log, unless s has been stopped.
|
|
// REQUIRES s.mu is held.
|
|
func (s *Server) errorf(format string, a ...interface{}) {
|
|
if s.events != nil {
|
|
s.events.Errorf(format, a...)
|
|
}
|
|
}
|
|
|
|
// RegisterService registers a service and its implementation to the gRPC
|
|
// server. It is called from the IDL generated code. This must be called before
|
|
// invoking Serve.
|
|
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
|
|
ht := reflect.TypeOf(sd.HandlerType).Elem()
|
|
st := reflect.TypeOf(ss)
|
|
if !st.Implements(ht) {
|
|
grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
|
|
}
|
|
s.register(sd, ss)
|
|
}
|
|
|
|
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.printf("RegisterService(%q)", sd.ServiceName)
|
|
if _, ok := s.m[sd.ServiceName]; ok {
|
|
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
|
|
}
|
|
srv := &service{
|
|
server: ss,
|
|
md: make(map[string]*MethodDesc),
|
|
sd: make(map[string]*StreamDesc),
|
|
mdata: sd.Metadata,
|
|
}
|
|
for i := range sd.Methods {
|
|
d := &sd.Methods[i]
|
|
srv.md[d.MethodName] = d
|
|
}
|
|
for i := range sd.Streams {
|
|
d := &sd.Streams[i]
|
|
srv.sd[d.StreamName] = d
|
|
}
|
|
s.m[sd.ServiceName] = srv
|
|
}
|
|
|
|
// MethodInfo contains the information of an RPC including its method name and type.
|
|
type MethodInfo struct {
|
|
// Name is the method name only, without the service name or package name.
|
|
Name string
|
|
// IsClientStream indicates whether the RPC is a client streaming RPC.
|
|
IsClientStream bool
|
|
// IsServerStream indicates whether the RPC is a server streaming RPC.
|
|
IsServerStream bool
|
|
}
|
|
|
|
// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
|
|
type ServiceInfo struct {
|
|
Methods []MethodInfo
|
|
// Metadata is the metadata specified in ServiceDesc when registering service.
|
|
Metadata interface{}
|
|
}
|
|
|
|
// GetServiceInfo returns a map from service names to ServiceInfo.
|
|
// Service names include the package names, in the form of <package>.<service>.
|
|
func (s *Server) GetServiceInfo() map[string]ServiceInfo {
|
|
ret := make(map[string]ServiceInfo)
|
|
for n, srv := range s.m {
|
|
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
|
|
for m := range srv.md {
|
|
methods = append(methods, MethodInfo{
|
|
Name: m,
|
|
IsClientStream: false,
|
|
IsServerStream: false,
|
|
})
|
|
}
|
|
for m, d := range srv.sd {
|
|
methods = append(methods, MethodInfo{
|
|
Name: m,
|
|
IsClientStream: d.ClientStreams,
|
|
IsServerStream: d.ServerStreams,
|
|
})
|
|
}
|
|
|
|
ret[n] = ServiceInfo{
|
|
Methods: methods,
|
|
Metadata: srv.mdata,
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
var (
|
|
// ErrServerStopped indicates that the operation is now illegal because of
|
|
// the server being stopped.
|
|
ErrServerStopped = errors.New("grpc: the server has been stopped")
|
|
)
|
|
|
|
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
|
|
if s.opts.creds == nil {
|
|
return rawConn, nil, nil
|
|
}
|
|
return s.opts.creds.ServerHandshake(rawConn)
|
|
}
|
|
|
|
// Serve accepts incoming connections on the listener lis, creating a new
|
|
// ServerTransport and service goroutine for each. The service goroutines
|
|
// read gRPC requests and then call the registered handlers to reply to them.
|
|
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
|
|
// this method returns.
|
|
// Serve always returns non-nil error.
|
|
func (s *Server) Serve(lis net.Listener) error {
|
|
s.mu.Lock()
|
|
s.printf("serving")
|
|
if s.lis == nil {
|
|
s.mu.Unlock()
|
|
lis.Close()
|
|
return ErrServerStopped
|
|
}
|
|
s.lis[lis] = true
|
|
s.mu.Unlock()
|
|
defer func() {
|
|
s.mu.Lock()
|
|
if s.lis != nil && s.lis[lis] {
|
|
lis.Close()
|
|
delete(s.lis, lis)
|
|
}
|
|
s.mu.Unlock()
|
|
}()
|
|
|
|
var tempDelay time.Duration // how long to sleep on accept failure
|
|
|
|
for {
|
|
rawConn, err := lis.Accept()
|
|
if err != nil {
|
|
if ne, ok := err.(interface {
|
|
Temporary() bool
|
|
}); ok && ne.Temporary() {
|
|
if tempDelay == 0 {
|
|
tempDelay = 5 * time.Millisecond
|
|
} else {
|
|
tempDelay *= 2
|
|
}
|
|
if max := 1 * time.Second; tempDelay > max {
|
|
tempDelay = max
|
|
}
|
|
s.mu.Lock()
|
|
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
|
|
s.mu.Unlock()
|
|
select {
|
|
case <-time.After(tempDelay):
|
|
case <-s.ctx.Done():
|
|
}
|
|
continue
|
|
}
|
|
s.mu.Lock()
|
|
s.printf("done serving; Accept = %v", err)
|
|
s.mu.Unlock()
|
|
return err
|
|
}
|
|
tempDelay = 0
|
|
// Start a new goroutine to deal with rawConn
|
|
// so we don't stall this Accept loop goroutine.
|
|
go s.handleRawConn(rawConn)
|
|
}
|
|
}
|
|
|
|
// handleRawConn is run in its own goroutine and handles a just-accepted
|
|
// connection that has not had any I/O performed on it yet.
|
|
func (s *Server) handleRawConn(rawConn net.Conn) {
|
|
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
|
|
s.mu.Unlock()
|
|
grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
|
|
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
|
|
if err != credentials.ErrConnDispatched {
|
|
rawConn.Close()
|
|
}
|
|
return
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if s.conns == nil {
|
|
s.mu.Unlock()
|
|
conn.Close()
|
|
return
|
|
}
|
|
s.mu.Unlock()
|
|
|
|
if s.opts.useHandlerImpl {
|
|
s.serveUsingHandler(conn)
|
|
} else {
|
|
s.serveHTTP2Transport(conn, authInfo)
|
|
}
|
|
}
|
|
|
|
// serveHTTP2Transport sets up a http/2 transport (using the
|
|
// gRPC http2 server transport in transport/http2_server.go) and
|
|
// serves streams on it.
|
|
// This is run in its own goroutine (it does network I/O in
|
|
// transport.NewServerTransport).
|
|
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
|
|
config := &transport.ServerConfig{
|
|
MaxStreams: s.opts.maxConcurrentStreams,
|
|
AuthInfo: authInfo,
|
|
InTapHandle: s.opts.inTapHandle,
|
|
StatsHandler: s.opts.statsHandler,
|
|
KeepaliveParams: s.opts.keepaliveParams,
|
|
KeepalivePolicy: s.opts.keepalivePolicy,
|
|
InitialWindowSize: s.opts.initialWindowSize,
|
|
InitialConnWindowSize: s.opts.initialConnWindowSize,
|
|
}
|
|
st, err := transport.NewServerTransport("http2", c, config)
|
|
if err != nil {
|
|
s.mu.Lock()
|
|
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
|
|
s.mu.Unlock()
|
|
c.Close()
|
|
grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
|
|
return
|
|
}
|
|
if !s.addConn(st) {
|
|
st.Close()
|
|
return
|
|
}
|
|
s.serveStreams(st)
|
|
}
|
|
|
|
func (s *Server) serveStreams(st transport.ServerTransport) {
|
|
defer s.removeConn(st)
|
|
defer st.Close()
|
|
var wg sync.WaitGroup
|
|
st.HandleStreams(func(stream *transport.Stream) {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
s.handleStream(st, stream, s.traceInfo(st, stream))
|
|
}()
|
|
}, func(ctx context.Context, method string) context.Context {
|
|
if !EnableTracing {
|
|
return ctx
|
|
}
|
|
tr := trace.New("grpc.Recv."+methodFamily(method), method)
|
|
return trace.NewContext(ctx, tr)
|
|
})
|
|
wg.Wait()
|
|
}
|
|
|
|
var _ http.Handler = (*Server)(nil)
|
|
|
|
// serveUsingHandler is called from handleRawConn when s is configured
|
|
// to handle requests via the http.Handler interface. It sets up a
|
|
// net/http.Server to handle the just-accepted conn. The http.Server
|
|
// is configured to route all incoming requests (all HTTP/2 streams)
|
|
// to ServeHTTP, which creates a new ServerTransport for each stream.
|
|
// serveUsingHandler blocks until conn closes.
|
|
//
|
|
// This codepath is only used when Server.TestingUseHandlerImpl has
|
|
// been configured. This lets the end2end tests exercise the ServeHTTP
|
|
// method as one of the environment types.
|
|
//
|
|
// conn is the *tls.Conn that's already been authenticated.
|
|
func (s *Server) serveUsingHandler(conn net.Conn) {
|
|
if !s.addConn(conn) {
|
|
conn.Close()
|
|
return
|
|
}
|
|
defer s.removeConn(conn)
|
|
h2s := &http2.Server{
|
|
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
|
|
}
|
|
h2s.ServeConn(conn, &http2.ServeConnOpts{
|
|
Handler: s,
|
|
})
|
|
}
|
|
|
|
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
st, err := transport.NewServerHandlerTransport(w, r)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
if !s.addConn(st) {
|
|
st.Close()
|
|
return
|
|
}
|
|
defer s.removeConn(st)
|
|
s.serveStreams(st)
|
|
}
|
|
|
|
// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
|
|
// If tracing is not enabled, it returns nil.
|
|
func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
|
|
tr, ok := trace.FromContext(stream.Context())
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
trInfo = &traceInfo{
|
|
tr: tr,
|
|
}
|
|
trInfo.firstLine.client = false
|
|
trInfo.firstLine.remoteAddr = st.RemoteAddr()
|
|
|
|
if dl, ok := stream.Context().Deadline(); ok {
|
|
trInfo.firstLine.deadline = dl.Sub(time.Now())
|
|
}
|
|
return trInfo
|
|
}
|
|
|
|
func (s *Server) addConn(c io.Closer) bool {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conns == nil || s.drain {
|
|
return false
|
|
}
|
|
s.conns[c] = true
|
|
return true
|
|
}
|
|
|
|
func (s *Server) removeConn(c io.Closer) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conns != nil {
|
|
delete(s.conns, c)
|
|
s.cv.Broadcast()
|
|
}
|
|
}
|
|
|
|
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
|
|
var (
|
|
cbuf *bytes.Buffer
|
|
outPayload *stats.OutPayload
|
|
)
|
|
if cp != nil {
|
|
cbuf = new(bytes.Buffer)
|
|
}
|
|
if s.opts.statsHandler != nil {
|
|
outPayload = &stats.OutPayload{}
|
|
}
|
|
p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
|
|
if err != nil {
|
|
// This typically indicates a fatal issue (e.g., memory
|
|
// corruption or hardware faults) the application program
|
|
// cannot handle.
|
|
//
|
|
// TODO(zhaoq): There exist other options also such as only closing the
|
|
// faulty stream locally and remotely (Other streams can keep going). Find
|
|
// the optimal option.
|
|
grpclog.Fatalf("grpc: Server failed to encode response %v", err)
|
|
}
|
|
err = t.Write(stream, p, opts)
|
|
if err == nil && outPayload != nil {
|
|
outPayload.SentTime = time.Now()
|
|
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
|
|
sh := s.opts.statsHandler
|
|
if sh != nil {
|
|
begin := &stats.Begin{
|
|
BeginTime: time.Now(),
|
|
}
|
|
sh.HandleRPC(stream.Context(), begin)
|
|
defer func() {
|
|
end := &stats.End{
|
|
EndTime: time.Now(),
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
end.Error = toRPCErr(err)
|
|
}
|
|
sh.HandleRPC(stream.Context(), end)
|
|
}()
|
|
}
|
|
if trInfo != nil {
|
|
defer trInfo.tr.Finish()
|
|
trInfo.firstLine.client = false
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
defer func() {
|
|
if err != nil && err != io.EOF {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
}()
|
|
}
|
|
if s.opts.cp != nil {
|
|
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
|
|
stream.SetSendCompress(s.opts.cp.Type())
|
|
}
|
|
p := &parser{r: stream}
|
|
pf, req, err := p.recvMsg(s.opts.maxMsgSize)
|
|
if err == io.EOF {
|
|
// The entire stream is done (for unary RPC only).
|
|
return err
|
|
}
|
|
if err == io.ErrUnexpectedEOF {
|
|
err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
|
|
}
|
|
if err != nil {
|
|
if st, ok := status.FromError(err); ok {
|
|
if e := t.WriteStatus(stream, st); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
} else {
|
|
switch st := err.(type) {
|
|
case transport.ConnectionError:
|
|
// Nothing to do here.
|
|
case transport.StreamError:
|
|
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
|
|
if st, ok := status.FromError(err); ok {
|
|
if e := t.WriteStatus(stream, st); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
return err
|
|
}
|
|
if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
|
|
// TODO checkRecvPayload always return RPC error. Add a return here if necessary.
|
|
}
|
|
var inPayload *stats.InPayload
|
|
if sh != nil {
|
|
inPayload = &stats.InPayload{
|
|
RecvTime: time.Now(),
|
|
}
|
|
}
|
|
df := func(v interface{}) error {
|
|
if inPayload != nil {
|
|
inPayload.WireLength = len(req)
|
|
}
|
|
if pf == compressionMade {
|
|
var err error
|
|
req, err = s.opts.dc.Do(bytes.NewReader(req))
|
|
if err != nil {
|
|
return Errorf(codes.Internal, err.Error())
|
|
}
|
|
}
|
|
if len(req) > s.opts.maxMsgSize {
|
|
// TODO: Revisit the error code. Currently keep it consistent with
|
|
// java implementation.
|
|
return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
|
|
}
|
|
if err := s.opts.codec.Unmarshal(req, v); err != nil {
|
|
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
|
|
}
|
|
if inPayload != nil {
|
|
inPayload.Payload = v
|
|
inPayload.Data = req
|
|
inPayload.Length = len(req)
|
|
sh.HandleRPC(stream.Context(), inPayload)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
|
|
}
|
|
return nil
|
|
}
|
|
reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
|
|
if appErr != nil {
|
|
appStatus, ok := status.FromError(appErr)
|
|
if !ok {
|
|
// Convert appErr if it is not a grpc status error.
|
|
appErr = status.Error(convertCode(appErr), appErr.Error())
|
|
appStatus, _ = status.FromError(appErr)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
if e := t.WriteStatus(stream, appStatus); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
|
|
}
|
|
return appErr
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(stringer("OK"), false)
|
|
}
|
|
opts := &transport.Options{
|
|
Last: true,
|
|
Delay: false,
|
|
}
|
|
if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
|
|
if err == io.EOF {
|
|
// The entire stream is done (for unary RPC only).
|
|
return err
|
|
}
|
|
if s, ok := status.FromError(err); ok {
|
|
if e := t.WriteStatus(stream, s); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
|
|
}
|
|
} else {
|
|
switch st := err.(type) {
|
|
case transport.ConnectionError:
|
|
// Nothing to do here.
|
|
case transport.StreamError:
|
|
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
|
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
|
|
}
|
|
default:
|
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
|
|
}
|
|
// TODO: Should we be logging if writing status failed here, like above?
|
|
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
|
|
// error or allow the stats handler to see it?
|
|
return t.WriteStatus(stream, status.New(codes.OK, ""))
|
|
}
|
|
|
|
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
|
|
sh := s.opts.statsHandler
|
|
if sh != nil {
|
|
begin := &stats.Begin{
|
|
BeginTime: time.Now(),
|
|
}
|
|
sh.HandleRPC(stream.Context(), begin)
|
|
defer func() {
|
|
end := &stats.End{
|
|
EndTime: time.Now(),
|
|
}
|
|
if err != nil && err != io.EOF {
|
|
end.Error = toRPCErr(err)
|
|
}
|
|
sh.HandleRPC(stream.Context(), end)
|
|
}()
|
|
}
|
|
if s.opts.cp != nil {
|
|
stream.SetSendCompress(s.opts.cp.Type())
|
|
}
|
|
ss := &serverStream{
|
|
t: t,
|
|
s: stream,
|
|
p: &parser{r: stream},
|
|
codec: s.opts.codec,
|
|
cp: s.opts.cp,
|
|
dc: s.opts.dc,
|
|
maxMsgSize: s.opts.maxMsgSize,
|
|
trInfo: trInfo,
|
|
statsHandler: sh,
|
|
}
|
|
if ss.cp != nil {
|
|
ss.cbuf = new(bytes.Buffer)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
|
defer func() {
|
|
ss.mu.Lock()
|
|
if err != nil && err != io.EOF {
|
|
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
ss.trInfo.tr.SetError()
|
|
}
|
|
ss.trInfo.tr.Finish()
|
|
ss.trInfo.tr = nil
|
|
ss.mu.Unlock()
|
|
}()
|
|
}
|
|
var appErr error
|
|
var server interface{}
|
|
if srv != nil {
|
|
server = srv.server
|
|
}
|
|
if s.opts.streamInt == nil {
|
|
appErr = sd.Handler(server, ss)
|
|
} else {
|
|
info := &StreamServerInfo{
|
|
FullMethod: stream.Method(),
|
|
IsClientStream: sd.ClientStreams,
|
|
IsServerStream: sd.ServerStreams,
|
|
}
|
|
appErr = s.opts.streamInt(server, ss, info, sd.Handler)
|
|
}
|
|
if appErr != nil {
|
|
appStatus, ok := status.FromError(appErr)
|
|
if !ok {
|
|
switch err := appErr.(type) {
|
|
case transport.StreamError:
|
|
appStatus = status.New(err.Code, err.Desc)
|
|
default:
|
|
appStatus = status.New(convertCode(appErr), appErr.Error())
|
|
}
|
|
appErr = appStatus.Err()
|
|
}
|
|
if trInfo != nil {
|
|
ss.mu.Lock()
|
|
ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
|
|
ss.trInfo.tr.SetError()
|
|
ss.mu.Unlock()
|
|
}
|
|
t.WriteStatus(ss.s, appStatus)
|
|
// TODO: Should we log an error from WriteStatus here and below?
|
|
return appErr
|
|
}
|
|
if trInfo != nil {
|
|
ss.mu.Lock()
|
|
ss.trInfo.tr.LazyLog(stringer("OK"), false)
|
|
ss.mu.Unlock()
|
|
}
|
|
return t.WriteStatus(ss.s, status.New(codes.OK, ""))
|
|
|
|
}
|
|
|
|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
|
|
sm := stream.Method()
|
|
if sm != "" && sm[0] == '/' {
|
|
sm = sm[1:]
|
|
}
|
|
pos := strings.LastIndex(sm, "/")
|
|
if pos == -1 {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
|
|
if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
return
|
|
}
|
|
service := sm[:pos]
|
|
method := sm[pos+1:]
|
|
srv, ok := s.m[service]
|
|
if !ok {
|
|
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
|
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
|
return
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
errDesc := fmt.Sprintf("unknown service %v", service)
|
|
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
return
|
|
}
|
|
// Unary RPC or Streaming RPC?
|
|
if md, ok := srv.md[method]; ok {
|
|
s.processUnaryRPC(t, stream, srv, md, trInfo)
|
|
return
|
|
}
|
|
if sd, ok := srv.sd[method]; ok {
|
|
s.processStreamingRPC(t, stream, srv, sd, trInfo)
|
|
return
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
|
|
s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
|
|
return
|
|
}
|
|
errDesc := fmt.Sprintf("unknown method %v", method)
|
|
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
|
if trInfo != nil {
|
|
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
|
trInfo.tr.SetError()
|
|
}
|
|
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
|
}
|
|
if trInfo != nil {
|
|
trInfo.tr.Finish()
|
|
}
|
|
}
|
|
|
|
// Stop stops the gRPC server. It immediately closes all open
|
|
// connections and listeners.
|
|
// It cancels all active RPCs on the server side and the corresponding
|
|
// pending RPCs on the client side will get notified by connection
|
|
// errors.
|
|
func (s *Server) Stop() {
|
|
s.mu.Lock()
|
|
listeners := s.lis
|
|
s.lis = nil
|
|
st := s.conns
|
|
s.conns = nil
|
|
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
|
|
s.cv.Broadcast()
|
|
s.mu.Unlock()
|
|
|
|
for lis := range listeners {
|
|
lis.Close()
|
|
}
|
|
for c := range st {
|
|
c.Close()
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.cancel()
|
|
if s.events != nil {
|
|
s.events.Finish()
|
|
s.events = nil
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// GracefulStop stops the gRPC server gracefully. It stops the server from
|
|
// accepting new connections and RPCs and blocks until all the pending RPCs are
|
|
// finished.
|
|
func (s *Server) GracefulStop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.conns == nil {
|
|
return
|
|
}
|
|
for lis := range s.lis {
|
|
lis.Close()
|
|
}
|
|
s.lis = nil
|
|
s.cancel()
|
|
if !s.drain {
|
|
for c := range s.conns {
|
|
c.(transport.ServerTransport).Drain()
|
|
}
|
|
s.drain = true
|
|
}
|
|
for len(s.conns) != 0 {
|
|
s.cv.Wait()
|
|
}
|
|
s.conns = nil
|
|
if s.events != nil {
|
|
s.events.Finish()
|
|
s.events = nil
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
internal.TestingCloseConns = func(arg interface{}) {
|
|
arg.(*Server).testingCloseConns()
|
|
}
|
|
internal.TestingUseHandlerImpl = func(arg interface{}) {
|
|
arg.(*Server).opts.useHandlerImpl = true
|
|
}
|
|
}
|
|
|
|
// testingCloseConns closes all existing transports but keeps s.lis
|
|
// accepting new connections.
|
|
func (s *Server) testingCloseConns() {
|
|
s.mu.Lock()
|
|
for c := range s.conns {
|
|
c.Close()
|
|
delete(s.conns, c)
|
|
}
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// SetHeader sets the header metadata.
|
|
// When called multiple times, all the provided metadata will be merged.
|
|
// All the metadata will be sent out when one of the following happens:
|
|
// - grpc.SendHeader() is called;
|
|
// - The first response is sent out;
|
|
// - An RPC status is sent out (error or success).
|
|
func SetHeader(ctx context.Context, md metadata.MD) error {
|
|
if md.Len() == 0 {
|
|
return nil
|
|
}
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
return stream.SetHeader(md)
|
|
}
|
|
|
|
// SendHeader sends header metadata. It may be called at most once.
|
|
// The provided md and headers set by SetHeader() will be sent.
|
|
func SendHeader(ctx context.Context, md metadata.MD) error {
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
t := stream.ServerTransport()
|
|
if t == nil {
|
|
grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
|
|
}
|
|
if err := t.WriteHeader(stream, md); err != nil {
|
|
return toRPCErr(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
|
|
// When called more than once, all the provided metadata will be merged.
|
|
func SetTrailer(ctx context.Context, md metadata.MD) error {
|
|
if md.Len() == 0 {
|
|
return nil
|
|
}
|
|
stream, ok := transport.StreamFromContext(ctx)
|
|
if !ok {
|
|
return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
|
|
}
|
|
return stream.SetTrailer(md)
|
|
}
|