Skip to content

Data Management

Relevant Source Files

This page documents the data management system within Templar, which provides efficient loading, processing, and distribution of datasets and model artifacts across the decentralized training framework. The system is designed to support distributed training over the internet with high-performance access to large datasets stored in cloud object storage.

For information about specific checkpoint management and saving/loading models, see Checkpoint Management.

The Templar data management system enables miners and validators to efficiently access training data and exchange model artifacts during the decentralized training process. It uses Cloudflare R2 storage as its primary storage backend, with specialized components for loading and processing dataset files in parallel.

flowchart TD
    subgraph "R2 Storage System"
        DB["Dataset Bucket"]
        GB["Gradients Bucket"]
        AB["Aggregator Bucket"]
        CB["Checkpoints Bucket"]
    end

    subgraph "Training Components"
        MN["Miners"]
        VL["Validators"]
        AG["Aggregation Server"]
    end

    subgraph "Data Processing"
        R2L["R2DatasetLoader"]
        PQ["Parquet Processing"]
        TK["Tokenization"]
        CR["Caching and Retries"]
    end

    DB --> R2L
    R2L --> PQ --> TK
    CR --> R2L
    
    MN <--"Get training data"--> R2L
    VL <--"Get training data"--> R2L
    
    MN --"Store gradients"--> GB
    VL --"Get gradients"--> GB
    AG --"Store aggregated models"--> AB
    MN --"Get aggregated weights"--> AB
    VL --"Get aggregated weights"--> AB
    MN --"Store checkpoints"--> CB
    VL --"Store checkpoints"--> CB

Sources: src/tplr/r2_dataset.py:33-46 , src/tplr/config.py:28-144

The data management system uses Cloudflare R2 as its primary storage backend, with four key buckets for different data types:

  1. Dataset Bucket: Contains training datasets in Parquet format, organized by configuration
  2. Gradients Bucket: Used for exchanging gradients between miners and validators
  3. Aggregator Bucket: Stores aggregated model states
  4. Checkpoints Bucket: Stores model checkpoints

Access to these buckets is configured through environment variables, with separate read and write credentials for each bucket.

flowchart LR
    subgraph "Environment Configuration"
        ENV["Environment Variables"]
    end

    subgraph "BUCKET_SECRETS"
        GRD["gradients: {
            account_id,
            name,
            credentials: {read, write}
        }"]
        
        AGG["aggregator: {
            account_id,
            name,
            credentials: {read, write}
        }"]
        
        DST["dataset: {
            account_id,
            name,
            credentials: {read, write}
            OR 
            multiple: [configs]
        }"]
    end

    subgraph "R2 Storage System"
        GRD_BUCKET["Gradients Bucket"]
        AGG_BUCKET["Aggregator Bucket"]
        DST_BUCKET["Dataset Bucket"]
    end

    ENV --> BUCKET_SECRETS
    GRD --> GRD_BUCKET
    AGG --> AGG_BUCKET
    DST --> DST_BUCKET

Sources: src/tplr/config.py:28-144

The system loads bucket configuration from environment variables using the load_bucket_secrets() function in the config module. This creates a BUCKET_SECRETS dictionary with the following structure for each bucket:

  • account_id: R2 account identifier
  • bucket_name: Name of the R2 bucket
  • credentials: Contains separate read and write credential sets:
    • read: Read-only access credentials
    • write: Write access credentials

The dataset bucket can be configured with multiple endpoints for load balancing, using a JSON array in the R2_DATASET_BUCKET_LIST environment variable.

Sources: src/tplr/config.py:28-144

The R2DatasetLoader class is responsible for efficiently loading training data from R2 storage. It handles dataset metadata caching, parallel data loading, tokenization, and batch creation.

classDiagram
    class R2DatasetLoader {
        +DATASET_SUBFOLDER: str
        +CF_REGION_NAME: str
        -_fs_cache: dict
        -_metadata_cache: dict
        -_token_cache: dict
        -_round_robin_index: int
        +PREFETCH_SIZE: int
        +MAX_CONCURRENT_REQUESTS: int
        +__init__(batch_size, sequence_length, tokenizer, pack_samples)
        +static next_pages(offset, n_pages, seed, num_rows_per_page): list
        +static create(batch_size, sequence_length, pages_info, tokenizer, pack_samples): R2DatasetLoader
        -static _load_r2_metadata(): tuple
        -static _get_fs(): S3FileSystem
        -_get_pad_size(input_ids): int
        -_refill_padded_buffer(): void
        -_process_page(page, sem): list
        +__iter__(): R2DatasetLoader
        +__next__(): np.array
    }

Sources: src/tplr/r2_dataset.py:33-595

