From 2e839b0b72b3ae4254db8e0e520cba9a5fb9cbeb Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 8 Jun 2026 18:28:22 -0700 Subject: [PATCH 1/2] =?UTF-8?q?feat(api):=20collaborative=20tracks=20?= =?UTF-8?q?=E2=80=94=20notifications,=20track=20embed,=20profile=20+=20das?= =?UTF-8?q?hboard=20merge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consumes the ETL TrackCollaborator entity (go-openaudio #345/#346) to surface collaborative tracks. Ships inert until clients write the on-chain invites and accepts. - Migration 0220 mirrors the ETL track_collaborators table (IF NOT EXISTS) so it exists before the functions/ pass; handle_track_collaborator.sql adds a notification trigger (invite -> collaborator, accept -> owner) modeled on the manager-request trigger. - Track responses embed a `collaborators` array (accepted only), bulk-resolved in one query alongside the existing owner/user fetch. - Profile track list (GET /users/:id/tracks) UNIONs owned + accepted-collab tracks, fixing the artist-pick pin to reference the profile user (so a collaborator's track is never spuriously pinned by the owner's pick). - Dashboard monthly listens widen to co-owned tracks. Sales/revenue stays scoped to the seller (no revenue split, per the feature's scope). - New GET /users/:id/collaboration_invites lists a user's invites/credits, optionally filtered by status. - track_collaborator_invite / track_collaborator_accept added to the notifications type whitelist. Bumps the ETL module to 3904b9d. sqlc regenerated. Co-Authored-By: Claude Opus 4.8 --- api/dbv1/collaborator_invites.go | 54 +++++++++ api/dbv1/models.go | 24 +++- api/dbv1/queries/track_collaborators.sql | 20 ++++ api/dbv1/track_collaborators.sql.go | 102 ++++++++++++++++ api/dbv1/tracks.go | 42 +++++-- api/server.go | 1 + api/v1_notifications.go | 2 +- api/v1_track_collaborators_test.go | 123 ++++++++++++++++++++ api/v1_users_collaboration_invites.go | 38 ++++++ api/v1_users_listen_counts_monthly.go | 5 +- api/v1_users_tracks.go | 10 +- database/seed.go | 10 ++ ddl/functions/handle_track_collaborator.sql | 73 ++++++++++++ ddl/migrations/0220_track_collaborators.sql | 29 +++++ go.mod | 2 +- go.sum | 2 + sql/01_schema.sql | 89 ++++++++++++++ 17 files changed, 609 insertions(+), 17 deletions(-) create mode 100644 api/dbv1/collaborator_invites.go create mode 100644 api/dbv1/queries/track_collaborators.sql create mode 100644 api/dbv1/track_collaborators.sql.go create mode 100644 api/v1_track_collaborators_test.go create mode 100644 api/v1_users_collaboration_invites.go create mode 100644 ddl/functions/handle_track_collaborator.sql create mode 100644 ddl/migrations/0220_track_collaborators.sql diff --git a/api/dbv1/collaborator_invites.go b/api/dbv1/collaborator_invites.go new file mode 100644 index 00000000..2c7f640c --- /dev/null +++ b/api/dbv1/collaborator_invites.go @@ -0,0 +1,54 @@ +package dbv1 + +import ( + "context" + "time" + + "api.audius.co/trashid" +) + +// FullTrackCollaboratorInvite is a collaborator credit from the invited user's +// perspective: which track, who invited them, and the current status. +type FullTrackCollaboratorInvite struct { + TrackID string `json:"track_id"` + Status string `json:"status"` + InvitedBy User `json:"invited_by"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// FullTrackCollaboratorInvites resolves a user's collaborator invites, embedding +// the inviter as a full user object (mirrors FullManagers). +func (q *Queries) FullTrackCollaboratorInvites(ctx context.Context, params GetTrackCollaboratorInvitesForUserParams) ([]FullTrackCollaboratorInvite, error) { + rows, err := q.GetTrackCollaboratorInvitesForUser(ctx, params) + if err != nil { + return nil, err + } + + inviterIds := make([]int32, len(rows)) + for i, row := range rows { + inviterIds[i] = row.InvitedBy + } + + users, err := q.UsersKeyed(ctx, GetUsersParams{ + Ids: inviterIds, + MyID: params.UserID, + }) + if err != nil { + return nil, err + } + + invites := make([]FullTrackCollaboratorInvite, len(rows)) + for i, row := range rows { + trackID, _ := trashid.EncodeHashId(int(row.TrackID)) + invites[i] = FullTrackCollaboratorInvite{ + TrackID: trackID, + Status: row.Status, + InvitedBy: users[row.InvitedBy], + CreatedAt: row.CreatedAt, + UpdatedAt: row.UpdatedAt, + } + } + + return invites, nil +} diff --git a/api/dbv1/models.go b/api/dbv1/models.go index c3c637ec..acceb4ec 100644 --- a/api/dbv1/models.go +++ b/api/dbv1/models.go @@ -1005,7 +1005,6 @@ type AudiusDataTx struct { type Block struct { Blockhash string `json:"blockhash"` Parenthash pgtype.Text `json:"parenthash"` - IsCurrent pgtype.Bool `json:"is_current"` Number pgtype.Int4 `json:"number"` } @@ -1305,6 +1304,14 @@ type EthIndexerCheckpoint struct { UpdatedAt time.Time `json:"updated_at"` } +// Per-user AUDIO ERC-20 balance (wei), summed across users.wallet + chain=eth associated_wallets. Pre-aggregated mirror of eth_wallet_balances, maintained by triggers (handle_eth_wallet_balance_change / handle_associated_wallets) and recomputed by update_eth_user_balance(user_id). ETH-side analog of sol_user_balances. +type EthUserBalance struct { + UserID int32 `json:"user_id"` + Balance pgtype.Numeric `json:"balance"` + UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` +} + // AUDIO ERC-20 balances (in wei) for tracked Ethereum wallets — primary users.wallet and chain=eth associated_wallets. Maintained event-driven by the eth-indexer (WebSocket subscription to the AUDIO Transfer topic, targeted balanceOf reads). type EthWalletBalance struct { Wallet string `json:"wallet"` @@ -2287,6 +2294,17 @@ type TagTrackUser struct { OwnerID int32 `json:"owner_id"` } +type TrackCollaborator struct { + TrackID int32 `json:"track_id"` + CollaboratorUserID int32 `json:"collaborator_user_id"` + InvitedBy int32 `json:"invited_by"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Txhash string `json:"txhash"` + Blocknumber pgtype.Int4 `json:"blocknumber"` +} + type TrackDelistStatus struct { CreatedAt pgtype.Timestamptz `json:"created_at"` TrackID int32 `json:"track_id"` @@ -2664,7 +2682,7 @@ type UserTip struct { UpdatedAt time.Time `json:"updated_at"` } -// Compatibility view that exposes sol_reward_disbursements in the column shape the API routes used to read from challenge_disbursements. Resolves user_id via the indexer-populated recipient_eth_address (see migration 0172). +// Compatibility view that exposes sol_reward_disbursements in the column shape the API routes used to read from challenge_disbursements. Resolves user_id via the indexer-populated recipient_eth_address (see migration 0172). Join uses LOWER(users.wallet) because the Go indexer stores recipient_eth_address as lowercase. type VChallengeDisbursement struct { ChallengeID string `json:"challenge_id"` Specifier string `json:"specifier"` @@ -2710,7 +2728,7 @@ type VUsdcPurchase struct { Splits interface{} `json:"splits"` } -// Per-user AUDIO/wAUDIO balance totals. One row per current user with eth_balance (wei) and sol_balance (wAUDIO base units, 8 decimals — multiply by 10^10 to compare to wei). eth_balance sums eth_wallet_balances across users.wallet + chain=eth associated_wallets (current, not deleted). sol_balance is sol_user_balances for the wAUDIO mint, already pre-aggregated across user_bank PDAs + linked Solana wallets by handle_sol_claimable_accounts / update_sol_user_balance triggers. +// Per-user AUDIO/wAUDIO balance totals. One row per current user with eth_balance (wei) and sol_balance (wAUDIO base units, 8 decimals — multiply by 10^10 to compare to wei). eth_balance is eth_user_balances (pre-aggregated across users.wallet + chain=eth associated_wallets, maintained by handle_eth_wallet_balance_change / handle_associated_wallets). sol_balance is sol_user_balances for the wAUDIO mint, pre-aggregated across user_bank PDAs + linked Solana wallets by handle_sol_claimable_accounts / update_sol_user_balance triggers. type VUserBalance struct { UserID int32 `json:"user_id"` EthBalance string `json:"eth_balance"` diff --git a/api/dbv1/queries/track_collaborators.sql b/api/dbv1/queries/track_collaborators.sql new file mode 100644 index 00000000..045c5894 --- /dev/null +++ b/api/dbv1/queries/track_collaborators.sql @@ -0,0 +1,20 @@ +-- Accepted collaborators for a set of tracks, used to embed a `collaborators` +-- array on track responses. Returns one row per (track, collaborator); the Go +-- layer bulk-resolves the user objects. Backed by the track_collaborators +-- primary key (track_id leads), so the ANY(...) lookup is index-served. +-- name: GetTrackCollaborators :many +SELECT track_id, collaborator_user_id +FROM track_collaborators +WHERE track_id = ANY(@track_ids::int[]) + AND status = 'accepted' +ORDER BY track_id, created_at; + +-- A user's collaborator invites/credits, optionally filtered by status +-- (pending/accepted/rejected). Backed by the covering +-- (collaborator_user_id, status, track_id) index. +-- name: GetTrackCollaboratorInvitesForUser :many +SELECT track_id, collaborator_user_id, invited_by, status, created_at, updated_at +FROM track_collaborators +WHERE collaborator_user_id = @user_id::int + AND (sqlc.narg('status')::text IS NULL OR status = sqlc.narg('status')) +ORDER BY created_at DESC; diff --git a/api/dbv1/track_collaborators.sql.go b/api/dbv1/track_collaborators.sql.go new file mode 100644 index 00000000..c7ef1b84 --- /dev/null +++ b/api/dbv1/track_collaborators.sql.go @@ -0,0 +1,102 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: track_collaborators.sql + +package dbv1 + +import ( + "context" + "time" + + "github.com/jackc/pgx/v5/pgtype" +) + +const getTrackCollaboratorInvitesForUser = `-- name: GetTrackCollaboratorInvitesForUser :many +SELECT track_id, collaborator_user_id, invited_by, status, created_at, updated_at +FROM track_collaborators +WHERE collaborator_user_id = $1::int + AND ($2::text IS NULL OR status = $2) +ORDER BY created_at DESC +` + +type GetTrackCollaboratorInvitesForUserParams struct { + UserID int32 `json:"user_id"` + Status pgtype.Text `json:"status"` +} + +type GetTrackCollaboratorInvitesForUserRow struct { + TrackID int32 `json:"track_id"` + CollaboratorUserID int32 `json:"collaborator_user_id"` + InvitedBy int32 `json:"invited_by"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// A user's collaborator invites/credits, optionally filtered by status +// (pending/accepted/rejected). Backed by the covering +// (collaborator_user_id, status, track_id) index. +func (q *Queries) GetTrackCollaboratorInvitesForUser(ctx context.Context, arg GetTrackCollaboratorInvitesForUserParams) ([]GetTrackCollaboratorInvitesForUserRow, error) { + rows, err := q.db.Query(ctx, getTrackCollaboratorInvitesForUser, arg.UserID, arg.Status) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTrackCollaboratorInvitesForUserRow + for rows.Next() { + var i GetTrackCollaboratorInvitesForUserRow + if err := rows.Scan( + &i.TrackID, + &i.CollaboratorUserID, + &i.InvitedBy, + &i.Status, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getTrackCollaborators = `-- name: GetTrackCollaborators :many +SELECT track_id, collaborator_user_id +FROM track_collaborators +WHERE track_id = ANY($1::int[]) + AND status = 'accepted' +ORDER BY track_id, created_at +` + +type GetTrackCollaboratorsRow struct { + TrackID int32 `json:"track_id"` + CollaboratorUserID int32 `json:"collaborator_user_id"` +} + +// Accepted collaborators for a set of tracks, used to embed a `collaborators` +// array on track responses. Returns one row per (track, collaborator); the Go +// layer bulk-resolves the user objects. Backed by the track_collaborators +// primary key (track_id leads), so the ANY(...) lookup is index-served. +func (q *Queries) GetTrackCollaborators(ctx context.Context, trackIds []int32) ([]GetTrackCollaboratorsRow, error) { + rows, err := q.db.Query(ctx, getTrackCollaborators, trackIds) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetTrackCollaboratorsRow + for rows.Next() { + var i GetTrackCollaboratorsRow + if err := rows.Scan(&i.TrackID, &i.CollaboratorUserID); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/api/dbv1/tracks.go b/api/dbv1/tracks.go index 94847b18..434d01d7 100644 --- a/api/dbv1/tracks.go +++ b/api/dbv1/tracks.go @@ -21,15 +21,16 @@ const IncludeID3TagsCtxKey = "includeID3Tags" type Track struct { GetTracksRow - Permalink string `json:"permalink"` - IsStreamable bool `json:"is_streamable"` - Artwork *SquareImage `json:"artwork"` - Stream *MediaLink `json:"stream"` - Download *MediaLink `json:"download"` - Preview *MediaLink `json:"preview"` - UserID trashid.HashId `json:"user_id"` - User User `json:"user"` - Access Access `json:"access"` + Permalink string `json:"permalink"` + IsStreamable bool `json:"is_streamable"` + Artwork *SquareImage `json:"artwork"` + Stream *MediaLink `json:"stream"` + Download *MediaLink `json:"download"` + Preview *MediaLink `json:"preview"` + UserID trashid.HashId `json:"user_id"` + User User `json:"user"` + Collaborators []User `json:"collaborators"` + Access Access `json:"access"` FolloweeReposts []*FolloweeRepost `json:"followee_reposts"` FolloweeFavorites []*FolloweeFavorite `json:"followee_favorites"` @@ -45,6 +46,7 @@ func (q *Queries) TracksKeyed(ctx context.Context, arg TracksParams) (map[int32] } userIds := []int32{} + trackIds := make([]int32, 0, len(rawTracks)) collectSplitUserIds := func(usage *AccessGate) { if usage == nil || usage.UsdcPurchase == nil { return @@ -56,6 +58,7 @@ func (q *Queries) TracksKeyed(ctx context.Context, arg TracksParams) (map[int32] for _, rawTrack := range rawTracks { userIds = append(userIds, rawTrack.UserID) + trackIds = append(trackIds, rawTrack.TrackID) var remixOf RemixOf json.Unmarshal(rawTrack.RemixOf, &remixOf) @@ -67,6 +70,18 @@ func (q *Queries) TracksKeyed(ctx context.Context, arg TracksParams) (map[int32] collectSplitUserIds(rawTrack.DownloadConditions) } + // Fetch accepted collaborators for these tracks in one query, and fold + // their user IDs into the bulk user fetch below so each is fully resolved. + collaboratorRows, err := q.GetTrackCollaborators(ctx, trackIds) + if err != nil { + return nil, err + } + collaboratorsByTrack := map[int32][]int32{} + for _, cr := range collaboratorRows { + collaboratorsByTrack[cr.TrackID] = append(collaboratorsByTrack[cr.TrackID], cr.CollaboratorUserID) + userIds = append(userIds, cr.CollaboratorUserID) + } + userMap, err := q.UsersKeyed(ctx, GetUsersParams{ MyID: arg.MyID.(int32), Ids: userIds, @@ -130,6 +145,14 @@ func (q *Queries) TracksKeyed(ctx context.Context, arg TracksParams) (map[int32] } } + // Resolve accepted collaborators (order preserved from the query). + collaborators := []User{} + for _, cid := range collaboratorsByTrack[rawTrack.TrackID] { + if cu, ok := userMap[cid]; ok { + collaborators = append(collaborators, cu) + } + } + // Get access from the bulk access map access := accessMap[rawTrack.TrackID] @@ -179,6 +202,7 @@ func (q *Queries) TracksKeyed(ctx context.Context, arg TracksParams) (map[int32] Preview: preview, User: user, UserID: user.ID, + Collaborators: collaborators, FolloweeFavorites: fullFolloweeFavorites(rawTrack.FolloweeFavorites), FolloweeReposts: fullFolloweeReposts(rawTrack.FolloweeReposts), RemixOf: fullRemixOf, diff --git a/api/server.go b/api/server.go index 03f794e3..1c1197de 100644 --- a/api/server.go +++ b/api/server.go @@ -432,6 +432,7 @@ func NewApiServer(config config.Config) *ApiServer { g.Get("/users/:userId/balance/history", app.v1UsersBalanceHistory) g.Get("/users/:userId/managers", app.v1UsersManagers) g.Get("/users/:userId/managed_users", app.v1UsersManagedUsers) + g.Get("/users/:userId/collaboration_invites", app.v1UsersCollaborationInvites) g.Get("/grantees/:address/users", app.v1GranteeUsers) g.Post("/users/:userId/grants", app.requireAuthMiddleware, app.requireWriteScope, app.postV1UsersGrant) g.Delete("/users/:userId/grants/:address", app.requireAuthMiddleware, app.requireWriteScope, app.deleteV1UsersGrant) diff --git a/api/v1_notifications.go b/api/v1_notifications.go index f5be832a..d886f54d 100644 --- a/api/v1_notifications.go +++ b/api/v1_notifications.go @@ -24,7 +24,7 @@ const notificationRelatedActorsPerGroup = 1 type GetNotificationsQueryParams struct { // Note that when limit is 0, we return 20 items to calculate unread count Limit int `query:"limit" default:"20" validate:"min=0,max=100"` - Types []string `query:"types" validate:"dive,oneof=announcement follow repost save remix cosign create tip_receive tip_send challenge_reward repost_of_repost save_of_repost tastemaker reaction supporter_dethroned supporter_rank_up supporting_rank_up milestone track_added_to_playlist tier_change trending trending_playlist trending_underground usdc_purchase_buyer usdc_purchase_seller track_added_to_purchased_album request_manager approve_manager_request claimable_reward comment comment_thread comment_mention comment_reaction listen_streak_reminder fan_remix_contest_started fan_remix_contest_ended fan_remix_contest_ending_soon fan_remix_contest_winners_selected fan_remix_contest_submission artist_remix_contest_ended artist_remix_contest_ending_soon artist_remix_contest_submissions fan_club_text_post remix_contest_update"` + Types []string `query:"types" validate:"dive,oneof=announcement follow repost save remix cosign create tip_receive tip_send challenge_reward repost_of_repost save_of_repost tastemaker reaction supporter_dethroned supporter_rank_up supporting_rank_up milestone track_added_to_playlist tier_change trending trending_playlist trending_underground usdc_purchase_buyer usdc_purchase_seller track_added_to_purchased_album request_manager approve_manager_request track_collaborator_invite track_collaborator_accept claimable_reward comment comment_thread comment_mention comment_reaction listen_streak_reminder fan_remix_contest_started fan_remix_contest_ended fan_remix_contest_ending_soon fan_remix_contest_winners_selected fan_remix_contest_submission artist_remix_contest_ended artist_remix_contest_ending_soon artist_remix_contest_submissions fan_club_text_post remix_contest_update"` GroupID string `query:"group_id" validate:"omitempty"` Timestamp float64 `query:"timestamp" validate:"omitempty,min=0"` } diff --git a/api/v1_track_collaborators_test.go b/api/v1_track_collaborators_test.go new file mode 100644 index 00000000..f3ea4833 --- /dev/null +++ b/api/v1_track_collaborators_test.go @@ -0,0 +1,123 @@ +package api + +import ( + "context" + "fmt" + "testing" + + "api.audius.co/api/dbv1" + "api.audius.co/database" + "api.audius.co/trashid" + "github.com/stretchr/testify/assert" +) + +// seedCollaborators adds: user 1 accepted on track 700 (owned by user 500) and +// user 1 pending on track 701 (owned by user 500). +func seedCollaborators(t *testing.T, app *ApiServer) { + database.SeedTable(app.pool.Replicas[0], "track_collaborators", []map[string]any{ + {"track_id": 700, "collaborator_user_id": 1, "invited_by": 500, "status": "accepted"}, + {"track_id": 701, "collaborator_user_id": 1, "invited_by": 500, "status": "pending"}, + }) +} + +// Accepted collaborators are embedded on the track response. +func TestTrackCollaboratorsEmbeddedOnTrack(t *testing.T) { + app := testAppWithFixtures(t) + seedCollaborators(t, app) + + var resp struct { + Data []dbv1.Track + } + // User 500's tracks default-sort to 701, 703, 702, 700 — so 700 is index 3. + status, body := testGet(t, app, fmt.Sprintf("/v1/full/users/%s/tracks", trashid.MustEncodeHashID(500)), &resp) + assert.Equal(t, 200, status) + jsonAssert(t, body, map[string]any{ + "data.3.id": trashid.MustEncodeHashID(700), + "data.3.collaborators.0.handle": "rayjacobson", + }) + // Non-collaborated tracks carry an empty array, not null. + assert.Contains(t, string(body), `"collaborators":[]`) +} + +// An accepted collaboration surfaces the track on the collaborator's profile. +func TestAcceptedCollaborationAppearsOnProfile(t *testing.T) { + app := testAppWithFixtures(t) + seedCollaborators(t, app) + + var resp struct { + Data []dbv1.Track + } + status, _ := testGet(t, app, fmt.Sprintf("/v1/full/users/%s/tracks", trashid.MustEncodeHashID(1)), &resp) + assert.Equal(t, 200, status) + + found := false + for _, track := range resp.Data { + if track.ID == trashid.MustEncodeHashID(700) { + found = true + } + } + assert.True(t, found, "accepted collaboration (track 700) should appear on user 1's profile") +} + +// A pending invite is excluded from the profile (only accepted credits surface). +func TestPendingCollaborationHiddenFromProfile(t *testing.T) { + app := testAppWithFixtures(t) + seedCollaborators(t, app) + + var resp struct { + Data []dbv1.Track + } + status, _ := testGet(t, app, fmt.Sprintf("/v1/full/users/%s/tracks", trashid.MustEncodeHashID(1)), &resp) + assert.Equal(t, 200, status) + + for _, track := range resp.Data { + assert.NotEqual(t, trashid.MustEncodeHashID(701), track.ID, "pending invite (track 701) must not appear on profile") + } +} + +func TestCollaborationInvitesEndpoint(t *testing.T) { + app := testAppWithFixtures(t) + seedCollaborators(t, app) + + var resp struct { + Data []dbv1.FullTrackCollaboratorInvite + } + + // Pending only. + status, body := testGet(t, app, fmt.Sprintf("/v1/users/%s/collaboration_invites?status=pending", trashid.MustEncodeHashID(1)), &resp) + assert.Equal(t, 200, status) + assert.Equal(t, 1, len(resp.Data)) + jsonAssert(t, body, map[string]any{ + "data.0.track_id": trashid.MustEncodeHashID(701), + "data.0.status": "pending", + "data.0.invited_by.handle": "UserTracksTester", + }) + + // No filter returns both the pending and accepted credits. + status, _ = testGet(t, app, fmt.Sprintf("/v1/users/%s/collaboration_invites", trashid.MustEncodeHashID(1)), &resp) + assert.Equal(t, 200, status) + assert.Equal(t, 2, len(resp.Data)) + + // Invalid status is rejected. + status, _ = testGet(t, app, fmt.Sprintf("/v1/users/%s/collaboration_invites?status=bogus", trashid.MustEncodeHashID(1)), &resp) + assert.Equal(t, 400, status) +} + +// Seeding rows fires the notification trigger: an invite notifies the +// collaborator, an accept notifies the inviter. +func TestTrackCollaboratorNotificationsGenerated(t *testing.T) { + app := testAppWithFixtures(t) + seedCollaborators(t, app) + + var inviteCount int + err := app.pool.Replicas[0].QueryRow(context.Background(), + "SELECT count(*) FROM notification WHERE type = 'track_collaborator_invite' AND 1 = ANY(user_ids)").Scan(&inviteCount) + assert.NoError(t, err) + assert.Equal(t, 1, inviteCount, "pending invite should notify the collaborator (user 1)") + + var acceptCount int + err = app.pool.Replicas[0].QueryRow(context.Background(), + "SELECT count(*) FROM notification WHERE type = 'track_collaborator_accept' AND 500 = ANY(user_ids)").Scan(&acceptCount) + assert.NoError(t, err) + assert.Equal(t, 1, acceptCount, "accepted credit should notify the inviter (user 500)") +} diff --git a/api/v1_users_collaboration_invites.go b/api/v1_users_collaboration_invites.go new file mode 100644 index 00000000..6e4dcbc1 --- /dev/null +++ b/api/v1_users_collaboration_invites.go @@ -0,0 +1,38 @@ +package api + +import ( + "api.audius.co/api/dbv1" + "github.com/gofiber/fiber/v2" + "github.com/jackc/pgx/v5/pgtype" +) + +type GetUsersCollaborationInvitesParams struct { + Status string `query:"status" default:"" validate:"omitempty,oneof=pending accepted rejected"` +} + +// v1UsersCollaborationInvites lists the collaborator credits a user has been +// offered. Defaults to all; pass ?status=pending to fetch just the actionable +// invites the user can accept/decline. +func (app *ApiServer) v1UsersCollaborationInvites(c *fiber.Ctx) error { + params := GetUsersCollaborationInvitesParams{} + if err := app.ParseAndValidateQueryParams(c, ¶ms); err != nil { + return err + } + + status := pgtype.Text{} + if params.Status != "" { + status = pgtype.Text{String: params.Status, Valid: true} + } + + invites, err := app.queries.FullTrackCollaboratorInvites(c.Context(), dbv1.GetTrackCollaboratorInvitesForUserParams{ + UserID: app.getUserId(c), + Status: status, + }) + if err != nil { + return err + } + + return c.JSON(fiber.Map{ + "data": invites, + }) +} diff --git a/api/v1_users_listen_counts_monthly.go b/api/v1_users_listen_counts_monthly.go index 077aa01e..e566fcb7 100644 --- a/api/v1_users_listen_counts_monthly.go +++ b/api/v1_users_listen_counts_monthly.go @@ -25,7 +25,10 @@ func (app *ApiServer) v1UsersListenCountsMonthly(c *fiber.Ctx) error { SUM(count) AS count FROM aggregate_monthly_plays WHERE play_item_id IN ( - SELECT track_id FROM tracks WHERE owner_id = @userId AND stem_of IS NULL + SELECT track_id FROM tracks WHERE stem_of IS NULL + AND (owner_id = @userId + OR track_id IN (SELECT track_id FROM track_collaborators + WHERE collaborator_user_id = @userId AND status = 'accepted')) AND (access_authorities IS NULL OR (COALESCE(@authed_wallet, '') <> '' AND EXISTS (SELECT 1 FROM unnest(access_authorities) aa WHERE lower(aa) = lower(@authed_wallet)))) diff --git a/api/v1_users_tracks.go b/api/v1_users_tracks.go index 26710beb..908b66ef 100644 --- a/api/v1_users_tracks.go +++ b/api/v1_users_tracks.go @@ -64,13 +64,19 @@ func (app *ApiServer) v1UserTracks(c *fiber.Ctx) error { gateConditions := queryMulti(c, "gate_condition") gateFilter := buildGateConditionFilter(gateConditions) + // The profile lists a user's own tracks plus tracks they've accepted a + // collaborator credit on. `u` is the track's owner (the deactivation check + // stays on the owner); the artist-pick pin references the profile user, so a + // collaborator's track is never spuriously pinned by the owner's pick. sql := ` SELECT track_id FROM tracks t JOIN users u ON owner_id = u.user_id LEFT JOIN aggregate_plays ON track_id = play_item_id LEFT JOIN aggregate_track USING (track_id) - WHERE t.owner_id = @user_id + WHERE (t.owner_id = @user_id + OR t.track_id IN (SELECT track_id FROM track_collaborators + WHERE collaborator_user_id = @user_id AND status = 'accepted')) AND u.is_deactivated = false AND t.is_delete = false AND t.is_available = true @@ -79,7 +85,7 @@ func (app *ApiServer) v1UserTracks(c *fiber.Ctx) error { AND (t.access_authorities IS NULL OR (COALESCE(@authed_wallet, '') <> '' AND EXISTS (SELECT 1 FROM unnest(t.access_authorities) aa WHERE lower(aa) = lower(@authed_wallet))))` + gateFilter + ` - ORDER BY (CASE WHEN t.track_id = u.artist_pick_track_id THEN 0 ELSE 1 END), ` + orderClause + ` + ORDER BY (CASE WHEN t.track_id = (SELECT artist_pick_track_id FROM users WHERE user_id = @user_id) THEN 0 ELSE 1 END), ` + orderClause + ` LIMIT @limit OFFSET @offset ` diff --git a/database/seed.go b/database/seed.go index 8563580d..2854bab2 100644 --- a/database/seed.go +++ b/database/seed.go @@ -226,6 +226,16 @@ var ( "blocknumber": 101, "txhash": "tx123", }, + "track_collaborators": { + "track_id": nil, + "collaborator_user_id": nil, + "invited_by": nil, + "status": "pending", + "created_at": time.Now(), + "updated_at": time.Now(), + "txhash": "tx123", + "blocknumber": 101, + }, "playlist_routes": { "slug": nil, "title_slug": nil, diff --git a/ddl/functions/handle_track_collaborator.sql b/ddl/functions/handle_track_collaborator.sql new file mode 100644 index 00000000..119ef4d5 --- /dev/null +++ b/ddl/functions/handle_track_collaborator.sql @@ -0,0 +1,73 @@ +-- Notifications for the collaborative-tracks handshake, mirroring +-- handle_manager_request.sql (grants). track_collaborators rows are written by +-- the ETL indexer: a 'pending' row is the owner's invite; a transition to +-- 'accepted' is the collaborator accepting. +-- +-- * new pending invite -> notify the collaborator ('track_collaborator_invite') +-- * invite becomes accepted -> notify the inviter/owner ('track_collaborator_accept') +create or replace function process_track_collaborator_change() returns trigger as $$ +begin + -- A newly created pending invite (created_at = updated_at distinguishes a + -- fresh insert from a reconciled re-write), or a row resurrected back to + -- pending: notify the invited collaborator. + if (TG_OP = 'INSERT' and NEW.status = 'pending' and NEW.created_at = NEW.updated_at) or + (TG_OP = 'UPDATE' and NEW.status = 'pending' and OLD.status is distinct from 'pending') + then + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.blocknumber, + array [new.collaborator_user_id], + new.updated_at, + 'track_collaborator_invite', + new.invited_by, + 'track_collaborator_invite:' || 'track_id:' || new.track_id || + ':collaborator_user_id:' || new.collaborator_user_id || + ':inviter_user_id:' || new.invited_by, + json_build_object( + 'track_id', new.track_id, + 'collaborator_user_id', new.collaborator_user_id, + 'inviter_user_id', new.invited_by + ) + ) + on conflict do nothing; + -- Invite accepted: notify the inviter (track owner). + elsif (TG_OP = 'UPDATE' and NEW.status = 'accepted' and OLD.status is distinct from 'accepted') or + (TG_OP = 'INSERT' and NEW.status = 'accepted') + then + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.blocknumber, + array [new.invited_by], + new.updated_at, + 'track_collaborator_accept', + new.collaborator_user_id, + 'track_collaborator_accept:' || 'track_id:' || new.track_id || + ':collaborator_user_id:' || new.collaborator_user_id || + ':inviter_user_id:' || new.invited_by, + json_build_object( + 'track_id', new.track_id, + 'collaborator_user_id', new.collaborator_user_id, + 'inviter_user_id', new.invited_by + ) + ) + on conflict do nothing; + end if; + return null; +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + +do $$ begin + create trigger trigger_track_collaborator_change + after insert or update on track_collaborators + for each row execute procedure process_track_collaborator_change(); +exception + when others then null; +end $$; diff --git a/ddl/migrations/0220_track_collaborators.sql b/ddl/migrations/0220_track_collaborators.sql new file mode 100644 index 00000000..8f811639 --- /dev/null +++ b/ddl/migrations/0220_track_collaborators.sql @@ -0,0 +1,29 @@ +-- track_collaborators is created natively by the OpenAudio ETL (go-openaudio +-- migration 0033). We mirror it here with IF NOT EXISTS so that: +-- 1. fresh / test databases (which load sql/01_schema.sql, not the ETL +-- migrations) have the table, and +-- 2. it exists before the functions/ pass runs, so the notification trigger +-- in handle_track_collaborator.sql always has its table. +-- The ETL migration and this one are byte-compatible CREATE TABLE IF NOT EXISTS +-- statements, so whichever runs first wins and the other is a no-op. +begin; + +CREATE TABLE IF NOT EXISTS track_collaborators ( + track_id integer NOT NULL, + collaborator_user_id integer NOT NULL, + invited_by integer NOT NULL, + status text NOT NULL DEFAULT 'pending', + created_at timestamp without time zone NOT NULL, + updated_at timestamp without time zone NOT NULL, + txhash character varying NOT NULL, + blocknumber integer, + CONSTRAINT track_collaborators_pkey PRIMARY KEY (track_id, collaborator_user_id), + CONSTRAINT track_collaborators_status_check CHECK (status IN ('pending', 'accepted', 'rejected')) +); + +CREATE INDEX IF NOT EXISTS idx_track_collaborators_collaborator + ON track_collaborators (collaborator_user_id, status, track_id); + +COMMENT ON TABLE track_collaborators IS 'Collaborator credits on a track. Owner invites via track metadata (status=pending); the collaborator accepts/declines on-chain (accepted/rejected). Indexed by ETL (go-openaudio).'; + +COMMIT; diff --git a/go.mod b/go.mod index 09f5cf88..878ee224 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/AlecAivazis/survey/v2 v2.3.7 github.com/Doist/unfurlist v0.0.0-20250409100812-515f2735f8e5 github.com/OpenAudio/go-openaudio v1.3.1-0.20260609000702-c5fcacffbb79 - github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609000702-c5fcacffbb79 + github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609005900-3904b9d7046c github.com/aquasecurity/esquery v0.2.0 github.com/axiomhq/axiom-go v0.23.0 github.com/axiomhq/hyperloglog v0.2.5 diff --git a/go.sum b/go.sum index 01c603b2..0819de61 100644 --- a/go.sum +++ b/go.sum @@ -24,6 +24,8 @@ github.com/OpenAudio/go-openaudio v1.3.1-0.20260609000702-c5fcacffbb79 h1:eqTu4/ github.com/OpenAudio/go-openaudio v1.3.1-0.20260609000702-c5fcacffbb79/go.mod h1:wiFXmVbIUkN2D5lRshknaARCKhzbHtCBKRCZe6UOnVs= github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609000702-c5fcacffbb79 h1:0JRNoJtyEx0ngJr24LI4CPW4eTQm3lbfU6OmVGVDuFs= github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609000702-c5fcacffbb79/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609005900-3904b9d7046c h1:aiXkL+/2d0SXEmDi+yFHd/yE0wZUx8Zw0nOoVFy7VwA= +github.com/OpenAudio/go-openaudio/pkg/etl v1.3.1-0.20260609005900-3904b9d7046c/go.mod h1:LZKiU9vBYzlZzn6oPRHHLPXteBtMKQPegNH9bX9JuH8= github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA= github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= diff --git a/sql/01_schema.sql b/sql/01_schema.sql index ba9c761b..a2a70d56 100644 --- a/sql/01_schema.sql +++ b/sql/01_schema.sql @@ -8460,6 +8460,95 @@ CREATE TABLE public.grants ( ); +-- +-- Name: track_collaborators; Type: TABLE; Schema: public; Owner: - +-- + +CREATE TABLE public.track_collaborators ( + track_id integer NOT NULL, + collaborator_user_id integer NOT NULL, + invited_by integer NOT NULL, + status text DEFAULT 'pending'::text NOT NULL, + created_at timestamp without time zone NOT NULL, + updated_at timestamp without time zone NOT NULL, + txhash character varying NOT NULL, + blocknumber integer, + CONSTRAINT track_collaborators_pkey PRIMARY KEY (track_id, collaborator_user_id), + CONSTRAINT track_collaborators_status_check CHECK ((status = ANY (ARRAY['pending'::text, 'accepted'::text, 'rejected'::text]))) +); + + +CREATE INDEX IF NOT EXISTS idx_track_collaborators_collaborator ON public.track_collaborators USING btree (collaborator_user_id, status, track_id); + + +-- Notification trigger for collaborative tracks (see +-- ddl/functions/handle_track_collaborator.sql). Defined inline here so fresh / +-- test databases loaded from this schema have it; `make test-schema` will +-- canonicalize the placement. +create or replace function public.process_track_collaborator_change() returns trigger as $$ +begin + if (TG_OP = 'INSERT' and NEW.status = 'pending' and NEW.created_at = NEW.updated_at) or + (TG_OP = 'UPDATE' and NEW.status = 'pending' and OLD.status is distinct from 'pending') + then + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.blocknumber, + array [new.collaborator_user_id], + new.updated_at, + 'track_collaborator_invite', + new.invited_by, + 'track_collaborator_invite:' || 'track_id:' || new.track_id || + ':collaborator_user_id:' || new.collaborator_user_id || + ':inviter_user_id:' || new.invited_by, + json_build_object( + 'track_id', new.track_id, + 'collaborator_user_id', new.collaborator_user_id, + 'inviter_user_id', new.invited_by + ) + ) + on conflict do nothing; + elsif (TG_OP = 'UPDATE' and NEW.status = 'accepted' and OLD.status is distinct from 'accepted') or + (TG_OP = 'INSERT' and NEW.status = 'accepted') + then + insert into notification + (blocknumber, user_ids, timestamp, type, specifier, group_id, data) + values + ( + new.blocknumber, + array [new.invited_by], + new.updated_at, + 'track_collaborator_accept', + new.collaborator_user_id, + 'track_collaborator_accept:' || 'track_id:' || new.track_id || + ':collaborator_user_id:' || new.collaborator_user_id || + ':inviter_user_id:' || new.invited_by, + json_build_object( + 'track_id', new.track_id, + 'collaborator_user_id', new.collaborator_user_id, + 'inviter_user_id', new.invited_by + ) + ) + on conflict do nothing; + end if; + return null; +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + return null; +end; +$$ language plpgsql; + +do $$ begin + create trigger trigger_track_collaborator_change + after insert or update on public.track_collaborators + for each row execute procedure public.process_track_collaborator_change(); +exception + when others then null; +end $$; + + -- -- Name: hourly_play_counts; Type: TABLE; Schema: public; Owner: - -- From f22594a4fa2b283a3438be491fd0653d813b61d8 Mon Sep 17 00:00:00 2001 From: Raymond Jacobson Date: Mon, 8 Jun 2026 19:39:57 -0700 Subject: [PATCH 2/2] feat(api): prefetch inviter/collaborator users for collab notifications Collect data.inviter_user_id and data.collaborator_user_id when resolving notification-related users, so the track_collaborator_invite / _accept notifications hydrate their user entities (mirrors grantee_user_id for managers). Notification id hashing is already generic (HashifyJson). --- api/v1_notifications.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api/v1_notifications.go b/api/v1_notifications.go index d886f54d..2c8f8743 100644 --- a/api/v1_notifications.go +++ b/api/v1_notifications.go @@ -378,6 +378,8 @@ func collectNotificationRelatedIds(action json.RawMessage, userIds, trackIds, pl "data.receiver_user_id", "data.dethroned_user_id", "data.grantee_user_id", + "data.inviter_user_id", + "data.collaborator_user_id", "data.tastemaker_user_id", "data.tastemaker_item_owner_id", "data.track_owner_id",