Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/batch chunks upload #200

Merged
merged 7 commits into from
Mar 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ The user must be the owner of the allocation.You can request the file be encrypt
| remotepath | yes | remote path to upload file to, use to access file later | | string |
| thumbnailpath | no | local path of thumbnaSil | | file path |
| chunksize | no | chunk size | 65536 | int |
| chunknumber | no | how many chunks should be uploaded in a http request | 1 | int |


<details>
Expand Down Expand Up @@ -729,10 +730,11 @@ The user must be the owner of the allocation.You can request the file be encrypt
| allocation | yes | allocation id, sender must be allocation owner | | string |
| commit | no | save metadata to blockchain | false | boolean |
| encrypt | no | encrypt file before upload | false | boolean |
| localpath | yes | local path of segment files to download, generate and upload | | file path |
| localpath | yes | local path of segment files to download, generate and upload | | file path |
| remotepath | yes | remote path to upload file to, use to access file later | | string |
| thumbnailpath | no | local path of thumbnaSil | | file path |
| chunksize | no | chunk size | 65536 | int |
| chunknumber | no | how many chunks should be uploaded in a http request | 1 | int |
| delay | no | set segment duration to seconds. | 5 | int |

<details>
Expand All @@ -755,10 +757,11 @@ The user must be the owner of the allocation.You can request the file be encrypt
| allocation | yes | allocation id, sender must be allocation owner | | string |
| commit | no | save metadata to blockchain | false | boolean |
| encrypt | no | encrypt file before upload | false | boolean |
| localpath | yes | local path of segment files to download, generate and upload | | file path |
| localpath | yes | local path of segment files to download, generate and upload | | file path |
| remotepath | yes | remote path to upload file to, use to access file later | | string |
| thumbnailpath | no | local path of thumbnaSil | | file path |
| chunksize | no | chunk size | 65536 | int |
| chunknumber | no | how many chunks should be uploaded in a http request | 1 | int |
| delay | no | set segment duration to seconds. | 5 | int |
| feed | no | set remote live feed to url. | false | url |
| downloader-args | no | pass args to youtube-dl to download video. default is \"-q -f best\". | -q -f best | [youtube-dl](https://github.com/ytdl-org/youtube-dl/blob/master/README.md#options)|
Expand Down Expand Up @@ -875,6 +878,8 @@ can update a file. To add collaborators to an allocation, use
| thumbnailpath | no | local fumbnail file to upload | | file path |
| commit | no | save meta data to blockchain | false | boolean |
| chunksize | no | chunk size | 65536 | int |
| chunknumber | no | how many chunks should be uploaded in a http request | 1 | int |


<details>
<summary>update</summary>
Expand Down Expand Up @@ -1169,11 +1174,12 @@ Only the allocation's owner can successfully run `sync`.
| allocation | yes | allocation id | | string |
| commit | no | commet metadata to blockchain | false | boolean |
| encryptpath | no | local directory path to be uploaded as encrypted | false | boolean |
| excludepath | no | paths to exclude from sync | | string array |
| excludepath | no | paths to exclude from sync | | string array |
| localchache | no | local chache of remote snapshot. Used for comparsion with remote. After sync will be updated. | | string |
| localpath | yes | local directory to which to sync | | file path |
| uploadonly | no | only upload and update files | false | boolean |
| chunksize | no | chunk size | 65536 | int |
| chunknumber | no | how many chunks should be uploaded in a http request | 1 | int |

<details>
<summary>sync</summary>
Expand Down
57 changes: 57 additions & 0 deletions cmd/createdir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package cmd

import (
"fmt"
"os"

"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/spf13/cobra"
)

