Skip to content

Aggregation Server

Relevant Source Files

The Aggregation Server is a critical component in the Templar distributed training framework that collects gradients from miners, aggregates them, and makes them available to all network participants. This central coordination allows the system to maintain consistent model state across decentralized nodes, facilitating efficient convergence during distributed training.

For information about gradient processing by miners, see Gradient Processing. For details on how validators use aggregated gradients, see Weight Setting.

The Aggregation Server serves as a central aggregation point in Templar’s architecture, bridging the operations of miners and validators while ensuring model consistency.

graph TD
    subgraph "Templar Network Components"
        M["Miners"]
        V["Validators"]
        AS["AggregationServer"]
        BN["Bittensor Network"]
    end
    
    subgraph "Storage Infrastructure"
        R2["Cloudflare R2 Storage"]
        subgraph "R2 Buckets"
            AG["Aggregator Bucket"]
            GR["Gradients Bucket"]
            DB["Dataset Bucket"]
        end
    end
    
    M -- "Submit gradients" --> GR
    AS -- "Gather gradients" --> GR
    AS -- "Store aggregated state" --> AG
    V -- "Fetch aggregated state" --> AG
    M -- "Fetch aggregated state" --> AG
    
    BN -- "Sync metagraph data" --> AS
    BN -- "Window signals" --> AS
    BN -- "Block timing" --> AS

Sources: neurons/aggregator.py:40-155 . README.md:52-56 .

The Aggregation Server implementation consists of several key components that work together to process gradients in a synchronized manner.

classDiagram
    class AggregationServer {
        +model: LlamaForCausalLM
        +transformer: TransformDCT
        +compressor: CompressDCT
        +comms: Comms
        +current_block: int
        +current_window: int
        +metagraph: Metagraph
        +metrics_logger: MetricsLogger
        +get_current_window()
        +process_window()
        +run()
        +block_listener()
    }
    
    class Comms {
        +bucket: Bucket
        +gather()
        +put()
        +get_peer_list()
    }
    
    class TransformDCT {
        +encode()
        +decode()
    }
    
    class CompressDCT {
        +compress()
        +decompress()
        +batch_decompress()
    }
    
    AggregationServer --> TransformDCT: "uses for gradient transformation"
    AggregationServer --> CompressDCT: "uses for gradient compression"
    AggregationServer --> Comms: "uses for data exchange"

Sources: neurons/aggregator.py:40-155 . src/tplr/compress.py:35-124 .

The Aggregation Server initializes a lightweight model instance for gradient processing, sets up communication channels, and configures compression parameters.

The server uses a standard argument parser with additional Bittensor-specific arguments:

graph TD
    AC["agg_config()"] --> AP["ArgumentParser"]
    AP --> |"Add arguments"| NA["Network arguments<br>--netuid, --device, etc."]
    AP --> |"Add arguments"| BA["Bittensor arguments<br>subtensor, wallet, logging"]
    AP --> BC["bt.config(parser)"]
    BC --> CF["config object"]

Sources: neurons/aggregator.py:41-69 .

During initialization, the Aggregation Server:

  1. Creates a model instance for processing gradients (using LlamaForCausalLM)
  2. Initializes compression tools (TransformDCT and CompressDCT)
  3. Configures communication channels (Comms)
  4. Sets up telemetry (WandB and InfluxDB)
  5. Establishes blockchain connectivity

Sources: neurons/aggregator.py:72-155 .

The server operates in synchronized windows aligned with Bittensor’s block timing system. The process follows these key steps:

sequenceDiagram
    participant AS as AggregationServer
    participant MN as Miners
    participant R2 as R2 Storage
    participant BT as Bittensor
    
    Note over AS: run() method starts
    
    loop For each window
        BT->>AS: Block signals via block_listener
        AS->>AS: Update current_window
        
        Note over AS: process_window() begins
        AS->>BT: Query timestamp for window
        BT->>AS: Return timestamp
        AS->>AS: Calculate time bounds for valid gradients
        
        AS->>AS: Update peers via update_peers()
        
        MN->>R2: Upload gradients during window
        
        AS->>R2: gather() gradients from selected peers
        Note over AS: Get gradients within time bounds
        
        opt For each valid gradient
            AS->>AS: batch_decompress() gradients
            AS->>AS: Transform via TransformDCT
            AS->>AS: Pack into binary tensor
        end
        
        AS->>R2: Store aggregated state in aggregator bucket
        AS->>AS: Log metrics to WandB/InfluxDB
        
        Note over AS: Wait for next window
    end

Sources: neurons/aggregator.py:162-424 . src/tplr/neurons.py:127-197 .

The server employs a sophisticated gradient processing pipeline that uses DCT (Discrete Cosine Transform) for efficient compression and aggregation.

