Skip to content
This repository has been archived by the owner on Sep 2, 2022. It is now read-only.

Native per job #237

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
578 changes: 517 additions & 61 deletions README.md

Large diffs are not rendered by default.

81 changes: 81 additions & 0 deletions api/v1beta1/flinkcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,79 @@ type JobSpec struct {
CancelRequested *bool `json:"cancelRequested,omitempty"`
}

// NativeSessionClusterJobSpec defines properties of a Native Flink session cluster.
// The properties in NativeSessionClusterJobSpec comes from
// https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html#kubernetes
type NativeSessionClusterJobSpec struct {
// kubernetes.cluster-id. The cluster id used for identifying the unique flink cluster.
// We use the Name of flinkCluster.ObjectMeta.Name
FlinkClusterID string `json:"flinkClusterID,omitempty"`

// kubernetes.config.file. The kubernetes config file will be used to create the client.
// The default is located at ~/.kube/config. The sericeaccount in the pod also works.
KubeConfig *string `json:"kubeConfig,omitempty"`

// kubernetes.container-start-command-template. Template for the kubernetes jobmanager
// and taskmanager container start invocation.
// Default: "%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%"
ContainerStartCommandTemplate *string `json:"containerStartCommandTemplate,omitempty"`

// kubernetes.entry.path. The entrypoint script of kubernetes in the image. It will be used as command for jobmanager and taskmanager container.
// Default: "/opt/flink/bin/kubernetes-entry.sh"
EntryPath *string `json:"entryPath,omitempty"`

// kubernetes.flink.conf.dir. The flink conf directory that will be mounted in pod.
// The flink-conf.yaml, log4j.properties, logback.xml in this path will be overwritten from config map.
// Default: "/opt/flink/conf"
CongfigDir *string `json:"congfigDir,omitempty"`

// kubernetes.flink.log.dir. The directory that logs of jobmanager and taskmanager be saved in the pod.
// Default: "/opt/flink/log".
LogDir *string `json:"logDir,omitempty"`

// kubernetes.jobmanager.cpu. The number of cpu used by job manager.
// Default: 1.0
CPUJobManager *int32 `json:"CPUJobManager,omitempty"`

// kubernetes.jobmanager.service-account. Service account that is used by jobmanager within kubernetes cluster.
// The job manager uses this service account when requesting taskmanager pods from the API server.
// Default: "default"
FlinkClusterSA *string `json:"flinkClusterSA,omitempty"`

// kubernetes.rest-service.exposed.type. It could be ClusterIP/NodePort/LoadBalancer(default).
// When set to ClusterIP, the rest service will not be created.
// Default: "LoadBalancer"
FlinkRestServiceType *string `json:"flinkRestServiceType,omitempty"`

// kubernetes.service.create-timeout. Timeout used for creating the service.
// The timeout value requires a time-unit specifier (ms/s/min/h/d).
// Default: "1 min"
FlinkServiceCreateTimeout *string `json:"flinkServiceCreateTimeout,omitempty"`

// kubernetes.taskmanager.cpu. The number of cpu used by task manager.
// By default, the cpu is set to the number of slots per TaskManager.
// Default: -1.0
TaskManagerCPU *int32 `json:"taskManagerCPU,omitempty"`
}

// NativeJobClusterJobSpec defines properties of a Native Flink job cluster.
type NativeJobClusterJobSpec struct {
// We use the Name of flinkCluster.ObjectMeta.Name
FlinkClusterID string `json:"flinkClusterID,omitempty"`
// JAR file of the job.
JarFile string `json:"jarFile"`
// Heap size
HeapSize *int32 `json:"heapSize,omitempty"`
// MemoryProcess size
MemoryProcessSize *int32 `json:"memoryProcessSize,omitempty"`
// MemoryProcess size
NumberOfTaskSlots *int32 `json:"numberOfTaskSlots,omitempty"`
// JobManager service account
JobManagerServiceAccount *string `json:"jobManagerServiceAccount,omitempty"`
// Rest service annotations
RestServiceAnnotations *string `json:"restServiceAnnotations,omitempty"`
}

