Skip to content

Usage Guide

This guide covers every major use case for IgnisMQ with complete, working code examples.


1. Dropwizard Integration

The recommended way to use IgnisMQ in a Dropwizard application is via the IgnisMQBundle.

Configuration Class

@Data
public class MyAppConfiguration extends Configuration {
    @Valid
    @NotNull
    private AerospikeConfiguration aerospikeConfig;

    @NotBlank
    private String clientId;

    @NotBlank
    private String farmId;

    @NotBlank
    private String zookeeperConnectionString;
}

Bundle Implementation

Subclass IgnisMQBundle and implement the four abstract methods:

public class MyIgnisMQBundle extends IgnisMQBundle<MyAppConfiguration> {

    private CuratorFramework curatorFramework;

    @Override
    protected BaseStorage getStorage(MyAppConfiguration config) {
        return new AerospikeStorage(config.getAerospikeConfig(), "my-namespace");
    }

    @Override
    protected String getClientId(MyAppConfiguration config) {
        return config.getClientId();
    }

    @Override
    protected String getFarmId(MyAppConfiguration config) {
        return config.getFarmId();
    }

    @Override
    protected CuratorFramework getCuratorFramework() {
        return curatorFramework;
    }

    public void setCuratorFramework(CuratorFramework curatorFramework) {
        this.curatorFramework = curatorFramework;
    }
}
Method Returns Purpose
getStorage(T config) BaseStorage Creates the Aerospike storage backend
getClientId(T config) String Unique identifier for this application/service
getFarmId(T config) String Identifier for the deployment region/datacenter
getCuratorFramework() CuratorFramework ZooKeeper client for leader election

Application Class

public class MyApplication extends Application<MyAppConfiguration> {

    private final MyIgnisMQBundle ignisMQBundle = new MyIgnisMQBundle();

    @Override
    public void initialize(Bootstrap<MyAppConfiguration> bootstrap) {
        bootstrap.addBundle(ignisMQBundle);
    }

    @Override
    public void run(MyAppConfiguration config, Environment environment) throws Exception {
        // Set up CuratorFramework before the bundle runs
        CuratorFramework curator = CuratorFrameworkFactory.newClient(
                config.getZookeeperConnectionString(),
                new ExponentialBackoffRetry(1000, 3)
        );
        curator.start();
        ignisMQBundle.setCuratorFramework(curator);

        // Get the manager created by the bundle
        IgnisMQManager manager = ignisMQBundle.getIgnisMQManager();

        // Register all message handlers BEFORE creating queues
        manager.initialiseMessageHandlers(Map.of(
                "order-handler", Map.entry(OrderEvent.class, new OrderEventHandler()),
                "notification-handler", Map.entry(Notification.class, new NotificationHandler())
        ));

        // Create queues
        manager.createQueue(CreateQueueRequest.builder()
                .name("order-events")
                .shards(32)
                .concurrency(8)
                .messageHandlerType("order-handler")
                .messageExpiry(new TimeToLive(TimeUnit.DAY, 2))
                .queueExpiry(new TimeToLive(TimeUnit.DAY, 7))
                .build());
    }
}

Handler registration order

You must call initialiseMessageHandlers() before createQueue(). The manager validates that the messageHandlerType references a registered handler. This method can only be called once — calling it again throws an exception.


2. Standalone Usage (Without Dropwizard)

If you're not using Dropwizard, create an IgnisMQManager directly.

The manager builds the StorageClient internally from the provided BaseStorage.

// 1. Configure Aerospike storage
AerospikeStorage storage = new AerospikeStorage(
        AerospikeConfiguration.builder()
                .hosts(List.of(AerospikeHost.builder()
                        .host("localhost")
                        .port(3000)
                        .build()))
                .retries(3)
                .sleepBetweenRetries(100)
                .socketTimeout(3000)
                .totalTimeout(5000)
                .maxConnectionsPerNode(100)
                .threadPoolSize(4)
                .build(),
        "my-namespace"
);

// 2. Set up ZooKeeper
CuratorFramework curator = CuratorFrameworkFactory.newClient(
        "zk-host:2181",
        new ExponentialBackoffRetry(1000, 3)
);
curator.start();

// 3. Create the manager
IgnisMQManager manager = new IgnisMQManager(
        "my-service",           // clientId
        storage,                // BaseStorage
        new ObjectMapper(),     // Jackson ObjectMapper
        new MetricRegistry(),   // Dropwizard MetricRegistry
        curator,                // CuratorFramework
        "datacenter-1"          // farmId
);