var createDirCmd = &cobra.Command{
Use: "createdir",
Short: "Create directory",
Long: `Create directory`,
Args: cobra.MinimumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
fflags := cmd.Flags() // fflags is a *flag.FlagSet
if !fflags.Changed("allocation") { // check if the flag "path" is set
PrintError("Error: allocation flag is missing") // If not, we'll let the user know
os.Exit(1) // and return
}
if !fflags.Changed("dirname") {
PrintError("Error: dirname flag is missing")
os.Exit(1)
}

allocationID := cmd.Flag("allocation").Value.String()
allocationObj, err := sdk.GetAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
os.Exit(1)
}
dirname := cmd.Flag("dirname").Value.String()

if err != nil {
PrintError("CreateDir failed: ", err)
os.Exit(1)
}
err = allocationObj.CreateDir(dirname)

if err != nil {
PrintError("CreateDir failed: ", err)
os.Exit(1)
}

fmt.Println(dirname + " directory created")
},
}

func init() {

rootCmd.AddCommand(createDirCmd)
createDirCmd.PersistentFlags().String("allocation", "", "Allocation ID")
createDirCmd.PersistentFlags().String("dirname", "", "New directory name")
createDirCmd.MarkFlagRequired("allocation")
createDirCmd.MarkFlagRequired("dirname")
}
181 changes: 181 additions & 0 deletions cmd/feed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package cmd

import (
"context"
"log"
"os"
"path/filepath"
"strings"
"sync"

thrown "github.com/0chain/errors"
"github.com/0chain/gosdk/core/common"
"github.com/0chain/gosdk/zboxcore/fileref"
"github.com/0chain/gosdk/zboxcore/sdk"
"github.com/0chain/gosdk/zboxcore/zboxutil"
"github.com/0chain/zboxcli/util"
"github.com/spf13/cobra"
)

// feedCmd represents upload command with --sync flag
var feedCmd = &cobra.Command{
Use: "feed",
Short: "download segment files from remote live feed, and upload",
Long: "download segment files from remote live feed, and upload",
Args: cobra.MinimumNArgs(0),
Run: func(cmd *cobra.Command, args []string) {
fflags := cmd.Flags() // fflags is a *flag.FlagSet
if !fflags.Changed("allocation") { // check if the flag "path" is set
PrintError("Error: allocation flag is missing") // If not, we'll let the user know
os.Exit(1) // and return
}
if !fflags.Changed("remotepath") {
PrintError("Error: remotepath flag is missing")
os.Exit(1)
}

if !fflags.Changed("localpath") {
PrintError("Error: localpath flag is missing")
os.Exit(1)
}

allocationID := cmd.Flag("allocation").Value.String()
allocationObj, err := sdk.GetAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation.", err)
os.Exit(1)
}
remotepath := cmd.Flag("remotepath").Value.String()
localpath := cmd.Flag("localpath").Value.String()
encrypt, _ := cmd.Flags().GetBool("encrypt")
commit, _ := cmd.Flags().GetBool("commit")

wg := &sync.WaitGroup{}
statusBar := &StatusBar{wg: wg}
wg.Add(1)
if strings.HasPrefix(remotepath, "/Encrypted") {
encrypt = true
}
var attrs fileref.Attributes
if fflags.Changed("attr-who-pays-for-reads") {
var (
wp common.WhoPays
wps string
)
if wps, err = fflags.GetString("attr-who-pays-for-reads"); err != nil {
log.Fatalf("getting 'attr-who-pays-for-reads' flag: %v", err)
}
if err = wp.Parse(wps); err != nil {
log.Fatal(err)
}
attrs.WhoPaysForReads = wp // set given value
}

chunkSize, _ := cmd.Flags().GetInt("chunksize")

// download video from remote live feed(eg youtube), and sync it to zcn
err = startFeedUpload(cmd, allocationObj, localpath, remotepath, encrypt, chunkSize, feedChunkNumber, attrs)

if err != nil {
PrintError("Upload failed.", err)
os.Exit(1)
}
wg.Wait()
if !statusBar.success {
os.Exit(1)
}

if commit {
remotepath = zboxutil.GetFullRemotePath(localpath, remotepath)
statusBar.wg.Add(1)
commitMetaTxn(remotepath, "Upload", "", "", allocationObj, nil, statusBar)
statusBar.wg.Wait()
}
},
}