// FlinkClusterSpec defines the desired state of FlinkCluster
type FlinkClusterSpec struct {
// Flink image spec for the cluster's components.
Expand All @@ -333,6 +406,14 @@ type FlinkClusterSpec struct {
// otherwise, it is a long-running Session Cluster.
Job *JobSpec `json:"job,omitempty"`

// (Optional) Native Flink session spec. If specified,
// this cluster is a Native Flink session(only jobmanager created in advanced.)
NativeSessionClusterJob *NativeSessionClusterJobSpec `json:"nativeSessionClusterJob,omitempty"`

// (Optional) Native Flink job cluster spec. If specified,
// this cluster is a Native Flink job cluster
NativeJobClusterJob *NativeJobClusterJobSpec `json:"nativeJobClusterJob,omitempty"`

// Environment variables shared by all JobManager, TaskManager and job
// containers.
EnvVars []corev1.EnvVar `json:"envVars,omitempty"`
Expand Down
18 changes: 18 additions & 0 deletions api/v1beta1/flinkcluster_validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func (v *Validator) ValidateCreate(cluster *FlinkCluster) error {
if err != nil {
return err
}
if cluster.Spec.NativeSessionClusterJob != nil {
// It's a native session cluster, will not to valide the jobManager, taskManager, etc.
return v.validateNativeSessionClusterJob()
}
if cluster.Spec.NativeJobClusterJob != nil {
// It's a native session cluster, will not to valide the jobManager, taskManager, etc.
return v.validateNativeJobClusterJob()
}
err = v.validateJobManager(&cluster.Spec.JobManager)
if err != nil {
return err
Expand Down Expand Up @@ -235,6 +243,16 @@ func (v *Validator) validateImage(imageSpec *ImageSpec) error {
return nil
}

func (v *Validator) validateNativeSessionClusterJob() error {
//TODO: Check if it's a need to validate the properties
return nil
}

func (v *Validator) validateNativeJobClusterJob() error {
//TODO: Check if it's a need to validate the properties
return nil
}

func (v *Validator) validateJobManager(jmSpec *JobManagerSpec) error {
var err error

Expand Down
6 changes: 5 additions & 1 deletion api/v1beta1/flinkcluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ var _ webhook.Defaulter = &FlinkCluster{}
// type.
func (cluster *FlinkCluster) Default() {
log.Info("default", "name", cluster.Name, "original", *cluster)
_SetDefault(cluster)
if cluster.Spec.NativeSessionClusterJob != nil || cluster.Spec.NativeJobClusterJob != nil {
log.Info("It's a Native luster, will not set defaults.")
} else {
_SetDefault(cluster)
}
log.Info("default", "name", cluster.Name, "augmented", *cluster)
}

Expand Down
115 changes: 115 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,16 @@ func (in *FlinkClusterSpec) DeepCopyInto(out *FlinkClusterSpec) {
*out = new(JobSpec)
(*in).DeepCopyInto(*out)
}
if in.NativeSessionClusterJob != nil {
in, out := &in.NativeSessionClusterJob, &out.NativeSessionClusterJob
*out = new(NativeSessionClusterJobSpec)
(*in).DeepCopyInto(*out)
}
if in.NativeJobClusterJob != nil {
in, out := &in.NativeJobClusterJob, &out.NativeJobClusterJob
*out = new(NativeJobClusterJobSpec)
(*in).DeepCopyInto(*out)
}
if in.EnvVars != nil {
in, out := &in.EnvVars, &out.EnvVars
*out = make([]v1.EnvVar, len(*in))
Expand Down Expand Up @@ -577,6 +587,111 @@ func (in *JobStatus) DeepCopy() *JobStatus {
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NativeJobClusterJobSpec) DeepCopyInto(out *NativeJobClusterJobSpec) {
*out = *in
if in.HeapSize != nil {
in, out := &in.HeapSize, &out.HeapSize
*out = new(int32)
**out = **in
}
if in.MemoryProcessSize != nil {
in, out := &in.MemoryProcessSize, &out.MemoryProcessSize
*out = new(int32)
**out = **in
}
if in.NumberOfTaskSlots != nil {
in, out := &in.NumberOfTaskSlots, &out.NumberOfTaskSlots
*out = new(int32)
**out = **in
}
if in.JobManagerServiceAccount != nil {
in, out := &in.JobManagerServiceAccount, &out.JobManagerServiceAccount
*out = new(string)
**out = **in
}
if in.RestServiceAnnotations != nil {
in, out := &in.RestServiceAnnotations, &out.RestServiceAnnotations
*out = new(string)
**out = **in
}
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NativeJobClusterJobSpec.
func (in *NativeJobClusterJobSpec) DeepCopy() *NativeJobClusterJobSpec {
if in == nil {
return nil
}
out := new(NativeJobClusterJobSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *NativeSessionClusterJobSpec) DeepCopyInto(out *NativeSessionClusterJobSpec) {
*out = *in
if in.KubeConfig != nil {
in, out := &in.KubeConfig, &out.KubeConfig
*out = new(string)
**out = **in
}
if in.ContainerStartCommandTemplate != nil {
in, out := &in.ContainerStartCommandTemplate, &out.ContainerStartCommandTemplate
*out = new(string)
**out = **in
}
if in.EntryPath != nil {
in, out := &in.EntryPath, &out.EntryPath
*out = new(string)
**out = **in
}
if in.CongfigDir != nil {
in, out := &in.CongfigDir, &out.CongfigDir
*out = new(string)
**out = **in
}
if in.LogDir != nil {
in, out := &in.LogDir, &out.LogDir
*out = new(string)
**out = **in
}
if in.CPUJobManager != nil {
in, out := &in.CPUJobManager, &out.CPUJobManager
*out = new(int32)
**out = **in
}
if in.FlinkClusterSA != nil {
in, out := &in.FlinkClusterSA, &out.FlinkClusterSA
*out = new(string)
**out = **in
}
if in.FlinkRestServiceType != nil {
in, out := &in.FlinkRestServiceType, &out.FlinkRestServiceType
*out = new(string)
**out = **in
}
if in.FlinkServiceCreateTimeout != nil {
in, out := &in.FlinkServiceCreateTimeout, &out.FlinkServiceCreateTimeout
*out = new(string)
**out = **in
}
if in.TaskManagerCPU != nil {
in, out := &in.TaskManagerCPU, &out.TaskManagerCPU
*out = new(int32)
**out = **in
}
}

// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NativeSessionClusterJobSpec.
func (in *NativeSessionClusterJobSpec) DeepCopy() *NativeSessionClusterJobSpec {
if in == nil {
return nil
}
out := new(NativeSessionClusterJobSpec)
in.DeepCopyInto(out)
return out
}

// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TaskManagerPorts) DeepCopyInto(out *TaskManagerPorts) {
*out = *in
Expand Down
28 changes: 28 additions & 0 deletions config/samples/flinkoperator_v1beta1_flinknativejobcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: native-flinkjobcluster-sample
spec:
image:
name: ccr.ccs.tencentyun.com/kinderyj/flink-test:nativeperjob1.10
pullPolicy: IfNotPresent
nativeJobClusterJob:
flinkClusterID: native-flinksessioncluster-sample
jarFile: /opt/flink/examples/streaming/WordCount.jar
hadoopConfig:
configMapName: hadoop
mountPath: /etc/hadoop
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: flinkoperator.k8s.io/v1beta1
kind: FlinkCluster
metadata:
name: native-flinksessioncluster-sample
spec:
image:
name: ccr.ccs.tencentyun.com/kinderyj/flink-test:1.10
pullPolicy: Always
nativeSessionClusterJob:
flinkClusterID: native-flinksessioncluster-sample
35 changes: 22 additions & 13 deletions controllers/flinkcluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (

// FlinkClusterReconciler reconciles a FlinkCluster object
type FlinkClusterReconciler struct {
Client client.Client
Log logr.Logger
Mgr ctrl.Manager
Client client.Client
Log logr.Logger
Mgr ctrl.Manager
}

// +kubebuilder:rbac:groups=flinkoperator.k8s.io,resources=flinkclusters,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -60,7 +60,7 @@ func (reconciler *FlinkClusterReconciler) Reconcile(
var log = reconciler.Log.WithValues(
"cluster", request.NamespacedName)
var handler = FlinkClusterHandler{
k8sClient: reconciler.Client,
k8sClient: reconciler.Client,
flinkClient: flinkclient.FlinkClient{
Log: log,
HTTPClient: flinkclient.HTTPClient{Log: log},
Expand Down Expand Up @@ -90,14 +90,14 @@ func (reconciler *FlinkClusterReconciler) SetupWithManager(
// FlinkClusterHandler holds the context and state for a
// reconcile request.
type FlinkClusterHandler struct {
k8sClient client.Client
flinkClient flinkclient.FlinkClient
request ctrl.Request
context context.Context
log logr.Logger
recorder record.EventRecorder
observed ObservedClusterState
desired DesiredClusterState
k8sClient client.Client
flinkClient flinkclient.FlinkClient
request ctrl.Request
context context.Context
log logr.Logger
recorder record.EventRecorder
observed ObservedClusterState
desired DesiredClusterState
}

func (handler *FlinkClusterHandler) reconcile(
Expand Down Expand Up @@ -184,7 +184,16 @@ func (handler *FlinkClusterHandler) reconcile(
} else {
log.Info("Desired state", "Job", "nil")
}

if desired.NativeClusterSessionJob != nil {
log.Info("Desired state", "NativeClusterSessionJob", *desired.NativeClusterSessionJob)
} else {
log.Info("Desired state", "NativeClusterSessionJob", "nil")
}
if desired.NativeJobClusterJob != nil {
log.Info("Desired state", "NativeJobClusterJob", *desired.NativeJobClusterJob)
} else {
log.Info("Desired state", "NativeJobClusterJob", "nil")
}
log.Info("---------- 4. Take actions ----------")

var reconciler = ClusterReconciler{
Expand Down
Loading