Skip to content

Communication System

Relevant Source Files

The Communication System in Templar provides a robust data exchange mechanism for distributed training across the network. It manages gradient sharing, checkpoint synchronization, peer coordination, and storage integration to enable effective communication between miners and validators. This page covers the core communication infrastructure, while related topics such as checkpoint specifics and blockchain integration are covered in Checkpoint Management and Chain Integration.

The Communication System is built around the Comms class, which serves as a centralized interface for all network communication operations. It inherits from ChainManager to integrate with the Bittensor blockchain and leverages Cloudflare R2 storage for data persistence.

graph TD
    subgraph "Communication System Components"
        COMMS["Comms Class"]
        CHAIN["ChainManager"]
        R2["R2 Storage Integration"]
        PEER["Peer Management"]
        GRAD["Gradient Exchange"]
        CKPT["Checkpoint Handling"]
    end

    COMMS -->|"inherits from"| CHAIN
    COMMS -->|"manages"| R2
    COMMS -->|"provides"| PEER
    COMMS -->|"enables"| GRAD
    COMMS -->|"handles"| CKPT

    subgraph "Integration Points"
        MINERS["Miners"]
        VALIDATORS["Validators"]
        BLOCKCHAIN["Bittensor Blockchain"]
        STORAGE["Cloudflare R2 Storage"]
    end

    MINERS -->|"uses"| COMMS
    VALIDATORS -->|"uses"| COMMS
    COMMS -->|"interacts with"| BLOCKCHAIN
    COMMS -->|"stores/retrieves data"| STORAGE

Sources: src/tplr/comms.py:64-121 . neurons/miner.py:174-188 . neurons/validator.py:208-220 .

The Comms class is initialized with a wallet, configuration, and references to the network context. It sets up connections to R2 storage buckets and prepares for blockchain interactions.

flowchart LR
    subgraph "Comms Initialization"
        INIT["__init__()"]
        WALLET["Wallet Authentication"]
        CONFIG["Configuration"]
        BUCKETS["Storage Buckets"]
        SESSIONS["S3 Sessions"]
    end

    INIT -->|"requires"| WALLET
    INIT -->|"processes"| CONFIG
    INIT -->|"configures"| BUCKETS
    INIT -->|"establishes"| SESSIONS

    subgraph "Bucket Types"
        GRAD["Gradients Bucket"]
        DATA["Dataset Bucket"]
        AGG["Aggregator Bucket"]
    end

    BUCKETS -->|"creates"| GRAD
    BUCKETS -->|"creates"| DATA
    BUCKETS -->|"creates"| AGG

Sources: src/tplr/comms.py:64-121 . src/tplr/comms.py:174-220 .

The Communication System uses Cloudflare R2 for persistent storage of gradients, model checkpoints, and peer information. It manages connections to different buckets and handles data serialization/deserialization.

Bucket TypePurposeAccess Control
GradientsStores gradient updates from minersRead/Write separated
DatasetContains training datasetsRead-only for most users
AggregatorStores aggregated model statesManaged access

The system provides efficient methods for large file handling through multipart uploads and downloads.

Sources: src/tplr/comms.py:122-169 . src/tplr/comms.py:322-389 . src/tplr/comms.py:476-683 .

sequenceDiagram
    participant Neuron as "Miner/Validator"
    participant Comms as "Comms"
    participant S3 as "R2 Storage"
    participant Chain as "Bittensor Chain"
    
    Neuron->>Comms: put(state_dict, uid, window, key)
    Comms->>Comms: Prepare data
    
    alt local storage
        Comms->>Comms: Store locally
    else remote storage
        Comms->>S3: s3_put_object(key, file_path)
        alt large file
            S3->>S3: upload_large_file()
        end
    end
    
    Comms->>Chain: try_commit(wallet, bucket)
    Comms->>Neuron: Return completion timestamp

Sources: src/tplr/comms.py:322-371 . src/tplr/comms.py:476-573 . neurons/miner.py:417-435 .

