Aerospike Backend¶
Comprehensive documentation of the Aerospike storage backend for IgnisMQ.
Overview¶
IgnisMQ uses Aerospike as its primary storage backend, leveraging the Magazine library for queue primitives (load/fire operations) and AerospikeQueueService for queue metadata management.
The storage layer is split into two concerns:
- AerospikeStorage — a thin configuration holder that carries the
AerospikeConfigurationand namespace, implementing theBaseStoragevisitor pattern. - AerospikeQueueService — the actual CRUD and sweep logic for queue metadata, fire timestamps, and sweep operations.
Configuration¶
AerospikeStorage storage = new AerospikeStorage(
AerospikeConfiguration.builder()
.hosts(List.of(AerospikeHost.builder()
.host("localhost")
.port(3000)
.build()))
.retries(3)
.sleepBetweenRetries(100)
.socketTimeout(3000)
.totalTimeout(5000)
.maxConnectionsPerNode(100)
.threadPoolSize(4) // default: availableProcessors * 4
.build(),
"my-namespace"
);
TLS
If credentials are configured in AerospikeConfiguration, the client will negotiate TLS automatically.
Queue Metadata Set: {farmId}_{clientId}_ignis_queues¶
Each record represents one queue. The Aerospike key is the queue name.
| Bin | Type | Description |
|---|---|---|
handlerType |
String | Message handler type identifier |
shards |
int | Number of shards for parallelism |
queueExpiry |
int | Queue expiry in seconds |
messageExpiry |
int | Message TTL in seconds |
concurrency |
int | Consumer concurrency |
shovelConcur |
int | Shovel concurrency |
shovelInterval |
int | Shovel time interval in seconds |
createdAt |
long | Creation timestamp (epoch ms) |
active |
String | "true" / "false" — used for secondary index |
sweepPointers |
Map<String, Long> | Per-shard sweep pointer positions |
sidelineSweep |
Map<String, Long> | Per-shard sideline sweep pointer positions |
sweptCounter |
long | Total messages swept from main magazine |
sidelineSwept |
long | Total messages swept from sideline magazine |
sweepDuration |
long | Sweep look-back duration in milliseconds |
maxBatchSize |
int | Batching config — max batch size (0 = disabled) |
maxWaitTime |
int | Batching config — max wait time in seconds (0 = disabled) |
Magazine Data Sets (managed by Magazine library)¶
| Set Name Pattern | Purpose |
|---|---|
{clientId}_data_set |
Message data records (keyed by {queueName}_SHARD_{shard}_{pointer}) |
{clientId}_meta_set |
Magazine pointers — load/fire pointers per shard |
Sideline
Sideline queues reuse the same Magazine sets but with _SIDELINE appended to the queue name (e.g., my-queue_SIDELINE).
Secondary Indexes¶
A secondary index is automatically created on the active bin (IndexType.STRING) during AerospikeQueueService construction:
// Index name: {farmId}_{clientId}_ignis_queues_active
createIndex(setName + "_" + ACTIVE_BIN, ACTIVE_BIN, IndexType.STRING);
This enables efficient queries for active or inactive queues via Filter.equal(ACTIVE_BIN, "true").
Warning
If the index already exists (result code 200), creation is silently skipped.
Client Configuration¶
The Aerospike client is configured in AerospikeStoreClient with the following policies:
| Setting | Value |
|---|---|
| Write commit level | CommitLevel.COMMIT_ALL |
| Write/read replica | Replica.MASTER_PROLES |
| Connection pool | maxConnectionsPerNode from config |
| Thread pool | threadPoolSize (default: availableProcessors * 4) |
| Write policy | sendKey = true (stores user key for retrieval) |
| TTL on write | Configurable per operation (-2 = don't update TTL) |
Retry Strategy¶
All AerospikeQueueService operations use a Guava Retryer:
RetryerBuilder.newBuilder()
.retryIfExceptionOfType(AerospikeException.class)
.withStopStrategy(StopStrategies.stopAfterAttempt(configuration.getRetries()))
.withWaitStrategy(WaitStrategies.fixedWait(
configuration.getSleepBetweenRetries(), TimeUnit.MILLISECONDS))
.withBlockStrategy(BlockStrategies.threadSleepStrategy())
.build();
Fire Timestamp Mechanism¶
When a consumer fires (dequeues) a message, addFireTimestamp() stamps the fireTS bin on the corresponding data record with the current epoch millisecond timestamp:
This timestamp is later used by the sweeper to identify "stuck" messages — those that were fired but never acknowledged within the configured sweepDuration.
Note
fireTS is also stamped during shoveling to reset the sweep clock for re-delivered messages.
Sweep Internals¶
The sweep process detects messages that were fired but not consumed within the allowed time window and reloads them into the sideline magazine.
Execution Flow¶
- Sweeper.run() — Triggered as a
TimerTask(only when this node is the active leader). - Fetches all active queues via
queueService.getQueues(true). - Submits each queue to a fixed thread pool (
Executors.newFixedThreadPool(64)). - Per queue: iterates over each shard
0..numShards-1sequentially. - Per shard: calls
queueService.sweep(queueName, shard, sweepTillFireTS, sidelineMagazine).
Sweep Algorithm (per shard)¶
flowchart TD
A[Start sweep for shard] --> B[Read Magazine meta record]
B --> C{Meta record exists?}
C -- No --> Z[Skip — no data]
C -- Yes --> D[Get currentFirePointer from meta]
D --> E[Get sweepPointer from queue entity]
E --> F{sweepPointer >= currentFirePointer?}
F -- Yes --> Z2[Skip — already swept]
F -- No --> G[Batch read 1000 records starting at sweepPointer]
G --> H{For each record}
H --> I{fireTS >= sweepTillFireTS?}
I -- Yes --> J[Stop — reached time boundary]
I -- No --> K[Load message into sideline magazine]
K --> L[Delete record from main magazine]
L --> H
H -- All processed --> M[Update sweepPointer and sweptCounter via MapOperation.increment]
M --> N{sweepPointer >= currentFirePointer?}
N -- Yes --> O[Done]
N -- No --> G
J --> O
Key Details¶
- sweepTillFireTimestamp =
System.currentTimeMillis() - sweepDuration - Batch size: 1000 records per batch (constant
SWEEP_BATCH_SIZE) - Sideline sweep threshold:
min(sweepTillFireTS, now - 2 * shovelTimeInterval)— prevents sweeping messages that are about to be shoveled - Pointer updates: Uses
MapOperation.incrementto atomically advance per-shard sweep pointers - Queue metadata TTL:
queueExpiry * 2(theTTL_FACTOR_FOR_QUEUE_EXPIRYconstant)
Performance Tips¶
| Lever | Guidance |
|---|---|
| Shards | More shards = more parallelism. Range: 1–512. Default: 32. |
| Message TTL | Set messageExpiry to keep data clean and prevent unbounded growth. |
| Queue metadata TTL | Automatically set to queueExpiry * 2 for safety margin. |
| Sweep batch size | Fixed at 1000 — balances memory usage vs. scan efficiency. |
| Namespace sizing | Size the namespace memory for peak in-flight message volume. |
| Thread pool | Sweep parallelism is capped at 64 threads (Constants.PARALLEL_FACTOR). |
| Concurrency | Max 100 consumers per queue (MAX_CONSUMERS_ALLOWED). |
| Connection pool | Tune maxConnectionsPerNode based on cluster size and throughput. |