API Reference¶
Complete reference for all public classes, interfaces, methods, and fields in IgnisMQ.
IQueue¶
The primary interface for interacting with a queue. Generic parameter M is the message type.
Methods¶
| Method | Signature | Description |
|---|---|---|
publish |
boolean publish(M message) throws JsonProcessingException |
Serializes the message to JSON and loads it into the Magazine. Returns true on success. |
getUnconsumedCount |
long getUnconsumedCount() |
Returns publishedCount - consumedCount, floored at 0. |
getMetaData |
QueueMetaData getMetaData() |
Returns a snapshot of published, consumed, sidelined, and shovelled counts. |
shovel |
void shovel(int concurrency) |
Triggers a one-shot shovel operation with auto-delete enabled. |
IgnisMQManager¶
Central manager for all queue lifecycle operations. Implements Dropwizard Managed for start/stop hooks.
Constructors¶
public IgnisMQManager(
String clientId,
BaseStorage storage,
ObjectMapper mapper,
MetricRegistry metricRegistry,
CuratorFramework curatorFramework,
String farmId
) throws Exception
Creates the manager without an externally provided StorageClient. The client is constructed internally from the storage configuration.
public IgnisMQManager(
String clientId,
BaseStorage storage,
ObjectMapper mapper,
MetricRegistry metricRegistry,
StorageClient storageClient,
CuratorFramework curatorFramework,
String farmId
) throws Exception
Creates the manager with an externally managed StorageClient, allowing the caller to share or reuse the underlying database connection.
Methods¶
initialiseMessageHandlers¶
Registers all message handler mappings. Must be called exactly once before any call to createQueue. Subsequent calls will fail. Internally calls refreshQueues() to sync state from the database and registers a QueueStatGauge for metrics.
[!WARNING] Calling this method more than once will throw an exception. Ensure all handlers are collected into a single map before invoking.
getQueue¶
Returns the in-memory IQueue instance for the given queue name.
Throws: IgnisMQException with ErrorCode.QUEUE_NOT_FOUND if no queue with that name exists.
getAllQueues¶
Returns the full in-memory map of queue name to IQueue instance.
getAllQueuesFromDB¶
Queries Aerospike and returns the set of all active queue names persisted in the database.
createQueue¶
Validates the request, creates the underlying Magazine structure, and persists a QueueEntity to the database. Starts consumers and schedules sweeps/shovels as configured.
Throws: IgnisMQException with ErrorCode.QUEUE_ALREADY_EXISTS if a queue with the same name is already active.
deactivateQueue¶
Stops all consumers for the queue, removes it from the in-memory map, and sets active=false in the database.
[!CAUTION] This operation is irreversible. The queue cannot be reactivated after deactivation.
increaseConsumers¶
Adds count new consumer timers for the specified queue and updates the concurrency value in the database.
decreaseConsumers¶
Stops count consumer timers for the specified queue and updates the concurrency value in the database.
scheduleShoveling¶
Schedules periodic shoveling for the queue using the interval and concurrency defined in shovelConfig.
sweepQueue¶
Manually triggers a sweep operation for the specified queue.
refreshQueues¶
Synchronizes the in-memory queue map with the persisted state in the database. Creates or removes queue instances as needed.
getTaskInitializer¶
Returns the internal TaskInitializer instance used for scheduling consumer and sweep tasks.
MessageHandler<M>¶
User-implemented interface for processing messages consumed from a queue.
Methods¶
| Method | Signature | Description |
|---|---|---|
handle (single) |
boolean handle(M message) throws Exception |
Processes a single message. Return true to mark as consumed, false to sideline the message. |
handle (batch) |
boolean handle(List<M> messages) throws Exception |
Processes a batch of messages. Return true to mark all as consumed, false to sideline all messages in the batch. |
getIgnorableExceptions |
Set<Class<?>> getIgnorableExceptions() |
Returns exception types that should not cause sidelining. When a thrown exception matches one of these types, the message is deleted instead of sidelined. |
CreateQueueRequest¶
Request object for creating a new queue via IgnisMQManager.createQueue().
Fields¶
| Field | Type | Default | Constraints | Description |
|---|---|---|---|---|
name |
String |
— | @NotBlank |
Unique queue name. |
shards |
Integer |
32 |
@Min(1) @Max(512) |
Number of shards for the underlying Magazine. |
messageExpiry |
TimeToLive |
2 days | Must be valid | TTL for individual messages. |
queueExpiry |
TimeToLive |
2 days | Must be valid | TTL for the queue itself. |
concurrency |
int |
— | @Min(1) @Max(100) |
Number of consumer threads. |
messageHandlerType |
String |
— | @NotNull |
Key into the handler map registered via initialiseMessageHandlers. |
shovelConfig |
ShovelConfig |
null |
@Valid, optional |
If provided, enables periodic shoveling. |
sweepDurationInMins |
int |
20 |
@Min(5) @Max(300) |
Interval in minutes between sweep runs. |
batchingConfig |
BatchingConfig |
null |
@Valid, optional |
If null, single-message mode is used. If set, enables batch consumption. |
[!IMPORTANT] Validation requires
queueExpiry >= messageExpiry. BothqueueExpiryandmessageExpirymust resolve to valid durations within the allowed maximum.
ShovelConfig¶
Configuration for periodic shoveling of sidelined messages back into the queue.
Fields¶
| Field | Type | Default | Constraints | Description |
|---|---|---|---|---|
timeIntervalInSecs |
int |
600 (10 min) |
@Max(86400) |
Interval in seconds between shovel runs. |
concurrency |
int |
4 |
@Min(1) @Max(50) |
Number of parallel shovel threads. |
BatchingConfig¶
Configuration for batch message consumption.
Fields¶
| Field | Type | Default | Constraints | Description |
|---|---|---|---|---|
maxBatchSize |
int |
2 |
@Min(2) @Max(200000) |
Maximum number of messages per batch. |
maxWaitTimeInSecs |
int |
1 |
@Min(1) @Max(300) |
Maximum time in seconds to wait for a full batch before dispatching. |
TimeToLive¶
Represents a duration with a unit, used for message and queue expiry.
Fields¶
| Field | Type | Constraints | Description |
|---|---|---|---|
timeUnit |
TimeUnit |
@NotNull |
The unit of time. |
duration |
int |
@Min(1) |
The numeric duration value. |
Methods¶
| Method | Signature | Description |
|---|---|---|
toSeconds |
int toSeconds() |
Converts the duration to seconds based on timeUnit. |
[!NOTE] Maximum allowed value after conversion is 31,536,000 seconds (1 year).
TimeUnit¶
| Value | Conversion |
|---|---|
MINUTE |
duration * 60 |
HOUR |
duration * 3600 |
DAY |
duration * 86400 |
QueueMetaData¶
Fields¶
| Field | Type | Description |
|---|---|---|
published |
long |
Total messages published. |
consumed |
long |
Total messages consumed. |
sidelined |
long |
Total messages sidelined. |
shovelled |
long |
Total messages shovelled back into the queue. |
QueueEntity¶
Persistent representation of a queue stored in Aerospike.
Fields¶
| Field | Type | Description |
|---|---|---|
messageHandlerType |
String |
Key for the registered message handler. |
shards |
int |
Number of shards. |
queueExpiry |
TimeToLive |
Queue-level TTL. |
messageExpiry |
TimeToLive |
Message-level TTL. |
concurrency |
int |
Number of consumer threads. |
shovelConcurrency |
int |
Number of shovel threads. |
shovelTimeIntervalInSecs |
int |
Shovel interval in seconds. |
createdAt |
long |
Creation timestamp (epoch millis). |
active |
boolean |
Whether the queue is active. |
sweepPointers |
Map |
Current sweep cursor positions per shard. |
sidelineSweepPointers |
Map |
Current sideline sweep cursor positions per shard. |
sweptCounter |
long |
Total messages swept. |
sidelineSweptCounter |
long |
Total sidelined messages swept. |
sweepDuration |
int |
Sweep interval in minutes. |
batchingConfig |
BatchingConfig |
Batching configuration, or null for single-message mode. |
BaseStorage¶
Abstract base for storage backends.
Fields¶
| Field | Type | Constraints | Description |
|---|---|---|---|
storageType |
StorageType |
@NotNull |
The type of storage backend. |
Methods¶
| Method | Signature | Description |
|---|---|---|
accept |
abstract <T> T accept(StorageVisitor<T> visitor) |
Visitor pattern dispatch for storage-specific operations. |
AerospikeStorage¶
Aerospike-backed storage implementation.
Fields¶
| Field | Type | Description |
|---|---|---|
configuration |
AerospikeConfiguration |
Aerospike cluster configuration. |
namespace |
String |
Aerospike namespace to use. |
Constructor¶
StorageClient\<T>¶
Generic interface for obtaining a database client handle.
Methods¶
| Method | Signature | Description |
|---|---|---|
getClient |
T getClient() |
Returns the underlying client instance. |
AerospikeStoreClient¶
Aerospike client wrapper that auto-starts on construction.
Constructor¶
[!NOTE] The client starts automatically on construction. Write policy is configured with
COMMIT_ALLcommit level andMASTER_PROLESreplica policy.
QueueService¶
Internal service interface for queue persistence operations.
Methods¶
| Method | Description |
|---|---|
exists |
Checks if a queue exists in storage. |
get |
Retrieves a QueueEntity by name. |
store |
Persists a new QueueEntity. |
updateState |
Updates the active state of a queue. |
updateConcurrency |
Updates consumer concurrency in storage. |
updateShovelConfig |
Updates shovel configuration in storage. |
getQueues |
Returns all queue entities from storage. |
addFireTimestamp |
Records a fire timestamp for audit/tracking. |
sweep |
Executes a sweep operation in storage. |
IgnisMQBundle\<T>¶
Dropwizard bundle for integrating IgnisMQ into a Dropwizard application.
Abstract Methods¶
| Method | Signature | Description |
|---|---|---|
getStorage |
BaseStorage getStorage(T config) |
Provide the storage backend from app config. |
getClientId |
String getClientId(T config) |
Provide the client identifier from app config. |
getFarmId |
String getFarmId(T config) |
Provide the farm/cluster identifier from app config. |
getCuratorFramework |
CuratorFramework getCuratorFramework() |
Provide the ZooKeeper CuratorFramework instance. |
Methods¶
| Method | Signature | Description |
|---|---|---|
getIgnisMQManager |
IgnisMQManager getIgnisMQManager() |
Returns the initialized IgnisMQManager instance. |
IgnisMQException¶
Fields¶
| Field | Type | Description |
|---|---|---|
errorCode |
ErrorCode |
The specific error code for this exception. |
Static Methods¶
| Method | Signature | Description |
|---|---|---|
propagate |
static IgnisMQException propagate(Throwable t) |
Wraps a throwable as an IgnisMQException. |
propagate |
static IgnisMQException propagate(String message, Throwable t) |
Wraps with a custom message. |
propagate |
static IgnisMQException propagate(ErrorCode errorCode, Throwable t) |
Wraps with a specific error code. |
ErrorCode¶
| Value | Description |
|---|---|
QUEUE_ALREADY_EXISTS |
A queue with the given name is already active. |
QUEUE_NOT_FOUND |
No queue exists with the given name. |
NOT_IMPLEMENTED |
Operation is not implemented for the current storage backend. |
AEROSPIKE_ERROR |
An error occurred in the Aerospike layer. |
INVALID_REQUEST |
The request failed validation. |
MAX_ALLOWED_CONSUMERS_EXCEEDED |
Consumer count exceeds the maximum of 100. |
INVALID_SHOVEL_TIME_INTERVAL |
Shovel interval exceeds 86,400 seconds. |
INVALID_MESSAGE_HANDLER |
The specified message handler type is not registered. |
INTERNAL_ERROR |
An unexpected internal error occurred. |
QueueStat¶
Fields¶
| Field | Type | Description |
|---|---|---|
name |
String |
Queue name. |
active |
boolean |
Whether the queue is active. |
published |
long |
Total published count. |
consumed |
long |
Total consumed count. |
unConsumed |
long |
Unconsumed count (published - consumed). |
sidelined |
long |
Total sidelined count. |
shovelled |
long |
Total shovelled count. |
Constants¶
All constants defined in Constants.java:
| Constant | Value | Description |
|---|---|---|
DEFAULT_SHARDS |
32 |
Default number of shards per queue. |
INITIAL_DELAY_IN_MS |
120000 |
Initial delay before consumers start (2 minutes). |
WATCHER_INITIAL_DELAY_IN_MS |
60000 |
Initial delay before the watcher starts (1 minute). |
DELAY_PERIOD_IN_MS |
1000 |
Consumer polling interval (1 second). |
SHOVEL_DELAY_IN_MS |
10000 |
Delay before first shovel run (10 seconds). |
DEFAULT_DURATION_DAY |
2 |
Default TTL duration in days. |
MAX_ALLOWED_SHOVEL_TIME_INTERVAL_IN_SECONDS |
86400 |
Maximum shovel interval (24 hours). |
MAX_DURATION_ALLOWED_IN_SECONDS |
31536000 |
Maximum TTL duration (1 year). |
REFRESH_INTERVAL_IN_MS |
300000 |
Queue refresh interval (5 minutes). |
MAX_CONSUMERS_ALLOWED |
100 |
Maximum consumer threads per queue. |
TTL_FACTOR_FOR_QUEUE_EXPIRY |
2 |
Multiplier applied to queue expiry for internal storage TTL. |
PARALLEL_FACTOR |
64 |
Parallelism factor for internal operations. |