diff --git a/.husky/pre-commit b/.husky/pre-commit index 6835915..cb2c84d 100755 --- a/.husky/pre-commit +++ b/.husky/pre-commit @@ -1,2 +1 @@ -make lint -make format +pnpm lint-staged diff --git a/api/pkg/handlers/account.go b/api/pkg/handlers/account.go index d2d53c2..046b7d0 100644 --- a/api/pkg/handlers/account.go +++ b/api/pkg/handlers/account.go @@ -103,7 +103,7 @@ func (h *AccountHandler) getAccounts(w http.ResponseWriter, r *http.Request) { storageQuota, err := provider.GetStorageQuota( r.Context(), userID, - &account.ID, + account.ID, account.AccessToken, account.RefreshToken, ) diff --git a/api/pkg/handlers/files.go b/api/pkg/handlers/files.go index 7be51ee..e1a8545 100644 --- a/api/pkg/handlers/files.go +++ b/api/pkg/handlers/files.go @@ -109,7 +109,7 @@ func (h *FilesHandler) RegisterRoutes() *chi.Mux { r.Post("/create-folder", h.createFolder) - r.Put("/move-to-trash", h.moveFilesToTrash) + r.Patch("/move-to-trash", h.moveFilesToTrash) r.Delete("/permanent-delete-files", h.permanentlyDelete) @@ -356,6 +356,7 @@ func (h *FilesHandler) uploadFilesToProvider(w http.ResponseWriter, r *http.Requ return } + defer conn.Release() queries := repository.New(conn) @@ -390,7 +391,7 @@ func (h *FilesHandler) uploadFilesToProvider(w http.ResponseWriter, r *http.Requ return } - err = provider.UploadFiles(r.Context(), accountID, conn, queries, authTokens, uploadedFiles) + err = provider.UploadFiles(r.Context(), *accountID, conn, queries, authTokens, uploadedFiles) if err != nil { config.LOGGER.Error( "failed to upload files", @@ -407,7 +408,11 @@ func (h *FilesHandler) uploadFilesToProvider(w http.ResponseWriter, r *http.Requ return } - utils.SendAPIResponse(w, http.StatusOK, "files uploaded successfully") + utils.SendAPIResponse( + w, + http.StatusOK, + map[string]string{"message": "Successfully upload your file(s)!"}, + ) } func (h *FilesHandler) createFolder(w http.ResponseWriter, r *http.Request) { @@ -523,7 +528,7 @@ func (h *FilesHandler) createFolder(w http.ResponseWriter, r *http.Request) { provider := providers.OAuthProviders[string(account.Provider)] - err = provider.CreateFolder(r.Context(), payload.Name, parentFolder, account, conn, *queries) + err = provider.CreateFolder(r.Context(), payload.Name, parentFolder, account, conn, queries) if err != nil { config.LOGGER.Error("failed to create new folder", zap.Error(err)) utils.SendAPIErrorResponse( @@ -538,7 +543,7 @@ func (h *FilesHandler) createFolder(w http.ResponseWriter, r *http.Request) { utils.SendAPIResponse( w, http.StatusOK, - map[string]string{"message": "Your folder was successfully created"}, + map[string]string{"message": "Your folder was successfully created!"}, ) } @@ -590,6 +595,7 @@ func (h *FilesHandler) moveFilesToTrash(w http.ResponseWriter, r *http.Request) return } + defer conn.Release() queries := repository.New(conn) @@ -631,7 +637,7 @@ func (h *FilesHandler) moveFilesToTrash(w http.ResponseWriter, r *http.Request) provider := providers.OAuthProviders[string(providerName)] - err = provider.MoveToTrash(r.Context(), &accountID, conn, queries, authTokens, items) + err = provider.MoveToTrash(r.Context(), accountID, conn, queries, authTokens, items) if err != nil { config.LOGGER.Error( "failed to move file ids for move to trash action", @@ -649,7 +655,7 @@ func (h *FilesHandler) moveFilesToTrash(w http.ResponseWriter, r *http.Request) } utils.SendAPIResponse(w, http.StatusOK, map[string]any{ - "message": "Files successfully moved to trash", + "message": "Files successfully moved to trash.", }) } @@ -701,6 +707,7 @@ func (h *FilesHandler) permanentlyDelete(w http.ResponseWriter, r *http.Request) return } + defer conn.Release() queries := repository.New(conn) @@ -744,7 +751,7 @@ func (h *FilesHandler) permanentlyDelete(w http.ResponseWriter, r *http.Request) err = provider.PermanentlyDeleteFiles( r.Context(), - &accountID, + accountID, conn, queries, authTokens, @@ -767,7 +774,7 @@ func (h *FilesHandler) permanentlyDelete(w http.ResponseWriter, r *http.Request) } utils.SendAPIResponse(w, http.StatusOK, map[string]any{ - "message": "Files successfully deleted", + "message": "File(s) successfully deleted.", }) } diff --git a/api/pkg/handlers/link.go b/api/pkg/handlers/link.go index dc18b23..ea747ab 100644 --- a/api/pkg/handlers/link.go +++ b/api/pkg/handlers/link.go @@ -185,7 +185,6 @@ func (h *LinkHandler) linkAccountCallback(w http.ResponseWriter, r *http.Request token, userId, accountInfo, err := provider.GetToken(w, r, store) if err != nil { h.logAndRedirectError(w, r, "GetToken failed", providerName, err) - return } @@ -197,7 +196,6 @@ func (h *LinkHandler) linkAccountCallback(w http.ResponseWriter, r *http.Request conn, err := h.connPool.Acquire(r.Context()) if err != nil { h.logAndRedirectError(w, r, "failed to acquire connection", providerName, err) - return } defer conn.Release() @@ -210,13 +208,11 @@ func (h *LinkHandler) linkAccountCallback(w http.ResponseWriter, r *http.Request ) if err != nil { h.logAndRedirectError(w, r, "account upsert failed", providerName, err) - return } - if err = h.scheduleBackgroundJobs(r.Context(), userId, providerName, accountID, token, queries); err != nil { - h.logAndRedirectError(w, r, "scheduling jobs failed", providerName, err) - + if err = h.scheduleBackgroundTasks(r.Context(), userId, providerName, accountID, token, queries); err != nil { + h.logAndRedirectError(w, r, "scheduling tasks failed", providerName, err) return } @@ -243,7 +239,7 @@ func (h *LinkHandler) logAndRedirectError( h.errorRedirect(w, r) } -func (h *LinkHandler) scheduleBackgroundJobs( +func (h *LinkHandler) scheduleBackgroundTasks( ctx context.Context, userID, providerName, accountID string, token *oauth2.Token, @@ -289,6 +285,10 @@ func (h *LinkHandler) upsertAccount( successQuery := "newAccount" + tokenExpiryDuration := time.Duration(token.ExpiresIn) * time.Second + + tokenExpiry := time.Now().Add(tokenExpiryDuration) + addCountParams := repository.AddAccountDetailsParams{ UserID: userID, Provider: repository.ProviderEnum(providerName), @@ -296,7 +296,7 @@ func (h *LinkHandler) upsertAccount( AccessToken: encAccessToken, RefreshToken: encRefreshToken, TokenType: db.PGTextField(token.TokenType), - Expiry: db.PGTimestamptzField(token.Expiry), + Expiry: db.PGTimestamptzField(tokenExpiry), Email: accountInfo.Email, Name: accountInfo.Name, AvatarUrl: db.PGTextField(accountInfo.AvatarURL), @@ -449,17 +449,17 @@ func (h *LinkHandler) enqueueFileSyncTaskAndLog( return nil } - err = queries.AddNewJobLog(ctx, repository.AddNewJobLogParams{ - JobID: info.ID, + err = queries.AddNewTaskLog(ctx, repository.AddNewTaskLogParams{ + TaskID: info.ID, AccountID: *accountUUID, Type: info.Type, - Status: repository.JobStatusEnumQueued, + Status: repository.TaskStatusEnumQueued, Queue: info.Queue, Params: params, }) if err != nil { config.LOGGER.Error( - "failed to insert job log", + "failed to insert task log", zap.String("provider", providerName), zap.String("task_type", tasks.TypeFileSync), zap.String("task_id", info.ID), @@ -535,17 +535,17 @@ func (h *LinkHandler) enqueueAuthTokenRenewalTaskAndLog( return nil } - err = queries.AddNewJobLog(ctx, repository.AddNewJobLogParams{ - JobID: info.ID, + err = queries.AddNewTaskLog(ctx, repository.AddNewTaskLogParams{ + TaskID: info.ID, AccountID: *accountUUID, Type: info.Type, - Status: repository.JobStatusEnumQueued, + Status: repository.TaskStatusEnumQueued, Queue: info.Queue, Params: params, }) if err != nil { config.LOGGER.Error( - "failed to insert job log", + "failed to insert task log", zap.String("provider", providerName), zap.String("task_type", tasks.TypeAuthTokenRenewal), zap.String("task_id", info.ID), diff --git a/api/pkg/providers/dropbox.go b/api/pkg/providers/dropbox.go index 3b3d454..8b4ecf5 100644 --- a/api/pkg/providers/dropbox.go +++ b/api/pkg/providers/dropbox.go @@ -298,6 +298,13 @@ func (p *DropboxProvider) SyncFiles( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + for { dropboxResponse, err := p.getDropboxFolderList( ctx, @@ -328,7 +335,7 @@ func (p *DropboxProvider) SyncFiles( insertedRows, err = p.bulkInsertSyncedItems( ctx, conn, - *queries, + queries, providerFileIDs, accountID, files, @@ -563,7 +570,7 @@ func (p *DropboxProvider) RenewOAuthTokens( func (p *DropboxProvider) GetStorageQuota( ctx context.Context, userID string, - accountID *pgtype.UUID, + accountID pgtype.UUID, encryptedAccessToken, encryptedRefreshToken string, ) (*StorageQuota, error) { storageQuotaKey := fmt.Sprintf("storage:dropbox:%s:%s", userID, accountID.String()) @@ -604,6 +611,24 @@ func (p *DropboxProvider) GetStorageQuota( return nil, err } + refreshToken, err := utils.Decrypt(encryptedRefreshToken) + if err != nil { + config.LOGGER.Error( + "failed to decrypt refresh token", + zap.String("provider", DROPBOX_PROVIDER_NAME), + zap.Error(err), + ) + + return nil, err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + url := DROPBOX_API_BASE_URL + "/2/users/get_space_usage" req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) @@ -684,7 +709,7 @@ func (p *DropboxProvider) GetStorageQuota( func (p *DropboxProvider) UploadFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -701,6 +726,24 @@ func (p *DropboxProvider) UploadFiles( return err } + refreshToken, err := utils.Decrypt(authTokens.RefreshToken) + if err != nil { + config.LOGGER.Error( + "failed to decrypt refresh token", + zap.String("provider", DROPBOX_PROVIDER_NAME), + zap.Error(err), + ) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + var ( mu sync.Mutex results []DropboxListFolderEntries @@ -735,9 +778,9 @@ func (p *DropboxProvider) UploadFiles( return err } - files, _ := p.convertToSyncedItemSlice(results, *accountID, false) + files, _ := p.convertToSyncedItemSlice(results, accountID, false) - _, err = p.bulkInsertSyncedItems(ctx, conn, *queries, []string{}, *accountID, files, "") + _, err = p.bulkInsertSyncedItems(ctx, conn, queries, []string{}, accountID, files, "") if err != nil { config.LOGGER.Error( "failed to insert newly uploaded files", @@ -854,7 +897,7 @@ func (p *DropboxProvider) uploadToDropbox( func (p *DropboxProvider) MoveToTrash( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -871,6 +914,24 @@ func (p *DropboxProvider) MoveToTrash( return err } + refreshToken, err := utils.Decrypt(authTokens.RefreshToken) + if err != nil { + config.LOGGER.Error( + "failed to decrypt refresh token", + zap.String("provider", DROPBOX_PROVIDER_NAME), + zap.Error(err), + ) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + var ( fileIDs []pgtype.UUID mu sync.Mutex @@ -909,7 +970,7 @@ func (p *DropboxProvider) MoveToTrash( return qx.SetFileTrashed(ctx, repository.SetFileTrashedParams{ FileIds: fileIDs, - AccountID: *accountID, + AccountID: accountID, }) }) if err != nil { @@ -923,7 +984,7 @@ func (p *DropboxProvider) MoveToTrash( func (p *DropboxProvider) PermanentlyDeleteFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -940,6 +1001,24 @@ func (p *DropboxProvider) PermanentlyDeleteFiles( return err } + refreshToken, err := utils.Decrypt(authTokens.RefreshToken) + if err != nil { + config.LOGGER.Error( + "failed to decrypt refresh token", + zap.String("provider", DROPBOX_PROVIDER_NAME), + zap.Error(err), + ) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + var ( fileIDs []pgtype.UUID mu sync.Mutex @@ -978,7 +1057,7 @@ func (p *DropboxProvider) PermanentlyDeleteFiles( return qx.DeleteSyncedItems(ctx, repository.DeleteSyncedItemsParams{ FileIds: fileIDs, - AccountID: *accountID, + AccountID: accountID, }) }) if err != nil { @@ -1031,7 +1110,7 @@ func (p *DropboxProvider) SearchByContent( refreshToken, err := utils.Decrypt(account.RefreshToken) if err != nil { config.LOGGER.Error( - "failed to decrypt access token", + "failed to decrypt refresh token", zap.String("provider", DROPBOX_PROVIDER_NAME), zap.Error(err), ) @@ -1039,6 +1118,13 @@ func (p *DropboxProvider) SearchByContent( return nil, err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + providerFileIDs := []string{} cursor := "" @@ -1345,7 +1431,7 @@ func (p *DropboxProvider) permanentlyDeleteFile( func (p *DropboxProvider) bulkInsertSyncedItems( ctx context.Context, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, providerFileIDs []string, accountID pgtype.UUID, files []repository.AddSyncedItemsParams, @@ -1452,7 +1538,7 @@ func (p *DropboxProvider) CreateFolder( parentFolder ParentFolder, account repository.GetLinkedAccountRow, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, ) error { logFields := []zap.Field{ zap.String("provider", DROPBOX_PROVIDER_NAME), @@ -1466,6 +1552,21 @@ func (p *DropboxProvider) CreateFolder( return err } + refreshToken, err := utils.Decrypt(account.RefreshToken) + if err != nil { + logFields = append(logFields, zap.Error(err)) + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + parentPath := parentFolder.Path reqBody := fmt.Appendf( diff --git a/api/pkg/providers/google.go b/api/pkg/providers/google.go index 7d6811a..1e2f459 100644 --- a/api/pkg/providers/google.go +++ b/api/pkg/providers/google.go @@ -167,6 +167,13 @@ func (p *GoogleProvider) SyncFiles( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -260,7 +267,7 @@ func (p *GoogleProvider) SyncFiles( insertedRows, err = p.bulkInsertSyncedItems( ctx, conn, - *queries, + queries, providerFileIDs, accountID, files, @@ -415,7 +422,7 @@ func (p *GoogleProvider) GetHTTPClient( func (p *GoogleProvider) GetStorageQuota( ctx context.Context, userID string, - accountID *pgtype.UUID, + accountID pgtype.UUID, encryptedAccessToken, encryptedRefreshToken string, ) (*StorageQuota, error) { storageQuotaKey := fmt.Sprintf("storage:google:%s:%s", userID, accountID.String()) @@ -467,6 +474,13 @@ func (p *GoogleProvider) GetStorageQuota( return nil, err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -525,7 +539,7 @@ func (p *GoogleProvider) GetStorageQuota( func (p *GoogleProvider) UploadFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -553,6 +567,13 @@ func (p *GoogleProvider) UploadFiles( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -600,9 +621,9 @@ func (p *GoogleProvider) UploadFiles( return err } - files, _ := p.convertToSyncedItemSlice(results, *accountID, false) + files, _ := p.convertToSyncedItemSlice(results, accountID, false) - _, err = p.bulkInsertSyncedItems(ctx, conn, *queries, []string{}, *accountID, files) + _, err = p.bulkInsertSyncedItems(ctx, conn, queries, []string{}, accountID, files) if err != nil { config.LOGGER.Error( "failed to insert newly uploaded files", @@ -627,7 +648,7 @@ func (p *GoogleProvider) UploadFiles( func (p *GoogleProvider) MoveToTrash( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -655,6 +676,13 @@ func (p *GoogleProvider) MoveToTrash( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -706,7 +734,7 @@ func (p *GoogleProvider) MoveToTrash( return qx.SetFileTrashed(ctx, repository.SetFileTrashedParams{ FileIds: fileIDs, - AccountID: *accountID, + AccountID: accountID, }) }) if err != nil { @@ -720,7 +748,7 @@ func (p *GoogleProvider) MoveToTrash( func (p *GoogleProvider) PermanentlyDeleteFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -748,6 +776,13 @@ func (p *GoogleProvider) PermanentlyDeleteFiles( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -799,7 +834,7 @@ func (p *GoogleProvider) PermanentlyDeleteFiles( return qx.DeleteSyncedItems(ctx, repository.DeleteSyncedItemsParams{ FileIds: fileIDs, - AccountID: *accountID, + AccountID: accountID, }) }) if err != nil { @@ -860,6 +895,13 @@ func (p *GoogleProvider) SearchByContent( return nil, err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) @@ -1059,7 +1101,7 @@ func (p *GoogleProvider) convertToSyncedItemSlice( func (p *GoogleProvider) bulkInsertSyncedItems( ctx context.Context, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, providerFileIDs []string, accountID pgtype.UUID, files []repository.AddSyncedItemsParams, @@ -1117,7 +1159,7 @@ func (p *GoogleProvider) CreateFolder( parentFolder ParentFolder, account repository.GetLinkedAccountRow, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, ) error { logFields := []zap.Field{ zap.String("provider", GOOGLE_PROVIDER_NAME), @@ -1137,6 +1179,13 @@ func (p *GoogleProvider) CreateFolder( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + httpClient := p.GetHTTPClient(ctx, accessToken, refreshToken) driveService, err := drive.NewService(ctx, option.WithHTTPClient(httpClient)) diff --git a/api/pkg/providers/init.go b/api/pkg/providers/init.go index 3c270a9..ca81d59 100644 --- a/api/pkg/providers/init.go +++ b/api/pkg/providers/init.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/blackmamoth/cloudmesh/pkg/config" "github.com/blackmamoth/cloudmesh/pkg/middlewares" @@ -69,7 +70,7 @@ type Provider interface { ) (string, int64, error) UploadFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -77,7 +78,7 @@ type Provider interface { ) error MoveToTrash( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -85,7 +86,7 @@ type Provider interface { ) error PermanentlyDeleteFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -101,7 +102,7 @@ type Provider interface { GetStorageQuota( ctx context.Context, userID string, - accountID *pgtype.UUID, + accountID pgtype.UUID, encryptedAccessToken, encryptedRefreshToken string, ) (*StorageQuota, error) CreateFolder( @@ -110,7 +111,7 @@ type Provider interface { parentFolder ParentFolder, account repository.GetLinkedAccountRow, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, ) error } @@ -301,3 +302,62 @@ func exchangeToken( return tok, receivedOauthState.UserID, accountInfo, nil } + +func EnsureValidAccesstoken( + ctx context.Context, + pool *pgxpool.Pool, + accountID pgtype.UUID, + accessToken, refreshToken string, + p Provider, +) (string, error) { + conn, err := pool.Acquire(ctx) + if err != nil { + return accessToken, nil + } + defer conn.Release() + + return ensureValidAccessToken(ctx, accountID, conn, accessToken, refreshToken, p) +} + +func ensureValidAccessToken(ctx context.Context, accountID pgtype.UUID, + conn *pgxpool.Conn, + accessToken, refreshToken string, + p Provider, +) (string, error) { + queries := repository.New(conn) + + tokenExpiry, err := queries.GetAuthTokenExpiry(ctx, accountID) + if err != nil { + config.LOGGER.Error( + "failed to fetch auth token expiry for an account", + zap.String("account_id", accountID.String()), + zap.Error(err), + ) + return "", err + } + + if !tokenExpiry.Valid { + return accessToken, nil + } + if time.Now().Add(5 * time.Minute).After(tokenExpiry.Time) { + config.LOGGER.Info( + "access token expires soon, refreshing", + zap.String("provider", GOOGLE_PROVIDER_NAME), + zap.String("account_id", accountID.String()), + zap.Time("expiry", tokenExpiry.Time), + ) + + newAccessToken, _, err := p.RenewOAuthTokens(ctx, conn, accountID, refreshToken) + if err != nil { + config.LOGGER.Error( + "failed to renew auth tokens", + zap.String("provider", GOOGLE_PROVIDER_NAME), + zap.String("account_id", accountID.String()), + zap.Error(err), + ) + return accessToken, err + } + return newAccessToken, nil + } + return accessToken, nil +} diff --git a/api/pkg/providers/microsoft.go b/api/pkg/providers/microsoft.go index 324659f..1e3b26c 100644 --- a/api/pkg/providers/microsoft.go +++ b/api/pkg/providers/microsoft.go @@ -367,6 +367,13 @@ func (p *MicrosoftProvider) SyncFiles( return err } + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + for { oneDriveResponse, err := p.getOneDriveDeltaFiles( ctx, @@ -407,7 +414,7 @@ func (p *MicrosoftProvider) SyncFiles( insertedRows, err = p.bulkInsertSyncedItems( ctx, conn, - *queries, + queries, providerFileIDs, accountID, files, @@ -573,7 +580,7 @@ func (p *MicrosoftProvider) convertToSyncedItemSlice( func (p *MicrosoftProvider) bulkInsertSyncedItems( ctx context.Context, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, providerFileIDs []string, accountID pgtype.UUID, files []repository.AddSyncedItemsParams, @@ -745,7 +752,7 @@ func (p *MicrosoftProvider) RenewOAuthTokens( func (p *MicrosoftProvider) UploadFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -764,6 +771,21 @@ func (p *MicrosoftProvider) UploadFiles( return err } + refreshToken, err := utils.Decrypt(authTokens.RefreshToken) + if err != nil { + logFields = append(logFields, zap.Error(err)) + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + var ( mu sync.Mutex results []OneDriveItem @@ -798,14 +820,14 @@ func (p *MicrosoftProvider) UploadFiles( return err } - files, _ := p.convertToSyncedItemSlice(results, *accountID, false) + files, _ := p.convertToSyncedItemSlice(results, accountID, false) insertRowCount, err := p.bulkInsertSyncedItems( ctx, conn, - *queries, + queries, []string{}, - *accountID, + accountID, files, "", ) @@ -910,7 +932,7 @@ func (p *MicrosoftProvider) uploadFiles( func (p *MicrosoftProvider) MoveToTrash( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -929,6 +951,21 @@ func (p *MicrosoftProvider) MoveToTrash( return err } + refreshToken, err := utils.Decrypt(authTokens.RefreshToken) + if err != nil { + logFields = append(logFields, zap.Error(err)) + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + var ( fileIDs []pgtype.UUID mu sync.Mutex @@ -968,7 +1005,7 @@ func (p *MicrosoftProvider) MoveToTrash( return qx.SetFileTrashed(ctx, repository.SetFileTrashedParams{ FileIds: fileIDs, - AccountID: *accountID, + AccountID: accountID, }) }) if err != nil { @@ -1039,7 +1076,7 @@ func (p *MicrosoftProvider) moveToTrash(ctx context.Context, accessToken, fileID func (p *MicrosoftProvider) PermanentlyDeleteFiles( ctx context.Context, - accountID *pgtype.UUID, + accountID pgtype.UUID, conn *pgxpool.Conn, queries *repository.Queries, authTokens repository.GetAuthTokensRow, @@ -1079,6 +1116,21 @@ func (p *MicrosoftProvider) SearchByContent( return nil, err } + refreshToken, err := utils.Decrypt(account.RefreshToken) + if err != nil { + logFields = append(logFields, zap.Error(err)) + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return nil, err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + providerFileIDs := []string{} nextLink := "" @@ -1188,7 +1240,7 @@ func (p *MicrosoftProvider) searchByContent( func (p *MicrosoftProvider) GetStorageQuota( ctx context.Context, userID string, - accountID *pgtype.UUID, + accountID pgtype.UUID, encryptedAccessToken, encryptedRefreshToken string, ) (*StorageQuota, error) { logFields := []zap.Field{ @@ -1229,6 +1281,21 @@ func (p *MicrosoftProvider) GetStorageQuota( return nil, err } + refreshToken, err := utils.Decrypt(encryptedRefreshToken) + if err != nil { + logFields = append(logFields, zap.Error(err)) + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return nil, err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, accountID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + url := MICROSOFT_GRAPH_API_BASE_URL + "/me/drive" req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) @@ -1308,7 +1375,7 @@ func (p *MicrosoftProvider) CreateFolder( parentFolder ParentFolder, account repository.GetLinkedAccountRow, conn *pgxpool.Conn, - queries repository.Queries, + queries *repository.Queries, ) error { logFields := []zap.Field{ zap.String("provider", MICROSOFT_PROVIDER_NAME), @@ -1321,6 +1388,20 @@ func (p *MicrosoftProvider) CreateFolder( return err } + refreshToken, err := utils.Decrypt(account.RefreshToken) + if err != nil { + config.LOGGER.Error("failed to decrypt refresh token", logFields...) + + return err + } + + _, pool := db.GetPGClient() + + accessToken, err = EnsureValidAccesstoken(ctx, pool, account.ID, accessToken, refreshToken, p) + if err != nil { + config.LOGGER.Error("failed to validate access token", zap.Error(err)) + } + url := MICROSOFT_GRAPH_API_BASE_URL + "/me/drive/root/children" if parentFolder.ID != "" { diff --git a/api/pkg/tasks/auth.go b/api/pkg/tasks/auth.go index e52d542..d8de36d 100644 --- a/api/pkg/tasks/auth.go +++ b/api/pkg/tasks/auth.go @@ -53,30 +53,30 @@ func HandleAuthTokenRenewalTask(ctx context.Context, t *asynq.Task) error { queries := repository.New(conn) - jobID := t.ResultWriter().TaskID() + taskID := t.ResultWriter().TaskID() retryCount, ok := asynq.GetRetryCount(ctx) if !ok || retryCount == 0 { - err = queries.UpdateJobLogStart(ctx, repository.UpdateJobLogStartParams{ + err = queries.UpdateTaskLogStart(ctx, repository.UpdateTaskLogStartParams{ StartedAt: db.PGTimestamptzField(time.Now()), - JobID: jobID, + TaskID: taskID, }) if err != nil { config.LOGGER.Error( - "failed to insert start log for job", - zap.String("job_id", jobID), + "failed to insert start log for task", + zap.String("task_id", taskID), zap.Error(err), ) } } else { - err = queries.UpdateJobLogRetryCount(ctx, repository.UpdateJobLogRetryCountParams{ + err = queries.UpdateTaskLogRetryCount(ctx, repository.UpdateTaskLogRetryCountParams{ // #nosec G115 -- retryCount is bounded and will never exceed int32 Retries: db.PGInt4Field(int32(retryCount)), - JobID: jobID, + TaskID: taskID, }) if err != nil { - config.LOGGER.Error("failed to insert retry count log for job", zap.String("job_id", jobID), zap.Int("retry_count", retryCount), zap.Error(err)) + config.LOGGER.Error("failed to insert retry count log for task", zap.String("task_id", taskID), zap.Int("retry_count", retryCount), zap.Error(err)) } } @@ -121,14 +121,14 @@ func HandleAuthTokenRenewalTask(ctx context.Context, t *asynq.Task) error { _, expiresIn, err := provider.RenewOAuthTokens(ctx, conn, *accountID, refreshToken) if err != nil { - dbErr := queries.UpdateJobLogFailed(ctx, repository.UpdateJobLogFailedParams{ - Error: db.PGTextField(err.Error()), - JobID: jobID, + dbErr := queries.UpdateTaskLogFailed(ctx, repository.UpdateTaskLogFailedParams{ + Error: db.PGTextField(err.Error()), + TaskID: taskID, }) if dbErr != nil { config.LOGGER.Error( - "failed to insert failed log for job", - zap.String("job_id", jobID), + "failed to insert failed log for task", + zap.String("task_id", taskID), zap.Error(err), ) } @@ -136,14 +136,14 @@ func HandleAuthTokenRenewalTask(ctx context.Context, t *asynq.Task) error { return err } - err = queries.UpdateJobLogFinish(ctx, repository.UpdateJobLogFinishParams{ + err = queries.UpdateTaskLogFinish(ctx, repository.UpdateTaskLogFinishParams{ FinishedAt: db.PGTimestamptzField(time.Now()), - JobID: jobID, + TaskID: taskID, }) if err != nil { config.LOGGER.Error( - "failed to insert finish log for job", - zap.String("job_id", jobID), + "failed to insert finish log for task", + zap.String("task_id", taskID), zap.Error(err), ) } diff --git a/api/pkg/tasks/file.go b/api/pkg/tasks/file.go index e22fa4b..8ad3f9e 100644 --- a/api/pkg/tasks/file.go +++ b/api/pkg/tasks/file.go @@ -52,30 +52,30 @@ func HandleFileSyncTask(ctx context.Context, t *asynq.Task) error { queries := repository.New(conn) - jobID := t.ResultWriter().TaskID() + taskID := t.ResultWriter().TaskID() retryCount, ok := asynq.GetRetryCount(ctx) if !ok || retryCount == 0 { - err = queries.UpdateJobLogStart(ctx, repository.UpdateJobLogStartParams{ + err = queries.UpdateTaskLogStart(ctx, repository.UpdateTaskLogStartParams{ StartedAt: db.PGTimestamptzField(time.Now()), - JobID: jobID, + TaskID: taskID, }) if err != nil { config.LOGGER.Error( - "failed to insert start log for job", - zap.String("job_id", jobID), + "failed to insert start log for task", + zap.String("task_id", taskID), zap.Error(err), ) } } else { - err = queries.UpdateJobLogRetryCount(ctx, repository.UpdateJobLogRetryCountParams{ + err = queries.UpdateTaskLogRetryCount(ctx, repository.UpdateTaskLogRetryCountParams{ // #nosec G115 -- retryCount is bounded and will never exceed int32 Retries: db.PGInt4Field(int32(retryCount)), - JobID: jobID, + TaskID: taskID, }) if err != nil { - config.LOGGER.Error("failed to insert retry count log for job", zap.String("job_id", jobID), zap.Int("retry_count", retryCount), zap.Error(err)) + config.LOGGER.Error("failed to insert retry count log for task", zap.String("task_id", taskID), zap.Int("retry_count", retryCount), zap.Error(err)) } } @@ -109,14 +109,14 @@ func HandleFileSyncTask(ctx context.Context, t *asynq.Task) error { err = provider.SyncFiles(ctx, conn, *accountID, authToken) if err != nil { - dbErr := queries.UpdateJobLogFailed(ctx, repository.UpdateJobLogFailedParams{ - Error: db.PGTextField(err.Error()), - JobID: jobID, + dbErr := queries.UpdateTaskLogFailed(ctx, repository.UpdateTaskLogFailedParams{ + Error: db.PGTextField(err.Error()), + TaskID: taskID, }) if dbErr != nil { config.LOGGER.Error( - "failed to insert failed log for job", - zap.String("job_id", jobID), + "failed to insert failed log for task", + zap.String("task_id", taskID), zap.Error(err), ) } @@ -124,14 +124,14 @@ func HandleFileSyncTask(ctx context.Context, t *asynq.Task) error { return err } - err = queries.UpdateJobLogFinish(ctx, repository.UpdateJobLogFinishParams{ + err = queries.UpdateTaskLogFinish(ctx, repository.UpdateTaskLogFinishParams{ FinishedAt: db.PGTimestamptzField(time.Now()), - JobID: jobID, + TaskID: taskID, }) if err != nil { config.LOGGER.Error( - "failed to insert finish log for job", - zap.String("job_id", jobID), + "failed to insert finish log for task", + zap.String("task_id", taskID), zap.Error(err), ) } diff --git a/api/repository/job_logs.sql.go b/api/repository/job_logs.sql.go deleted file mode 100644 index 0e51b59..0000000 --- a/api/repository/job_logs.sql.go +++ /dev/null @@ -1,105 +0,0 @@ -// Code generated by sqlc. DO NOT EDIT. -// versions: -// sqlc v1.29.0 -// source: job_logs.sql - -package repository - -import ( - "context" - - "github.com/jackc/pgx/v5/pgtype" -) - -const addNewJobLog = `-- name: AddNewJobLog :exec -INSERT INTO job_logs ( - job_id, account_id, type, status, queue, params -) VALUES ( - $1, $2, $3, $4, $5, $6 -) -` - -type AddNewJobLogParams struct { - JobID string `json:"job_id"` - AccountID pgtype.UUID `json:"account_id"` - Type string `json:"type"` - Status JobStatusEnum `json:"status"` - Queue string `json:"queue"` - Params []byte `json:"params"` -} - -func (q *Queries) AddNewJobLog(ctx context.Context, arg AddNewJobLogParams) error { - _, err := q.db.Exec(ctx, addNewJobLog, - arg.JobID, - arg.AccountID, - arg.Type, - arg.Status, - arg.Queue, - arg.Params, - ) - return err -} - -const updateJobLogFailed = `-- name: UpdateJobLogFailed :exec -UPDATE job_logs SET -status = 'failed', error = $1 -WHERE job_id = $2 -` - -type UpdateJobLogFailedParams struct { - Error pgtype.Text `json:"error"` - JobID string `json:"job_id"` -} - -func (q *Queries) UpdateJobLogFailed(ctx context.Context, arg UpdateJobLogFailedParams) error { - _, err := q.db.Exec(ctx, updateJobLogFailed, arg.Error, arg.JobID) - return err -} - -const updateJobLogFinish = `-- name: UpdateJobLogFinish :exec -UPDATE job_logs SET -status = 'succeeded', finished_at = $1 -WHERE job_id = $2 -` - -type UpdateJobLogFinishParams struct { - FinishedAt pgtype.Timestamptz `json:"finished_at"` - JobID string `json:"job_id"` -} - -func (q *Queries) UpdateJobLogFinish(ctx context.Context, arg UpdateJobLogFinishParams) error { - _, err := q.db.Exec(ctx, updateJobLogFinish, arg.FinishedAt, arg.JobID) - return err -} - -const updateJobLogRetryCount = `-- name: UpdateJobLogRetryCount :exec -UPDATE job_logs SET -status = 'retrying', retries = $1 -WHERE job_id = $2 -` - -type UpdateJobLogRetryCountParams struct { - Retries pgtype.Int4 `json:"retries"` - JobID string `json:"job_id"` -} - -func (q *Queries) UpdateJobLogRetryCount(ctx context.Context, arg UpdateJobLogRetryCountParams) error { - _, err := q.db.Exec(ctx, updateJobLogRetryCount, arg.Retries, arg.JobID) - return err -} - -const updateJobLogStart = `-- name: UpdateJobLogStart :exec -UPDATE job_logs SET -status = 'processing', started_at = $1 -WHERE job_id = $2 -` - -type UpdateJobLogStartParams struct { - StartedAt pgtype.Timestamptz `json:"started_at"` - JobID string `json:"job_id"` -} - -func (q *Queries) UpdateJobLogStart(ctx context.Context, arg UpdateJobLogStartParams) error { - _, err := q.db.Exec(ctx, updateJobLogStart, arg.StartedAt, arg.JobID) - return err -} diff --git a/api/repository/linked_account.sql.go b/api/repository/linked_account.sql.go index f7cf18c..08e9602 100644 --- a/api/repository/linked_account.sql.go +++ b/api/repository/linked_account.sql.go @@ -67,6 +67,17 @@ func (q *Queries) GetAccountByProviderID(ctx context.Context, arg GetAccountByPr return id, err } +const getAuthTokenExpiry = `-- name: GetAuthTokenExpiry :one +SELECT expiry FROM linked_account WHERE id = $1 +` + +func (q *Queries) GetAuthTokenExpiry(ctx context.Context, accountID pgtype.UUID) (pgtype.Timestamptz, error) { + row := q.db.QueryRow(ctx, getAuthTokenExpiry, accountID) + var expiry pgtype.Timestamptz + err := row.Scan(&expiry) + return expiry, err +} + const getAuthTokens = `-- name: GetAuthTokens :one SELECT provider, access_token, refresh_token FROM linked_account WHERE user_id = $1 AND id = $2 diff --git a/api/repository/models.go b/api/repository/models.go index 02bdb3c..4d3ef5f 100644 --- a/api/repository/models.go +++ b/api/repository/models.go @@ -11,92 +11,92 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -type JobStatusEnum string +type ProviderEnum string const ( - JobStatusEnumQueued JobStatusEnum = "queued" - JobStatusEnumProcessing JobStatusEnum = "processing" - JobStatusEnumSucceeded JobStatusEnum = "succeeded" - JobStatusEnumFailed JobStatusEnum = "failed" - JobStatusEnumRetrying JobStatusEnum = "retrying" + ProviderEnumGoogle ProviderEnum = "google" + ProviderEnumDropbox ProviderEnum = "dropbox" + ProviderEnumMicrosoft ProviderEnum = "microsoft" ) -func (e *JobStatusEnum) Scan(src interface{}) error { +func (e *ProviderEnum) Scan(src interface{}) error { switch s := src.(type) { case []byte: - *e = JobStatusEnum(s) + *e = ProviderEnum(s) case string: - *e = JobStatusEnum(s) + *e = ProviderEnum(s) default: - return fmt.Errorf("unsupported scan type for JobStatusEnum: %T", src) + return fmt.Errorf("unsupported scan type for ProviderEnum: %T", src) } return nil } -type NullJobStatusEnum struct { - JobStatusEnum JobStatusEnum `json:"job_status_enum"` - Valid bool `json:"valid"` // Valid is true if JobStatusEnum is not NULL +type NullProviderEnum struct { + ProviderEnum ProviderEnum `json:"provider_enum"` + Valid bool `json:"valid"` // Valid is true if ProviderEnum is not NULL } // Scan implements the Scanner interface. -func (ns *NullJobStatusEnum) Scan(value interface{}) error { +func (ns *NullProviderEnum) Scan(value interface{}) error { if value == nil { - ns.JobStatusEnum, ns.Valid = "", false + ns.ProviderEnum, ns.Valid = "", false return nil } ns.Valid = true - return ns.JobStatusEnum.Scan(value) + return ns.ProviderEnum.Scan(value) } // Value implements the driver Valuer interface. -func (ns NullJobStatusEnum) Value() (driver.Value, error) { +func (ns NullProviderEnum) Value() (driver.Value, error) { if !ns.Valid { return nil, nil } - return string(ns.JobStatusEnum), nil + return string(ns.ProviderEnum), nil } -type ProviderEnum string +type TaskStatusEnum string const ( - ProviderEnumGoogle ProviderEnum = "google" - ProviderEnumDropbox ProviderEnum = "dropbox" - ProviderEnumMicrosoft ProviderEnum = "microsoft" + TaskStatusEnumQueued TaskStatusEnum = "queued" + TaskStatusEnumProcessing TaskStatusEnum = "processing" + TaskStatusEnumSucceeded TaskStatusEnum = "succeeded" + TaskStatusEnumFailed TaskStatusEnum = "failed" + TaskStatusEnumRetrying TaskStatusEnum = "retrying" ) -func (e *ProviderEnum) Scan(src interface{}) error { +func (e *TaskStatusEnum) Scan(src interface{}) error { switch s := src.(type) { case []byte: - *e = ProviderEnum(s) + *e = TaskStatusEnum(s) case string: - *e = ProviderEnum(s) + *e = TaskStatusEnum(s) default: - return fmt.Errorf("unsupported scan type for ProviderEnum: %T", src) + return fmt.Errorf("unsupported scan type for TaskStatusEnum: %T", src) } return nil } -type NullProviderEnum struct { - ProviderEnum ProviderEnum `json:"provider_enum"` - Valid bool `json:"valid"` // Valid is true if ProviderEnum is not NULL +type NullTaskStatusEnum struct { + TaskStatusEnum TaskStatusEnum `json:"task_status_enum"` + Valid bool `json:"valid"` // Valid is true if TaskStatusEnum is not NULL } // Scan implements the Scanner interface. -func (ns *NullProviderEnum) Scan(value interface{}) error { +func (ns *NullTaskStatusEnum) Scan(value interface{}) error { if value == nil { - ns.ProviderEnum, ns.Valid = "", false + ns.TaskStatusEnum, ns.Valid = "", false return nil } ns.Valid = true - return ns.ProviderEnum.Scan(value) + return ns.TaskStatusEnum.Scan(value) } // Value implements the driver Valuer interface. -func (ns NullProviderEnum) Value() (driver.Value, error) { +func (ns NullTaskStatusEnum) Value() (driver.Value, error) { if !ns.Valid { return nil, nil } - return string(ns.ProviderEnum), nil + return string(ns.TaskStatusEnum), nil } type Account struct { @@ -121,22 +121,6 @@ type DrizzleDrizzleMigration struct { CreatedAt pgtype.Int8 `json:"created_at"` } -type JobLog struct { - ID pgtype.UUID `json:"id"` - JobID string `json:"job_id"` - AccountID pgtype.UUID `json:"account_id"` - Type string `json:"type"` - Status JobStatusEnum `json:"status"` - Queue string `json:"queue"` - Params []byte `json:"params"` - Error pgtype.Text `json:"error"` - Retries pgtype.Int4 `json:"retries"` - StartedAt pgtype.Timestamptz `json:"started_at"` - FinishedAt pgtype.Timestamptz `json:"finished_at"` - CreatedAt pgtype.Timestamptz `json:"created_at"` - UpdatedAt pgtype.Timestamptz `json:"updated_at"` -} - type Jwk struct { ID string `json:"id"` PublicKey string `json:"public_key"` @@ -197,6 +181,22 @@ type SyncedItem struct { UpdatedAt pgtype.Timestamptz `json:"updated_at"` } +type TaskLog struct { + ID pgtype.UUID `json:"id"` + TaskID string `json:"task_id"` + AccountID pgtype.UUID `json:"account_id"` + Type string `json:"type"` + Status TaskStatusEnum `json:"status"` + Queue string `json:"queue"` + Params []byte `json:"params"` + Error pgtype.Text `json:"error"` + Retries pgtype.Int4 `json:"retries"` + StartedAt pgtype.Timestamptz `json:"started_at"` + FinishedAt pgtype.Timestamptz `json:"finished_at"` + CreatedAt pgtype.Timestamptz `json:"created_at"` + UpdatedAt pgtype.Timestamptz `json:"updated_at"` +} + type User struct { ID string `json:"id"` Name string `json:"name"` diff --git a/api/repository/task_logs.sql.go b/api/repository/task_logs.sql.go new file mode 100644 index 0000000..b80f29e --- /dev/null +++ b/api/repository/task_logs.sql.go @@ -0,0 +1,105 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: task_logs.sql + +package repository + +import ( + "context" + + "github.com/jackc/pgx/v5/pgtype" +) + +const addNewTaskLog = `-- name: AddNewTaskLog :exec +INSERT INTO task_logs ( + task_id, account_id, type, status, queue, params +) VALUES ( + $1, $2, $3, $4, $5, $6 +) +` + +type AddNewTaskLogParams struct { + TaskID string `json:"task_id"` + AccountID pgtype.UUID `json:"account_id"` + Type string `json:"type"` + Status TaskStatusEnum `json:"status"` + Queue string `json:"queue"` + Params []byte `json:"params"` +} + +func (q *Queries) AddNewTaskLog(ctx context.Context, arg AddNewTaskLogParams) error { + _, err := q.db.Exec(ctx, addNewTaskLog, + arg.TaskID, + arg.AccountID, + arg.Type, + arg.Status, + arg.Queue, + arg.Params, + ) + return err +} + +const updateTaskLogFailed = `-- name: UpdateTaskLogFailed :exec +UPDATE task_logs SET +status = 'failed', error = $1 +WHERE task_id = $2 +` + +type UpdateTaskLogFailedParams struct { + Error pgtype.Text `json:"error"` + TaskID string `json:"task_id"` +} + +func (q *Queries) UpdateTaskLogFailed(ctx context.Context, arg UpdateTaskLogFailedParams) error { + _, err := q.db.Exec(ctx, updateTaskLogFailed, arg.Error, arg.TaskID) + return err +} + +const updateTaskLogFinish = `-- name: UpdateTaskLogFinish :exec +UPDATE task_logs SET +status = 'succeeded', finished_at = $1 +WHERE task_id = $2 +` + +type UpdateTaskLogFinishParams struct { + FinishedAt pgtype.Timestamptz `json:"finished_at"` + TaskID string `json:"task_id"` +} + +func (q *Queries) UpdateTaskLogFinish(ctx context.Context, arg UpdateTaskLogFinishParams) error { + _, err := q.db.Exec(ctx, updateTaskLogFinish, arg.FinishedAt, arg.TaskID) + return err +} + +const updateTaskLogRetryCount = `-- name: UpdateTaskLogRetryCount :exec +UPDATE task_logs SET +status = 'retrying', retries = $1 +WHERE task_id = $2 +` + +type UpdateTaskLogRetryCountParams struct { + Retries pgtype.Int4 `json:"retries"` + TaskID string `json:"task_id"` +} + +func (q *Queries) UpdateTaskLogRetryCount(ctx context.Context, arg UpdateTaskLogRetryCountParams) error { + _, err := q.db.Exec(ctx, updateTaskLogRetryCount, arg.Retries, arg.TaskID) + return err +} + +const updateTaskLogStart = `-- name: UpdateTaskLogStart :exec +UPDATE task_logs SET +status = 'processing', started_at = $1 +WHERE task_id = $2 +` + +type UpdateTaskLogStartParams struct { + StartedAt pgtype.Timestamptz `json:"started_at"` + TaskID string `json:"task_id"` +} + +func (q *Queries) UpdateTaskLogStart(ctx context.Context, arg UpdateTaskLogStartParams) error { + _, err := q.db.Exec(ctx, updateTaskLogStart, arg.StartedAt, arg.TaskID) + return err +} diff --git a/api/sqlc/migrations/20250714151705_add_job_logs.sql b/api/sqlc/migrations/20250714151705_add_task_logs.sql similarity index 69% rename from api/sqlc/migrations/20250714151705_add_job_logs.sql rename to api/sqlc/migrations/20250714151705_add_task_logs.sql index bb019c2..3d274f5 100644 --- a/api/sqlc/migrations/20250714151705_add_job_logs.sql +++ b/api/sqlc/migrations/20250714151705_add_task_logs.sql @@ -1,14 +1,14 @@ -- +goose Up -- +goose StatementBegin -CREATE TYPE job_status_enum AS ENUM ('queued', 'processing', 'succeeded', 'failed', 'retrying'); +CREATE TYPE task_status_enum AS ENUM ('queued', 'processing', 'succeeded', 'failed', 'retrying'); -CREATE TABLE IF NOT EXISTS job_logs ( +CREATE TABLE IF NOT EXISTS task_logs ( id UUID NOT NULL DEFAULT gen_random_uuid(), - job_id TEXT NOT NULL, + task_id TEXT NOT NULL, account_id UUID NOT NULL, type TEXT NOT NULL, - status job_status_enum NOT NULL, + status task_status_enum NOT NULL, queue TEXT NOT NULL, params JSONB DEFAULT NULL, @@ -27,7 +27,7 @@ CREATE TABLE IF NOT EXISTS job_logs ( -- +goose Down -- +goose StatementBegin -DROP TABLE IF EXISTS job_logs; +DROP TABLE IF EXISTS task_logs; -DROP TYPE IF EXISTS job_status_enum; +DROP TYPE IF EXISTS task_status_enum; -- +goose StatementEnd diff --git a/api/sqlc/queries/job_logs.sql b/api/sqlc/queries/job_logs.sql deleted file mode 100644 index 066ab5b..0000000 --- a/api/sqlc/queries/job_logs.sql +++ /dev/null @@ -1,26 +0,0 @@ --- name: AddNewJobLog :exec -INSERT INTO job_logs ( - job_id, account_id, type, status, queue, params -) VALUES ( - @job_id, @account_id, @type, @status, @queue, @params -); - --- name: UpdateJobLogStart :exec -UPDATE job_logs SET -status = 'processing', started_at = @started_at -WHERE job_id = @job_id; - --- name: UpdateJobLogRetryCount :exec -UPDATE job_logs SET -status = 'retrying', retries = @retries -WHERE job_id = @job_id; - --- name: UpdateJobLogFinish :exec -UPDATE job_logs SET -status = 'succeeded', finished_at = @finished_at -WHERE job_id = @job_id; - --- name: UpdateJobLogFailed :exec -UPDATE job_logs SET -status = 'failed', error = @error -WHERE job_id = @job_id; \ No newline at end of file diff --git a/api/sqlc/queries/linked_account.sql b/api/sqlc/queries/linked_account.sql index 4d06da1..3034881 100644 --- a/api/sqlc/queries/linked_account.sql +++ b/api/sqlc/queries/linked_account.sql @@ -24,6 +24,9 @@ SELECT id, access_token, refresh_token, provider FROM linked_account WHERE user_ SELECT provider, access_token, refresh_token FROM linked_account WHERE user_id = @user_id AND id = @account_id; +-- name: GetAuthTokenExpiry :one +SELECT expiry FROM linked_account WHERE id = @account_id; + -- name: UpdateLastSyncedTimestamp :exec UPDATE linked_account SET last_synced_at = NOW(), sync_page_token = @sync_page_token WHERE id = @account_id; diff --git a/api/sqlc/queries/task_logs.sql b/api/sqlc/queries/task_logs.sql new file mode 100644 index 0000000..d79140d --- /dev/null +++ b/api/sqlc/queries/task_logs.sql @@ -0,0 +1,26 @@ +-- name: AddNewTaskLog :exec +INSERT INTO task_logs ( + task_id, account_id, type, status, queue, params +) VALUES ( + @task_id, @account_id, @type, @status, @queue, @params +); + +-- name: UpdateTaskLogStart :exec +UPDATE task_logs SET +status = 'processing', started_at = @started_at +WHERE task_id = @task_id; + +-- name: UpdateTaskLogRetryCount :exec +UPDATE task_logs SET +status = 'retrying', retries = @retries +WHERE task_id = @task_id; + +-- name: UpdateTaskLogFinish :exec +UPDATE task_logs SET +status = 'succeeded', finished_at = @finished_at +WHERE task_id = @task_id; + +-- name: UpdateTaskLogFailed :exec +UPDATE task_logs SET +status = 'failed', error = @error +WHERE task_id = @task_id; diff --git a/package.json b/package.json index 8bdb5cc..f9ddf98 100644 --- a/package.json +++ b/package.json @@ -5,5 +5,12 @@ }, "scripts": { "prepare": "husky" + }, + "lint-staged": { + "*": [ + "make format", + "make lint", + "git add ." + ] } }