sequenceDiagram
    participant Neuron as "Miner/Validator"
    participant Comms as "Comms"
    participant S3 as "R2 Storage"
    
    Neuron->>Comms: get(uid, window, key)
    
    alt local retrieval
        Comms->>Comms: Load from local storage
    else remote retrieval
        Comms->>S3: s3_get_object(key, bucket)
        alt large file
            S3->>S3: download_large_file()
        end
        S3->>Comms: Return data
    end
    
    Comms->>Comms: Process data (deserialize)
    Comms->>Neuron: Return state_dict, global_step

Sources: src/tplr/comms.py:372-474 . src/tplr/comms.py:574-683 .

The gradient gathering process is central to Templar’s distributed training approach. Validators collect gradient updates from multiple miners, normalize them, and apply them to update their models.

flowchart TD
    subgraph "Gather Process"
        GATHER["gather()"]
        FETCH["Fetch peer gradients"]
        NORMALIZE["Normalize gradients"]
        AGGREGATE["Aggregate results"]
        SKIPPED["Track skipped UIDs"]
    end

    GATHER -->|"1. initiates"| FETCH
    FETCH -->|"2. processes"| NORMALIZE
    NORMALIZE -->|"3. combines"| AGGREGATE
    FETCH -->|"4. records failures"| SKIPPED
    AGGREGATE -->|"5. includes"| SKIPPED

    subgraph "Time Management"
        TIME_WINDOW["Time window filtering"]
        TIME_CHECK["Check data timestamp"]
        TOO_EARLY["Reject too early"]
        TOO_LATE["Reject too late"]
    end

    FETCH -->|"applies"| TIME_WINDOW
    TIME_WINDOW -->|"performs"| TIME_CHECK
    TIME_CHECK -->|"may flag"| TOO_EARLY
    TIME_CHECK -->|"may flag"| TOO_LATE

Sources: src/tplr/comms.py:684-1118 . neurons/validator.py:827-846 . neurons/miner.py:489-501 .

The Communication System incorporates sophisticated peer management to ensure efficient and fair participation in the training process.

flowchart LR
    subgraph "Peer Management"
        TRACK["track_active_peers()"]
        UPDATE["update_peers_with_buckets()"]
        ACTIVE["active_peers set"]
        EVAL["eval_peers"]
        INACTIVE["inactive_peers"]
    end

    TRACK -->|"continuously updates"| ACTIVE
    UPDATE -->|"refreshes"| EVAL
    UPDATE -->|"identifies"| INACTIVE

    subgraph "Peer Selection Criteria"
        BUCKETS["Has valid bucket"]
        COMMITMENT["Has chain commitment"]
        RECENT["Recently active"]
        PERFORMANCE["Performance rating"]
    end

    BUCKETS -->|"contributes to"| EVAL
    COMMITMENT -->|"contributes to"| EVAL
    RECENT -->|"contributes to"| EVAL
    PERFORMANCE -->|"affects"| EVAL

Sources: src/tplr/comms.py:1228-1386 . neurons/validator.py:695-704 . neurons/miner.py:477-485 .

The system manages peer lists to coordinate which nodes should communicate with each other during different training windows.

MethodPurpose
post_peer_listPublishes a list of selected peers for a future window
get_peer_listRetrieves the peer list for the current window
update_peers_with_bucketsRefreshes peer information with storage access

Sources: src/tplr/comms.py:1228-1336 . neurons/validator.py:678-686 .

The Communication System provides mechanisms for saving and loading model checkpoints to enable consistent model state across the network.

