IgnisMQ¶
A distributed, persistent, high-throughput message queue for Java
IgnisMQ is a Java library that provides reliable asynchronous message processing built on top of Magazine and Aerospike, with first-class Dropwizard integration for microservice environments.
Key Features¶
-
High Throughput
Backed by Aerospike — handles millions of messages per second with sub-millisecond latency. Sharded queues distribute load across the cluster.
-
Reliable Delivery
Failed messages are automatically sidelined and can be shoveled back for reprocessing. Stuck messages are swept and recovered.
-
Distributed Coordination
ZooKeeper-based leader election ensures exactly one sweeper runs across the cluster. Load balancing distributes partitions across nodes.
-
Configurable Everything
Concurrency, TTLs, batch sizes, shovel intervals, sweep durations — every aspect of queue behavior is tunable at runtime.
How It Works¶
sequenceDiagram
participant App as Your Application
participant MQ as IgnisMQManager
participant Q as MagazineQueue
participant M as Magazine (Aerospike)
participant S as Sideline Magazine
App->>MQ: publish("order-queue", order)
MQ->>Q: publish(order)
Q->>M: magazine.load(json)
Note over Q: Consumer timer fires every 1s
Q->>M: magazine.fire()
M-->>Q: MagazineData
Q->>App: handler.handle(order)
alt Handler returns false
Q->>S: sidelineMagazine.load(json)
Note over S: Message saved for retry
end
Q->>M: magazine.delete(data)
Note over S: Shovel timer fires periodically
S-->>M: Move messages back to main queue
Quick Start¶
// 1. Extend the bundle
public class MyIgnisMQBundle extends IgnisMQBundle<MyConfig> {
@Override
protected BaseStorage getStorage(MyConfig config) {
return new AerospikeStorage(config.getAerospikeConfig(), "my-ns");
}
@Override
protected String getClientId(MyConfig c) { return c.getClientId(); }
@Override
protected String getFarmId(MyConfig c) { return c.getFarmId(); }
@Override
protected CuratorFramework getCuratorFramework() { return curator; }
}
// 2. Register in your Application
bootstrap.addBundle(ignisMQBundle);
// 3. Initialize handlers and create a queue
manager.initialiseMessageHandlers(Map.of(
"order-handler", Map.entry(Order.class, new OrderHandler())
));
manager.createQueue(CreateQueueRequest.builder()
.name("order-queue")
.concurrency(8)
.messageHandlerType("order-handler")
.build());
// 4. Publish messages
manager.getQueue("order-queue").publish(new Order("ORD-123", 99.99));
// 1. Create storage and manager
BaseStorage storage = new AerospikeStorage(aerospikeConfig, "my-ns");
IgnisMQManager manager = new IgnisMQManager(
"my-client", storage, new ObjectMapper(),
new MetricRegistry(), curatorFramework, "farm-1"
);
// 2. Register handlers
manager.initialiseMessageHandlers(Map.of(
"order-handler", Map.entry(Order.class, new OrderHandler())
));
// 3. Create a queue and publish
manager.createQueue(CreateQueueRequest.builder()
.name("order-queue")
.concurrency(4)
.messageHandlerType("order-handler")
.build());
manager.getQueue("order-queue").publish(new Order("ORD-123", 99.99));
Message Lifecycle at a Glance¶
stateDiagram-v2
[*] --> Published: publish()
Published --> Fired: consumer fires message
Fired --> Consumed: handler returns true
Fired --> Sidelined: handler returns false / exception
Consumed --> [*]: message deleted
Sidelined --> Shoveled: shovel task runs
Shoveled --> Published: message re-queued
Fired --> Swept: message stuck (no ack)
Swept --> Sidelined: sweeper recovers
Architecture Overview¶
flowchart TD
subgraph Application
App["Your App"] --> Bundle["IgnisMQBundle"]
Bundle --> Manager["IgnisMQManager"]
end
subgraph Queue Management
Manager --> Q1["MagazineQueue #1"]
Manager --> Q2["MagazineQueue #2"]
Manager --> QN["MagazineQueue #N"]
end
subgraph Per-Queue Components
Q1 --> C["Consumer Timers<br/>(1s interval)"]
Q1 --> SH["Shovel Timers<br/>(configurable interval)"]
end
subgraph Cluster Coordination
Manager --> TI["TaskInitializer"]
TI --> LE["LeaderElector"]
LE --> ZK["ZooKeeper"]
TI --> SW["Sweeper<br/>(leader-only)"]
end
subgraph Storage
C --> Mag["Magazine<br/>(Main)"]
C --> Side["Magazine<br/>(Sideline)"]
SH --> Mag
SH --> Side
SW --> AQS["AerospikeQueueService"]
Mag --> AS["Aerospike"]
Side --> AS
AQS --> AS
end
Manager -.->|refreshQueues<br/>every 5 min| AS
What to Read Next¶
| Page | Description |
|---|---|
| Getting Started | Prerequisites, installation, and your first queue in 5 minutes |
| Core Concepts | Message lifecycle, sidelining, shoveling, sweeping, leader election |
| Usage Guide | Complete examples — Dropwizard, standalone, batching, shoveling |
| Architecture | Component deep-dive, threading model, data flow |
| API Reference | Every class, method, and field documented |
| Configuration | All configuration options with defaults and constraints |
| Error Codes | Complete error catalog with causes and solutions |
| Aerospike Backend | Data model, indexing, sweep internals |
| References | Academic and industry work that inspired IgnisMQ's design |