-
Notifications
You must be signed in to change notification settings - Fork 749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
re-architecting around gateways and sensors #92
Conversation
…ing, making sensor repeatable
Couple things:
Overall, this looks really cool, I'm excited to try this out. |
Makefile
Outdated
DOCKER_PUSH=false | ||
IMAGE_NAMESPACE=argoproj | ||
DOCKER_PUSH=true | ||
IMAGE_NAMESPACE=metalgearsolid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its my registry name :) . The changes such as registry name, adding namespaces to manifests etc were all part of local testing and I had planned to remove them before resolving the WIP status.
common/common.go
Outdated
GatewayTransformerConfigMapEnvVar = "GATEWAY_TRANSFORMER_CONFIG_MAP" | ||
|
||
// GatewayEventTransformerImage is image for gateway event transformer | ||
GatewayEventTransformerImage = "metalgearsolid/gateway-transformer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above comment for namespace
common/util.go
Outdated
|
||
// SendSuccessResponse sends http success response | ||
func SendSuccessResponse(writer *http.ResponseWriter) { | ||
(*writer).WriteHeader(http.StatusOK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do you need to dereference the writer
here?
kind: ConfigMap | ||
metadata: | ||
name: gateway-controller-configmap | ||
namespace: argo-events |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove the namespace references from manifests..
hack/update-codegen.sh
Outdated
bash -x ${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \ | ||
github.com/argoproj/argo-events/pkg/client github.com/argoproj/argo-events/pkg/apis \ | ||
github.com/argoproj/argo-events/pkg/sensor-client github.com/argoproj/argo-events/pkg/apis \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should be using github.com/argoproj/argo-events/pkg/client
for both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
pkg/apis/gateway/v1alpha1/types.go
Outdated
type Gateway struct { | ||
metav1.TypeMeta `json:",inline"` | ||
metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"` | ||
Status NodePhase `json:"type" protobuf:"bytes,2,opt,name=type"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be of type GatewayStatus
instead? make sure to edit the annotations to match the name as well!
pkg/apis/gateway/v1alpha1/types.go
Outdated
Sensors []string `json:"sensors" protobuf:"bytes,8,opt,name=sensors"` | ||
|
||
// ServiceAccountName is name of service account to run the gateway | ||
ServiceAccountName string `json:"serviceAccountName" protobuf:"bytes,9,opt,name=service_account_name"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the protobuf naming scheme should be camelCase: serviceAccountName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup.
@@ -25,18 +25,6 @@ import ( | |||
"k8s.io/apimachinery/pkg/apis/meta/v1" | |||
) | |||
|
|||
// SignalType is the type of the signal | |||
type SignalType string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great to see this go!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
package structure much better!
only major concern is the introduction of the registered*
map in the configuration of each gateway. Should these store state about the Gateways? What happens when a Gateway goes down?
controllers/gateway/operator.go
Outdated
case v1alpha1.NodePhaseNew: | ||
|
||
goc.markGatewayPhase(v1alpha1.NodePhaseNew, "new") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems redundant as the phase is already new
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops. got carried away there marking gateway phases :)
controllers/gateway/operator.go
Outdated
// Update node phase to running | ||
goc.gw.Status = v1alpha1.NodePhaseRunning | ||
goc.gw.Status.Phase = v1alpha1.NodePhaseRunning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
un-needed, you update the phase at the end of operation and this update happens in the markGatewayPhase
function anyway..
controllers/gateway/operator.go
Outdated
return err | ||
} | ||
} | ||
goc.markGatewayPhase(v1alpha1.NodePhaseError, "failed to create transformer gateway configuration") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's useful to include the err
in the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
controllers/gateway/operator.go
Outdated
@@ -159,22 +168,13 @@ func (goc *gwOperationCtx) operate() error { | |||
_, err = goc.controller.kubeClientset.AppsV1().Deployments(goc.gw.Namespace).Create(gatewayDeployment) | |||
if err != nil { | |||
goc.log.Error().Err(err).Msg("failed gateway deployment") | |||
goc.gw.Status = v1alpha1.NodePhaseError | |||
goc.markGatewayPhase(v1alpha1.NodePhaseError, "failed gateway deployment") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same deal with including err
in the message
controllers/gateway/operator.go
Outdated
} else { | ||
goc.gw.Status = v1alpha1.NodePhaseRunning | ||
// expose gateway if service is configured | ||
if goc.gw.Spec.Service.Port != 0 { | ||
goc.createGatewayService() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we fail silently here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the service is not the absolutely critical part of the gateway. An user can create the service manually in the event of service creation failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have a status phase to represent this possible condition state? if not, we should so when the user investigates why stuff is not working, he knows how to resolve.
gateways/core/artifact/s3.go
Outdated
registeredArtifacts []S3Artifact | ||
gatewayConfig *gateways.GatewayConfig | ||
// registeredArtifacts contains map of registered s3 artifacts | ||
registeredArtifacts map[uint64]*S3Artifact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of this map? it doesn't look like we're saving anything in this? also the gateway doesn't want to be storing state correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't be storing the state of the gateway but rather the configurations. A gateway can be configured with multiple configurations. Each configuration is run in different go routine and produces the events. The idea is to use a single gateway among multiple use cases that differ only in configurations.
We are watching gateway-configmap to dynamically add new configuration and the purpose of this map is to keep track of existing configurations of gateway. Whenever an user updates the configmap with new configuration, this map is used to first check whether the configuration already exists or is it a completely new configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so what happens when a gateway goes down and loses this information? does it pick up the existing configurations from the configMap and restart the necessary go routines?
It makes sense to add number of completions count in sensor #6 |
I was looking for a fast JSON output logger and came across zerolog. From the benchmarking posted on their github, it performs better than uber/zap. |
controllers/gateway/validate.go
Outdated
if len(goc.gw.Spec.Sensors) <= 0 { | ||
return fmt.Errorf("no associated sensor with gateway") | ||
} | ||
if goc.gw.Spec.ServiceAccountName == "" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use default service account instead
controllers/gateway/operator.go
Outdated
} | ||
} | ||
|
||
// persist the updates to the Sensor resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update comment, remove sensor resource
Added K8 events for gateway. |
K8 events currently work as mechanism for gateway to update it's state and avoid synchronization within multiple go routines altogether and remove need to have multiple gateway resource watchers. Event watcher currently sit within gateway processor. I am thinking of moving it to Gateway controller. Event watcher in gateway processor listens to k8 events that are specific to that gateway, but Gateway operator will listen to k8 events of type I am also thinking of watching k8 events to maintain Sensor state at Sensor controller. This will remove necessity to watch sensor resource in sensor pod. Same trade-off applies here as well. |
…ding support for NATS and Kafka dispatch mechanism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really cool stuff. I put some questions / comments here, but looking good! thanks for the documentation and diagrams!
common/util.go
Outdated
@@ -90,3 +92,21 @@ func SendErrorResponse(writer http.ResponseWriter) { | |||
writer.WriteHeader(http.StatusBadRequest) | |||
writer.Write([]byte(ErrorResponse)) | |||
} | |||
|
|||
// RandomStringGenerator generates a random string | |||
func RandomStringGenerator() string { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there not a library out there to do this?
return ce, nil | ||
} | ||
|
||
// dispatches the event to configured sensor | ||
// getWatcherIP returns IP of service which backs given component. | ||
func (toc *tOperationCtx) getWatcherIP(name string) (string, string, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method seems a bit odd to me.. shouldn't we be using DNS with service names instead of inspected for IP addresses?
|
||
For detailed implementation, check out [Calendar HTTP gateway](https://github.com/argoproj/argo-events/tree/eventing/gateways/rest/calendar) | ||
|
||
###### Advantage of writing a gRPC or http gateway is that you can use other languages for implementation of gateway processor server. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this an advantage for all types of gateways? as long as the gateway processor server has a library to interface with the gateway client.. be that NATS, Kafka, HTTP. you just need a supported protocol + infra (if needed in case of streams)
…e whenever configuration is removed from gateway config map. remaned workflows in example sensors.
common/k8-events.go
Outdated
}, | ||
ObjectMeta: metav1.ObjectMeta{ | ||
Namespace: event.Namespace, | ||
Name: event.Name + "-" + RandomStringGenerator(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you not just use GenerateName
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, had it changed in my local. I'll push some new changes with test cases
controllers/sensor/signal-filter.go
Outdated
|
||
if timeFilter.Start != "" && timeFilter.Stop != "" { | ||
se.log.Info().Str("start time format", currentTStr + " " + timeFilter.Start).Msg("start time format") | ||
startTime, err := time.Parse("2006-01-02 15:04:05", currentTStr + " " + timeFilter.Start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's make the format string 2006-01-02 15:04:05
a constant since we use it more than once and seeing this may be confusing for some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done 👍
removing WIP status |
ready to merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good! I'm going to create a follow-up ticket to specifically address writing better unit tests and integration tests.
Good work on this @VaibhavPage ! Excited to get this out.
@@ -100,12 +100,12 @@ func (toc *tOperationCtx) transform(r *http.Request) (*sv1alpha.Event, error) { | |||
Context: sv1alpha.EventContext{ | |||
CloudEventsVersion: common.CloudEventsVersion, | |||
EventID: fmt.Sprintf("%x", eventId), | |||
ContentType: r.Header.Get(common.HeaderContentType), | |||
EventTime: metav1.Time{Time: time.Now().UTC()}, | |||
ContentType: "application/json", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know this to always be true?
@@ -88,7 +90,7 @@ func (c *SensorController) processNextItem() bool { | |||
|
|||
obj, exists, err := c.informer.GetIndexer().GetByKey(key.(string)) | |||
if err != nil { | |||
log.Warnf("failed to get sensor '%s' from informer index: %+v", key, err) | |||
fmt.Errorf("failed to get sensor '%s' from informer index: %+v", key, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be using a logger instead of just printing?
@@ -10,6 +10,6 @@ func TestGwOperationCtx_validate(t *testing.T) { | |||
gateway, err := getGateway() | |||
assert.Nil(t, err) | |||
goc := newGatewayOperationCtx(gateway, fakeController) | |||
err = goc.validate() | |||
err = Validate(goc.gw) | |||
assert.Nil(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably beef up this unit tests at some point later.. we can make a note of it and address in a future ticket.
* added gateway for processing outside events, making gateway long running, making sensor repeatable * refactoring k8s manifests * adding s3 gateway * making s3 and webhook gateway mult configurable * refactor of calendar, s3 and webhook gateway done. started with streams * added nats gateway * renamed signals to gateways. moved out-of-box gateways to core. * refactor gateways * removing signals folder * added kafka gateway * gateway and sensor dirs under a common controllers and clients directory * adding resource, amqp and mqtt gateways. refactored sensor watcher * adding gateway examples, readme etc * adding gateway docs. refactor gateway and sensor examples * added watcher for sensor. Completion count for sensor. Updated gateway doc * sensor's signal are gateway-name/configuration-name * making gateway processor as gRPC server * giving user options to implement gateway either by mimicing core gateways, a grpc server or completely custom implementation * gateway spec changed. divided extending gateways into 3 parts. started updating gateway examples * updated gateway examples with deploySpec. started writing custom gateway doc. * adding wiki for writing custom gateways * updating wiki. grpc gateway updated DialOptions * added REST gateways. wiki to follow * refactor rest gateways * added k8 events for gateway. Gateway configuration state is now maintained using k8 events * minor refactor in k8 events * added gateways as watchers for gateway along with sensors. started adding support for NATS and Kafka dispatch mechanism * added k8 event for escalation. nodes are deleted from gateway resource whenever configuration is removed from gateway config map. remaned workflows in example sensors. * added unit tests. refactored sensor notification handler. added k8 events for sensor * adding test cases for sensor controller, operator and notification handler * updating webhook gateway * fixed issue with syncing gateway nodes and gateway configmap * test: cleaning up test cases * update docker registry to argoproj * exposing gateway and sensor validate method * update dependencies
No description provided.