Architecture¶
This page provides a deep dive into the internal architecture of IgnisMQ — its components, threading model, data flows, type hierarchy, and operational mechanics.
1. System Overview¶
IgnisMQ is structured in distinct layers, each with a clear responsibility.
graph TB
subgraph Application Layer
A[IgnisMQBundle<br/><i>Dropwizard Bundle</i>]
B[IgnisMQManager<br/><i>Queue Registry</i>]
end
subgraph Queue Layer
C1[MagazineQueue #1]
C2[MagazineQueue #2]
CN[MagazineQueue #N]
subgraph "Each MagazineQueue"
MQ_MAIN[Main Magazine]
MQ_SIDE[Sideline Magazine]
end
end
subgraph Consumer Layer
D1[MagazineConsumerTask Timers]
D2[ShovelTask Timers]
end
subgraph Coordination Layer
E1[TaskInitializer]
E2[LeaderElector]
E3[Sweeper]
E4[ZooKeeper / Curator]
end
subgraph Storage Layer
F1[AerospikeQueueService]
F2[AerospikeStoreClient]
F3[(Aerospike)]
end
A -->|creates| B
B -->|holds| C1
B -->|holds| C2
B -->|holds| CN
C1 --> D1
C1 --> D2
E1 -->|creates| E2
E1 -->|creates| E3
E2 -->|LeaderSelector| E4
E3 -->|activate/deactivate| D1
C1 --> F1
F1 --> F2
F2 --> F3
Layered Design
Each layer only communicates with its immediate neighbours. The application layer never touches storage directly — it always goes through the queue abstraction.
2. Component Relationships¶
classDiagram
class IgnisMQBundle {
-IgnisMQManager manager
+run(config, environment)
}
class IgnisMQManager {
-ConcurrentHashMap~String, IQueue~ queues
+registerQueue(config, handler) IQueue
+getQueue(name) IQueue
}
class IQueue~M~ {
<<sealed interface>>
+publish(M message)
+publishWithDelay(M message, long delay)
}
class MagazineQueue~M~ {
-Magazine mainMagazine
-Magazine sidelineMagazine
-List~Timer~ consumerTimers
-List~Timer~ shovelTimers
-ObjectMapper objectMapper
+publish(M message)
+publishWithDelay(M message, long delay)
}
class Magazine {
-QueueService queueService
-String setName
+load(Record record)
+fire() Record
+delete(String id)
}
class MagazineConsumerTask {
-Magazine magazine
-Magazine sidelineMagazine
-QueueHandler handler
+run()
}
class ShovelTask {
-Magazine sidelineMagazine
-Magazine mainMagazine
+run()
}
class TaskInitializer {
-LeaderElector leaderElector
-Sweeper sweeper
+initialize()
}
class LeaderElector {
-LeaderSelector leaderSelector
-CuratorFramework curator
+takeLeadership()
+updateState()
}
class Sweeper {
<<implements LoadBalancer>>
-ExecutorService executorService
+activate()
+deactivate()
+run()
}
class QueueStatGauge {
<<CachedGauge~Map~>>
-timeout: 3 min
+loadValue() Map
}
class QueueService {
<<sealed interface>>
+load(Record record)
+fire(String set) Record
+delete(String set, String id)
+scan(String set, Filter filter) List
}
class AerospikeQueueService {
-AerospikeStoreClient client
}
class AerospikeStoreClient {
-IAerospikeClient aerospikeClient
}
IgnisMQBundle --> IgnisMQManager : creates
IgnisMQManager o-- IQueue : ConcurrentHashMap
IQueue <|.. MagazineQueue : permits
MagazineQueue *-- Magazine : main
MagazineQueue *-- Magazine : sideline
MagazineQueue *-- MagazineConsumerTask : consumerTimers
MagazineQueue *-- ShovelTask : shovelTimers
MagazineConsumerTask --> Magazine : reads from main
MagazineConsumerTask --> Magazine : writes to sideline
ShovelTask --> Magazine : reads from sideline
ShovelTask --> Magazine : writes to main
TaskInitializer --> LeaderElector : creates
TaskInitializer --> Sweeper : creates
LeaderElector ..> Sweeper : activates/deactivates
Magazine --> QueueService
QueueService <|.. AerospikeQueueService : permits
AerospikeQueueService --> AerospikeStoreClient
IgnisMQManager --> QueueStatGauge : registers
ConcurrentHashMap
IgnisMQManager uses a ConcurrentHashMap<String, IQueue> to hold all registered queues. This allows thread-safe registration and lookup at runtime.
3. Threading Model¶
IgnisMQ relies heavily on java.util.Timer threads and a shared ForkJoinPool for background work.
Timer Schedule Summary¶
| Component | Count | Initial Delay | Period | Notes |
|---|---|---|---|---|
| Consumer Timer | 1 per concurrency slot | 2 min | 1 sec | Timer.schedule(task, 120_000, 1_000) |
| Shovel Timer | 1 per shovel concurrency | 2 min | timeIntervalInSecs × 1000 |
Configurable interval |
| Watcher Timer | 1 global | 1 min | 5 min | Timer.schedule(refreshQueues, 60_000, 300_000) |
| Sweeper Timer | 1 global | 10 min | 15 min | Timer.schedule(sweeper, 600_000, 900_000) |
| Leader Election | 1 global | — | 30 sec poll | updateState() polling loop |
| Sweeper Executor | ForkJoinPool | — | — | parallelism = 64, used for parallel queue sweeps |
gantt
title IgnisMQ Timer Startup Timeline
dateFormat ss
axisFormat %S s
section Consumers
Initial delay (2 min) :done, c_delay, 00, 120s
Consumer fires every 1s :active, c_run, after c_delay, 30s
section Shovels
Initial delay (2 min) :done, s_delay, 00, 120s
Shovel fires every N sec :active, s_run, after s_delay, 30s
section Watcher
Initial delay (1 min) :done, w_delay, 00, 60s
Watcher fires every 5 min :active, w_run, after w_delay, 30s
section Sweeper
Initial delay (10 min) :done, sw_delay, 00, 600s
Sweeper fires every 15 min :active, sw_run, after sw_delay, 30s
section Leader Election
Curator LeaderSelector :active, le, 00, 30s
updateState poll (30s loop) :active, le2, after le, 30s
Thread Counts
For a queue with concurrency = 8 and shovelConcurrency = 2, IgnisMQ spawns 10 Timer threads for that single queue alone. Plan your concurrency settings carefully.
4. Data Flow¶
4.1 Publish Flow¶
sequenceDiagram
participant App as Application
participant IQ as IQueue.publish()
participant OM as ObjectMapper
participant Mag as Main Magazine
participant QS as AerospikeQueueService
participant AS as Aerospike
App->>IQ: publish(message)
IQ->>OM: writeValueAsString(message)
OM-->>IQ: JSON string
IQ->>Mag: load(record)
Mag->>QS: load(record)
QS->>AS: put(key, bins)
AS-->>QS: ok
QS-->>Mag: ok
Mag-->>IQ: ok
IQ-->>App: void
4.2 Consume Flow¶
sequenceDiagram
participant T as Timer
participant CT as MagazineConsumerTask
participant M as Main Magazine
participant QS as QueueService
participant AS as Aerospike
participant H as QueueHandler
T->>CT: run()
CT->>M: fire()
M->>QS: fire(setName)
QS->>AS: query(set, filter)
AS-->>QS: Record
QS-->>M: Record
M-->>CT: Record (with fireTS added)
CT->>CT: addFireTimestamp(record)
CT->>CT: deserialize(JSON → M)
CT->>H: handle(message)
alt Success
H-->>CT: ok
CT->>M: delete(id)
M->>QS: delete(set, id)
QS->>AS: delete(key)
else Failure
H-->>CT: exception
CT->>CT: sidelineMagazine.load(record)
CT->>M: delete(id)
end
4.3 Shovel Flow¶
sequenceDiagram
participant T as Timer
participant ST as ShovelTask
participant SL as Sideline Magazine
participant M as Main Magazine
participant AS as Aerospike
T->>ST: run()
ST->>SL: fire()
SL->>AS: query(sidelineSet)
AS-->>SL: Record
SL-->>ST: Record
ST->>M: load(record)
M->>AS: put(mainSet, record)
AS-->>M: ok
M-->>ST: ok
ST->>SL: delete(id)
SL->>AS: delete(sidelineSet, id)
4.4 Sweep Flow¶
sequenceDiagram
participant Timer
participant SW as Sweeper
participant FJP as ForkJoinPool (p=64)
participant QS as QueueService
participant AS as Aerospike
participant SL as Sideline Magazine
Timer->>SW: run()
SW->>QS: getQueues(active=true)
QS-->>SW: List<QueueConfig>
loop For each queue + shard (parallel via FJP)
SW->>FJP: submit(sweepTask)
FJP->>QS: scan(set, fireTS < threshold)
QS->>AS: scan with filter
AS-->>QS: stale Records
loop For each stale record
QS->>SL: load(record) [reload to sideline]
QS->>AS: delete(original record)
end
end
Sweep Threshold
The sweeper scans for records whose fireTS is older than a configurable threshold. These are messages that were dequeued (fire timestamp set) but never successfully processed or deleted — indicating a crashed consumer.
5. Sealed Type Hierarchy¶
IgnisMQ uses Java 17 sealed types to enforce a closed set of implementations.
classDiagram
class IQueue~M~ {
<<sealed interface>>
+publish(M message)
+publishWithDelay(M message, long delay)
}
class MagazineQueue~M~ {
<<final>>
}
IQueue <|.. MagazineQueue : permits
class QueueService {
<<sealed interface>>
+load(Record)
+fire(String set) Record
+delete(String set, String id)
}
class AerospikeQueueService {
<<final>>
}
QueueService <|.. AerospikeQueueService : permits
class BaseStorage {
<<abstract sealed>>
+accept(StorageVisitor~T~ visitor) T
}
class AerospikeStorage {
<<final>>
-String namespace
-IAerospikeClient client
}
BaseStorage <|-- AerospikeStorage : permits
class StorageClient~T~ {
<<interface>>
+get(Key key) T
+put(Key key, Bin... bins)
+delete(Key key)
}
class AerospikeStoreClient {
<<final>>
}
StorageClient <|.. AerospikeStoreClient : implements
Why Sealed Types?
Sealed types provide exhaustiveness guarantees at compile time. When you pattern-match on IQueue, the compiler knows the only possible implementation is MagazineQueue. This makes the codebase safer to extend — adding a new storage backend requires updating all permits clauses and their consumers.
6. Visitor Pattern¶
IgnisMQ uses the visitor pattern to decouple storage construction from storage type specifics.
classDiagram
class BaseStorage {
<<abstract sealed>>
+accept(StorageVisitor~T~ visitor) T
}
class AerospikeStorage {
-String namespace
-IAerospikeClient client
+accept(StorageVisitor~T~ visitor) T
}
class StorageVisitor~T~ {
<<interface>>
+visit(AerospikeStorage storage) T
}
class MagazineStorageVisitor {
<<inner class of MagazineQueue>>
+visit(AerospikeStorage storage) Magazine
}
class QueueServiceVisitor {
<<used by IgnisMQManager>>
+visit(AerospikeStorage storage) QueueService
}
class StoreClientVisitor {
<<used by IgnisMQManager>>
+visit(AerospikeStorage storage) StorageClient
}
BaseStorage <|-- AerospikeStorage : permits
StorageVisitor <|.. MagazineStorageVisitor
StorageVisitor <|.. QueueServiceVisitor
StorageVisitor <|.. StoreClientVisitor
BaseStorage --> StorageVisitor : accept()
AerospikeStorage ..|> BaseStorage
How it works:
IgnisMQManagerreceives aBaseStorageinstance (which is actuallyAerospikeStorage).- To build a
QueueService, it callsstorage.accept(new QueueServiceVisitor()). AerospikeStorage.accept()dispatches tovisitor.visit(this).- The visitor has access to the concrete
AerospikeStoragefields and constructsAerospikeQueueService.
The same pattern is used inside MagazineQueue — MagazineStorageVisitor builds the Magazine instances from the concrete storage.
Extensibility
To add a new storage backend (e.g., Redis), you would: (1) create RedisStorage extends BaseStorage, (2) add visit(RedisStorage) to StorageVisitor, (3) implement the visitor methods. The sealed hierarchy ensures you cannot forget any call site.
7. Metrics¶
IgnisMQ integrates with Dropwizard Metrics for observability.
Registered Metrics¶
| Metric Name Pattern | Type | Description |
|---|---|---|
commands.{farmId}_{clientId}_{queueName}_publisher.all |
Timer | Latency and throughput of publish operations |
commands.{farmId}_{clientId}_{queueName}_consumer.all |
Timer | Latency and throughput of consume operations |
QueueStatGauge per queue |
CachedGauge (3 min TTL) | Reports published, consumed, unconsumed, sidelined, shovelled counts |
QueueStatGauge¶
flowchart LR
A[Metrics Reporter] -->|getValue| B[QueueStatGauge]
B -->|cached?| C{Cache fresh?<br/>TTL = 3 min}
C -->|Yes| D[Return cached Map]
C -->|No| E[loadValue]
E --> F[Query Aerospike<br/>for queue stats]
F --> G[Return Map with<br/>published, consumed,<br/>unconsumed, sidelined,<br/>shovelled]
G --> D
FunctionMetrics (AspectJ)¶
IgnisMQ uses AspectJ weaving via the @FunctionMetrics annotation to automatically add timing metrics to annotated methods without manual instrumentation code.
8. ZooKeeper Node Structure¶
IgnisMQ uses ZooKeeper (via Apache Curator) for leader election and work distribution among instances.
/{clientId}-ignis-workers/
└── {clientId}/
├── loadbalancer-leader ← LeaderSelector (persistent node)
│ Only one instance holds leadership.
│ Leader runs the Sweeper.
│
├── loadbalancer/
│ └── {balancerId} ← Ephemeral sequential nodes.
│ Each running IgnisMQ instance
│ registers itself here.
│
└── readers/
└── {partition} ← Communicator nodes.
Each partition stores the
assigned balancerId, mapping
partitions to instances.
graph TD
ROOT["/{clientId}-ignis-workers"] --> CLIENT["/{clientId}"]
CLIENT --> LEADER["loadbalancer-leader<br/><i>LeaderSelector</i>"]
CLIENT --> LB["loadbalancer/"]
CLIENT --> READERS["readers/"]
LB --> LB1["{balancerId-001}<br/><i>ephemeral</i>"]
LB --> LB2["{balancerId-002}<br/><i>ephemeral</i>"]
LB --> LBN["{balancerId-N}<br/><i>ephemeral</i>"]
READERS --> R1["{partition-0}<br/>assigned: balancerId-001"]
READERS --> R2["{partition-1}<br/>assigned: balancerId-002"]
READERS --> RN["{partition-N}<br/>assigned: balancerId-001"]
style LEADER fill:#f9a825,stroke:#f57f17
style LB1 fill:#81d4fa,stroke:#0277bd
style LB2 fill:#81d4fa,stroke:#0277bd
style LBN fill:#81d4fa,stroke:#0277bd
Leader election flow:
- Each instance creates an ephemeral node under
loadbalancer/. - Curator's
LeaderSelectorcompetes forloadbalancer-leader. - The winning instance calls
takeLeadership(), which activates theSweeper. - The leader polls
updateState()every 30 seconds to reassign partitions across live balancer nodes. - When the leader dies, its ephemeral node disappears and Curator triggers a new election.
ZooKeeper Dependency
ZooKeeper is required for multi-instance deployments. In single-instance mode, the instance always becomes the leader. If ZK is unavailable, sweeping and partition rebalancing will stall — but publish and consume continue to work.