Distributed Execution

MIND provides first-class support for distributed training and inference, allowing you to scale models across multiple nodes with automatic sharding, gradient synchronization, and fault tolerance.

Production Ready: Distributed execution is complete with TCP transport, NCCL/Gloo backends, RingAllReduce, pipeline parallelism, and fault tolerance. See the Roadmap for full status.

Overview

MIND's distributed execution framework supports three parallelism strategies:

  • Data Parallelism: Replicate the model across devices and split data batches
  • Model Parallelism: Split large models across devices when they don't fit in memory
  • Pipeline Parallelism: Split model layers across devices for improved throughput

Getting Started

Enable distributed execution with the @distributed annotation:

use mind::distributed::{init, world_size, rank};

@distributed(strategy = "data_parallel")
fn train_step(model: &mut Model, batch: Tensor<f32, [B, 784]>) -> f32 {
    let output = model.forward(batch);
    let loss = cross_entropy(output, labels);

    // Gradients are automatically synchronized across all nodes
    loss.backward();
    optimizer.step();

    loss.item()
}

fn main() {
    // Initialize distributed runtime
    init();

    println!("Running on node {} of {}", rank(), world_size());

    for epoch in 0..100 {
        let loss = train_step(&mut model, batch);
        if rank() == 0 {
            println!("Epoch {}: loss = {:.4}", epoch, loss);
        }
    }
}

Data Parallelism

Data parallelism replicates the model on each device and splits input batches. Gradients are averaged across all replicas using all-reduce operations.

use mind::distributed::{DataParallel, AllReduce};

// Wrap model for data parallel training
let model = DataParallel::new(model, devices);

// Training loop - batches are automatically distributed
for batch in dataloader.iter() {
    let loss = model.forward(batch);
    loss.backward();

    // Gradients synchronized via NCCL/Gloo
    optimizer.step();
}

Gradient Synchronization

MIND supports multiple collective communication backends:

BackendDevicesNotes
NCCLNVIDIA GPURecommended for multi-GPU training
GlooCPU, GPUCross-platform, supports TCP/IP
MPICPU, GPUHPC clusters with InfiniBand

Model Parallelism

For models that don't fit on a single device, use model parallelism to split layers across devices:

use mind::distributed::{ModelParallel, DeviceMap};

// Define how layers map to devices
let device_map = DeviceMap::new()
    .layer("embed", Device::cuda(0))
    .layers("transformer.0..12", Device::cuda(0))
    .layers("transformer.12..24", Device::cuda(1))
    .layer("head", Device::cuda(1));

// Create model parallel wrapper
let model = ModelParallel::new(model, device_map);

// Forward pass automatically moves tensors between devices
let output = model.forward(input);

Pipeline Parallelism

Pipeline parallelism enables higher throughput by overlapping computation across micro-batches:

use mind::distributed::{Pipeline, Schedule};

// Configure pipeline with micro-batching
let pipeline = Pipeline::new(model)
    .stages(4)                    // Split into 4 pipeline stages
    .micro_batches(8)             // 8 micro-batches per batch
    .schedule(Schedule::GPipe);   // Or Schedule::PipeDream

// Training with pipeline parallelism
for batch in dataloader.iter() {
    let loss = pipeline.forward_backward(batch);
    optimizer.step();
}

Multi-Node Training

Scale training across multiple machines using the MIND distributed launcher:

# Launch on 4 nodes with 8 GPUs each
mind launch --nodes 4 --gpus-per-node 8 \
    --master-addr 192.168.1.1 \
    --master-port 29500 \
    train.mind

# Or use a hostfile
mind launch --hostfile hosts.txt --gpus-per-node 8 train.mind

Fault Tolerance

MIND supports elastic training with automatic checkpointing and recovery:

use mind::distributed::{Elastic, Checkpoint};

// Enable elastic training with checkpointing
let trainer = Elastic::new(model)
    .min_nodes(2)
    .max_nodes(8)
    .checkpoint_dir("checkpoints/")
    .checkpoint_interval(1000);  // steps

// Training automatically resumes on node failure
trainer.fit(dataloader, epochs);

Complete Example: Distributed MNIST Training

Here's a complete example training a simple neural network on MNIST across multiple GPUs:

// distributed_mnist.mind
use mind::nn::{Linear, relu, softmax, cross_entropy};
use mind::optim::{Adam, Optimizer};
use mind::data::{Dataset, DataLoader};
use mind::distributed::{init, rank, world_size, DataParallel, all_reduce};

struct MLP {
    fc1: Linear<784, 256>,
    fc2: Linear<256, 128>,
    fc3: Linear<128, 10>,
}

impl MLP {
    fn forward(&self, x: Tensor<f32, [B, 784]>) -> Tensor<f32, [B, 10]> {
        let h1 = relu(self.fc1.forward(x));
        let h2 = relu(self.fc2.forward(h1));
        softmax(self.fc3.forward(h2), axis: -1)
    }
}