The dataset loading process involves several key steps:

  1. Metadata Loading: The loader first fetches and caches dataset metadata and shard sizes from R2 storage
  2. Page Selection: Random pages are selected based on a seed for reproducible training
  3. Parallel Loading: Selected pages are loaded in parallel using asyncio
  4. Tokenization: Text data is tokenized using the provided tokenizer
  5. Batch Creation: Tokenized data is organized into batches of the requested size
sequenceDiagram
    participant Client
    participant R2DatasetLoader
    participant R2Storage
    participant Tokenizer

    Client->>R2DatasetLoader: next_pages(offset, n_pages, seed)
    R2DatasetLoader->>R2Storage: load_r2_metadata()
    R2Storage-->>R2DatasetLoader: metadata, shard_sizes
    R2DatasetLoader-->>Client: pages_info

    Client->>R2DatasetLoader: create(batch_size, sequence_length, pages_info, tokenizer)
    loop For each page
        R2DatasetLoader->>R2Storage: Open parquet file
        R2Storage-->>R2DatasetLoader: parquet data
        R2DatasetLoader->>Tokenizer: tokenize(text)
        Tokenizer-->>R2DatasetLoader: tokens
    end
    R2DatasetLoader-->>Client: loader instance

    Client->>R2DatasetLoader: iterate batches
    loop Until exhausted
        R2DatasetLoader->>R2DatasetLoader: _refill_padded_buffer()
        R2DatasetLoader-->>Client: batch of tokens
    end

Sources: src/tplr/r2_dataset.py:180-240 , src/tplr/r2_dataset.py:241-303 , src/tplr/r2_dataset.py:380-518

The system supports distributing dataset access across multiple R2 endpoints using a round-robin approach. This feature is enabled by configuring multiple dataset configurations in the R2_DATASET_BUCKET_LIST environment variable.

flowchart TD
    subgraph "Dataset Configuration"
        MULTI["multiple: [
            {account_id: 'accountA', name: 'bucketA', credentials: {...}},
            {account_id: 'accountB', name: 'bucketB', credentials: {...}}
        ]"]
    end

    subgraph "R2DatasetLoader"
        RR["_round_robin_index"]
        FS["_fs_cache"]
        GET["_get_fs()"]
    end

    subgraph "R2 Endpoints"
        EP1["Endpoint A"]
        EP2["Endpoint B"]
    end

    MULTI --> RR
    RR --> GET
    GET --> FS
    FS --"First request"--> EP1
    FS --"Second request"--> EP2
    FS --"Third request"--> EP1

The _get_fs() method implements the round-robin strategy by:

  1. Incrementing the _round_robin_index counter
  2. Selecting a configuration based on the index modulo the number of configurations
  3. Caching S3FileSystem instances to avoid repeated instantiation

Sources: src/tplr/r2_dataset.py:333-378 , tests/test_r2_loader.py:543-907

The R2DatasetLoader implements robust error handling and retry mechanisms to deal with transient failures when accessing R2 storage. This is essential for reliable operation in a distributed environment.

flowchart LR
    subgraph "Client Code"
        REQUEST["Data Request"]
        SUCCESS["Success"]
        FAILURE["Failure"]
    end

    subgraph "Error Handling"
        RETRY["retry_call()"]
        ATTEMPT1["Attempt 1"]
        ATTEMPT2["Attempt 2"]
        ATTEMPT3["Attempt 3"]
        BACKOFF["Exponential Backoff"]
    end

    REQUEST --> RETRY
    RETRY --> ATTEMPT1
    ATTEMPT1 --"Success"--> SUCCESS
    ATTEMPT1 --"Failure"--> BACKOFF
    BACKOFF --> ATTEMPT2
    ATTEMPT2 --"Success"--> SUCCESS
    ATTEMPT2 --"Failure"--> BACKOFF
    BACKOFF --> ATTEMPT3
    ATTEMPT3 --"Success"--> SUCCESS
    ATTEMPT3 --"Failure"--> FAILURE

Key error handling features include:

  • Exponential backoff between retry attempts
  • Separate retry configurations for different operations
  • Thread-safe filesystem and connection management
  • Caching of successful results to avoid redundant operations

Sources: src/tplr/r2_dataset.py:435-469 , tests/test_r2_loader.py:311-498

The Templar framework uses Parquet files for dataset storage. These files are organized into shards, with metadata describing the contents and structure of each shard.

Each dataset configuration contains multiple shards, with each shard containing rows of text data. The system uses metadata files to track:

  • Total rows per configuration
  • Number of rows per shard
  • Shard file paths
  • Dataset splits (train/valid/test)
