Skip to content

Commit

Permalink
[PROTO-1665] DDEX: Add failure states, fix write scope, and clean up (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
theoilie authored Feb 16, 2024
1 parent dde01a7 commit dc3aa8d
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 87 deletions.
2 changes: 1 addition & 1 deletion packages/ddex/ingester/common/sdk_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ type CollectionMetadata struct {
IsPrivate bool `bson:"is_private"`
Tags NullableString `bson:"tags,omitempty"`
Genre Genre `bson:"genre"`
Mood Mood `bson:"mood"`
Mood Mood `bson:"mood,omitempty"`
ReleaseDate time.Time `bson:"release_date"`

// TODO: Handle these fields
Expand Down
17 changes: 10 additions & 7 deletions packages/ddex/ingester/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ type Delivery struct {
}

type PendingRelease struct {
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID primitive.ObjectID `bson:"delivery_id"`
PublishDate time.Time `bson:"publish_date"`
Track CreateTrackRelease `bson:"create_track_release"`
Album CreateAlbumRelease `bson:"create_album_release"`
CreatedAt time.Time `bson:"created_at"`
ID primitive.ObjectID `bson:"_id"`
UploadETag string `bson:"upload_etag"`
DeliveryID primitive.ObjectID `bson:"delivery_id"`
PublishDate time.Time `bson:"publish_date"`
Track CreateTrackRelease `bson:"create_track_release"`
Album CreateAlbumRelease `bson:"create_album_release"`
CreatedAt time.Time `bson:"created_at"`
Errors []string `bson:"errors"`
FailureCount int `bson:"failure_count"`
FailedAfterUpload bool `bson:"failed_after_upload"`
}

type PublishedRelease struct {
Expand Down
2 changes: 1 addition & 1 deletion packages/ddex/ingester/parser/sony_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func processReleaseNode(rNode *xmlquery.Node, soundRecordings *[]SoundRecording,
continue
}
if coverArtURL != "" {
fmt.Printf("Skipping duplicate audio file for Image %s\n", ci.Reference)
fmt.Printf("Skipping duplicate cover art file for Image %s\n", ci.Reference)
}
coverArtURL = fmt.Sprintf("s3://%s/%s/%s%s", indexedBucket, deliveryIDHex, d.FileDetails.FilePath, d.FileDetails.FileName)
coverArtURLHash = d.FileDetails.HashSum
Expand Down
60 changes: 34 additions & 26 deletions packages/ddex/publisher/src/models/pendingReleases.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,48 +81,50 @@ const moods = [

const artistSchema = new mongoose.Schema({
name: { type: String, required: true },
roles: [{ type: String }],
roles: [String],
})

const trackMetadataSchema = new mongoose.Schema({
title: { type: String, required: true },
release_date: { type: String, required: true }, // Assuming ISO date format as string
release_date: { type: Date, required: true },
genre: { type: String, enum: genres, required: true },
duration: { type: Number, required: true },
preview_start_seconds: { type: Number },
isrc: { type: String },
license: { type: String },
description: { type: String },
preview_start_seconds: Number,
isrc: String,
license: String,
description: String,
mood: { type: String, enum: moods },
tags: { type: String },
tags: String,
artists: [artistSchema],
artist_name: { type: String, required: true },
copyright: { type: String, required: true },
preview_audio_file_url: { type: String, required: true },
preview_audio_file_url_hash: { type: String, required: true },
preview_audio_file_url_hash_algo: { type: String, required: true },
copyright: String,
preview_audio_file_url: String,
preview_audio_file_url_hash: String,
preview_audio_file_url_hash_algo: String,
audio_file_url: { type: String, required: true },
audio_file_url_hash: { type: String, required: true },
audio_file_url_hash_algo: { type: String, required: true },
cover_art_url: { type: String, required: true },
cover_art_url_hash: { type: String, required: true },
cover_art_url_hash_algo: { type: String, required: true },

// Required if it's a standalone track. Uses playlist_owner_id and playlist's cover_art_url if it's part of an album
artist_name: String,
cover_art_url: String,
cover_art_url_hash: String,
cover_art_url_hash_algo: String,
})

export type TrackMetadata = mongoose.InferSchemaType<typeof trackMetadataSchema>

const collectionMetadataSchema = new mongoose.Schema({
playlist_name: { type: String, required: true },
playlist_owner_id: { type: String, required: true },
description: { type: String },
is_album: { type: Boolean, required: true },
is_private: { type: Boolean, required: true },
tags: { type: String },
genre: { type: String, enum: genres, required: true },
mood: { type: String, enum: moods, required: true },
release_date: { type: String, required: true }, // Assuming ISO date format as string
license: { type: String },
upc: { type: String },
release_date: { type: Date, required: true },
description: String,
is_album: Boolean,
is_private: Boolean,
tags: String,
mood: { type: String, enum: moods },
license: String,
upc: String,
cover_art_url: { type: String, required: true },
cover_art_url_hash: { type: String, required: true },
cover_art_url_hash_algo: { type: String, required: true },
Expand Down Expand Up @@ -152,13 +154,15 @@ export type CreateAlbumRelease = mongoose.InferSchemaType<
>

export const pendingReleasesSchema = new mongoose.Schema({
_id: { type: mongoose.Schema.Types.ObjectId, required: true },
upload_etag: { type: String, required: true },
delivery_id: { type: mongoose.Schema.Types.ObjectId, required: true },
publish_date: { type: Date, required: true },
created_at: { type: Date, required: true },
create_track_release: { type: createTrackReleaseSchema, required: true },
create_album_release: { type: createAlbumReleaseSchema, required: true },
create_track_release: createTrackReleaseSchema,
create_album_release: createAlbumReleaseSchema,
upload_errors: [String],
failure_count: Number,
failed_after_upload: Boolean,
})

// Releases parsed from indexed DDEX deliveries that are awaiting publishing
Expand All @@ -168,4 +172,8 @@ const PendingReleases = mongoose.model(
'pending_releases'
)

export type PendingRelease = mongoose.HydratedDocument<
mongoose.InferSchemaType<typeof pendingReleasesSchema>
>

export default PendingReleases
5 changes: 4 additions & 1 deletion packages/ddex/publisher/src/models/publishedReleases.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {

// DDEX releases that have been published
const publishedReleasesSchema = new mongoose.Schema({
_id: mongoose.Schema.Types.ObjectId,
upload_etag: String,
delivery_id: mongoose.Schema.Types.ObjectId,
publish_date: Date,
Expand All @@ -24,4 +23,8 @@ const PublishedReleases = mongoose.model(
'published_releases'
)

export type PublishedRelease = mongoose.InferSchemaType<
typeof publishedReleasesSchema
>

export default PublishedReleases
134 changes: 100 additions & 34 deletions packages/ddex/publisher/src/services/publisherService.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import mongoose from 'mongoose'
import Deliveries from '../models/deliveries'
import PendingReleases from '../models/pendingReleases'
import PublishedReleases from '../models/publishedReleases'
import PublishedReleases, {
PublishedRelease,
} from '../models/publishedReleases'
import type {
TrackMetadata,
CollectionMetadata,
CreateTrackRelease,
CreateAlbumRelease,
PendingRelease,
} from '../models/pendingReleases'
import type {
AudiusSdk as AudiusSdkType,
UploadTrackRequest,
UploadAlbumRequest,
Genre,
Mood,
} from '@audius/sdk/dist/sdk/index.d.ts'
} from '@audius/sdk'
import createS3 from './s3Service'

// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand All @@ -39,7 +42,7 @@ const formatTrackMetadata = (
title: metadata.title,
description: metadata.description || '',
genre: metadata.genre as Genre,
mood: (metadata.mood || 'Other') as Mood, // TODO: SDK requires mood, but XML doesn't provide one
...(metadata.mood && { mood: metadata.mood as Mood }),
tags: metadata.tags || '',
isrc: metadata.isrc,
license: metadata.license,
Expand Down Expand Up @@ -78,11 +81,16 @@ const uploadTrack = async (
pendingTrack: CreateTrackRelease,
s3Service: ReturnType<typeof createS3>
) => {
if (!pendingTrack.metadata.artist_name) {
throw new Error('Missing artist_name in track metadata')
}

const userId = await getUserId(audiusSdk, pendingTrack.metadata.artist_name)
const metadata = formatTrackMetadata(pendingTrack.metadata)

pendingTrack.metadata.cover_art_url =
's3://ddex-dev-audius-indexed/65cc6ff94bc8f81560c8749e/resources/A10301A0005108088N_T-1027024165547_Image.jpg' // TODO: Remove after ensuring tracks always have cover art
if (!pendingTrack.metadata.cover_art_url) {
throw new Error('Missing cover_art_url in track metadata')
}

const coverArtDownload = await s3Service.downloadFromS3Indexed(
pendingTrack.metadata.cover_art_url
Expand Down Expand Up @@ -169,36 +177,78 @@ const uploadAlbum = async (
return result
}

async function recordPendingReleaseErr(
doc: PendingRelease,
error: any,
failedAfterUpload = false
) {
let errorMsg = ''

if (error instanceof Error) {
errorMsg = error.message
} else {
errorMsg = 'An unknown error occurred'
}

console.error(errorMsg)
try {
await PendingReleases.updateOne(
{ _id: doc._id },
{
$push: { upload_errors: errorMsg },
$inc: { failure_count: 1 },
$set: { failed_after_upload: failedAfterUpload },
}
)
} catch (updateError) {
console.error(
'Failed to update pending_releases doc with error:',
updateError
)
}
}

export const publishReleases = async (
audiusSdk: AudiusSdkType,
s3: ReturnType<typeof createS3>
) => {
// eslint-disable-next-line no-constant-condition
while (true) {
const currentDate = new Date()

const session = await mongoose.startSession()
session.startTransaction()

let documents
try {
const documents = await PendingReleases.find({
const currentDate = new Date()
documents = await PendingReleases.find({
publish_date: { $lte: currentDate },
}).session(session)
})
} catch (error) {
console.error('Failed to fetch pending releases:', error)
await new Promise((resolve) => setTimeout(resolve, 10_000))
continue
}

for (const doc of documents) {
let publishedData
for (const doc of documents) {
if (doc.failed_after_upload) {
console.error(
`pending_releases doc with delivery_id ${doc.delivery_id} requires manual intervention because it's already uploaded to Audius but failed to move to published_releases.`
)
continue
}

const deliveryId = doc.delivery_id
let publishedData: PublishedRelease

try {
if (doc.create_track_release) {
const uploadResult = await uploadTrack(
audiusSdk,
doc.create_track_release,
s3
)
publishedData = {
...doc.toObject(),
track: doc.create_track_release,
entity_id: uploadResult.trackId,
blockhash: uploadResult.blockHash,
blocknumber: uploadResult.blockNumber,
created_at: new Date(),
}
} else if (doc.create_album_release) {
const uploadResult = await uploadAlbum(
Expand All @@ -207,45 +257,61 @@ export const publishReleases = async (
s3
)
publishedData = {
...doc.toObject(),
album: doc.create_album_release,
entity_id: uploadResult.albumId,
blockhash: uploadResult.blockHash,
blocknumber: uploadResult.blockNumber,
created_at: new Date(),
}
} else {
throw new Error('Missing track or album in pending release')
recordPendingReleaseErr(
doc,
'Missing track or album in pending release'
)
continue
}
} catch (error) {
recordPendingReleaseErr(doc, error)
continue
}

publishedData = {
...publishedData,
publish_date: doc.publish_date,
upload_etag: doc.upload_etag,
delivery_id: deliveryId,
created_at: new Date(),
}
console.log('Published release: ', JSON.stringify(publishedData))

// Move document to 'published_releases' collection
// Mark release as published in Mongo
const session = await mongoose.startSession()
try {
session.startTransaction()
const publishedRelease = new PublishedReleases(publishedData)
await publishedRelease.save({ session })
await PendingReleases.deleteOne({ _id: doc._id }).session(session)

// Update delivery_status to 'published' once all releases in the delivery are published
const remainingCount = await PendingReleases.countDocuments({
delivery_id: doc.delivery_id,
_id: { $ne: doc._id },
delivery_id: deliveryId,
}).session(session)
if (remainingCount === 0) {
// Update delivery_status in deliveries collection
await Deliveries.updateOne(
{ _id: doc.delivery_id },
{ _id: deliveryId },
{ $set: { delivery_status: 'published' } },
{ session }
)
}
console.log('Published release: ', publishedData)
await session.commitTransaction()
} catch (error) {
await session.abortTransaction()
recordPendingReleaseErr(doc, error, true)
} finally {
session.endSession()
}

await session.commitTransaction()
} catch (error) {
console.error('Error publishing release, rolling back.', error)
await session.abortTransaction()
} finally {
session.endSession()
}

// 10 seconds
await new Promise((resolve) => setTimeout(resolve, 10000))
// Wait 10 seconds before checking for new releases
await new Promise((resolve) => setTimeout(resolve, 10_000))
}
}
5 changes: 1 addition & 4 deletions packages/ddex/publisher/src/services/sdkService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import type {
AudiusSdk as AudiusSdkType,
ServicesConfig,
} from '@audius/sdk/dist/sdk/index.d.ts'
import type { AudiusSdk as AudiusSdkType, ServicesConfig } from '@audius/sdk'
import {
AppAuth,
DiscoveryNodeSelector,
Expand Down
Loading

0 comments on commit dc3aa8d

Please sign in to comment.