The process_window method gathers gradients from miners using time-bounded collection:

  1. Determines the current window and time bounds for valid gradients
  2. Selects peers for gradient collection based on network parameters
  3. Uses the comms.gather() method to collect gradients from the selected peers

Sources: neurons/aggregator.py:209-291 .

Once gradients are gathered, they are processed as follows:

graph TD
    G["gather() function<br>collects miner gradients"] --> P["process_start<br>Process gathered gradients"]
    P --> |"For each parameter"| I["Extract idxs/vals<br>from gather_result"]
    I --> D["batch_decompress()<br>Reconstruct gradients"]
    D --> T["transformer.decode()<br>Inverse DCT transform"]
    T --> S["sign() method<br>Convert to binary representation"]
    S --> B["pack_binary_tensor()<br>Pack as compact binary"]
    B --> ST["Store in processed_state_dict"]
    ST --> R2["Put in R2 aggregator bucket<br>with window metadata"]

Sources: neurons/aggregator.py:292-372 . src/tplr/neurons.py:478-516 .

Miners and validators synchronize with the aggregation server through the catchup_with_aggregation_server() function in the tplr.neurons module. This ensures all nodes converge to a consistent model state.

graph TD
    CS["catchup_with_aggregation_server()"] --> CW["Calculate windows to catch up<br>checkpoint_window → target_window"]
    CW --> |"For each window"| LA["load_aggregation() from server"]
    LA --> PD["process_loaded_data()<br>Unpack compressed tensors"]
    PD --> |"For each parameter"| AT["Apply gradients to model parameters"]
    AT --> OS["optimizer.step()<br>scheduler.step()"]
    OS --> DD["Compare with debug_dict<br>to verify synchronization"]
    DD --> GS["Update global_step"]

Sources: src/tplr/neurons.py:199-368 .

When nodes fetch data from the aggregation server, they:

  1. Load the aggregated state for a specific window
  2. Process and unpack the binary tensor representation
  3. Apply the gradients to their local model parameters
  4. Verify synchronization through model comparison

Sources: src/tplr/neurons.py:371-477 .

Block Listening and Window Synchronization

Section titled “Block Listening and Window Synchronization”

The Aggregation Server stays synchronized with the blockchain through a block listener thread that monitors new blocks and updates the current window.

graph TD
    BL["block_listener() thread"] --> SB["Subscribe to block headers"]
    SB --> HF["handler() function<br>processes block events"]
    HF --> UB["Update current_block"]
    UB --> CW["Calculate new_window from block"]
    CW --> |"If window changed"| UW["Update current_window"]
    UW --> NC["Notify comms system<br>comms.current_window = current_window"]

Sources: neurons/aggregator.py:489-527 .

The server implements several error handling mechanisms to ensure continuous operation:

  1. Retry logic for blockchain connections with exponential backoff
  2. Graceful handling of missing or invalid gradients
  3. Exception catching in the main processing loop
  4. Fallback time window calculation if blockchain timestamps are unavailable

Sources: neurons/aggregator.py:425-486 .

The Aggregation Server is designed to run as a standalone service and can be started using:

Terminal window
python neurons/aggregator.py --netuid <netuid> --device <device>

The server uses uvloop for improved performance and runs an asynchronous event loop to process windows continuously.

Sources: neurons/aggregator.py:529-532 .

The Aggregation Server integrates closely with both miners and validators in the Templar system:

Miners:

  1. Submit their gradients to the gradients bucket
  2. Periodically synchronize with the Aggregation Server to get the latest model state
  3. Apply aggregated gradients to their local model during catchup periods

Sources: docs/miner.md:446-461 .

Validators:

  1. Evaluate miner contributions based on the current model state
  2. Synchronize with the Aggregation Server to maintain a consistent reference model
  3. Use the aggregated state to ensure fair evaluation of miner gradients

Sources: docs/validator.md:387-398 .

The Aggregation Server provides comprehensive monitoring and telemetry:

  1. Weights & Biases: Logs aggregation metrics, success rates, and timing information
  2. InfluxDB: Detailed performance metrics with tagging by window and iteration
  3. Loki Logging: Structured logging for operational events and error tracing

Key metrics tracked include:

  • Aggregation success rate
  • Number of peers selected and successfully aggregated
  • Processing time for gathering, processing, and storing
  • Skipped UIDs and error counts

Sources: neurons/aggregator.py:374-414 . telemetry/simulator/loki-test.py:18-45 .

The Aggregation Server is a critical component in the Templar framework that enables efficient distributed training by providing a consistent aggregation mechanism for gradients across the network. By centralizing the aggregation process while maintaining the decentralized nature of the training system, it helps achieve convergence in model training while reducing communication overhead through its compression techniques.