Custom Stages
Build any MongoDB aggregation stage not yet natively supported.
Why Custom Stages?
Konduct covers common operations, but MongoDB has 50+ stages. Use custom stages for:
- Stages not yet implemented
- Experimental MongoDB features
- Complex nested pipelines
- Edge cases
Basic Custom Stage
konduct.collection<Product>()
.customStage("\$replaceRoot") {
"newRoot" from "\$embedded.document"
}
.toList()
Building Complex Stages
$redact Stage
konduct.collection<Document>()
.customStage("\$redact") {
"\$cond" from {
"if" from doc {
"\$eq" from list("\$level", 5)
}
"then" from "$$PRUNE"
"else" from "$$DESCEND"
}
}
.toList()
$graphLookup Stage
konduct.collection<Employee>()
.customStage("\$graphLookup") {
"from" from "employees"
"startWith" from "\$reportsTo"
"connectFromField" from "reportsTo"
"connectToField" from "name"
"as" from "reportingHierarchy"
"maxDepth" from 3
}
.toList()
$bucket Stage
konduct.collection<Product>()
.customStage("\$bucket") {
"groupBy" from "\$price"
"boundaries" from list(0, 100, 200, 300, 500, 1000)
"default" from "Other"
"output" from {
"count" from doc { "\$sum" from 1 }
"titles" from doc { "\$push" from "\$title" }
"avgPrice" from doc { "\$avg" from "\$price" }
}
}
.toList()
$setWindowFields Stage
konduct.collection<Sale>()
.customStage("\$setWindowFields") {
"partitionBy" from "\$category"
"sortBy" from { "date" from 1 }
"output" from {
"cumulativeTotal" from {
"\$sum" from "\$amount"
"window" from {
"documents" from list("unbounded", "current")
}
}
"rank" from {
"\$rank" from doc { }
}
}
}
.toList()
Helper Functions
Build Documents
.customStage("\$lookup") {
"from" from "orders"
"let" from {
"userId" from "\$_id"
}
"pipeline" from list(
doc {
"\$match" from {
"\$expr" from {
"\$eq" from list("\$userId", "$$userId")
}
}
},
doc {
"\$sort" from { "orderDate" from -1 }
}
)
"as" from "orders"
}
Build Lists
Real-World Examples
Geospatial $geoNear
konduct.collection<Store>()
.customStage("\$geoNear") {
"near" from {
"type" from "Point"
"coordinates" from list(-73.9667, 40.78)
}
"distanceField" from "distance"
"maxDistance" from 5000
"spherical" from true
}
.limit(10)
.toList()
Text Search $search
konduct.collection<Article>()
.customStage("\$search") {
"index" from "article_index"
"text" from {
"query" from "mongodb aggregation"
"path" from list("title", "content")
}
}
.limit(20)
.toList()
$unionWith
konduct.collection<Product>()
.customStage("\$unionWith") {
"coll" from "archived_products"
"pipeline" from list(
doc {
"\$match" from { "status" from "archived" }
}
)
}
.toList()
$merge (Output to Collection)
konduct.collection<Sale>()
.group {
by(Sale::productId)
accumulate {
"totalSales" sum Sale::amount
}
}
.customStage("\$merge") {
"into" from "product_statistics"
"on" from "_id"
"whenMatched" from "replace"
"whenNotMatched" from "insert"
}
.toList()
Type Safety
Custom stages return Document by default. Convert with into():
data class GeoResult(
val name: String,
val distance: Double
)
konduct.collection<Store>()
.customStage("\$geoNear") {
"near" from coordinates
"distanceField" from "distance"
}
.into<GeoResult>()
.toList()
Debugging Custom Stages
val pipeline = konduct.collection<Product>()
.customStage("\$bucket") {
// ... complex stage
}
println(pipeline.toJson())
// Inspect generated MongoDB JSON
When to Use Custom Stages
✅ Use custom stages when:
- Feature not yet in Konduct
- Complex MongoDB-specific operation
- Experimental/new MongoDB features
❌ Don't use custom stages when:
- Native Konduct method exists (use it instead!)
- Can be built with existing methods
- Just learning MongoDB (use native methods first)
Contributing
Found yourself using the same custom stage repeatedly? Consider contributing it to Konduct! See Contributing.
See Also
- MongoDB Aggregation Stages - Full stage reference
- API Reference - CustomStageBuilder docs