Skip to content

Commit

Permalink
Added client interface
Browse files Browse the repository at this point in the history
Signed-off-by: Patryk Strusiewicz-Surmacki <[email protected]>
  • Loading branch information
p-strusiewiczsurmacki-mobica committed May 31, 2024
1 parent 69de278 commit 2516887
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 48 deletions.
6 changes: 3 additions & 3 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"

"github.com/go-logr/zapr"
netnsadapter "github.com/telekom/das-schiff-network-operator/pkg/adapters/netns"
vrfigbpadapter "github.com/telekom/das-schiff-network-operator/pkg/adapters/vrf_igbp"
"github.com/telekom/das-schiff-network-operator/pkg/agent"
agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
Expand All @@ -25,7 +24,7 @@ func main() {
"The controller will load its initial configuration from this file. "+
"Omit this flag to use the default configuration values. "+
"Command-line flags override configuration from this file.")
flag.StringVar(&agentType, "agent", "legacy", "Use selected agent type (default: legacy).")
flag.StringVar(&agentType, "agent", "vrf-igbp", "Use selected agent type (default: vrf-igbp).")
flag.IntVar(&port, "port", agent.DefaultPort, fmt.Sprintf("gRPC listening port. (default: %d)", agent.DefaultPort))

zc := zap.NewProductionConfig()
Expand All @@ -44,7 +43,8 @@ func main() {
var adapter agent.Adapter
switch agentType {
case "netconf":
adapter, err = netnsadapter.New()
log.Error(fmt.Errorf("agent is currently not supported"), "type", agentType)
os.Exit(1)
default:
adapter, err = vrfigbpadapter.New(anycastTracker, log)
}
Expand Down
41 changes: 19 additions & 22 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ import (
"sort"
"strconv"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"

"github.com/telekom/das-schiff-network-operator/api/v1alpha1"
"github.com/telekom/das-schiff-network-operator/controllers"
vrfigbpadapter "github.com/telekom/das-schiff-network-operator/pkg/adapters/vrf_igbp"
"github.com/telekom/das-schiff-network-operator/pkg/agent"
agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
"github.com/telekom/das-schiff-network-operator/pkg/anycast"
"github.com/telekom/das-schiff-network-operator/pkg/bpf"
"github.com/telekom/das-schiff-network-operator/pkg/config"
Expand Down Expand Up @@ -98,7 +96,7 @@ func main() {
var configFile string
var interfacePrefix string
var nodeConfigPath string
var useNetconf bool
var agentType string
var agentPort int
flag.StringVar(&configFile, "config", "",
"The controller will load its initial configuration from this file. "+
Expand All @@ -110,8 +108,7 @@ func main() {
"Interface prefix for bridge devices for MACVlan sync")
flag.StringVar(&nodeConfigPath, "nodeconfig-path", reconciler.DefaultNodeConfigPath,
"Path to store working node configuration.")
flag.BoolVar(&useNetconf, "use-netconf", false,
"Use NETCONF interface to configure hosts instead of Netlink and FRR.")
flag.StringVar(&agentType, "agent", "vrf-igbp", "Use selected agent type (default: vrf-igbp).")
flag.IntVar(&agentPort, "agentPort", agent.DefaultPort,
"gRPC agent port. (default: "+strconv.Itoa(agent.DefaultPort)+")")
opts := zap.Options{
Expand Down Expand Up @@ -140,13 +137,25 @@ func main() {
}
}

if err := start(&options, agentPort, onlyBPFMode, nodeConfigPath, interfacePrefix); err != nil {
var agentClient agent.Client
switch agentType {
case "netconf":
setupLog.Error(fmt.Errorf("netconf agent is currently not supported"), "unsupported error")
os.Exit(1)
default:
agentClient, err = vrfigbpadapter.NewClient(fmt.Sprintf(":%d", agentPort))
if err != nil {
setupLog.Error(err, "error creating agent's client")
}
}

if err := start(&options, onlyBPFMode, nodeConfigPath, interfacePrefix, agentClient); err != nil {
setupLog.Error(err, "error running manager")
os.Exit(1)
}
}

func start(options *manager.Options, agentPort int, onlyBPFMode bool, nodeConfigPath, interfacePrefix string) error {
func start(options *manager.Options, onlyBPFMode bool, nodeConfigPath, interfacePrefix string, agentClient agent.Client) error {
clientConfig := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(clientConfig, *options)
if err != nil {
Expand All @@ -164,18 +173,6 @@ func start(options *manager.Options, agentPort int, onlyBPFMode bool, nodeConfig
return fmt.Errorf("unable to create webhook VRFRouteConfiguration: %w", err)
}

var grpcOpts []grpc.DialOption
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(fmt.Sprintf(":%d", agentPort), grpcOpts...)
if err != nil {
return fmt.Errorf("unable to create gRPC connection: %w", err)
}
defer conn.Close()

agentClient := agentpb.NewAgentClient(conn)

setupLog.Info("configured gRPC client")

if err := initComponents(mgr, anycastTracker, cfg, clientConfig, onlyBPFMode, nodeConfigPath, agentClient); err != nil {
return fmt.Errorf("unable to initialize components: %w", err)
}
Expand All @@ -193,7 +190,7 @@ func start(options *manager.Options, agentPort int, onlyBPFMode bool, nodeConfig
return nil
}

func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config, clientConfig *rest.Config, onlyBPFMode bool, nodeConfigPath string, agentClient agentpb.AgentClient) error {
func initComponents(mgr manager.Manager, anycastTracker *anycast.Tracker, cfg *config.Config, clientConfig *rest.Config, onlyBPFMode bool, nodeConfigPath string, agentClient agent.Client) error {
// Start VRFRouteConfigurationReconciler when we are not running in only BPF mode.
if !onlyBPFMode {
if err := setupReconcilers(mgr, nodeConfigPath, agentClient); err != nil {
Expand Down Expand Up @@ -271,7 +268,7 @@ func setupBPF(cfg *config.Config) error {
return nil
}

func setupReconcilers(mgr manager.Manager, nodeConfigPath string, agentClient agentpb.AgentClient) error {
func setupReconcilers(mgr manager.Manager, nodeConfigPath string, agentClient agent.Client) error {
r, err := reconciler.NewReconciler(mgr.GetClient(), mgr.GetLogger(), nodeConfigPath, agentClient)
if err != nil {
return fmt.Errorf("unable to create debounced reconciler: %w", err)
Expand Down
23 changes: 17 additions & 6 deletions pkg/adapters/netns/netns.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,42 @@
package adapters

import (
"context"
"errors"

"github.com/telekom/das-schiff-network-operator/api/v1alpha1"
"github.com/telekom/das-schiff-network-operator/pkg/agent"
"github.com/telekom/das-schiff-network-operator/pkg/config"
)

type NetNS struct{}
type netNS struct{}

func New() (agent.Adapter, error) {
return &NetNS{}, nil
return &netNS{}, nil
}

func (*NetNS) CheckHealth() error {
func (*netNS) CheckHealth() error {
return errors.ErrUnsupported
}

func (*NetNS) GetConfig() *config.Config {
func (*netNS) GetConfig() *config.Config {
return nil
}

func (*NetNS) ReconcileLayer3([]v1alpha1.VRFRouteConfigurationSpec, []v1alpha1.RoutingTableSpec) error {
func (*netNS) ReconcileLayer3([]v1alpha1.VRFRouteConfigurationSpec, []v1alpha1.RoutingTableSpec) error {
return errors.ErrUnsupported
}

func (*NetNS) ReconcileLayer2([]v1alpha1.Layer2NetworkConfigurationSpec) error {
func (*netNS) ReconcileLayer2([]v1alpha1.Layer2NetworkConfigurationSpec) error {
return errors.ErrUnsupported
}

type netNSClient struct{}

func NewClient() agent.Client {
return &netNSClient{}
}

func (*netNSClient) SendConfig(context.Context, *v1alpha1.NodeConfig) error {
return errors.ErrUnsupported
}
51 changes: 51 additions & 0 deletions pkg/adapters/vrf_igbp/vrfigbp.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package adapters

import (
"context"
"encoding/json"
"fmt"
"os"
"time"

"github.com/go-logr/logr"
"github.com/telekom/das-schiff-network-operator/api/v1alpha1"
"github.com/telekom/das-schiff-network-operator/pkg/agent"
agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
"github.com/telekom/das-schiff-network-operator/pkg/anycast"
"github.com/telekom/das-schiff-network-operator/pkg/config"
"github.com/telekom/das-schiff-network-operator/pkg/frr"
"github.com/telekom/das-schiff-network-operator/pkg/healthcheck"
"github.com/telekom/das-schiff-network-operator/pkg/nl"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const defaultTimeout = 30 * time.Second

type VrfIgbp struct {
netlinkManager *nl.Manager
config *config.Config
Expand Down Expand Up @@ -72,3 +81,45 @@ func (r *VrfIgbp) CheckHealth() error {
func (r *VrfIgbp) GetConfig() *config.Config {
return r.config
}

type vrfIgbpClient struct {
grpcClient agentpb.AgentClient
}

func NewClient(address string) (agent.Client, error) {
var grpcOpts []grpc.DialOption
grpcOpts = append(grpcOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(address, grpcOpts...)
if err != nil {
return nil, fmt.Errorf("unable to create gRPC connection: %w", err)
}

client := agentpb.NewAgentClient(conn)

vrfigbpClient := vrfIgbpClient{
grpcClient: client,
}

return &vrfigbpClient, nil
}

func (c *vrfIgbpClient) SendConfig(ctx context.Context, nodeConfig *v1alpha1.NodeConfig) error {
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

nc := agentpb.NetworkConfiguration{
Data: []byte{},
}
data, err := json.Marshal(*nodeConfig)
if err != nil {
return fmt.Errorf("error marshaling NodeConfig: %w", err)
}

nc.Data = data

if _, err = c.grpcClient.SetConfiguration(timeoutCtx, &nc); err != nil {
return fmt.Errorf("error setting configuration: %w", err)
}

return nil
}
10 changes: 7 additions & 3 deletions pkg/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/go-logr/logr"
"github.com/telekom/das-schiff-network-operator/api/v1alpha1"
pb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
"github.com/telekom/das-schiff-network-operator/pkg/config"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -20,7 +20,7 @@ const (

type Server struct {
adapter Adapter
pb.UnimplementedAgentServer
agentpb.UnimplementedAgentServer
logger *logr.Logger
}

Expand All @@ -31,6 +31,10 @@ type Adapter interface {
GetConfig() *config.Config
}

type Client interface {
SendConfig(context.Context, *v1alpha1.NodeConfig) error
}

func NewServer(adapter Adapter, logger *logr.Logger) *Server {
sLog := logger.WithName("agent-server")
return &Server{
Expand All @@ -40,7 +44,7 @@ func NewServer(adapter Adapter, logger *logr.Logger) *Server {
}

// nolint: wrapcheck
func (s Server) SetConfiguration(_ context.Context, nc *pb.NetworkConfiguration) (*emptypb.Empty, error) {
func (s Server) SetConfiguration(_ context.Context, nc *agentpb.NetworkConfiguration) (*emptypb.Empty, error) {
s.logger.Info("new request")
if nc == nil {
s.logger.Info("nil request")
Expand Down
18 changes: 4 additions & 14 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/go-logr/logr"
"github.com/telekom/das-schiff-network-operator/api/v1alpha1"
agentpb "github.com/telekom/das-schiff-network-operator/pkg/agent/pb"
"github.com/telekom/das-schiff-network-operator/pkg/agent"
"github.com/telekom/das-schiff-network-operator/pkg/debounce"
"github.com/telekom/das-schiff-network-operator/pkg/healthcheck"
"github.com/telekom/das-schiff-network-operator/pkg/nodeconfig"
Expand All @@ -34,7 +34,7 @@ type Reconciler struct {
healthChecker *healthcheck.HealthChecker
nodeConfig *v1alpha1.NodeConfig
nodeConfigPath string
agentClient agentpb.AgentClient
agentClient agent.Client

debouncer *debounce.Debouncer
}
Expand All @@ -44,7 +44,7 @@ type reconcile struct {
logr.Logger
}

func NewReconciler(clusterClient client.Client, logger logr.Logger, nodeConfigPath string, agentClient agentpb.AgentClient) (*Reconciler, error) {
func NewReconciler(clusterClient client.Client, logger logr.Logger, nodeConfigPath string, agentClient agent.Client) (*Reconciler, error) {
reconciler := &Reconciler{
client: clusterClient,
logger: logger,
Expand Down Expand Up @@ -162,17 +162,7 @@ func (r *reconcile) sendConfig(ctx context.Context, nodeCfg *v1alpha1.NodeConfig
timeoutCtx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

nc := agentpb.NetworkConfiguration{
Data: []byte{},
}
data, err := json.Marshal(*nodeCfg)
if err != nil {
return fmt.Errorf("error marshaling NodeConfig: %w", err)
}

nc.Data = data

if _, err = r.agentClient.SetConfiguration(timeoutCtx, &nc); err != nil {
if err := r.agentClient.SendConfig(timeoutCtx, nodeCfg); err != nil {
return fmt.Errorf("error setting configuration: %w", err)
}

Expand Down

0 comments on commit 2516887

Please sign in to comment.