Skip to content

Commit

Permalink
feat(worker): add with temp url for worker cache and fix issues (#3147)
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjjj authored and fsamin committed Aug 7, 2018
1 parent 7aa9b83 commit b9b7cf1
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 153 deletions.
1 change: 1 addition & 0 deletions engine/api/api_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (api *API) InitRouter() {

// Cache
r.Handle("/project/{permProjectKey}/cache/{tag}", r.POSTEXECUTE(api.postPushCacheHandler, NeedWorker()), r.GET(api.getPullCacheHandler, NeedWorker()))
r.Handle("/project/{permProjectKey}/cache/{tag}/url", r.POSTEXECUTE(api.postPushCacheWithTempURLHandler, NeedWorker()), r.GET(api.getPullCacheWithTempURLHandler, NeedWorker()))

// Hooks
r.Handle("/project/{key}/application/{permApplicationName}/hook", r.GET(api.getApplicationHooksHandler))
Expand Down
68 changes: 68 additions & 0 deletions engine/api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,71 @@ func (api *API) getPullCacheHandler() Handler {
return nil
}
}

func (api *API) postPushCacheWithTempURLHandler() Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
projectKey := vars["permProjectKey"]
tag := vars["tag"]

// check tag name pattern
regexp := sdk.NamePatternRegex
if !regexp.MatchString(tag) {
return sdk.ErrInvalidName
}

store, ok := objectstore.Storage().(objectstore.DriverWithRedirect)
if !ok {
return sdk.WrapError(sdk.ErrNotImplemented, "postPushCacheWithTempURLHandler> cast error")
}

cacheObject := sdk.Cache{
Name: "cache.tar",
Project: projectKey,
Tag: tag,
}

url, key, errO := store.StoreURL(&cacheObject)
if errO != nil {
return sdk.WrapError(errO, "postPushCacheWithTempURLHandler>Cannot store cache")
}
cacheObject.TmpURL = url
cacheObject.SecretKey = key

return WriteJSON(w, cacheObject, http.StatusOK)
}
}

func (api *API) getPullCacheWithTempURLHandler() Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
vars := mux.Vars(r)
projectKey := vars["permProjectKey"]
tag := vars["tag"]

// check tag name pattern
regexp := sdk.NamePatternRegex
if !regexp.MatchString(tag) {
return sdk.ErrInvalidName
}

store, ok := objectstore.Storage().(objectstore.DriverWithRedirect)
if !ok {
return sdk.WrapError(sdk.ErrNotImplemented, "getPullCacheWithTempURLHandler> cast error")
}

cacheObject := sdk.Cache{
Name: "cache.tar",
Project: projectKey,
Tag: tag,
}

url, key, errF := store.FetchURL(&cacheObject)
if errF != nil {
return sdk.WrapError(errF, "getPullCacheWithTempURLHandler> Cannot get tmp URL")
}
cacheObject.TmpURL = url
cacheObject.SecretKey = key

return WriteJSON(w, cacheObject, http.StatusOK)
}
}
3 changes: 1 addition & 2 deletions engine/hatchery/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {

var errCl error
var clientset *kubernetes.Clientset
k8sTimeout := time.Second * 5
k8sTimeout := time.Second * 10
if h.Config.KubernetesConfigFile != "" {
cfg, err := clientcmd.BuildConfigFromFlags(h.Config.KubernetesMasterURL, h.Config.KubernetesConfigFile)
if err != nil {
Expand All @@ -82,7 +82,6 @@ func (h *HatcheryKubernetes) ApplyConfiguration(cfg interface{}) error {
if err != nil {
return sdk.WrapError(err, "Cannot build config from config getter")
}
configK8s.Host = h.Config.KubernetesMasterURL
configK8s.Timeout = k8sTimeout

if h.Config.KubernetesCertAuthData != "" {
Expand Down
84 changes: 52 additions & 32 deletions engine/worker/cmd_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func cachePullCmd(w *currentWorker) func(cmd *cobra.Command, args []string) {
sdk.Exit("cache pull HTTP error %v\n", err)
}
cdsError := sdk.DecodeError(body)
sdk.Exit("Error: %v\n", cdsError)
sdk.Exit("Error: %v -> %s\n", cdsError, string(body))
}

fmt.Printf("Worker cache pull with success (tag: %s)\n", args[0])
Expand Down Expand Up @@ -331,55 +331,75 @@ func (wk *currentWorker) cachePullHandler(w http.ResponseWriter, r *http.Request

tr := tar.NewReader(bts)
for {
hdr, err := tr.Next()
if err == io.EOF {
header, errH := tr.Next()
if errH == io.EOF {
break
}
if err != nil {
err = sdk.Error{
Message: "worker cache pull > Unable to read tar file : " + err.Error(),

if errH != nil {
errH = sdk.Error{
Message: "worker cache pull > Unable to read tar file : " + errH.Error(),
Status: http.StatusBadRequest,
}
writeError(w, r, err)
writeJSON(w, errH, http.StatusBadRequest)
return
}

if hdr == nil {
if header == nil {
continue
}

target := filepath.Join(path, hdr.Name)
if _, errS := os.Stat(filepath.Dir(target)); errS != nil {
if errM := os.MkdirAll(filepath.Dir(target), 0755); errM != nil {
errM = sdk.Error{
Message: "worker cache pull > Cannot create directory : " + errM.Error(),
// the target location where the dir/file should be created
target := filepath.Join(path, header.Name)

// check the file type
switch header.Typeflag {
// if its a dir and it doesn't exist create it
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
if err := os.MkdirAll(target, 0755); err != nil {
err = sdk.Error{
Message: "worker cache pull > Unable to mkdir all files : " + err.Error(),
Status: http.StatusInternalServerError,
}
writeJSON(w, err, http.StatusInternalServerError)
return
}
}
case tar.TypeSymlink:
if err := os.Symlink(header.Linkname, target); err != nil {
err = sdk.Error{
Message: "worker cache pull > Unable to create symlink : " + err.Error(),
Status: http.StatusInternalServerError,
}
writeError(w, r, errM)
writeJSON(w, err, http.StatusInternalServerError)
return
}
}

f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(hdr.Mode))
if err != nil {
err = sdk.Error{
Message: "worker cache pull > Cannot create file: " + err.Error(),
Status: http.StatusInternalServerError,
// if it's a file create it
case tar.TypeReg, tar.TypeLink:
f, err := os.OpenFile(target, os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
err = sdk.Error{
Message: "worker cache pull > Unable to open file : " + err.Error(),
Status: http.StatusInternalServerError,
}
writeJSON(w, err, http.StatusInternalServerError)
return
}
writeError(w, r, err)
return
}

// copy over contents
if _, err := io.Copy(f, tr); err != nil {
f.Close()
err = sdk.Error{
Message: "worker cache pull > Cannot copy content file : " + err.Error(),
Status: http.StatusInternalServerError,
// copy over contents
if _, err := io.Copy(f, tr); err != nil {
_ = f.Close()
err = sdk.Error{
Message: "worker cache pull > Cannot copy content file : " + err.Error(),
Status: http.StatusInternalServerError,
}
writeJSON(w, err, http.StatusInternalServerError)
return
}
writeError(w, r, err)
return

_ = f.Close()
}
f.Close()
}
}
Loading

0 comments on commit b9b7cf1

Please sign in to comment.