Data Management
This page documents the data management system within Templar, which provides efficient loading, processing, and distribution of datasets and model artifacts across the decentralized training framework. The system is designed to support distributed training over the internet with high-performance access to large datasets stored in cloud object storage.
For information about specific checkpoint management and saving/loading models, see Checkpoint Management.
Overview
Section titled “Overview”The Templar data management system enables miners and validators to efficiently access training data and exchange model artifacts during the decentralized training process. It uses Cloudflare R2 storage as its primary storage backend, with specialized components for loading and processing dataset files in parallel.
flowchart TD subgraph "R2 Storage System" DB["Dataset Bucket"] GB["Gradients Bucket"] AB["Aggregator Bucket"] CB["Checkpoints Bucket"] end subgraph "Training Components" MN["Miners"] VL["Validators"] AG["Aggregation Server"] end subgraph "Data Processing" R2L["R2DatasetLoader"] PQ["Parquet Processing"] TK["Tokenization"] CR["Caching and Retries"] end DB --> R2L R2L --> PQ --> TK CR --> R2L MN <--"Get training data"--> R2L VL <--"Get training data"--> R2L MN --"Store gradients"--> GB VL --"Get gradients"--> GB AG --"Store aggregated models"--> AB MN --"Get aggregated weights"--> AB VL --"Get aggregated weights"--> AB MN --"Store checkpoints"--> CB VL --"Store checkpoints"--> CB
Sources: src/tplr/r2_dataset.py:33-46 , src/tplr/config.py:28-144
Storage Architecture
Section titled “Storage Architecture”The data management system uses Cloudflare R2 as its primary storage backend, with four key buckets for different data types:
- Dataset Bucket: Contains training datasets in Parquet format, organized by configuration
- Gradients Bucket: Used for exchanging gradients between miners and validators
- Aggregator Bucket: Stores aggregated model states
- Checkpoints Bucket: Stores model checkpoints
Access to these buckets is configured through environment variables, with separate read and write credentials for each bucket.
flowchart LR subgraph "Environment Configuration" ENV["Environment Variables"] end subgraph "BUCKET_SECRETS" GRD["gradients: { account_id, name, credentials: {read, write} }"] AGG["aggregator: { account_id, name, credentials: {read, write} }"] DST["dataset: { account_id, name, credentials: {read, write} OR multiple: [configs] }"] end subgraph "R2 Storage System" GRD_BUCKET["Gradients Bucket"] AGG_BUCKET["Aggregator Bucket"] DST_BUCKET["Dataset Bucket"] end ENV --> BUCKET_SECRETS GRD --> GRD_BUCKET AGG --> AGG_BUCKET DST --> DST_BUCKET
Sources: src/tplr/config.py:28-144
R2 Bucket Configuration
Section titled “R2 Bucket Configuration”The system loads bucket configuration from environment variables using the load_bucket_secrets()
function in the config module. This creates a BUCKET_SECRETS
dictionary with the following structure for each bucket:
account_id
: R2 account identifierbucket_name
: Name of the R2 bucketcredentials
: Contains separate read and write credential sets:read
: Read-only access credentialswrite
: Write access credentials
The dataset bucket can be configured with multiple endpoints for load balancing, using a JSON array in the R2_DATASET_BUCKET_LIST
environment variable.
Sources: src/tplr/config.py:28-144
The R2DatasetLoader
Section titled “The R2DatasetLoader”The R2DatasetLoader
class is responsible for efficiently loading training data from R2 storage. It handles dataset metadata caching, parallel data loading, tokenization, and batch creation.
classDiagram class R2DatasetLoader { +DATASET_SUBFOLDER: str +CF_REGION_NAME: str -_fs_cache: dict -_metadata_cache: dict -_token_cache: dict -_round_robin_index: int +PREFETCH_SIZE: int +MAX_CONCURRENT_REQUESTS: int +__init__(batch_size, sequence_length, tokenizer, pack_samples) +static next_pages(offset, n_pages, seed, num_rows_per_page): list +static create(batch_size, sequence_length, pages_info, tokenizer, pack_samples): R2DatasetLoader -static _load_r2_metadata(): tuple -static _get_fs(): S3FileSystem -_get_pad_size(input_ids): int -_refill_padded_buffer(): void -_process_page(page, sem): list +__iter__(): R2DatasetLoader +__next__(): np.array }
Sources: src/tplr/r2_dataset.py:33-595
Dataset Loading Process
Section titled “Dataset Loading Process”The dataset loading process involves several key steps:
- Metadata Loading: The loader first fetches and caches dataset metadata and shard sizes from R2 storage
- Page Selection: Random pages are selected based on a seed for reproducible training
- Parallel Loading: Selected pages are loaded in parallel using asyncio
- Tokenization: Text data is tokenized using the provided tokenizer
- Batch Creation: Tokenized data is organized into batches of the requested size
sequenceDiagram participant Client participant R2DatasetLoader participant R2Storage participant Tokenizer Client->>R2DatasetLoader: next_pages(offset, n_pages, seed) R2DatasetLoader->>R2Storage: load_r2_metadata() R2Storage-->>R2DatasetLoader: metadata, shard_sizes R2DatasetLoader-->>Client: pages_info Client->>R2DatasetLoader: create(batch_size, sequence_length, pages_info, tokenizer) loop For each page R2DatasetLoader->>R2Storage: Open parquet file R2Storage-->>R2DatasetLoader: parquet data R2DatasetLoader->>Tokenizer: tokenize(text) Tokenizer-->>R2DatasetLoader: tokens end R2DatasetLoader-->>Client: loader instance Client->>R2DatasetLoader: iterate batches loop Until exhausted R2DatasetLoader->>R2DatasetLoader: _refill_padded_buffer() R2DatasetLoader-->>Client: batch of tokens end
Sources: src/tplr/r2_dataset.py:180-240 , src/tplr/r2_dataset.py:241-303 , src/tplr/r2_dataset.py:380-518
Load Balancing with Round-Robin
Section titled “Load Balancing with Round-Robin”The system supports distributing dataset access across multiple R2 endpoints using a round-robin approach. This feature is enabled by configuring multiple dataset configurations in the R2_DATASET_BUCKET_LIST
environment variable.
flowchart TD subgraph "Dataset Configuration" MULTI["multiple: [ {account_id: 'accountA', name: 'bucketA', credentials: {...}}, {account_id: 'accountB', name: 'bucketB', credentials: {...}} ]"] end subgraph "R2DatasetLoader" RR["_round_robin_index"] FS["_fs_cache"] GET["_get_fs()"] end subgraph "R2 Endpoints" EP1["Endpoint A"] EP2["Endpoint B"] end MULTI --> RR RR --> GET GET --> FS FS --"First request"--> EP1 FS --"Second request"--> EP2 FS --"Third request"--> EP1
The _get_fs()
method implements the round-robin strategy by:
- Incrementing the
_round_robin_index
counter - Selecting a configuration based on the index modulo the number of configurations
- Caching S3FileSystem instances to avoid repeated instantiation
Sources: src/tplr/r2_dataset.py:333-378 , tests/test_r2_loader.py:543-907
Error Handling and Retry Mechanisms
Section titled “Error Handling and Retry Mechanisms”The R2DatasetLoader
implements robust error handling and retry mechanisms to deal with transient failures when accessing R2 storage. This is essential for reliable operation in a distributed environment.
flowchart LR subgraph "Client Code" REQUEST["Data Request"] SUCCESS["Success"] FAILURE["Failure"] end subgraph "Error Handling" RETRY["retry_call()"] ATTEMPT1["Attempt 1"] ATTEMPT2["Attempt 2"] ATTEMPT3["Attempt 3"] BACKOFF["Exponential Backoff"] end REQUEST --> RETRY RETRY --> ATTEMPT1 ATTEMPT1 --"Success"--> SUCCESS ATTEMPT1 --"Failure"--> BACKOFF BACKOFF --> ATTEMPT2 ATTEMPT2 --"Success"--> SUCCESS ATTEMPT2 --"Failure"--> BACKOFF BACKOFF --> ATTEMPT3 ATTEMPT3 --"Success"--> SUCCESS ATTEMPT3 --"Failure"--> FAILURE
Key error handling features include:
- Exponential backoff between retry attempts
- Separate retry configurations for different operations
- Thread-safe filesystem and connection management
- Caching of successful results to avoid redundant operations
Sources: src/tplr/r2_dataset.py:435-469 , tests/test_r2_loader.py:311-498
Dataset Structure and Format
Section titled “Dataset Structure and Format”The Templar framework uses Parquet files for dataset storage. These files are organized into shards, with metadata describing the contents and structure of each shard.
Parquet File Structure
Section titled “Parquet File Structure”Each dataset configuration contains multiple shards, with each shard containing rows of text data. The system uses metadata files to track:
- Total rows per configuration
- Number of rows per shard
- Shard file paths
- Dataset splits (train/valid/test)
flowchart TD subgraph "R2 Dataset Bucket" META["_metadata.yaml"] SIZES["_shard_sizes.json"] subgraph "Config 1" C1S1["shard_0001.parquet"] C1S2["shard_0002.parquet"] C1S3["shard_0003.parquet"] end subgraph "Config 2" C2S1["shard_0001.parquet"] C2S2["shard_0002.parquet"] end end META --> C1S1 META --> C1S2 META --> C1S3 META --> C2S1 META --> C2S2 SIZES --"Contains row counts"--> C1S1 SIZES --"Contains row counts"--> C1S2 SIZES --"Contains row counts"--> C1S3 SIZES --"Contains row counts"--> C2S1 SIZES --"Contains row counts"--> C2S2
The dataset structure includes:
_metadata.yaml
: Contains configuration information about the dataset_shard_sizes.json
: Maps configurations to shard files with row counts- Parquet files: Contain actual text data in columnar format
Sources: src/tplr/r2_dataset.py:180-240 , src/tplr/r2_dataset.py:270-331
Performance Optimizations
Section titled “Performance Optimizations”The R2DatasetLoader
implements several performance optimizations to efficiently load and process large datasets:
- Metadata Caching: Dataset metadata is cached locally to avoid repeated network requests
- Parallel Loading: Multiple pages are loaded in parallel using asyncio and semaphores
- Connection Pooling: S3FileSystem instances are configured with connection pooling
- Result Caching: Tokenized results are cached to avoid redundant processing
- Prefetching: Data is prefetched in the background while processing current batches
- Thread Safety: Thread locks ensure safe concurrent access to shared resources
Sources: src/tplr/r2_dataset.py:56-88 , src/tplr/r2_dataset.py:333-378 , scripts/benchmarks/benchmark_parquet_loader.py:54-242
Integration with Training Pipeline
Section titled “Integration with Training Pipeline”The data management system integrates with the Templar training pipeline, providing data for miners and validators during the decentralized training process:
flowchart LR subgraph "Miner" MT["Model Training"] ML["Data Loading"] MR2["R2DatasetLoader"] end subgraph "Validator" VT["Evaluation"] VL["Data Loading"] VR2["R2DatasetLoader"] end subgraph "R2 Storage" DB["Dataset Bucket"] GB["Gradients Bucket"] AB["Aggregator Bucket"] end DB --> MR2 MR2 --> ML ML --> MT MT --"Gradients"--> GB DB --> VR2 VR2 --> VL VL --> VT GB --"Miner Gradients"--> VT VT --"Evaluation Results"--> AB
The integration points include:
- Miners use the
R2DatasetLoader
to fetch training data - Validators use the
R2DatasetLoader
to fetch evaluation data - Both components access other buckets for gradient exchange and aggregation
Sources: src/tplr/r2_dataset.py:33-46
Benchmarking and Performance
Section titled “Benchmarking and Performance”The system includes benchmark tools to evaluate the performance of the data loading system under different configurations. Key metrics include:
- Tokens per second processing rate
- Memory usage
- Total processing time
- Batch processing time
These benchmarks help optimize configuration parameters like:
- Number of concurrent requests
- Batch size
- Sequence length
- Buffer sizes
Sources: scripts/benchmarks/benchmark_parquet_loader.py:54-242 , scripts/benchmarks/benchmark_results/parquet_loader_results.csv:1-23
Configuration Parameters
Section titled “Configuration Parameters”The R2DatasetLoader
can be configured with several parameters to optimize performance:
Parameter | Description | Default Value |
---|---|---|
MAX_CONCURRENT_REQUESTS | Maximum parallel requests | 8 |
PREFETCH_SIZE | Number of pages to prefetch | 3 |
READ_BUFFER_SIZE | Buffer size for reading | 4MB |
BATCH_SIZE | Default batch size for tokenization | 128 |
CF_REGION_NAME | Cloudflare region name | ”enam” |
DATASET_SUBFOLDER | Subfolder in bucket containing dataset | ”HuggingFaceFW_fineweb-edu-score-2” |
Sources: src/tplr/r2_dataset.py:75-88
Environment Configuration
Section titled “Environment Configuration”The data management system requires specific environment variables to be set for proper operation:
Environment Variable | Description |
---|---|
R2_GRADIENTS_ACCOUNT_ID | Account ID for gradients bucket |
R2_GRADIENTS_BUCKET_NAME | Bucket name for gradients |
R2_GRADIENTS_READ_ACCESS_KEY_ID | Read access key for gradients |
R2_GRADIENTS_READ_SECRET_ACCESS_KEY | Read secret key for gradients |
R2_GRADIENTS_WRITE_ACCESS_KEY_ID | Write access key for gradients |
R2_GRADIENTS_WRITE_SECRET_ACCESS_KEY | Write secret key for gradients |
R2_AGGREGATOR_ACCOUNT_ID | Account ID for aggregator bucket |
R2_AGGREGATOR_BUCKET_NAME | Bucket name for aggregator |
… | (Similar variables for aggregator read/write) |
R2_DATASET_ACCOUNT_ID | Account ID for dataset bucket |
R2_DATASET_BUCKET_NAME | Bucket name for dataset |
… | (Similar variables for dataset read/write) |
R2_DATASET_BUCKET_LIST | Optional JSON array of multiple dataset configs |
Sources: src/tplr/config.py:28-144 , tests/test_r2_loader.py:22-44