Core Concepts¶
This page covers the internal architecture and key mechanisms of IgnisMQ. Understanding these concepts is essential for operating IgnisMQ effectively in production.
Message Lifecycle¶
Every message in IgnisMQ follows a deterministic path from publication to deletion. The core abstraction is the Magazine — a sharded, persistent message store backed by your chosen datastore (Aerospike, etc.).
stateDiagram-v2
[*] --> Published : magazine.load()
Published --> Fired : magazine.fire()
Fired --> Consumed : handler returns true
Fired --> Sidelined : handler returns false / exception
Consumed --> Deleted : magazine.delete()
Sidelined --> Deleted : magazine.delete() from main
Sidelined --> Shoveled : ShovelTask
Shoveled --> Published : reloaded to main magazine
Fired --> Swept : stuck message detected
Swept --> Sidelined : reloaded to sideline
Deleted --> [*]
Step-by-step¶
- Publish —
magazine.load(message)serializes the message to JSON viaObjectMapperand persists it to the datastore with a unique message ID and the current timestamp. - Fire —
magazine.fire()retrieves the next available message from the store. The message is now considered "in-flight." - Consume — The registered
MagazineHandlerprocesses the message. The outcome determines what happens next (see Sidelining). - Delete —
magazine.delete(messageId)removes the message from the main magazine. This happens regardless of whether the handler succeeded or failed.
// Publishing a message
Magazine<MyEvent> magazine = ignisClient.getMagazine("orders", MyEvent.class);
magazine.load(new MyEvent("order-123", "PLACED"));
// Firing (typically done by the consumer loop, not user code)
MagazineEntry entry = magazine.fire();
// Deleting after processing
magazine.delete(entry.getMessageId());
Note
All messages are serialized to JSON using Jackson's ObjectMapper. Ensure your message classes are serialization-friendly — include a no-arg constructor and avoid circular references.
Sidelining¶
Sidelining is IgnisMQ's mechanism for isolating messages that could not be processed successfully. Rather than retrying in-place (which risks head-of-line blocking), failed messages are moved to a dedicated sideline magazine named {queueName}_SIDELINE.
flowchart TD
Fire["magazine.fire()"] --> Deser["Deserialize JSON → M"]
Deser --> Call["handler.handle(message)"]
Call -->|returns true| Delete["magazine.delete()"]
Call -->|returns false| Sideline["sidelineMagazine.load()"]
Sideline --> Delete
Call -->|throws exception| Check{"Is exception<br/>ignorable?"}
Check -->|Yes| Delete
Check -->|No| Sideline2["sidelineMagazine.load()"]
Sideline2 --> Delete
Decision logic in MagazineConsumerTask¶
| Outcome | Action |
|---|---|
handler.handle() returns true |
Message deleted from main magazine. Done. |
handler.handle() returns false |
Message loaded into sideline magazine, then deleted from main. |
Exception thrown, not in getIgnorableExceptions() |
Message loaded into sideline magazine, then deleted from main. |
Exception thrown, is in getIgnorableExceptions() |
Message deleted from main magazine. Not sidelined, not retried. |
Warning
The message is always deleted from the main magazine after processing, regardless of success or failure. This is by design — it prevents poison messages from blocking the queue. If you need to inspect failures, look in the sideline magazine.
Tip
Use getIgnorableExceptions() to list exception types that represent permanent, non-recoverable failures (e.g., IllegalArgumentException, ValidationException). These messages are silently dropped since sidelining and shoveling them would be pointless.
public class OrderHandler implements MagazineHandler<OrderEvent> {
@Override
public boolean handle(OrderEvent message) {
// return true → message consumed successfully
// return false → message sidelined for later retry
return orderService.process(message);
}
@Override
public Set<Class<? extends Exception>> getIgnorableExceptions() {
// These exceptions cause the message to be deleted, not sidelined
return Set.of(
IllegalArgumentException.class,
MalformedOrderException.class
);
}
}
Shoveling¶
The ShovelTask moves messages from the sideline magazine back into the main magazine for reprocessing. This is the retry mechanism in IgnisMQ.
sequenceDiagram
participant ST as ShovelTask
participant SM as Sideline Magazine
participant MM as Main Magazine
loop Every shovelInterval
ST->>SM: fire()
SM-->>ST: sidelined message
ST->>ST: Stamp fireTS on message
ST->>MM: load(message)
alt Load succeeds
ST->>SM: delete(messageId)
else Load fails
ST->>SM: reload(message)
Note over ST,SM: Message stays in sideline
end
end
Two modes of operation¶
Scheduled mode (default)
- Runs on a periodic timer controlled by
ShovelConfig.timeIntervalInSecs. - Default interval: 600 seconds (10 minutes).
- Keeps running indefinitely.
One-shot mode (autoDelete=true)
- Executes a single shovel pass, then cancels itself.
- If an error occurs during the pass, it self-reschedules with a 10-second delay before retrying.
- Useful for on-demand sideline draining.
Configuration¶
ShovelConfig config = ShovelConfig.builder()
.concurrency(4) // default: 4 parallel shovel workers
.timeIntervalInSecs(600) // default: 600 (10 minutes)
.build();
| Parameter | Default | Description |
|---|---|---|
concurrency |
4 | Number of parallel shovel workers per queue |
timeIntervalInSecs |
600 | Interval between scheduled shovel passes |
Info
Each shoveled message gets a fresh fireTS stamped on it. This timestamp is critical for sweep tracking — it allows the sweeper to distinguish between newly shoveled messages and genuinely stuck ones.
Warning
If magazine.load() back to main fails, the message is reloaded to the sideline via sidelineMagazine.reload(). It is never lost.
Sweeping¶
The Sweeper is a safety net that catches messages stuck in the "fired but never consumed" state — typically due to consumer crashes, network partitions, or long GC pauses.
flowchart TD
Start["Sweeper TimerTask fires<br/>(every 15 min)"] --> Leader{"Am I the<br/>leader?"}
Leader -->|No| Skip["Skip sweep"]
Leader -->|Yes| Iterate["For each active queue + shard"]
Iterate --> Scan["Scan for messages where<br/>fireTS < now - sweepDuration"]
Scan --> Found{"Messages<br/>found?"}
Found -->|No| Next["Next shard"]
Found -->|Yes| Batch["Process batch of 1000"]
Batch --> Reload["Reload to sideline magazine"]
Reload --> Update["Update sweep pointer<br/>via MapOperation.increment"]
Update --> More{"More messages?"}
More -->|Yes| Batch
More -->|No| Next
Next --> Done{"More shards?"}
Done -->|Yes| Iterate
Done -->|No| End["Sleep until next tick"]
Key parameters¶
| Parameter | Default | Description |
|---|---|---|
| Sweep interval | 15 minutes | How often the sweeper runs |
| Initial delay | 10 minutes | Delay before first sweep after startup |
sweepDurationInMins |
20 | Messages fired more than this many minutes ago are considered stuck |
| Batch size | 1000 | Messages processed per batch |
Sideline sweep logic¶
When sweeping the sideline magazine, the sweeper uses a more conservative threshold:
This prevents the sweeper from re-sidelining messages that are currently being shoveled back to the main queue. The 2x multiplier provides a safety margin.
Warning
The sweeper only runs on the leader-elected instance. If leader election is misconfigured or ZooKeeper is down, no sweeping occurs. Monitor your sideline queue depths to detect this.
Note
Sweep pointers are tracked per-queue-per-shard in the queue entity using MapOperation.increment. This allows sweeps to resume from where they left off rather than re-scanning the entire shard.
Leader Election¶
IgnisMQ uses Apache Curator's LeaderSelector backed by ZooKeeper to ensure that cluster-wide singleton tasks (like sweeping) run on exactly one instance.
sequenceDiagram
participant I1 as Instance 1
participant I2 as Instance 2
participant ZK as ZooKeeper
participant Comm as Communicator Nodes
I1->>ZK: Register ephemeral node<br/>/{clientId}-ignis-workers/{clientId}/members/i1
I2->>ZK: Register ephemeral node<br/>/{clientId}-ignis-workers/{clientId}/members/i2
Note over ZK: LeaderSelector elects I1
I1->>I1: takeLeadership()
I1->>ZK: Read member list
I1->>I1: Round-robin partitions across members
loop Every 30 seconds
I1->>Comm: Write assignments to<br/>communicator ZK nodes
I1->>I1: Refresh partition assignments
end
I1-->>I1: Check communicator node
Note over I1: balancerId matches → activate sweeper
I2-->>I2: Check communicator node
Note over I2: balancerId does NOT match → sweeper inactive
Note over ZK: I1 disconnects
ZK->>ZK: Ephemeral node removed
ZK->>I2: Leadership notification
I2->>I2: takeLeadership()
I2->>Comm: Write new assignments
I2-->>I2: Activate sweeper
ZooKeeper path structure¶
/{clientId}-ignis-workers/
└── {clientId}/
├── loadbalancer-leader ← LeaderSelector election node
├── members/
│ ├── instance-1 ← ephemeral member node
│ └── instance-2 ← ephemeral member node
└── communicator/
├── instance-1 ← partition assignments for instance-1
└── instance-2 ← partition assignments for instance-2
How activation works¶
- The leader reads all registered member nodes.
- It round-robins partitions across members.
- It writes assignments (including a
balancerId) to each instance's communicator ZK node. - Each instance watches its own communicator node. If the
balancerIdin the assignment matches the instance's own ID, it sets theactiveAtomicBoolean totrue, enabling the sweeper. - Only one instance has
active = trueat any time.
Info
Leadership state is refreshed every 30 seconds. On ZK disconnect, leadership is immediately relinquished, triggering a re-election. The new leader reassigns partitions and a different instance may become the active sweeper.
Queue Refresh Cycle¶
IgnisMQ keeps its in-memory queue state synchronized with the database through a background watcher.
| Parameter | Value |
|---|---|
| Refresh interval | 5 minutes |
| Initial delay | 1 minute |
What refreshQueues() does¶
- Discovery — Queries the datastore for all queue definitions. Any queue found in the DB but not in memory is created (consumers are started, shovel tasks are scheduled).
- Expiration — Queues past their expiry time are deactivated.
- Cleanup — Inactive queues are removed from in-memory data structures and their consumers/shovelers are cancelled.
- Reconciliation — If a queue's
concurrencyorShovelConfighas changed in the DB, the in-memory configuration is updated. Consumers are scaled up or down to match.
flowchart LR
Timer["Watcher Timer<br/>(every 5 min)"] --> Fetch["Fetch queues from DB"]
Fetch --> New{"New queues?"}
New -->|Yes| Create["Create consumers<br/>+ shovelers"]
New -->|No| Expired{"Expired queues?"}
Create --> Expired
Expired -->|Yes| Deactivate["Deactivate + remove"]
Expired -->|No| Changed{"Config changed?"}
Deactivate --> Changed
Changed -->|Yes| Reconcile["Scale consumers<br/>Update shovel config"]
Changed -->|No| Done["Done"]
Reconcile --> Done
Tip
You don't need to restart instances to pick up new queues or configuration changes. The refresh cycle handles this automatically within 5 minutes. For immediate effect, you can trigger a manual refresh if the client exposes that API.
Consumer Threading Model¶
Each consumer in IgnisMQ is a java.util.Timer running a MagazineConsumerTask on a fixed-delay schedule.
Scheduling parameters¶
| Parameter | Value |
|---|---|
| Initial delay | 2 minutes |
| Fixed delay between runs | 1 second |
| Max consumers per queue | 100 (MAX_CONSUMERS_ALLOWED) |
How it works¶
┌────────────────────────────────────────────────┐
│ Timer Thread │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ MagazineConsumerTask.run() │ │
│ │ │ │
│ │ while (message = magazine.fire()) { │ │
│ │ process(message); │ │
│ │ magazine.delete(message); │ │
│ │ } │ │
│ │ // magazine empty → return │ │
│ │ // Timer re-invokes after 1s delay │ │
│ └──────────────────────────────────────────┘ │
└────────────────────────────────────────────────┘
Each Timer is single-threaded. The MagazineConsumerTask polls in a tight loop until the magazine is empty, then returns control. The Timer waits 1 second (fixed delay), then invokes the task again.
Scaling consumers at runtime¶
// Scale up — adds more Timer threads for this queue
ignisClient.increaseConsumers("orders", 5);
// Scale down — cancels Timer threads for this queue
ignisClient.decreaseConsumers("orders", 3);
Warning
The maximum number of consumers per queue is 100 (MAX_CONSUMERS_ALLOWED). Attempting to exceed this limit will be silently capped. Each consumer is a full Timer thread, so overprovisioning wastes OS threads.
Note
Fixed-delay scheduling means the 1-second gap is measured after the task completes, not from the start of the previous run. If a task takes 30 seconds to drain the magazine, the next invocation starts 31 seconds after the previous one began.
Tip
For high-throughput queues, increase the consumer count rather than trying to speed up individual consumers. Each consumer processes sequentially within its thread, so parallelism comes from having multiple consumers on the same queue (each pulling from different shards or competing for the same shard).