Skip to content

Getting Started

This guide walks you through setting up IgnisMQ and running your first queue in under 5 minutes.

Prerequisites

Requirement Version Purpose
Java 17+ Sealed classes, pattern matching used throughout
Apache Maven 3.8+ Build tool
Aerospike 5.x / 6.x Queue storage backend
ZooKeeper 3.4+ Leader election and cluster coordination
Docker Any recent Running integration tests via Testcontainers

Installation

Maven

Add the dependency for the module you need:

Use this if you have your own application framework or want to manage the lifecycle yourself.

<dependency>
    <groupId>com.phonepe</groupId>
    <artifactId>ignismq-core</artifactId>
    <version>${ignismq.version}</version>
</dependency>

Use this if you're building a Dropwizard application. This includes ignismq-core transitively.

<dependency>
    <groupId>com.phonepe</groupId>
    <artifactId>ignismq-dw-bundle</artifactId>
    <version>${ignismq.version}</version>
</dependency>

Find the latest version

Check Maven Central for the latest release version.

Build from Source

git clone https://github.com/PhonePe/ignisMQ.git
cd ignisMQ
mvn clean install

To run the full test suite (requires Docker):

mvn clean test

Docker required for tests

Integration tests use Testcontainers to spin up an Aerospike instance (aerospike/aerospike-server:6.1.0.7). Make sure Docker is running before executing tests.

Your First Queue in 5 Minutes

This example shows the minimal code to create a queue, publish a message, and consume it.

Step 1: Define Your Message Type

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderEvent {
    private String orderId;
    private double amount;
    private String status;
}

Step 2: Implement a Message Handler

The MessageHandler interface is how you process messages. Return true to acknowledge, false to sideline for retry.

public class OrderEventHandler implements MessageHandler<OrderEvent> {

    @Override
    public boolean handle(OrderEvent message) throws Exception {
        System.out.println("Processing order: " + message.getOrderId()
                + " amount: " + message.getAmount());

        // Your business logic here
        // Return true  → message consumed successfully
        // Return false → message sidelined for later retry
        return true;
    }

    @Override
    public boolean handle(List<OrderEvent> messages) throws Exception {
        // Called in batch mode. Process all messages.
        for (OrderEvent msg : messages) {
            handle(msg);
        }
        return true;
    }

    @Override
    public Set<Class<?>> getIgnorableExceptions() {
        // These exceptions will NOT cause sidelining
        return Set.of();
    }
}

Step 3: Create the Manager and Queue

// Configure Aerospike storage
AerospikeStorage storage = new AerospikeStorage(
    AerospikeConfiguration.builder()
        .hosts(List.of(AerospikeHost.builder()
            .host("localhost")
            .port(3000)
            .build()))
        .retries(3)
        .sleepBetweenRetries(100)
        .socketTimeout(3000)
        .totalTimeout(5000)
        .maxConnectionsPerNode(100)
        .threadPoolSize(4)
        .build(),
    "my-namespace"
);

// Create the manager
IgnisMQManager manager = new IgnisMQManager(
    "my-service",               // clientId — identifies this application
    storage,                     // Aerospike storage
    new ObjectMapper(),          // Jackson mapper for serialization
    new MetricRegistry(),        // Dropwizard metrics
    curatorFramework,            // ZooKeeper curator client
    "datacenter-1"              // farmId — identifies this deployment
);

// Register message handlers (MUST be called before createQueue)
manager.initialiseMessageHandlers(Map.of(
    "order-handler", Map.entry(OrderEvent.class, new OrderEventHandler())
));

// Create the queue
manager.createQueue(CreateQueueRequest.builder()
    .name("order-events")
    .shards(32)                      // Number of Magazine shards
    .concurrency(4)                  // Number of consumer threads
    .messageHandlerType("order-handler")  // Must match a registered handler
    .messageExpiry(new TimeToLive(TimeUnit.DAY, 2))
    .queueExpiry(new TimeToLive(TimeUnit.DAY, 7))
    .build());
public class MyApplication extends Application<MyConfig> {

    private final MyIgnisMQBundle ignisMQBundle = new MyIgnisMQBundle();

    @Override
    public void initialize(Bootstrap<MyConfig> bootstrap) {
        bootstrap.addBundle(ignisMQBundle);
    }

    @Override
    public void run(MyConfig config, Environment env) {
        IgnisMQManager manager = ignisMQBundle.getIgnisMQManager();

        manager.initialiseMessageHandlers(Map.of(
            "order-handler",
            Map.entry(OrderEvent.class, new OrderEventHandler())
        ));

        manager.createQueue(CreateQueueRequest.builder()
            .name("order-events")
            .concurrency(4)
            .messageHandlerType("order-handler")
            .build());
    }
}

Step 4: Publish Messages

IQueue<OrderEvent> queue = manager.getQueue("order-events");

// Publish a single message
queue.publish(OrderEvent.builder()
    .orderId("ORD-001")
    .amount(149.99)
    .status("CREATED")
    .build());

// Publish in a loop
for (int i = 0; i < 1000; i++) {
    queue.publish(OrderEvent.builder()
        .orderId("ORD-" + i)
        .amount(Math.random() * 500)
        .status("CREATED")
        .build());
}

That's it! Messages will be consumed automatically by the consumer threads. You don't need to call any "consume" method — the MagazineConsumerTask timers handle this.

Consumer startup delay

Consumers start after a 2 minute initial delay (INITIAL_DELAY_IN_MS). This allows the cluster to stabilize. During this time, published messages are safely persisted in Aerospike.

What Happens Behind the Scenes

After you call createQueue():

  1. Two Magazine instances are created — a main queue and a sideline queue ({name}_SIDELINE)
  2. Consumer timers are started (one java.util.Timer per concurrency slot), each polling every 1 second
  3. Queue metadata is persisted to Aerospike with TTL = queueExpiry * 2
  4. A background watcher refreshes queue state from the database every 5 minutes
  5. The TaskInitializer starts leader election and schedules the sweeper

Next Steps

Topic Description
Core Concepts Understand sidelining, shoveling, sweeping, and leader election
Usage Guide Advanced examples — batching, shoveling, runtime scaling
Configuration All config options with defaults and constraints