// 4. Register handlers and create queues
manager.initialiseMessageHandlers(Map.of(
        "order-handler", Map.entry(OrderEvent.class, new OrderEventHandler())
));

manager.createQueue(CreateQueueRequest.builder()
        .name("order-events")
        .concurrency(4)
        .messageHandlerType("order-handler")
        .build());

Use this when you already have an Aerospike client and want to share it.

// Pre-build the StorageClient
StorageClient storageClient = new AerospikeStoreClient(aerospikeConfiguration);

IgnisMQManager manager = new IgnisMQManager(
        "my-service",           // clientId
        storage,                // BaseStorage
        new ObjectMapper(),     // Jackson ObjectMapper
        new MetricRegistry(),   // Dropwizard MetricRegistry
        storageClient,          // Pre-built StorageClient
        curator,                // CuratorFramework
        "datacenter-1"          // farmId
);

Constructor side effects

Both constructors immediately start background tasks: the watcher thread (refreshes queues every 5 minutes) and the TaskInitializer (leader election + sweeper). Make sure ZooKeeper and Aerospike are reachable before constructing the manager.


3. Publishing Messages

Once you have a queue, publish messages using the publish() method on IQueue. Messages are serialized to JSON via Jackson's ObjectMapper.

Basic Publishing

IQueue<OrderEvent> queue = manager.getQueue("order-events");

// Publish a POJO
queue.publish(OrderEvent.builder()
        .orderId("ORD-12345")
        .amount(249.99)
        .status("CREATED")
        .build());

Different Message Types

Any Jackson-serializable Java object works as a message:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private double amount;
    private String status;
}

IQueue<OrderEvent> queue = manager.getQueue("order-events");
queue.publish(new OrderEvent("ORD-001", 99.99, "CREATED"));
IQueue<String> queue = manager.getQueue("simple-queue");
queue.publish("Hello, IgnisMQ!");
queue.publish("{\"key\": \"raw-json-string\"}");
IQueue<Map<String, Object>> queue = manager.getQueue("dynamic-queue");
queue.publish(Map.of(
        "eventType", "USER_SIGNUP",
        "userId", "usr-42",
        "timestamp", System.currentTimeMillis()
));

Publish return value

publish() returns true if the message was successfully written to Magazine (Aerospike). It throws JsonProcessingException if serialization fails.


4. Implementing MessageHandler

The MessageHandler<M> interface has three methods you must implement:

public interface MessageHandler<M> {
    boolean handle(M message) throws Exception;
    boolean handle(List<M> messages) throws Exception;
    Set<Class<?>> getIgnorableExceptions();
}
Method Called When Return true Return false
handle(M) Single message consumed Message acknowledged and deleted Message sidelined for retry
handle(List<M>) Batch mode enabled All messages in batch acknowledged All messages in batch sidelined
getIgnorableExceptions() Exception thrown during handling Exceptions in this set are swallowed (message skipped, not sidelined)

Simple Handler

public class OrderEventHandler implements MessageHandler<OrderEvent> {

    @Override
    public boolean handle(OrderEvent message) throws Exception {
        log.info("Processing order: {} amount: {}",
                message.getOrderId(), message.getAmount());
        // Business logic here
        return true; // Acknowledge
    }

    @Override
    public boolean handle(List<OrderEvent> messages) throws Exception {
        for (OrderEvent msg : messages) {
            handle(msg);
        }
        return true;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        return Set.of(); // No exceptions are ignorable
    }
}

Handler with Retry Logic

Return false to sideline the message. Shoveling will re-queue it for another attempt later.

public class PaymentHandler implements MessageHandler<PaymentEvent> {

    @Override
    public boolean handle(PaymentEvent payment) throws Exception {
        try {
            paymentGateway.process(payment);
            return true; // Success — message consumed
        } catch (GatewayTimeoutException e) {
            log.warn("Payment gateway timeout for {}, will retry via shovel",
                    payment.getPaymentId());
            return false; // Sidelined — will be retried when shovel runs
        }
    }

    @Override
    public boolean handle(List<PaymentEvent> messages) throws Exception {
        boolean allSucceeded = true;
        for (PaymentEvent msg : messages) {
            allSucceeded &= handle(msg);
        }
        return allSucceeded;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        return Set.of(); // All exceptions cause sidelining
    }
}

