[MongoDB] 03. Aggregation Pipeline (집계)
MongoDB의 Aggregation Pipeline을 이용한 데이터 집계와 변환을 정리합니다.
Aggregation Pipeline이란?
Aggregation Pipeline은 문서를 여러 단계(Stage)를 거쳐 변환하고 집계하는 프레임워크입니다.
RDBMS의 GROUP BY, HAVING, JOIN, 서브쿼리 등을 대체합니다.
컬렉션 → [Stage 1] → [Stage 2] → [Stage 3] → ... → 결과
주요 Stage
| Stage | 설명 | SQL 대응 |
|---|---|---|
$match |
조건 필터링 | WHERE |
$group |
그룹핑 및 집계 | GROUP BY |
$project |
필드 선택/변환 | SELECT |
$sort |
정렬 | ORDER BY |
$limit |
결과 수 제한 | LIMIT |
$skip |
건너뛰기 | OFFSET |
$unwind |
배열 펼치기 | - |
$lookup |
다른 컬렉션 조인 | JOIN |
$addFields |
필드 추가 | SELECT AS |
$count |
문서 수 카운트 | COUNT |
$out |
결과를 컬렉션에 저장 | INSERT INTO … SELECT |
$merge |
결과를 기존 컬렉션에 병합 | MERGE |
$bucket |
범위별 그룹핑 | - |
$facet |
다중 파이프라인 병렬 실행 | - |
$match (필터링)
// WHERE dept = '개발팀' AND salary >= 5000000
db.users.aggregate([
{ $match: { dept: "개발팀", salary: { $gte: 5000000 } } }
])
$group (그룹핑)
// 부서별 인원수, 평균급여
db.users.aggregate([
{ $group: {
_id: "$dept", // 그룹핑 기준
count: { $sum: 1 }, // COUNT
avg_salary: { $avg: "$salary" }, // AVG
max_salary: { $max: "$salary" }, // MAX
min_salary: { $min: "$salary" }, // MIN
total_salary: { $sum: "$salary" } // SUM
}}
])
집계 연산자
| 연산자 | 설명 |
|---|---|
$sum |
합계 (1이면 카운트) |
$avg |
평균 |
$min / $max |
최소/최대 |
$first / $last |
첫 번째/마지막 값 |
$push |
배열로 수집 |
$addToSet |
중복 없이 배열로 수집 |
$count |
문서 수 |
$project (필드 선택/변환)
db.users.aggregate([
{ $project: {
_id: 0,
name: 1,
dept: 1,
annual_salary: { $multiply: ["$salary", 12] },
name_dept: { $concat: ["$name", " (", "$dept", ")"] }
}}
])
$sort / $limit / $skip
// 급여 높은 순 상위 5명
db.users.aggregate([
{ $sort: { salary: -1 } },
{ $limit: 5 }
])
// 페이징 (2페이지, 페이지당 10건)
db.users.aggregate([
{ $sort: { _id: 1 } },
{ $skip: 10 },
{ $limit: 10 }
])
$unwind (배열 펼치기)
배열 필드를 개별 문서로 분리합니다.
// 원본: { name: "홍길동", skills: ["JS", "Python", "MongoDB"] }
// 결과: 3개 문서로 분리
db.users.aggregate([
{ $unwind: "$skills" },
{ $group: {
_id: "$skills",
count: { $sum: 1 }
}},
{ $sort: { count: -1 } }
])
// 스킬별 보유 인원 수
$lookup (JOIN)
다른 컬렉션과 조인합니다.
// users 컬렉션의 dept_id로 departments 컬렉션 조인
db.users.aggregate([
{ $lookup: {
from: "departments", // 조인할 컬렉션
localField: "dept_id", // 현재 컬렉션 필드
foreignField: "_id", // 대상 컬렉션 필드
as: "dept_info" // 결과 배열 필드명
}},
{ $unwind: "$dept_info" }, // 배열을 단일 문서로
{ $project: {
name: 1,
salary: 1,
dept_name: "$dept_info.name"
}}
])
Pipeline $lookup (복잡한 조인, MongoDB 3.6+)
db.orders.aggregate([
{ $lookup: {
from: "products",
let: { product_id: "$product_id" },
pipeline: [
{ $match: { $expr: { $eq: ["$_id", "$$product_id"] } } },
{ $project: { name: 1, price: 1 } }
],
as: "product"
}}
])
$addFields / $set
// 기존 필드 유지하면서 새 필드 추가
db.users.aggregate([
{ $addFields: {
annual_salary: { $multiply: ["$salary", 12] },
is_senior: { $gte: ["$age", 30] }
}}
])
$bucket (범위별 그룹핑)
// 급여 구간별 인원 수
db.users.aggregate([
{ $bucket: {
groupBy: "$salary",
boundaries: [0, 3000000, 5000000, 7000000, 10000000],
default: "기타",
output: {
count: { $sum: 1 },
names: { $push: "$name" }
}
}}
])
$facet (다중 파이프라인)
하나의 입력으로 여러 파이프라인을 동시에 실행합니다.
db.users.aggregate([
{ $facet: {
"부서별통계": [
{ $group: { _id: "$dept", count: { $sum: 1 } } }
],
"급여상위3": [
{ $sort: { salary: -1 } },
{ $limit: 3 },
{ $project: { name: 1, salary: 1 } }
],
"전체통계": [
{ $group: { _id: null, total: { $sum: 1 }, avg_salary: { $avg: "$salary" } } }
]
}}
])
실전 예시: SQL → Aggregation 변환
SQL
SELECT dept, COUNT(*) as count, AVG(salary) as avg_sal
FROM users
WHERE age >= 25
GROUP BY dept
HAVING COUNT(*) >= 2
ORDER BY avg_sal DESC;
MongoDB Aggregation
db.users.aggregate([
{ $match: { age: { $gte: 25 } } },
{ $group: {
_id: "$dept",
count: { $sum: 1 },
avg_sal: { $avg: "$salary" }
}},
{ $match: { count: { $gte: 2 } } },
{ $sort: { avg_sal: -1 } }
])
- [MongoDB] 10. 자주 발생하는 Troubleshooting
- [MongoDB] 09. 성능 튜닝과 모니터링
- [MongoDB] 08. 백업과 복구
- [MongoDB] 07. 보안과 사용자 관리
- [MongoDB] 06. Replica Set과 Sharding (고가용성/확장)
- [MongoDB] 05. 스키마 설계 (Embedding vs Referencing)
- [MongoDB] 04. 인덱스 (Index)
- [MongoDB] 03. Aggregation Pipeline (집계)
- [MongoDB] 02. CRUD 기본 (문서 삽입, 조회, 수정, 삭제)
- [MongoDB] 01. MongoDB 소개 및 설치