-
Notifications
You must be signed in to change notification settings - Fork 0
/
grpc.go
114 lines (89 loc) · 3.37 KB
/
grpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package potens
import (
"net"
"os"
"strconv"
"strings"
"time"
"github.com/kubex/potens-go/auth"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// GrpcBackgroundContext context to use when communicating with other services
func (app *Application) GrpcBackgroundContext() context.Context {
return app.GrpcContext(context.Background())
}
// GrpcTimeoutContext context to use when communicating with other services
func (app *Application) GrpcTimeoutContext(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(app.GrpcContext(context.Background()), timeout)
}
// GrpcContext context to use when communicating with other services
func (app *Application) GrpcContext(parent context.Context) context.Context {
md := metadata.Pairs(
auth.GetAppIDKey(), app.Definition().AppID,
auth.GetAppVendorKey(), app.Definition().VendorID,
)
if parentMd, hasParentMd := metadata.FromIncomingContext(parent); hasParentMd {
return metadata.NewOutgoingContext(parent, metadata.Join(md, parentMd))
}
return metadata.NewOutgoingContext(parent, md)
}
// CreateServer creates a gRPC server with your tls certificates
func (app *Application) CreateServer() error {
app.server = grpc.NewServer()
return nil
}
//GetGrpcServer returns the grpc server
func (app *Application) GetGrpcServer() *grpc.Server {
return app.server
}
func (app *Application) Serve() error {
lis, err := net.Listen("tcp", ":"+app.PortString())
if err != nil {
return err
}
app.Log().Debug("Serving", zap.Int("port", app.Port))
return app.server.Serve(lis)
}
func (app *Application) grpcServiceDialer(service string, timeout time.Duration) (net.Conn, error) {
location := os.Getenv(strings.ToUpper(service) + EnvServiceLocationSuffix)
kubexServiceDomain := os.Getenv(EnvKubexServiceDomain)
if kubexServiceDomain == "" {
kubexServiceDomain = KubexProductionServicesDomain
}
location = app.GetServiceEnvLocation(service)
if location == "" {
location = strings.ToLower(service) + "." + kubexServiceDomain
location += ":" + strconv.FormatInt(int64(KubexDefaultGRPCPort), 10)
}
app.Log().Debug("Dialing GRPC", zap.String("service", service), zap.String("location", location))
return net.DialTimeout("tcp", location, timeout)
}
func (app *Application) GetServiceConnection(service string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithDialer(app.grpcServiceDialer))
return grpc.Dial(service, opts...)
}
func (app *Application) getEnvLocation(prefix string, service string) string {
service = strings.Replace(service, "-", "_", -1)
serviceHost := os.Getenv(strings.ToUpper(prefix+service) + EnvServiceHostSuffix)
servicePort := os.Getenv(strings.ToUpper(prefix+service) + EnvServicePortSuffix + "_GRPC")
if servicePort == "" {
servicePort = os.Getenv(strings.ToUpper(prefix+service) + EnvServicePortSuffix + "_DEFAULTPORT")
}
if servicePort == "" {
servicePort = os.Getenv(strings.ToUpper(prefix+service) + EnvServicePortSuffix)
}
if serviceHost+servicePort != "" {
return serviceHost + ":" + servicePort
}
return ""
}
func (app *Application) GetServiceEnvLocation(service string) string {
return app.getEnvLocation("", service)
}
func (app *Application) GetAppEnvLocation(service string) string {
return app.getEnvLocation("app_", service)
}