func startFeedUpload(cmd *cobra.Command, allocationObj *sdk.Allocation, localPath, remotePath string, encrypt bool, chunkSize, chunkNumber int, attrs fileref.Attributes) error {

downloadArgs, _ := cmd.Flags().GetString("downloader-args")
ffmpegArgs, _ := cmd.Flags().GetString("ffmpeg-args")
delay, _ := cmd.Flags().GetInt("delay")
feed, _ := cmd.Flags().GetString("feed")

if len(feed) == 0 {
return thrown.New("invalid_path", "feed should be valid")
}

reader, err := sdk.CreateYoutubeDL(sdk.NewSignalContext(context.TODO()), localPath, feed, util.SplitArgs(downloadArgs), util.SplitArgs(ffmpegArgs), delay)
if err != nil {
return err
}

defer reader.Close()

mimeType, err := reader.GetFileContentType()
if err != nil {
return err
}

remotePath = zboxutil.RemoteClean(remotePath)
isabs := zboxutil.IsRemoteAbs(remotePath)
if !isabs {
err = thrown.New("invalid_path", "Path should be valid and absolute")
return err
}
remotePath = zboxutil.GetFullRemotePath(localPath, remotePath)

_, fileName := filepath.Split(remotePath)

liveMeta := sdk.LiveMeta{
MimeType: mimeType,
RemoteName: fileName,
RemotePath: remotePath,
Attributes: attrs,
}

syncUpload := sdk.CreateLiveUpload(util.GetHomeDir(), allocationObj, liveMeta, reader,
sdk.WithLiveChunkSize(chunkSize),
sdk.WithLiveChunkNumber(chunkNumber),
sdk.WithLiveEncrypt(encrypt),
sdk.WithLiveStatusCallback(func() sdk.StatusCallback {
wg := &sync.WaitGroup{}
statusBar := &StatusBar{wg: wg}
wg.Add(1)

return statusBar
}),
sdk.WithLiveDelay(delay))

return syncUpload.Start()
}

var feedChunkNumber int

func init() {

// feed command
rootCmd.AddCommand(feedCmd)
feedCmd.PersistentFlags().String("allocation", "", "Allocation ID")
feedCmd.PersistentFlags().String("remotepath", "", "Remote path to upload")
feedCmd.PersistentFlags().String("localpath", "", "Local path of file to upload")
feedCmd.PersistentFlags().String("thumbnailpath", "", "Local thumbnail path of file to upload")
feedCmd.PersistentFlags().String("attr-who-pays-for-reads", "owner", "Who pays for reads: owner or 3rd_party")
feedCmd.Flags().Bool("encrypt", false, "pass this option to encrypt and upload the file")
feedCmd.Flags().Bool("commit", false, "pass this option to commit the metadata transaction")

feedCmd.Flags().Int("chunksize", sdk.CHUNK_SIZE, "chunk size")
feedCmd.Flags().IntVarP(&feedChunkNumber, "chunknumber", "", 1, "how many chunks should be uploaded in a http request")

feedCmd.Flags().Int("delay", 5, "set segment duration to seconds.")

// SyncUpload
feedCmd.Flags().String("feed", "", "set remote live feed to url.")
feedCmd.Flags().String("downloader-args", "-q -f best", "pass args to youtube-dl to download video. default is \"-q\".")
feedCmd.Flags().String("ffmpeg-args", "-loglevel warning", "pass args to ffmpeg to build segments. default is \"-loglevel warning\".")

feedCmd.MarkFlagRequired("allocation")
feedCmd.MarkFlagRequired("remotepath")
feedCmd.MarkFlagRequired("localpath")

}
Loading