Distributed Execution
MIND provides first-class support for distributed training and inference, allowing you to scale models across multiple nodes with automatic sharding, gradient synchronization, and fault tolerance.
Production Ready: Distributed execution is complete with TCP transport, NCCL/Gloo backends, RingAllReduce, pipeline parallelism, and fault tolerance. See the Roadmap for full status.
Overview
MIND's distributed execution framework supports three parallelism strategies:
- Data Parallelism: Replicate the model across devices and split data batches
- Model Parallelism: Split large models across devices when they don't fit in memory
- Pipeline Parallelism: Split model layers across devices for improved throughput
Getting Started
Enable distributed execution with the @distributed annotation:
use mind::distributed::{init, world_size, rank};
@distributed(strategy = "data_parallel")
fn train_step(model: &mut Model, batch: Tensor<f32, [B, 784]>) -> f32 {
let output = model.forward(batch);
let loss = cross_entropy(output, labels);
// Gradients are automatically synchronized across all nodes
loss.backward();
optimizer.step();
loss.item()
}
fn main() {
// Initialize distributed runtime
init();
println!("Running on node {} of {}", rank(), world_size());
for epoch in 0..100 {
let loss = train_step(&mut model, batch);
if rank() == 0 {
println!("Epoch {}: loss = {:.4}", epoch, loss);
}
}
}Data Parallelism
Data parallelism replicates the model on each device and splits input batches. Gradients are averaged across all replicas using all-reduce operations.
use mind::distributed::{DataParallel, AllReduce};
// Wrap model for data parallel training
let model = DataParallel::new(model, devices);
// Training loop - batches are automatically distributed
for batch in dataloader.iter() {
let loss = model.forward(batch);
loss.backward();
// Gradients synchronized via NCCL/Gloo
optimizer.step();
}Gradient Synchronization
MIND supports multiple collective communication backends:
| Backend | Devices | Notes |
|---|---|---|
| NCCL | NVIDIA GPU | Recommended for multi-GPU training |
| Gloo | CPU, GPU | Cross-platform, supports TCP/IP |
| MPI | CPU, GPU | HPC clusters with InfiniBand |
Model Parallelism
For models that don't fit on a single device, use model parallelism to split layers across devices:
use mind::distributed::{ModelParallel, DeviceMap};
// Define how layers map to devices
let device_map = DeviceMap::new()
.layer("embed", Device::cuda(0))
.layers("transformer.0..12", Device::cuda(0))
.layers("transformer.12..24", Device::cuda(1))
.layer("head", Device::cuda(1));
// Create model parallel wrapper
let model = ModelParallel::new(model, device_map);
// Forward pass automatically moves tensors between devices
let output = model.forward(input);Pipeline Parallelism
Pipeline parallelism enables higher throughput by overlapping computation across micro-batches:
use mind::distributed::{Pipeline, Schedule};
// Configure pipeline with micro-batching
let pipeline = Pipeline::new(model)
.stages(4) // Split into 4 pipeline stages
.micro_batches(8) // 8 micro-batches per batch
.schedule(Schedule::GPipe); // Or Schedule::PipeDream
// Training with pipeline parallelism
for batch in dataloader.iter() {
let loss = pipeline.forward_backward(batch);
optimizer.step();
}Multi-Node Training
Scale training across multiple machines using the MIND distributed launcher:
# Launch on 4 nodes with 8 GPUs each
mind launch --nodes 4 --gpus-per-node 8 \
--master-addr 192.168.1.1 \
--master-port 29500 \
train.mind
# Or use a hostfile
mind launch --hostfile hosts.txt --gpus-per-node 8 train.mindFault Tolerance
MIND supports elastic training with automatic checkpointing and recovery:
use mind::distributed::{Elastic, Checkpoint};
// Enable elastic training with checkpointing
let trainer = Elastic::new(model)
.min_nodes(2)
.max_nodes(8)
.checkpoint_dir("checkpoints/")
.checkpoint_interval(1000); // steps
// Training automatically resumes on node failure
trainer.fit(dataloader, epochs);Complete Example: Distributed MNIST Training
Here's a complete example training a simple neural network on MNIST across multiple GPUs:
// distributed_mnist.mind
use mind::nn::{Linear, relu, softmax, cross_entropy};
use mind::optim::{Adam, Optimizer};
use mind::data::{Dataset, DataLoader};
use mind::distributed::{init, rank, world_size, DataParallel, all_reduce};
struct MLP {
fc1: Linear<784, 256>,
fc2: Linear<256, 128>,
fc3: Linear<128, 10>,
}
impl MLP {
fn forward(&self, x: Tensor<f32, [B, 784]>) -> Tensor<f32, [B, 10]> {
let h1 = relu(self.fc1.forward(x));
let h2 = relu(self.fc2.forward(h1));
softmax(self.fc3.forward(h2), axis: -1)
}
}
fn main() {
// Initialize distributed runtime (auto-detects backend)
init();
let local_rank = rank();
let n_workers = world_size();
if local_rank == 0 {
println!("Training with {} workers", n_workers);
}
// Create model and wrap for data parallelism
let model = MLP::new();
let model = DataParallel::new(model);
// Optimizer with scaled learning rate
let lr = 0.001 * (n_workers as f32).sqrt();
let optimizer = Adam::new(model.parameters(), lr: lr);
// Each worker gets a shard of the dataset
let dataset = Dataset::mnist("data/", train: true);
let loader = DataLoader::new(dataset)
.batch_size(64)
.shuffle(true)
.distributed(rank: local_rank, world_size: n_workers);
// Training loop
for epoch in 0..10 {
let mut total_loss = 0.0;
let mut batches = 0;
for (images, labels) in loader.iter() {
// Forward pass
let output = model.forward(images);
let loss = cross_entropy(output, labels);
// Backward pass (gradients auto-synced)
loss.backward();
optimizer.step();
optimizer.zero_grad();
total_loss += loss.item();
batches += 1;
}
// Reduce loss across all workers for logging
let avg_loss = all_reduce(total_loss, op: "mean") / (batches as f32);
if local_rank == 0 {
println!("Epoch {}: loss = {:.4}", epoch + 1, avg_loss);
}
}
// Save model (only on rank 0)
if local_rank == 0 {
model.save("mnist_model.mind");
println!("Model saved!");
}
}Launch with:
# Single node, 4 GPUs
mind launch --gpus 4 distributed_mnist.mind
# Multi-node (2 nodes, 8 GPUs each)
mind launch --nodes 2 --gpus-per-node 8 \
--master-addr 10.0.0.1 distributed_mnist.mindEnvironment Variables
Configure distributed execution with these environment variables:
| Variable | Default | Description |
|---|---|---|
| MIND_BACKEND | nccl | Communication backend (nccl, gloo, mpi) |
| MIND_MASTER_ADDR | localhost | Master node IP address |
| MIND_MASTER_PORT | 29500 | Master node port for coordination |
| MIND_WORLD_SIZE | 1 | Total number of workers |
| MIND_RANK | 0 | This worker's rank |
| MIND_LOCAL_RANK | 0 | Local GPU index on this node |
| MIND_PROFILE | 0 | Enable profiling (1=basic, 2=detailed) |
Best Practices
- Start with data parallelism for most workloads - it's the simplest and most efficient
- Use gradient accumulation to simulate larger batch sizes without more memory
- Profile communication overhead with
MIND_PROFILE=1 - Enable mixed precision training to reduce communication bandwidth
- Use gradient compression for slow network connections
- Scale learning rate with the square root of worker count for stable convergence
- Use warmup epochs when training with many workers
Runtime Implementation
The MIND runtime provides production-grade distributed primitives:
Transport Layer
TCP-based transport with connection pooling, message serialization, and ring topology helpers for efficient collective operations.
use mind::distributed::transport::{TcpTransport, TransportConfig};
let config = TransportConfig {
bind_addr: "0.0.0.0:29500".parse()?,
connect_timeout_ms: 5000,
buffer_size: 1024 * 1024, // 1MB
};
let transport = TcpTransport::new(config, rank, world_size)?;
// Ring topology helpers
let left = transport.left_neighbor(); // (rank - 1) % world_size
let right = transport.right_neighbor(); // (rank + 1) % world_sizeCommunication Backends
Abstract backend trait with NCCL (GPU) and Gloo (CPU/cross-platform) implementations:
use mind::distributed::backend::{Backend, NcclBackend, GlooBackend};
// Auto-select based on hardware
let backend = Backend::auto_select()?;
// Or explicitly choose
let nccl = NcclBackend::new(rank, world_size)?; // GPU clusters
let gloo = GlooBackend::new(rank, world_size)?; // CPU or cross-platform
// Collective operations via backend
backend.all_reduce(tensor, AllReduceOp::Sum)?;
backend.broadcast(tensor, root: 0)?;
backend.barrier()?;Fault Tolerance
Elastic training with automatic failure detection and recovery:
use mind::distributed::coordinator::{
DistributedCoordinator, FaultToleranceConfig, ClusterHealth
};
let fault_config = FaultToleranceConfig {
min_workers: 2,
max_workers: 16,
elastic: true, // Allow dynamic scaling
checkpoint_interval: 1000, // Steps between checkpoints
auto_recover: true,
max_heartbeat_misses: 3,
replacement_delay_ms: 5000,
};
let coordinator = DistributedCoordinator::new_with_fault_tolerance(
world_size, fault_config
)?;
// Monitor cluster health
let health: ClusterHealth = coordinator.health_status();
println!("Workers: {}/{} alive, can_train: {}",
health.alive_workers, health.total_workers, health.can_train);
// Automatic checkpoint on failure
if coordinator.should_checkpoint() {
model.save_checkpoint("checkpoint.mind")?;
}Learn More
See the Future Extensions page for upcoming distributed features and the Roadmap for development status.