@@ -325,6 +325,11 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
325325 return fmt .Errorf ("deal: %s: failed to get raw size from SQL or LID" , deal .DealUuid .String ())
326326 }
327327
328+ stx , err := mdb .BeginTx (ctx , & sql.TxOptions {Isolation : sql .LevelSerializable })
329+ if err != nil {
330+ return fmt .Errorf ("failed to begin transaction: %w" , err )
331+ }
332+
328333 _ , err = hdb .BeginTransaction (ctx , func (tx * harmonydb.Tx ) (bool , error ) {
329334 // Add deal to HarmonyDB
330335 if ! a {
@@ -343,7 +348,7 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
343348 }
344349
345350 // Mark deal added to harmonyDB
346- _ , err = mdb .Exec (`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING` , deal .DealUuid .String ())
351+ _ , err = stx .Exec (`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING` , deal .DealUuid .String ())
347352 if err != nil {
348353 return false , fmt .Errorf ("deal: %s: failed to mark deal migrated: %w" , deal .DealUuid .String (), err )
349354 }
@@ -359,7 +364,7 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
359364 }
360365
361366 // Mark deal added to pieceDeal in HarmonyDB
362- _ , err = mdb .Exec (`UPDATE Deals SET LID = TRUE WHERE ID = ?` , deal .DealUuid .String ())
367+ _ , err = stx .Exec (`UPDATE Deals SET LID = TRUE WHERE ID = ?` , deal .DealUuid .String ())
363368 if err != nil {
364369 return false , fmt .Errorf ("deal: %s: failed to mark deal LID migrated: %w" , deal .DealUuid .String (), err )
365370 }
@@ -395,13 +400,26 @@ func migrateBoostDeals(ctx context.Context, activeSectors bitfield.BitField, mad
395400 } else {
396401 llog .Infof ("Skipping indexing as sector %d is not unsealed" , deal .SectorID )
397402 }
403+
404+ // Mark deal added to pipeline in HarmonyDB
405+ _ , err = stx .Exec (`UPDATE Deals SET Pipeline = TRUE WHERE ID = ?` , deal .DealUuid .String ())
406+ if err != nil {
407+ return false , fmt .Errorf ("deal: %s: failed to mark deal Pipeline migrated: %w" , deal .DealUuid .String (), err )
408+ }
398409 }
399410 return true , nil
400411 }, harmonydb .OptionRetry ())
401412 if err != nil {
402- return err
413+ serr := stx .Rollback ()
414+ if serr != nil {
415+ return fmt .Errorf ("failed to commit Haromy changes: %w and failed to rollback transaction: %w" , err , serr )
416+ }
417+ return fmt .Errorf ("failed to commit Haromy changes: %w" , err )
418+ }
419+ err = stx .Commit ()
420+ if err != nil {
421+ return fmt .Errorf ("failed to commit Haromy changes: %w" , err )
403422 }
404-
405423 }
406424
407425 return nil
@@ -592,6 +610,11 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
592610 return fmt .Errorf ("deal: %s: inbound file size is 0" , deal .ID .String ())
593611 }
594612
613+ stx , err := mdb .BeginTx (ctx , & sql.TxOptions {Isolation : sql .LevelSerializable })
614+ if err != nil {
615+ return fmt .Errorf ("failed to begin transaction: %w" , err )
616+ }
617+
595618 _ , err = hdb .BeginTransaction (ctx , func (tx * harmonydb.Tx ) (bool , error ) {
596619 if ! a {
597620 // Add DDO deal to harmonyDB
@@ -607,7 +630,7 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
607630 }
608631
609632 // Mark deal added to harmonyDB
610- _ , err = mdb .Exec (`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING` , deal .ID .String ())
633+ _ , err = stx .Exec (`INSERT INTO Deals (ID, DB, LID) VALUES (?, TRUE, FALSE) ON CONFLICT(ID) DO NOTHING` , deal .ID .String ())
611634 if err != nil {
612635 return false , fmt .Errorf ("deal: %s: failed to mark DDO deal migrated: %w" , deal .ID .String (), err )
613636 }
@@ -622,13 +645,12 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
622645 }
623646
624647 // Mark deal added to pieceDeal in HarmonyDB
625- _ , err = mdb .Exec (`UPDATE Deals SET LID = TRUE WHERE ID = ?` , deal .ID .String ())
648+ _ , err = stx .Exec (`UPDATE Deals SET LID = TRUE WHERE ID = ?` , deal .ID .String ())
626649 if err != nil {
627650 return false , fmt .Errorf ("deal: %s: failed to mark deal LID migrated: %w" , deal .ID .String (), err )
628651 }
629652 }
630653
631- // TODO: Confirm if using the mk12 pipeline will have any impact for DDO deals
632654 if ! c {
633655 // Check if we can index and announce i.e. we have unsealed copy
634656 var exists bool
@@ -658,11 +680,25 @@ func migrateDDODeals(ctx context.Context, full v1api.FullNode, activeSectors bit
658680 } else {
659681 llog .Infof ("Skipping indexing as sector %d is not unsealed" , deal .SectorID )
660682 }
683+
684+ // Mark deal added to pipeline in HarmonyDB
685+ _ , err = stx .Exec (`UPDATE Deals SET Pipeline = TRUE WHERE ID = ?` , deal .ID .String ())
686+ if err != nil {
687+ return false , fmt .Errorf ("deal: %s: failed to mark deal Pipeline migrated: %w" , deal .ID .String (), err )
688+ }
661689 }
662690 return true , nil
663691 }, harmonydb .OptionRetry ())
664692 if err != nil {
665- return err
693+ serr := stx .Rollback ()
694+ if serr != nil {
695+ return fmt .Errorf ("failed to commit Haromy changes: %w and failed to rollback transaction: %w" , err , serr )
696+ }
697+ return fmt .Errorf ("failed to commit Haromy changes: %w" , err )
698+ }
699+ err = stx .Commit ()
700+ if err != nil {
701+ return fmt .Errorf ("failed to commit Haromy changes: %w" , err )
666702 }
667703 }
668704
0 commit comments