flowchart TD
    subgraph "Checkpoint Operations"
        SAVE["save_checkpoint()"]
        LOAD["load_checkpoint()"]
        GET_LATEST["get_latest_checkpoint()"]
    end

    SAVE -->|"stores model state"| R2["R2 Storage"]
    LOAD -->|"requests from"| GET_LATEST
    GET_LATEST -->|"retrieves from"| R2

    subgraph "Checkpoint Components"
        MODEL["Model state_dict"]
        OPT["Optimizer state"]
        SCHED["Scheduler state"]
        MOM["Momentum buffer"]
        META["Metadata (window, step)"]
    end

    SAVE -->|"includes"| MODEL
    SAVE -->|"includes"| OPT
    SAVE -->|"includes"| SCHED
    SAVE -->|"includes"| MOM
    SAVE -->|"includes"| META

Sources: src/tplr/comms.py:1489-1566 . src/tplr/comms.py:1567-1677 . neurons/miner.py:727-747 . neurons/validator.py:582-602 .

The Communication System incorporates several mechanisms to ensure reliable operation in a distributed environment:

  1. Retry Logic: Automatic retries for network operations with exponential backoff
  2. Client Reconnection: Purging and recreation of S3 clients when connection issues occur
  3. Stale Data Cleanup: Regular removal of outdated data from both local and remote storage
  4. Timeout Handling: Graceful handling of operations that exceed time limits
  5. Concurrency Control: Semaphores to limit the number of concurrent operations

Sources: src/tplr/comms.py:366-371 . src/tplr/comms.py:484-497 . src/tplr/comms.py:237-258 .

Both miners and validators integrate with the Communication System to participate in the distributed training process.

Miners use the Communication System to:

  • Retrieve model checkpoints to align with the network state
  • Upload their own gradient updates
  • Gather peer gradients to update their models
  • Share debug information and metrics
# Miner initialization of Comms
self.comms = tplr.comms.Comms(
wallet=self.wallet,
save_location="/tmp",
key_prefix="model",
config=self.config,
netuid=self.config.netuid,
metagraph=self.metagraph,
hparams=self.hparams,
uid=self.uid,
)

Sources: neurons/miner.py:174-188 . neurons/miner.py:417-435 . neurons/miner.py:489-501 .

Validators use the Communication System to:

  • Maintain the latest model state via checkpoints
  • Gather gradients from miners for evaluation
  • Post peer lists to coordinate network communication
  • Commit storage information to the blockchain
# Validator gradient gathering
gather_result = await self.comms.gather(
my_uid=self.uid,
uids=self.comms.peers,
window=self.sync_window,
key="gradient",
timeout=35,
device=self.config.device,
local=False,
totalks=self.totalks,
time_min=time_min,
time_max=time_max,
)

Sources: neurons/validator.py:208-220 . neurons/validator.py:678-686 . neurons/validator.py:827-846 .

The Communication System is designed for efficiency in a distributed environment:

  1. Chunked Transfers: Large files are processed in manageable chunks
  2. Resource Optimization: Adaptive resource allocation based on available CPU/GPU
  3. Persistent Connections: S3 clients are reused to avoid connection overhead
  4. Concurrent Operations: Parallel processing of multiple data transfers
  5. Time Window Filtering: Implements temporal boundaries for data relevance

Sources: src/tplr/comms.py:476-573 . src/tplr/comms.py:574-683 . src/tplr/comms.py:122-147 .

The Communication System is configured through:

  1. Environment Variables: R2 credentials and bucket information
  2. Hyperparameters: Network settings from the hparams configuration
  3. Runtime Parameters: Passed during neuron initialization

Key configuration parameters include:

ParameterPurposeSource
active_check_intervalFrequency for checking peer activityhparams.json
recent_windowsNumber of windows to check for activityhparams.json
peer_replacement_frequencyHow often to update peer listshparams.json
time_window_delta_secondsTemporal boundary for data relevancehparams.json

Sources: src/tplr/comms.py:64-121 . hparams.json:38-46 .

The Communication System is a fundamental component of Templar that enables decentralized training by facilitating efficient data exchange between miners and validators. Through integration with Cloudflare R2 storage and the Bittensor blockchain, it provides a reliable infrastructure for gradient sharing, checkpoint synchronization, and peer coordination.