Skip to content

IgnisMQ

A distributed, persistent, high-throughput message queue for Java


IgnisMQ is a Java library that provides reliable asynchronous message processing built on top of Magazine and Aerospike, with first-class Dropwizard integration for microservice environments.

Key Features

  • High Throughput


    Backed by Aerospike — handles millions of messages per second with sub-millisecond latency. Sharded queues distribute load across the cluster.

  • Reliable Delivery


    Failed messages are automatically sidelined and can be shoveled back for reprocessing. Stuck messages are swept and recovered.

  • Distributed Coordination


    ZooKeeper-based leader election ensures exactly one sweeper runs across the cluster. Load balancing distributes partitions across nodes.

  • Configurable Everything


    Concurrency, TTLs, batch sizes, shovel intervals, sweep durations — every aspect of queue behavior is tunable at runtime.

How It Works

sequenceDiagram
    participant App as Your Application
    participant MQ as IgnisMQManager
    participant Q as MagazineQueue
    participant M as Magazine (Aerospike)
    participant S as Sideline Magazine

    App->>MQ: publish("order-queue", order)
    MQ->>Q: publish(order)
    Q->>M: magazine.load(json)

    Note over Q: Consumer timer fires every 1s

    Q->>M: magazine.fire()
    M-->>Q: MagazineData
    Q->>App: handler.handle(order)

    alt Handler returns false
        Q->>S: sidelineMagazine.load(json)
        Note over S: Message saved for retry
    end

    Q->>M: magazine.delete(data)

    Note over S: Shovel timer fires periodically
    S-->>M: Move messages back to main queue

Quick Start

// 1. Extend the bundle
public class MyIgnisMQBundle extends IgnisMQBundle<MyConfig> {
    @Override
    protected BaseStorage getStorage(MyConfig config) {
        return new AerospikeStorage(config.getAerospikeConfig(), "my-ns");
    }
    @Override
    protected String getClientId(MyConfig c) { return c.getClientId(); }
    @Override
    protected String getFarmId(MyConfig c) { return c.getFarmId(); }
    @Override
    protected CuratorFramework getCuratorFramework() { return curator; }
}

// 2. Register in your Application
bootstrap.addBundle(ignisMQBundle);

// 3. Initialize handlers and create a queue
manager.initialiseMessageHandlers(Map.of(
    "order-handler", Map.entry(Order.class, new OrderHandler())
));

manager.createQueue(CreateQueueRequest.builder()
    .name("order-queue")
    .concurrency(8)
    .messageHandlerType("order-handler")
    .build());

// 4. Publish messages
manager.getQueue("order-queue").publish(new Order("ORD-123", 99.99));
// 1. Create storage and manager
BaseStorage storage = new AerospikeStorage(aerospikeConfig, "my-ns");
IgnisMQManager manager = new IgnisMQManager(
    "my-client", storage, new ObjectMapper(),
    new MetricRegistry(), curatorFramework, "farm-1"
);

// 2. Register handlers
manager.initialiseMessageHandlers(Map.of(
    "order-handler", Map.entry(Order.class, new OrderHandler())
));

// 3. Create a queue and publish
manager.createQueue(CreateQueueRequest.builder()
    .name("order-queue")
    .concurrency(4)
    .messageHandlerType("order-handler")
    .build());

manager.getQueue("order-queue").publish(new Order("ORD-123", 99.99));

Message Lifecycle at a Glance

stateDiagram-v2
    [*] --> Published: publish()
    Published --> Fired: consumer fires message
    Fired --> Consumed: handler returns true
    Fired --> Sidelined: handler returns false / exception
    Consumed --> [*]: message deleted
    Sidelined --> Shoveled: shovel task runs
    Shoveled --> Published: message re-queued
    Fired --> Swept: message stuck (no ack)
    Swept --> Sidelined: sweeper recovers

Architecture Overview

flowchart TD
    subgraph Application
        App["Your App"] --> Bundle["IgnisMQBundle"]
        Bundle --> Manager["IgnisMQManager"]
    end

    subgraph Queue Management
        Manager --> Q1["MagazineQueue #1"]
        Manager --> Q2["MagazineQueue #2"]
        Manager --> QN["MagazineQueue #N"]
    end

    subgraph Per-Queue Components
        Q1 --> C["Consumer Timers<br/>(1s interval)"]
        Q1 --> SH["Shovel Timers<br/>(configurable interval)"]
    end

    subgraph Cluster Coordination
        Manager --> TI["TaskInitializer"]
        TI --> LE["LeaderElector"]
        LE --> ZK["ZooKeeper"]
        TI --> SW["Sweeper<br/>(leader-only)"]
    end

    subgraph Storage
        C --> Mag["Magazine<br/>(Main)"]
        C --> Side["Magazine<br/>(Sideline)"]
        SH --> Mag
        SH --> Side
        SW --> AQS["AerospikeQueueService"]
        Mag --> AS["Aerospike"]
        Side --> AS
        AQS --> AS
    end

    Manager -.->|refreshQueues<br/>every 5 min| AS
Page Description
Getting Started Prerequisites, installation, and your first queue in 5 minutes
Core Concepts Message lifecycle, sidelining, shoveling, sweeping, leader election
Usage Guide Complete examples — Dropwizard, standalone, batching, shoveling
Architecture Component deep-dive, threading model, data flow
API Reference Every class, method, and field documented
Configuration All configuration options with defaults and constraints
Error Codes Complete error catalog with causes and solutions
Aerospike Backend Data model, indexing, sweep internals
References Academic and industry work that inspired IgnisMQ's design