Handler with Ignorable Exceptions

Exceptions in the ignorable set are caught and swallowed — the message is skipped without being sidelined.

public class EventSyncHandler implements MessageHandler<SyncEvent> {

    @Override
    public boolean handle(SyncEvent event) throws Exception {
        if (eventStore.exists(event.getEventId())) {
            throw new DuplicateEventException(event.getEventId());
            // This exception is ignorable — message is simply skipped
        }
        eventStore.save(event);
        return true;
    }

    @Override
    public boolean handle(List<SyncEvent> messages) throws Exception {
        for (SyncEvent msg : messages) {
            handle(msg);
        }
        return true;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        return Set.of(
                DuplicateEventException.class,  // Skip duplicates silently
                StaleEventException.class        // Skip outdated events
        );
    }
}

Exception handling flow

When an exception is thrown during handle():

  1. If the exception class is in getIgnorableExceptions() → message is skipped (not sidelined)
  2. Otherwise → message is sidelined to the sideline Magazine for later retry

5. Batch Consumption

Batch mode allows consuming multiple messages in a single handle(List<M>) call, which is useful for bulk database writes, batch API calls, or aggregation.

Configuration

Set batchingConfig in the CreateQueueRequest:

manager.createQueue(CreateQueueRequest.builder()
        .name("analytics-events")
        .concurrency(4)
        .messageHandlerType("analytics-handler")
        .batchingConfig(BatchingConfig.builder()
                .maxBatchSize(100)
                .maxWaitTimeInSecs(5)
                .build())
        .build());

How Batching Works

The consumer accumulates messages until either condition is met:

Parameter Range Default Trigger
maxBatchSize 2 – 200,000 2 Batch is full → deliver immediately
maxWaitTimeInSecs 1 – 300 1 Time elapsed → deliver whatever has accumulated
Time ──────────────────────────────────────────────►
     │ msg1  msg2  msg3  ...  msg100               │
     │◄─────── batch fills up ──────►│              │
     │                               ▼              │
     │                    handler.handle(List<M>)   │
     │                                              │
     │ msg1  msg2  msg3                             │
     │◄──── only 3 msgs ────►│                      │
     │                        │ maxWaitTimeInSecs   │
     │                        ▼ elapsed             │
     │             handler.handle(List<M>)          │

Batch Handler Example

public class AnalyticsHandler implements MessageHandler<AnalyticsEvent> {

    private final JdbcTemplate jdbc;

    @Override
    public boolean handle(AnalyticsEvent message) throws Exception {
        return handle(List.of(message)); // Delegate to batch method
    }

    @Override
    public boolean handle(List<AnalyticsEvent> messages) throws Exception {
        // Bulk insert for efficiency
        jdbc.batchUpdate(
                "INSERT INTO analytics (event_id, event_type, payload, ts) VALUES (?, ?, ?, ?)",
                messages.stream()
                        .map(e -> new Object[]{e.getId(), e.getType(), e.getPayload(), e.getTimestamp()})
                        .collect(Collectors.toList())
        );
        log.info("Inserted {} analytics events in batch", messages.size());
        return true;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        return Set.of();
    }
}

When to use batching

Batching is most beneficial when the per-message overhead is high (e.g., database round-trips, HTTP calls). For simple in-memory processing, single-message consumption is typically sufficient.


6. Shoveling Configuration

Shoveling moves messages from the sideline queue back to the main queue for re-consumption. This is the retry mechanism for messages that failed processing.

Configure at Queue Creation

manager.createQueue(CreateQueueRequest.builder()
        .name("payment-events")
        .concurrency(8)
        .messageHandlerType("payment-handler")
        .shovelConfig(ShovelConfig.builder()
                .concurrency(4)         // 4 parallel shovel threads
                .timeIntervalInSecs(600) // Run every 10 minutes
                .build())
        .build());
Parameter Range Default Description
concurrency 1 – 50 4 Number of parallel shovel threads
timeIntervalInSecs 0 – 86,400 (1 day) 600 (10 min) Interval between shovel runs

Schedule Shoveling Later

If you didn't configure shoveling at queue creation, you can schedule it afterwards:

manager.scheduleShoveling("payment-events", ShovelConfig.builder()
        .concurrency(2)
        .timeIntervalInSecs(300) // Every 5 minutes
        .build());

Manual One-Shot Shovel