fn main() {
    // Initialize distributed runtime (auto-detects backend)
    init();

    let local_rank = rank();
    let n_workers = world_size();

    if local_rank == 0 {
        println!("Training with {} workers", n_workers);
    }

    // Create model and wrap for data parallelism
    let model = MLP::new();
    let model = DataParallel::new(model);

    // Optimizer with scaled learning rate
    let lr = 0.001 * (n_workers as f32).sqrt();
    let optimizer = Adam::new(model.parameters(), lr: lr);

    // Each worker gets a shard of the dataset
    let dataset = Dataset::mnist("data/", train: true);
    let loader = DataLoader::new(dataset)
        .batch_size(64)
        .shuffle(true)
        .distributed(rank: local_rank, world_size: n_workers);

    // Training loop
    for epoch in 0..10 {
        let mut total_loss = 0.0;
        let mut batches = 0;

        for (images, labels) in loader.iter() {
            // Forward pass
            let output = model.forward(images);
            let loss = cross_entropy(output, labels);

            // Backward pass (gradients auto-synced)
            loss.backward();
            optimizer.step();
            optimizer.zero_grad();

            total_loss += loss.item();
            batches += 1;
        }

        // Reduce loss across all workers for logging
        let avg_loss = all_reduce(total_loss, op: "mean") / (batches as f32);

        if local_rank == 0 {
            println!("Epoch {}: loss = {:.4}", epoch + 1, avg_loss);
        }
    }

    // Save model (only on rank 0)
    if local_rank == 0 {
        model.save("mnist_model.mind");
        println!("Model saved!");
    }
}

Launch with:

# Single node, 4 GPUs
mind launch --gpus 4 distributed_mnist.mind

# Multi-node (2 nodes, 8 GPUs each)
mind launch --nodes 2 --gpus-per-node 8 \
    --master-addr 10.0.0.1 distributed_mnist.mind

Environment Variables

Configure distributed execution with these environment variables:

VariableDefaultDescription
MIND_BACKENDncclCommunication backend (nccl, gloo, mpi)
MIND_MASTER_ADDRlocalhostMaster node IP address
MIND_MASTER_PORT29500Master node port for coordination
MIND_WORLD_SIZE1Total number of workers
MIND_RANK0This worker's rank
MIND_LOCAL_RANK0Local GPU index on this node
MIND_PROFILE0Enable profiling (1=basic, 2=detailed)

Best Practices

  • Start with data parallelism for most workloads - it's the simplest and most efficient
  • Use gradient accumulation to simulate larger batch sizes without more memory
  • Profile communication overhead with MIND_PROFILE=1
  • Enable mixed precision training to reduce communication bandwidth
  • Use gradient compression for slow network connections
  • Scale learning rate with the square root of worker count for stable convergence
  • Use warmup epochs when training with many workers

Runtime Implementation

The MIND runtime provides production-grade distributed primitives:

Transport Layer

TCP-based transport with connection pooling, message serialization, and ring topology helpers for efficient collective operations.

use mind::distributed::transport::{TcpTransport, TransportConfig};

let config = TransportConfig {
    bind_addr: "0.0.0.0:29500".parse()?,
    connect_timeout_ms: 5000,
    buffer_size: 1024 * 1024,  // 1MB
};

let transport = TcpTransport::new(config, rank, world_size)?;

// Ring topology helpers
let left = transport.left_neighbor();   // (rank - 1) % world_size
let right = transport.right_neighbor(); // (rank + 1) % world_size

Communication Backends

Abstract backend trait with NCCL (GPU) and Gloo (CPU/cross-platform) implementations:

use mind::distributed::backend::{Backend, NcclBackend, GlooBackend};

// Auto-select based on hardware
let backend = Backend::auto_select()?;

// Or explicitly choose
let nccl = NcclBackend::new(rank, world_size)?;  // GPU clusters
let gloo = GlooBackend::new(rank, world_size)?;  // CPU or cross-platform

// Collective operations via backend
backend.all_reduce(tensor, AllReduceOp::Sum)?;
backend.broadcast(tensor, root: 0)?;
backend.barrier()?;

Fault Tolerance

Elastic training with automatic failure detection and recovery:

use mind::distributed::coordinator::{
    DistributedCoordinator, FaultToleranceConfig, ClusterHealth
};

let fault_config = FaultToleranceConfig {
    min_workers: 2,
    max_workers: 16,
    elastic: true,                // Allow dynamic scaling
    checkpoint_interval: 1000,    // Steps between checkpoints
    auto_recover: true,
    max_heartbeat_misses: 3,
    replacement_delay_ms: 5000,
};

let coordinator = DistributedCoordinator::new_with_fault_tolerance(
    world_size, fault_config
)?;

// Monitor cluster health
let health: ClusterHealth = coordinator.health_status();
println!("Workers: {}/{} alive, can_train: {}",
    health.alive_workers, health.total_workers, health.can_train);

// Automatic checkpoint on failure
if coordinator.should_checkpoint() {
    model.save_checkpoint("checkpoint.mind")?;
}

Learn More

See the Future Extensions page for upcoming distributed features and the Roadmap for development status.