Skip to content

Example: Pub/Sub Event Routing

This example models a content-based publish/subscribe system. Subscribers register Boolean filter expressions. When an event is published, Mustang finds all matching subscribers in sub-linear time — regardless of how many subscriptions are registered.


Scenario

A financial data platform routes events to subscribers:

Subscriber Filter
sports-alerts category ∈ {Sports} AND rating ∈ {4,5}
finance-non-tech category ∈ {Finance} AND industry ∉ {Tech}
breaking-any priority ∈
finance-or-sports (category ∈ {Finance}) OR (category ∈ {Sports} AND rating ≥ 4)

Index all subscriptions

import com.phonepe.mustang.criteria.impl.CNFCriteria;
import com.phonepe.mustang.composition.impl.Disjunction;

MustangEngine engine = MustangEngine.builder().mapper(mapper).build();

// sports-alerts: category=Sports AND rating∈{4,5}
Criteria sportsAlerts = DNFCriteria.builder()
    .id("sports-alerts")
    .conjunction(Conjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.category")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Sports")).build())
            .build())
        .predicate(IncludedPredicate.builder()
            .lhs("$.rating")
            .detail(EqualityDetail.builder().values(Sets.newHashSet(4, 5)).build())
            .build())
        .build())
    .build();

// finance-non-tech: category=Finance AND industry∉{Tech}
Criteria financeNonTech = DNFCriteria.builder()
    .id("finance-non-tech")
    .conjunction(Conjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.category")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Finance")).build())
            .build())
        .predicate(ExcludedPredicate.builder()
            .lhs("$.industry")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Tech")).build())
            .build())
        .build())
    .build();

// breaking-any: priority=breaking (no other constraints)
Criteria breakingAny = DNFCriteria.builder()
    .id("breaking-any")
    .conjunction(Conjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.priority")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("breaking")).build())
            .build())
        .build())
    .build();

// finance-or-sports (CNF): (category=Finance) AND (category=Sports OR rating≥4)
// Expressed as CNF: two disjunctions that must both be satisfied
Criteria financeOrSports = CNFCriteria.builder()
    .id("finance-or-sports")
    .disjunction(Disjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.category")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Finance")).build())
            .build())
        .predicate(IncludedPredicate.builder()
            .lhs("$.category")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Sports")).build())
            .build())
        .build())
    .disjunction(Disjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.rating")
            .detail(RangeDetail.builder().lowerBound(4).includeLowerBound(true).build())
            .build())
        .build())
    .build();

engine.add("subscriptions", List.of(sportsAlerts, financeNonTech, breakingAny, financeOrSports));

Route an incoming event

// Event: a sports story with rating 5
JsonNode event = mapper.readTree("""
    {
      "category": "Sports",
      "rating": 5,
      "industry": "Media",
      "priority": "normal"
    }
    """);

RequestContext ctx = RequestContext.builder().node(event).build();
Set<String> subscribers = engine.search("subscriptions", ctx);
// → ["sports-alerts", "finance-or-sports"]
// Explanation:
//   sports-alerts:    category=Sports ✓, rating∈{4,5} ✓ → MATCH
//   finance-non-tech: category∈{Finance} ✗             → NO MATCH
//   breaking-any:     priority=breaking ✗               → NO MATCH
//   finance-or-sports: disjunction1: Sports ✓, disjunction2: rating≥4 ✓ → MATCH

Route a breaking finance event

JsonNode breaking = mapper.readTree("""
    {
      "category": "Finance",
      "industry": "Banking",
      "rating": 3,
      "priority": "breaking"
    }
    """);

RequestContext ctx2 = RequestContext.builder().node(breaking).build();
Set<String> subscribers2 = engine.search("subscriptions", ctx2);
// → ["finance-non-tech", "breaking-any", "finance-or-sports"]
// Explanation:
//   finance-non-tech:  category=Finance ✓, industry∉{Tech} ✓ → MATCH
//   breaking-any:      priority=breaking ✓                   → MATCH
//   finance-or-sports: disjunction1: Finance ✓, disjunction2: rating≥4? (3<4) ✗ → NO MATCH
//                      Wait — disjunction2 is NOT satisfied (rating=3 < 4).
//                      → Actually NO MATCH for finance-or-sports

Dynamically add/remove subscriptions

// New subscriber registers
Criteria newSub = DNFCriteria.builder()
    .id("tech-finance")
    .conjunction(Conjunction.builder()
        .predicate(IncludedPredicate.builder()
            .lhs("$.category")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Finance")).build())
            .build())
        .predicate(IncludedPredicate.builder()
            .lhs("$.industry")
            .detail(EqualityDetail.builder().values(Sets.newHashSet("Tech")).build())
            .build())
        .build())
    .build();

engine.add("subscriptions", newSub);

// Subscriber cancels
engine.delete("subscriptions", breakingAny);

Reload the full subscription set atomically

When you reload all subscriptions from a database (e.g., on startup or periodically):

List<Criteria> freshSubs = loadFromDatabase();  // your DB loader
String tmpIndex = "subscriptions-" + System.currentTimeMillis();
engine.add(tmpIndex, freshSubs);
engine.replaceIndex("subscriptions", tmpIndex);
// Zero downtime — no events are missed during the reload