Trigger an immediate, one-time shovel that stops when the sideline is empty:

IQueue<?> queue = manager.getQueue("payment-events");
queue.shovel(4); // 4 concurrent threads, runs once

Shovel behavior

  • Scheduled shoveling runs repeatedly at the configured interval
  • Manual shovel (queue.shovel(concurrency)) runs once and stops when sideline is drained
  • Shovel threads count toward the max consumer limit (100 per queue)

7. Runtime Queue Management

IgnisMQManager provides methods to inspect and manage queues at runtime.

Retrieving Queues

// Get a specific queue (throws IgnisMQException if not found)
IQueue<OrderEvent> queue = manager.getQueue("order-events");

// Get all active queues on this instance
Map<String, IQueue<?>> allQueues = manager.getAllQueues();
for (Map.Entry<String, IQueue<?>> entry : allQueues.entrySet()) {
    log.info("Queue: {} unconsumed: {}", entry.getKey(), entry.getValue().getUnconsumedCount());
}

// Get all queue names from persistent storage (across all instances)
Set<String> allQueueNames = manager.getAllQueuesFromDB();
log.info("Total queues in cluster: {}", allQueueNames.size());

Scaling Consumers

Adjust the number of consumer threads for a queue at runtime:

// Scale up: add 4 more consumer threads
manager.increaseConsumers("order-events", 4);

// Scale down: stop 2 consumer threads
manager.decreaseConsumers("order-events", 2);

Consumer limits

The maximum total consumers per queue is 100 (MAX_CONSUMERS_ALLOWED). Attempting to exceed this throws IgnisMQException with error code MAX_ALLOWED_CONSUMERS_EXCEEDED.

Deactivating a Queue

// Stop all consumers and shovel tasks, mark queue as inactive
manager.deactivateQueue("order-events");

Irreversible operation

deactivateQueue() is permanent. Once deactivated, the queue cannot be reactivated. The deactivation propagates to all instances on the next refresh cycle (every 5 minutes). Any unconsumed messages remain in Aerospike until their TTL expires.

Manual Sweep

Trigger the sweep process for a specific queue (normally runs automatically via leader election):

manager.sweepQueue("order-events");

8. Queue Monitoring

Unconsumed Count

IQueue<?> queue = manager.getQueue("order-events");
long pending = queue.getUnconsumedCount();
log.info("Messages waiting to be consumed: {}", pending);

Full Queue Metadata

QueueMetaData meta = queue.getMetaData();
log.info("Published:  {}", meta.getPublished());
log.info("Consumed:   {}", meta.getConsumed());
log.info("Sidelined:  {}", meta.getSidelined());
log.info("Shovelled:  {}", meta.getShovelled());
Field Description
published Total messages ever published (load pointer sum across shards)
consumed Total messages consumed/acknowledged (fire pointer sum across shards)
sidelined Messages currently in the sideline queue (sideline load − sideline fire)
shovelled Messages moved back from sideline to main queue (sideline fire pointer)

Automatic Metrics

IgnisMQ automatically registers metrics with the Dropwizard MetricRegistry:

Metric Type Description
commands.{queueName}_publish.all Timer Latency of publish operations
commands.{queueName}_consume.all Timer Latency of consume operations
ignis.queue.stats Gauge QueueStatGuage — reports queue statistics every 3 minutes
// Access metrics programmatically
Timer publishTimer = metricRegistry.timer("commands.order-events_publish.all");
log.info("Publish p99: {} ms", publishTimer.getSnapshot().get99thPercentile() / 1_000_000);

9. Queue Lifecycle & TTL

IgnisMQ uses time-to-live (TTL) values to manage the lifecycle of messages and queues.

TTL Configuration

manager.createQueue(CreateQueueRequest.builder()
        .name("transient-events")
        .concurrency(4)
        .messageHandlerType("transient-handler")
        .messageExpiry(new TimeToLive(TimeUnit.HOUR, 6))   // Messages expire after 6 hours
        .queueExpiry(new TimeToLive(TimeUnit.DAY, 7))      // Queue expires after 7 days
        .build());
Parameter Default Max Description
messageExpiry 2 days 1 year (365 days) TTL for individual messages in Aerospike
queueExpiry 2 days 1 year (365 days) TTL for the queue itself (from creation time)
sweepDurationInMins 20 min 300 min How far back the sweeper looks for stuck messages

Available Time Units

