Skip to content

Commit

Permalink
[PROTO-1664] Replicate transcode result in parallel (#7593)
Browse files Browse the repository at this point in the history
  • Loading branch information
stereosteve authored Feb 15, 2024
1 parent 23edd81 commit 0cd31cf
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 8 deletions.
2 changes: 2 additions & 0 deletions mediorum/Dockerfile.unittests
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM golang:alpine

RUN apk add ffmpeg

WORKDIR /app

COPY . .
Expand Down
43 changes: 43 additions & 0 deletions mediorum/server/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"mime/multipart"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"

"github.com/AudiusProject/audius-protocol/mediorum/server/signature"
Expand All @@ -18,6 +20,47 @@ import (
"gocloud.dev/blob"
)

func (ss *MediorumServer) replicateFileParallel(cid string, filePath string) ([]string, error) {
preferred, _ := ss.rendezvousHealthyHosts(cid)
queue := make(chan string, len(preferred))
for _, p := range preferred {
queue <- p
}

mu := sync.Mutex{}
results := []string{}

wg := sync.WaitGroup{}
wg.Add(ss.Config.ReplicationFactor)

for i := 0; i < ss.Config.ReplicationFactor; i++ {
go func() {
defer wg.Done()

file, err := os.Open(filePath)
if err != nil {
ss.logger.Error("failed to open file", "filePath", filePath, "err", err)
return
}
defer file.Close()
for peer := range queue {
file.Seek(0, 0)
err := ss.replicateFileToHost(peer, cid, file)
if err == nil {
mu.Lock()
results = append(results, peer)
mu.Unlock()
break
}
}

}()
}

wg.Wait()
return results, nil
}

func (ss *MediorumServer) replicateFile(fileName string, file io.ReadSeeker) ([]string, error) {
logger := ss.logger.With("task", "replicate", "cid", fileName)

Expand Down
6 changes: 6 additions & 0 deletions mediorum/server/serve_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"time"

"github.com/AudiusProject/audius-protocol/mediorum/crudr"
Expand Down Expand Up @@ -59,6 +61,10 @@ func (ss *MediorumServer) serveCrudPush(c echo.Context) error {
return c.String(http.StatusBadRequest, err.Error())
}

if v, _ := strconv.ParseBool(os.Getenv("LOG_CRUD_PUSH")); v {
ss.logger.Info("CRUD_PUSH", "op", op)
}

known := ss.crud.KnownType(op)
if !known {
return c.String(406, "unknown crudr type")
Expand Down
68 changes: 68 additions & 0 deletions mediorum/server/serve_upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package server

import (
"bytes"
"encoding/json"
"io"
"mime/multipart"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestUploadFile(t *testing.T) {
ss := testNetwork[0]
s2 := testNetwork[1]

var b bytes.Buffer
w := multipart.NewWriter(&b)

{
fw, err := w.CreateFormField("template")
assert.NoError(t, err)
fw.Write([]byte("audio"))
}

fw, err := w.CreateFormFile(filesFormFieldName, "beep.wav")
assert.NoError(t, err)

hit, err := os.Open("testdata/beep.wav")
assert.NoError(t, err)

io.Copy(fw, hit)
hit.Close()
w.Close()

resp, err := http.Post(ss.Config.Self.Host+"/uploads", w.FormDataContentType(), &b)
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, 200)

dec := json.NewDecoder(resp.Body)
var uploads []Upload
err = dec.Decode(&uploads)
assert.NoError(t, err)
uploadId := uploads[0].ID

// force sweep (since blob changes SkipBroadcast)
for _, s := range testNetwork {
s.crud.ForceSweep()
}

// poll for complete
var u2 *Upload
for i := 0; i < 3; i++ {
resp, err := s2.reqClient.R().SetSuccessResult(&u2).Get(s2.Config.Self.Host + "/uploads/" + uploadId)
assert.NoError(t, err)
assert.Equal(t, resp.StatusCode, 200)
if u2.Status == JobStatusDone {
break
}
time.Sleep(time.Second)
}

assert.Equal(t, u2.TranscodeProgress, 1.0)
assert.Len(t, u2.TranscodedMirrors, ss.Config.ReplicationFactor)
}
2 changes: 1 addition & 1 deletion mediorum/server/test_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func setupTestNetwork(replicationFactor, serverCount int) []*MediorumServer {
}

func TestMain(m *testing.M) {
testNetwork = setupTestNetwork(5, 5)
testNetwork = setupTestNetwork(5, 9)

exitVal := m.Run()
// todo: tear down testNetwork
Expand Down
Binary file added mediorum/server/testdata/beep.wav
Binary file not shown.
14 changes: 8 additions & 6 deletions mediorum/server/transcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,12 @@ func (ss *MediorumServer) transcodeAudio(upload *Upload, destPath string, cmd *e
fmt.Sscanf(line, "out_time_us=%f", &u)
if u > 0 && durationUs > 0 {
percent := u / durationUs
// logger.Debug("transcode", "file", fileHash, "progress", percent)
upload.TranscodeProgress = percent
upload.TranscodedAt = time.Now().UTC()
ss.crud.Patch(upload)

if percent-upload.TranscodeProgress > 0.1 {
upload.TranscodeProgress = percent
upload.TranscodedAt = time.Now().UTC()
ss.crud.Patch(upload)
}
}
}
}
Expand Down Expand Up @@ -378,7 +380,7 @@ func (ss *MediorumServer) transcodeFullAudio(upload *Upload, temp *os.File, logg
return onError(err, upload.Status, "computeFileCID")
}
resultKey := resultHash
upload.TranscodedMirrors, err = ss.replicateFile(resultHash, dest)
upload.TranscodedMirrors, err = ss.replicateFileParallel(resultHash, destPath)
if err != nil {
return onError(err, upload.Status, "replicateFile")
}
Expand Down Expand Up @@ -446,7 +448,7 @@ func (ss *MediorumServer) transcodeAudioPreview(upload *Upload, temp *os.File, l
return onError(err, upload.Status, "computeFileCID")
}
resultKey := resultHash
mirrors, err := ss.replicateFile(resultHash, dest)
mirrors, err := ss.replicateFileParallel(resultHash, destPath)
if err != nil {
return onError(err, upload.Status, "replicating file")
}
Expand Down
2 changes: 1 addition & 1 deletion mediorum/server/upload_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (ss *MediorumServer) startUploadScroller() {
uploadCursor.After = upload.CreatedAt
}

if len(uploads) == 0 {
if len(overwrites) == 0 {
continue
}

Expand Down

0 comments on commit 0cd31cf

Please sign in to comment.