@@ -2,6 +2,7 @@ package node
22
33import (
44 "context"
5+ "github.com/unpackdev/fdb/pkg/types"
56 "sync"
67 "time"
78
@@ -25,24 +26,6 @@ const (
2526// P2PDistributorStateType is the type identifier for P2PDistributor state
2627const P2PDistributorStateType state.StateType = "p2p_distributor"
2728
28- // Priority represents the importance level of a record batch
29- type Priority uint8
30-
31- const (
32- PriorityHigh Priority = iota
33- PriorityNormal
34- PriorityLow
35- )
36-
37- // Target specifies the distribution target type
38- type Target uint8
39-
40- const (
41- TargetAll Target = iota
42- TargetValidators
43- TargetDirectPeer
44- )
45-
4629// DistributionStats tracks performance metrics
4730type DistributionStats struct {
4831 RecordsDistributed int64
@@ -56,8 +39,8 @@ type DistributionStats struct {
5639// RecordBatch represents a batch of records to be distributed
5740type RecordBatch struct {
5841 Records []db.WriteRequest
59- Priority Priority
60- Target Target
42+ Priority types. Priority
43+ Target types. Target
6144 TargetPeer * peer.ID // Only set if Target is TargetDirectPeer
6245}
6346
@@ -439,13 +422,13 @@ func (d *P2PDistributor) distributeBatch(batch *RecordBatch) {
439422 zap .Uint8 ("priority" , uint8 (batch .Priority )))
440423
441424 switch batch .Target {
442- case TargetAll :
425+ case types . TargetAll :
443426 d .logger .Debug ("Distributing to all peers" , zap .Int ("record_count" , recordCount ))
444427 d .distributeToAllPeers (batch )
445- case TargetValidators :
428+ case types . TargetValidators :
446429 d .logger .Debug ("Distributing to validators" , zap .Int ("record_count" , recordCount ))
447430 d .distributeToValidators (batch )
448- case TargetDirectPeer :
431+ case types . TargetDirectPeer :
449432 if batch .TargetPeer != nil {
450433 d .logger .Debug ("Distributing to direct peer" ,
451434 zap .Int ("record_count" , recordCount ),
@@ -458,7 +441,7 @@ func (d *P2PDistributor) distributeBatch(batch *RecordBatch) {
458441}
459442
460443// DistributeRecord adds a record to the appropriate distribution queue
461- func (d * P2PDistributor ) DistributeRecord (key [32 ]byte , value []byte , priority Priority , target Target ) error {
444+ func (d * P2PDistributor ) DistributeRecord (key [32 ]byte , value []byte , priority types. Priority , target types. Target ) error {
462445 // Create a deep copy of the value when a record first enters the system
463446 valueCopy := make ([]byte , len (value ))
464447 copy (valueCopy , value )
@@ -477,11 +460,11 @@ func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority P
477460 // Queue based on priority
478461 var queue chan * RecordBatch
479462 switch priority {
480- case PriorityHigh :
463+ case types . PriorityHigh :
481464 queue = d .highPriorityQueue
482- case PriorityNormal :
465+ case types . PriorityNormal :
483466 queue = d .normalQueue
484- case PriorityLow :
467+ case types . PriorityLow :
485468 queue = d .lowPriorityQueue
486469 }
487470
@@ -508,7 +491,7 @@ func (d *P2PDistributor) DistributeRecord(key [32]byte, value []byte, priority P
508491}
509492
510493// DistributeRecordToPeer sends a record directly to a specific peer
511- func (d * P2PDistributor ) DistributeRecordToPeer (key [32 ]byte , value []byte , peerID peer.ID , priority Priority ) error {
494+ func (d * P2PDistributor ) DistributeRecordToPeer (key [32 ]byte , value []byte , peerID peer.ID , priority types. Priority ) error {
512495 // Create a deep copy of the value when a record first enters the system
513496 valueCopy := make ([]byte , len (value ))
514497 copy (valueCopy , value )
@@ -521,18 +504,18 @@ func (d *P2PDistributor) DistributeRecordToPeer(key [32]byte, value []byte, peer
521504 batch := & RecordBatch {
522505 Records : []db.WriteRequest {record },
523506 Priority : priority ,
524- Target : TargetDirectPeer ,
507+ Target : types . TargetDirectPeer ,
525508 TargetPeer : & peerID ,
526509 }
527510
528511 // Queue based on priority
529512 var queue chan * RecordBatch
530513 switch priority {
531- case PriorityHigh :
514+ case types . PriorityHigh :
532515 queue = d .highPriorityQueue
533- case PriorityNormal :
516+ case types . PriorityNormal :
534517 queue = d .normalQueue
535- case PriorityLow :
518+ case types . PriorityLow :
536519 queue = d .lowPriorityQueue
537520 }
538521
0 commit comments