TimeUnit Example Seconds
MINUTE new TimeToLive(TimeUnit.MINUTE, 30) 1,800
HOUR new TimeToLive(TimeUnit.HOUR, 12) 43,200
DAY new TimeToLive(TimeUnit.DAY, 7) 604,800

TTL Rules and Behavior

  • Queue metadata is stored with TTL = queueExpiry × 2. This ensures metadata outlives both the queue and its messages to prevent Magazine identifier clashes.
  • Validation: queueExpiry must be ≥ messageExpiry. This is enforced at creation time.
  • Maximum TTL: 1 year (31,536,000 seconds). Exceeding this fails validation.
  • Expired queues: On each refresh cycle (every 5 minutes), queues whose createdAt + queueExpiry has passed are automatically deactivated.
Timeline:
├── Queue created (createdAt)
├── messageExpiry ──► Individual messages expire in Aerospike
├── queueExpiry ────► Queue deactivated on next refresh cycle
├── queueExpiry × 2 ► Queue metadata removed from Aerospike

Why queueExpiry × 2?

The queue entity is persisted with TTL = queueExpiry × 2 to guarantee that (queueExpiry + messageExpiry) time elapses before the metadata is removed. Since queueExpiry >= messageExpiry, queueExpiry × 2 >= queueExpiry + messageExpiry always holds. This prevents a new queue with the same name from conflicting with lingering messages from the old queue.


10. Complete Production Example

A realistic setup with multiple queues, different configurations, and proper error handling.

public class ECommerceApplication extends Application<ECommerceConfig> {

    private final MyIgnisMQBundle ignisMQBundle = new MyIgnisMQBundle();

    @Override
    public void initialize(Bootstrap<ECommerceConfig> bootstrap) {
        bootstrap.addBundle(ignisMQBundle);
    }

    @Override
    public void run(ECommerceConfig config, Environment environment) throws Exception {
        IgnisMQManager manager = ignisMQBundle.getIgnisMQManager();

        // ──────────────────────────────────────────────
        // Register ALL handlers upfront
        // ──────────────────────────────────────────────
        manager.initialiseMessageHandlers(Map.of(
                "order-handler",
                    Map.entry(OrderEvent.class, new OrderEventHandler(orderService)),
                "payment-handler",
                    Map.entry(PaymentEvent.class, new PaymentEventHandler(paymentGateway)),
                "notification-handler",
                    Map.entry(NotificationEvent.class, new NotificationHandler(emailClient, smsClient)),
                "analytics-handler",
                    Map.entry(AnalyticsEvent.class, new AnalyticsHandler(analyticsDb))
        ));

        // ──────────────────────────────────────────────
        // Queue 1: Order Events — standard concurrency, shoveling enabled
        // ──────────────────────────────────────────────
        manager.createQueue(CreateQueueRequest.builder()
                .name("order-events")
                .shards(64)
                .concurrency(16)
                .messageHandlerType("order-handler")
                .messageExpiry(new TimeToLive(TimeUnit.DAY, 3))
                .queueExpiry(new TimeToLive(TimeUnit.DAY, 14))
                .shovelConfig(ShovelConfig.builder()
                        .concurrency(4)
                        .timeIntervalInSecs(300)  // Retry failed orders every 5 minutes
                        .build())
                .sweepDurationInMins(30)
                .build());

        // ──────────────────────────────────────────────
        // Queue 2: Payment Events — critical, aggressive shoveling
        // ──────────────────────────────────────────────
        manager.createQueue(CreateQueueRequest.builder()
                .name("payment-events")
                .shards(32)
                .concurrency(8)
                .messageHandlerType("payment-handler")
                .messageExpiry(new TimeToLive(TimeUnit.DAY, 7))
                .queueExpiry(new TimeToLive(TimeUnit.DAY, 30))
                .shovelConfig(ShovelConfig.builder()
                        .concurrency(8)
                        .timeIntervalInSecs(60)   // Retry failed payments every minute
                        .build())
                .sweepDurationInMins(15)
                .build());

        // ──────────────────────────────────────────────
        // Queue 3: Notifications — lower priority, no shoveling
        // ──────────────────────────────────────────────
        manager.createQueue(CreateQueueRequest.builder()
                .name("notification-events")
                .shards(16)
                .concurrency(4)
                .messageHandlerType("notification-handler")
                .messageExpiry(new TimeToLive(TimeUnit.HOUR, 12))
                .queueExpiry(new TimeToLive(TimeUnit.DAY, 2))
                .build());

        // ──────────────────────────────────────────────
        // Queue 4: Analytics — high throughput with batching
        // ──────────────────────────────────────────────
        manager.createQueue(CreateQueueRequest.builder()
                .name("analytics-events")
                .shards(128)
                .concurrency(8)
                .messageHandlerType("analytics-handler")
                .messageExpiry(new TimeToLive(TimeUnit.DAY, 1))
                .queueExpiry(new TimeToLive(TimeUnit.DAY, 3))
                .batchingConfig(BatchingConfig.builder()
                        .maxBatchSize(500)
                        .maxWaitTimeInSecs(10)
                        .build())
                .build());

        // ──────────────────────────────────────────────
        // Publish from your business logic
        // ──────────────────────────────────────────────
        environment.jersey().register(new OrderResource(manager));
    }
}