flowchart TD
    subgraph "R2 Dataset Bucket"
        META["_metadata.yaml"]
        SIZES["_shard_sizes.json"]
        
        subgraph "Config 1"
            C1S1["shard_0001.parquet"]
            C1S2["shard_0002.parquet"]
            C1S3["shard_0003.parquet"]
        end
        
        subgraph "Config 2"
            C2S1["shard_0001.parquet"]
            C2S2["shard_0002.parquet"]
        end
    end
    
    META --> C1S1
    META --> C1S2
    META --> C1S3
    META --> C2S1
    META --> C2S2
    
    SIZES --"Contains row counts"--> C1S1
    SIZES --"Contains row counts"--> C1S2
    SIZES --"Contains row counts"--> C1S3
    SIZES --"Contains row counts"--> C2S1
    SIZES --"Contains row counts"--> C2S2

The dataset structure includes:

  • _metadata.yaml: Contains configuration information about the dataset
  • _shard_sizes.json: Maps configurations to shard files with row counts
  • Parquet files: Contain actual text data in columnar format

Sources: src/tplr/r2_dataset.py:180-240 , src/tplr/r2_dataset.py:270-331

The R2DatasetLoader implements several performance optimizations to efficiently load and process large datasets:

  1. Metadata Caching: Dataset metadata is cached locally to avoid repeated network requests
  2. Parallel Loading: Multiple pages are loaded in parallel using asyncio and semaphores
  3. Connection Pooling: S3FileSystem instances are configured with connection pooling
  4. Result Caching: Tokenized results are cached to avoid redundant processing
  5. Prefetching: Data is prefetched in the background while processing current batches
  6. Thread Safety: Thread locks ensure safe concurrent access to shared resources

Sources: src/tplr/r2_dataset.py:56-88 , src/tplr/r2_dataset.py:333-378 , scripts/benchmarks/benchmark_parquet_loader.py:54-242

The data management system integrates with the Templar training pipeline, providing data for miners and validators during the decentralized training process:

flowchart LR
    subgraph "Miner"
        MT["Model Training"]
        ML["Data Loading"]
        MR2["R2DatasetLoader"]
    end

    subgraph "Validator"
        VT["Evaluation"]
        VL["Data Loading"]
        VR2["R2DatasetLoader"]
    end

    subgraph "R2 Storage"
        DB["Dataset Bucket"]
        GB["Gradients Bucket"]
        AB["Aggregator Bucket"]
    end

    DB --> MR2
    MR2 --> ML
    ML --> MT
    MT --"Gradients"--> GB
    
    DB --> VR2
    VR2 --> VL
    VL --> VT
    GB --"Miner Gradients"--> VT
    VT --"Evaluation Results"--> AB

The integration points include:

  • Miners use the R2DatasetLoader to fetch training data
  • Validators use the R2DatasetLoader to fetch evaluation data
  • Both components access other buckets for gradient exchange and aggregation

Sources: src/tplr/r2_dataset.py:33-46

The system includes benchmark tools to evaluate the performance of the data loading system under different configurations. Key metrics include:

  • Tokens per second processing rate
  • Memory usage
  • Total processing time
  • Batch processing time

These benchmarks help optimize configuration parameters like:

  • Number of concurrent requests
  • Batch size
  • Sequence length
  • Buffer sizes

Sources: scripts/benchmarks/benchmark_parquet_loader.py:54-242 , scripts/benchmarks/benchmark_results/parquet_loader_results.csv:1-23

The R2DatasetLoader can be configured with several parameters to optimize performance:

ParameterDescriptionDefault Value
MAX_CONCURRENT_REQUESTSMaximum parallel requests8
PREFETCH_SIZENumber of pages to prefetch3
READ_BUFFER_SIZEBuffer size for reading4MB
BATCH_SIZEDefault batch size for tokenization128
CF_REGION_NAMECloudflare region name”enam”
DATASET_SUBFOLDERSubfolder in bucket containing dataset”HuggingFaceFW_fineweb-edu-score-2”

Sources: src/tplr/r2_dataset.py:75-88

The data management system requires specific environment variables to be set for proper operation:

Environment VariableDescription
R2_GRADIENTS_ACCOUNT_IDAccount ID for gradients bucket
R2_GRADIENTS_BUCKET_NAMEBucket name for gradients
R2_GRADIENTS_READ_ACCESS_KEY_IDRead access key for gradients
R2_GRADIENTS_READ_SECRET_ACCESS_KEYRead secret key for gradients
R2_GRADIENTS_WRITE_ACCESS_KEY_IDWrite access key for gradients
R2_GRADIENTS_WRITE_SECRET_ACCESS_KEYWrite secret key for gradients
R2_AGGREGATOR_ACCOUNT_IDAccount ID for aggregator bucket
R2_AGGREGATOR_BUCKET_NAMEBucket name for aggregator
(Similar variables for aggregator read/write)
R2_DATASET_ACCOUNT_IDAccount ID for dataset bucket
R2_DATASET_BUCKET_NAMEBucket name for dataset
(Similar variables for dataset read/write)
R2_DATASET_BUCKET_LISTOptional JSON array of multiple dataset configs

Sources: src/tplr/config.py:28-144 , tests/test_r2_loader.py:22-44