Skip to content

Commit

Permalink
feat: add osm collector
Browse files Browse the repository at this point in the history
Signed-off-by: Sanya Kochhar <[email protected]>
Co-authored-by: Johnson Shi <[email protected]>
Co-authored-by: Shalier Xia <[email protected]>
  • Loading branch information
3 people committed Jun 14, 2021
1 parent 147b24b commit 68730dc
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 25 deletions.
32 changes: 21 additions & 11 deletions cmd/aks-periscope/aks-periscope.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func main() {
log.Fatalf("Failed to create CRD: %v", err)
}

clusterType := os.Getenv("CLUSTER_TYPE")
collectorList := strings.Fields(os.Getenv("COLLECTOR_LIST"))

// Copies self-signed cert information to container if application is running on Azure Stack Cloud.
// We need the cert in order to communicate with the storage account.
Expand All @@ -43,26 +43,27 @@ func main() {
kubeletCmdCollector := collector.NewKubeletCmdCollector(exporter)
systemPerfCollector := collector.NewSystemPerfCollector(exporter)
helmCollector := collector.NewHelmCollector(exporter)
osmCollector := collector.NewOsmCollector(exporter)

if strings.EqualFold(clusterType, "connectedCluster") {
collectors = append(collectors, containerLogsCollector)
collectors = append(collectors, dnsCollector)
collectors = append(collectors, helmCollector)
collectors = append(collectors, kubeObjectsCollector)
collectors = append(collectors, networkOutboundCollector)
collectors = append(collectors, containerLogsCollector)
collectors = append(collectors, dnsCollector)
collectors = append(collectors, kubeObjectsCollector)
collectors = append(collectors, networkOutboundCollector)

if contains(collectorList, "connectedCluster") {
collectors = append(collectors, helmCollector)
} else {
collectors = append(collectors, containerLogsCollector)
collectors = append(collectors, dnsCollector)
collectors = append(collectors, kubeObjectsCollector)
collectors = append(collectors, networkOutboundCollector)
collectors = append(collectors, systemLogsCollector)
collectors = append(collectors, ipTablesCollector)
collectors = append(collectors, nodeLogsCollector)
collectors = append(collectors, kubeletCmdCollector)
collectors = append(collectors, systemPerfCollector)
}

if contains(collectorList, "OSM") {
collectors = append(collectors, osmCollector)
}

collectorGrp := new(sync.WaitGroup)

for _, c := range collectors {
Expand Down Expand Up @@ -158,3 +159,12 @@ func zipAndExport(exporter interfaces.Exporter) error {
//nolint:govet
return nil
}

func contains(flagsList []string, flag string) bool {
for _, f := range flagsList {
if strings.EqualFold(f, flag) {
return true
}
}
return false
}
20 changes: 13 additions & 7 deletions deployment/aks-periscope.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ metadata:
name: aks-periscope-role
rules:
- apiGroups: ["","metrics.k8s.io"]
resources: ["pods", "nodes", "secrets"]
verbs: ["get", "watch", "list"]
resources: ["pods", "pods/portforward", "nodes", "secrets"]
verbs: ["get", "watch", "list", "create"]
- apiGroups: ["aks-periscope.azure.github.com"]
resources: ["diagnostics"]
verbs: ["get", "watch", "list", "create", "patch"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations", "validatingwebhookconfigurations"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
Expand Down Expand Up @@ -83,7 +89,7 @@ spec:
- configMapRef:
name: nodelogs-config
- configMapRef:
name: clustertype-config
name: collectors-config
- secretRef:
name: azureblob-secret
volumeMounts:
Expand Down Expand Up @@ -136,13 +142,13 @@ metadata:
data:
DIAGNOSTIC_NODELOGS_LIST: /var/log/azure/cluster-provision.log /var/log/cloud-init.log
---
apiVersion: v1
kind: ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: clustertype-config
name: collectors-config
namespace: aks-periscope
data:
CLUSTER_TYPE: "managedCluster" # <custom flag>
COLLECTOR_LIST: managedCluster # <custom flag> connectedCluster OSM
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
Expand Down
36 changes: 34 additions & 2 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package collector

import "github.com/Azure/aks-periscope/pkg/interfaces"
import (
"path/filepath"

"github.com/Azure/aks-periscope/pkg/interfaces"
"github.com/Azure/aks-periscope/pkg/utils"
)

// Type defines Collector Type
type Type int
Expand All @@ -22,6 +27,8 @@ const (
NetworkOutbound
// NodeLogs defines NodeLogs Collector Type
NodeLogs
// Osm defines Open Service Mesh Collector Type
Osm
// SystemLogs defines SystemLogs Collector Type
SystemLogs
// SystemPerf defines SystemPerf Collector Type
Expand All @@ -30,7 +37,7 @@ const (

// Name returns type name
func (t Type) name() string {
return [...]string{"dns", "containerlogs", "helm", "iptables", "kubeletcmd", "kubeobjects", "networkoutbound", "nodelogs", "systemlogs", "systemperf"}[t]
return [...]string{"dns", "containerlogs", "helm", "iptables", "kubeletcmd", "kubeobjects", "networkoutbound", "nodelogs", "osm", "systemlogs", "systemperf"}[t]
}

// BaseCollector defines Base Collector
Expand Down Expand Up @@ -69,3 +76,28 @@ func (b *BaseCollector) Export() error {

return nil
}

// CollectKubectlOutputToCollectorFiles collects output of a given kubectl command to a file.
// Returns kubectl's stderr output if stdout output is empty.
func (b *BaseCollector) CollectKubectlOutputToCollectorFiles(rootPath string, fileName string, kubeCmds []string) error {
outputStreams, err := utils.RunCommandOnContainerWithOutputStreams("kubectl", kubeCmds...)
if err != nil {
return err
}

// If kubectl stdout output is empty, i.e., there is no resource of this type within the cluster
// the absence of this resource is logged in the file with the relevant message from stderr (Ex: "No resource found...").
output := outputStreams.Stdout
if len(output) == 0 {
output = outputStreams.Stderr
}

resourceFile := filepath.Join(rootPath, fileName)
if err = utils.WriteToFile(resourceFile, output); err != nil {
return err
}

b.AddToCollectorFiles(resourceFile)

return nil
}
189 changes: 189 additions & 0 deletions pkg/collector/osm_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package collector

import (
"log"
"path/filepath"
"regexp"

"github.com/Azure/aks-periscope/pkg/interfaces"
"github.com/Azure/aks-periscope/pkg/utils"
)

// OsmCollector defines an Osm Collector struct
type OsmCollector struct {
BaseCollector
}

var _ interfaces.Collector = &OsmCollector{}

// NewOsmCollector is a constructor
func NewOsmCollector(exporter interfaces.Exporter) *OsmCollector {
return &OsmCollector{
BaseCollector: BaseCollector{
collectorType: Osm,
exporter: exporter,
},
}
}

// Collect implements the interface method
func (collector *OsmCollector) Collect() error {
// Get all OSM deployments in order to collect information for various resources across all meshes in the cluster
meshList, err := utils.GetResourceList([]string{"get", "deployments", "--all-namespaces", "-l", "app=osm-controller", "-o", "jsonpath={..meshName}"}, " ")
if err != nil {
return err
}

// Directory where OSM logs will be written to
rootPath, err := utils.CreateCollectorDir(collector.GetName())
if err != nil {
return err
}

for _, meshName := range meshList {
meshRootPath := filepath.Join(rootPath, "mesh_"+meshName)

monitoredNamespaces, err := utils.GetResourceList([]string{"get", "namespaces", "--all-namespaces", "-l", "openservicemesh.io/monitored-by=" + meshName, "-o", "jsonpath={..name}"}, " ")
if err != nil {
log.Printf("Failed to find any namespaces monitored by OSM named '%s': %+v\n", meshName, err)
}
controllerNamespaces, err := utils.GetResourceList([]string{"get", "deployments", "--all-namespaces", "-l", "app=osm-controller,meshName=" + meshName, "-o", "jsonpath={..metadata.namespace}"}, " ")
if err != nil {
log.Printf("Failed to find controller namespace(s) for OSM named '%s': %+v\n", meshName, err)
}
callNamespaceCollectors(collector, monitoredNamespaces, controllerNamespaces, meshRootPath, meshName)
collectGroundTruth(collector, meshRootPath, meshName)
}
return nil
}

// callNamespaceCollectors calls functions to collect data for osm-controller namespace and namespaces monitored by a given mesh
func callNamespaceCollectors(collector *OsmCollector, monitoredNamespaces []string, controllerNamespaces []string, rootPath string, meshName string) {
for _, namespace := range monitoredNamespaces {
namespaceRootPath := filepath.Join(rootPath, "namespace_"+namespace)
if err := collectDataFromEnvoys(collector, namespaceRootPath, namespace); err != nil {
log.Printf("Failed to collect Envoy configs in OSM monitored namespace %s: %+v", namespace, err)
}
collectNamespaceResources(collector, namespaceRootPath, namespace)
}
for _, namespace := range controllerNamespaces {
namespaceRootPath := filepath.Join(rootPath, "controller_namespace_"+namespace)
if err := collectPodLogs(collector, namespaceRootPath, namespace); err != nil {
log.Printf("Failed to collect pod logs for controller namespace %s: %+v", namespace, err)
}
collectNamespaceResources(collector, namespaceRootPath, namespace)
}
}

// collectNamespaceResources collects information about general resources in a given namespace
func collectNamespaceResources(collector *OsmCollector, rootPath string, namespace string) {
if err := collectPodConfigs(collector, rootPath, namespace); err != nil {
log.Printf("Failed to collect pod configs for ns %s: %+v", namespace, err)
}

var namespaceResourcesMap = map[string][]string{
"metadata.json": {"get", "namespaces", namespace, "-o", "jsonpath={..metadata}", "-o", "json"},
"services_list.tsv": {"get", "services", "-n", namespace, "-o", "wide"},
"services.json": {"get", "services", "-n", namespace, "-o", "json"},
"endpoints_list.tsv": {"get", "endpoints", "-n", namespace, "-o", "wide"},
"endpoints.json": {"get", "endpoints", "-n", namespace, "-o", "json"},
"configmaps_list.tsv": {"get", "configmaps", "-n", namespace, "-o", "wide"},
"configmaps.json": {"get", "configmaps", "-n", namespace, "-o", "json"},
"ingresses_list.tsv": {"get", "ingresses", "-n", namespace, "-o", "wide"},
"ingresses.json": {"get", "ingresses", "-n", namespace, "-o", "json"},
"service_accounts_list.tsv": {"get", "serviceaccounts", "-n", namespace, "-o", "wide"},
"service_accounts.json": {"get", "serviceaccounts", "-n", namespace, "-o", "json"},
"pods_list.tsv": {"get", "pods", "-n", namespace, "-o", "wide"},
}
for fileName, kubeCmds := range namespaceResourcesMap {
if err := collector.CollectKubectlOutputToCollectorFiles(rootPath, fileName, kubeCmds); err != nil {
log.Printf("Failed to collect %s in OSM monitored namespace %s: %+v", fileName, namespace, err)
}
}
}

// collectPodConfigs collects configs for pods in given namespace
func collectPodConfigs(collector *OsmCollector, rootPath string, namespace string) error {
rootPath = filepath.Join(rootPath, "pod_configs")
pods, err := utils.GetResourceList([]string{"get", "pods", "-n", namespace, "-o", "jsonpath={..metadata.name}"}, " ")
if err != nil {
return err
}
for _, podName := range pods {
kubeCmds := []string{"get", "pods", "-n", namespace, podName, "-o", "json"}
if err := collector.CollectKubectlOutputToCollectorFiles(rootPath, podName+".json", kubeCmds); err != nil {
log.Printf("Failed to collect config for pod %s in OSM monitored namespace %s: %+v", podName, namespace, err)
}
}
return nil
}

// collectDataFromEnvoys collects Envoy proxy config for pods in monitored namespace: port-forward and curl config dump
func collectDataFromEnvoys(collector *OsmCollector, rootPath string, namespace string) error {
pods, err := utils.GetResourceList([]string{"get", "pods", "-n", namespace, "-o", "jsonpath={..metadata.name}"}, " ")
if err != nil {
return err
}
for _, podName := range pods {
pid, err := utils.RunBackgroundCommand("kubectl", "port-forward", podName, "-n", namespace, "15000:15000")
if err != nil {
log.Printf("Failed to collect Envoy config for pod %s in OSM monitored namespace %s: %+v", podName, namespace, err)
continue
}

envoyQueries := [5]string{"config_dump", "clusters", "listeners", "ready", "stats"}
for _, query := range envoyQueries {
responseBody, err := utils.GetUrlWithRetries("http://localhost:15000/"+query, 5)
if err != nil {
log.Printf("Failed to collect Envoy %s for pod %s in OSM monitored namespace %s: %+v", query, podName, namespace, err)
continue
}
// Remove certificate secrets from Envoy config i.e., "inline_bytes" field from response
re := regexp.MustCompile("(?m)[\r\n]+^.*inline_bytes.*$")
secretRemovedResponse := re.ReplaceAllString(string(responseBody), "---redacted---")

fileName := query + "_" + podName + ".txt"
resourceFile := filepath.Join(rootPath, "envoy_data", fileName)
if err = utils.WriteToFile(resourceFile, secretRemovedResponse); err != nil {
log.Printf("Failed to write to file: %+v", err)
continue
}
collector.AddToCollectorFiles(resourceFile)
}
if err = utils.KillProcess(pid); err != nil {
log.Printf("Failed to kill process: %+v", err)
continue
}
}
return nil
}

// collectPodLogs collects logs of every pod in a given namespace
func collectPodLogs(collector *OsmCollector, rootPath string, namespace string) error {
rootPath = filepath.Join(rootPath, "pod_logs")
pods, err := utils.GetResourceList([]string{"get", "pods", "-n", namespace, "-o", "jsonpath={..metadata.name}"}, " ")
if err != nil {
return err
}
for _, podName := range pods {
if err := collector.CollectKubectlOutputToCollectorFiles(rootPath, podName+".log", []string{"logs", "-n", namespace, podName}); err != nil {
log.Printf("Failed to collect logs for pod %s: %+v", podName, err)
}
}
return nil
}

// collectGroundTruth collects ground truth on resources in given mesh
func collectGroundTruth(collector *OsmCollector, rootPath string, meshName string) {
var groundTruthMap = map[string][]string{
"all_resources_list.tsv": {"get", "all", "--all-namespaces", "-l", "app.kubernetes.io/instance=" + meshName, "-o", "wide"},
"all_resources_configs.json": {"get", "all", "--all-namespaces", "-l", "app.kubernetes.io/instance=" + meshName, "-o", "json"},
"mutating_webhook_configurations.json": {"get", "MutatingWebhookConfiguration", "--all-namespaces", "-l", "app.kubernetes.io/instance=" + meshName, "-o", "json"},
"validating_webhook_configurations.json": {"get", "ValidatingWebhookConfiguration", "--all-namespaces", "-l", "app.kubernetes.io/instance=" + meshName, "-o", "json"},
}
for fileName, kubeCmds := range groundTruthMap {
if err := collector.CollectKubectlOutputToCollectorFiles(rootPath, fileName, kubeCmds); err != nil {
log.Printf("Failed to collect %s for OSM: %+v", fileName, err)
}
}
}
Loading

0 comments on commit 68730dc

Please sign in to comment.