Resource Publishing Messages

@Path("/orders")
public class OrderResource {

    private final IgnisMQManager manager;

    public OrderResource(IgnisMQManager manager) {
        this.manager = manager;
    }

    @POST
    public Response createOrder(CreateOrderRequest request) {
        Order order = orderService.create(request);

        // Publish to order queue
        try {
            manager.getQueue("order-events").publish(OrderEvent.builder()
                    .orderId(order.getId())
                    .amount(order.getTotal())
                    .status("CREATED")
                    .build());
        } catch (JsonProcessingException e) {
            log.error("Failed to serialize order event", e);
            // Order is already created — log the error, don't fail the API
        }

        // Publish to analytics queue
        try {
            manager.getQueue("analytics-events").publish(AnalyticsEvent.builder()
                    .eventType("ORDER_CREATED")
                    .entityId(order.getId())
                    .timestamp(System.currentTimeMillis())
                    .build());
        } catch (JsonProcessingException e) {
            log.warn("Failed to publish analytics event", e);
        }

        return Response.status(201).entity(order).build();
    }
}

Production Handler with Error Handling

public class PaymentEventHandler implements MessageHandler<PaymentEvent> {

    private final PaymentGateway gateway;
    private static final Logger log = LoggerFactory.getLogger(PaymentEventHandler.class);

    public PaymentEventHandler(PaymentGateway gateway) {
        this.gateway = gateway;
    }

    @Override
    public boolean handle(PaymentEvent event) throws Exception {
        try {
            PaymentResult result = gateway.process(event);
            if (result.isSuccess()) {
                log.info("Payment {} processed successfully", event.getPaymentId());
                return true;
            } else {
                log.warn("Payment {} declined: {}", event.getPaymentId(), result.getReason());
                return true; // Declined is a final state — don't retry
            }
        } catch (GatewayTimeoutException e) {
            log.warn("Gateway timeout for payment {}, will retry", event.getPaymentId());
            return false; // Sideline for retry via shoveling
        } catch (InvalidPaymentException e) {
            log.error("Invalid payment data for {}: {}", event.getPaymentId(), e.getMessage());
            throw e; // Not in ignorable set → will be sidelined
        }
    }

    @Override
    public boolean handle(List<PaymentEvent> messages) throws Exception {
        boolean allOk = true;
        for (PaymentEvent msg : messages) {
            allOk &= handle(msg);
        }
        return allOk;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        return Set.of(
                DuplicatePaymentException.class  // Already processed — skip silently
        );
    }
}

Runtime Operations

// Monitor queue health
manager.getAllQueues().forEach((name, queue) -> {
    QueueMetaData meta = queue.getMetaData();
    log.info("Queue={} published={} consumed={} sidelined={} pending={}",
            name, meta.getPublished(), meta.getConsumed(),
            meta.getSidelined(), queue.getUnconsumedCount());
});

// Scale up during peak hours
manager.increaseConsumers("order-events", 8);    // 16 → 24 consumers
manager.increaseConsumers("payment-events", 4);  // 8  → 12 consumers

// Scale down during off-peak
manager.decreaseConsumers("order-events", 8);    // 24 → 16 consumers
manager.decreaseConsumers("payment-events", 4);  // 12 → 8  consumers

// Emergency: manually trigger shovel for a queue
manager.getQueue("payment-events").shovel(8);

// Emergency: manually trigger sweep
manager.sweepQueue("payment-events");

// Decommission a queue (IRREVERSIBLE)
manager.deactivateQueue("legacy-events");