본문 바로가기
IT/Python

[FastAPI] DB 문제 해결 및 성능 개션

by 저당단 2026. 6. 3.

 

1. SQLAlchemy의 bindparam을 통한 Bulk Update로 성능 향상

from sqlalchemy import update, bindparam

# 1. 실행할 쿼리 틀을 미리 만듦
stmt = (
    update(User)
    .where(User.id == bindparam("_id"))
    .values(name=bindparam("_name"))
)

# 2. 실제 데이터들을 리스트로 묶어 한 번에 실행 (executemany)
db.session.execute(
    stmt,
    [
        {"_id": 1, "_name": "Alice"},
        {"_id": 2, "_name": "Bob"},
        {"_id": 3, "_name": "Charlie"},
    ]
)

bindparam에 필드값을 매핑해 미리 자리를 만들어두는 방식.

성능이 향상되지만 데이터셋이 적을 때는 그다지 편차가 크지 않다.

 

2. Join을 통한 라운드트립 개선

라운드트립이란 애플리케이션과 DB 사이를 왔다갔다 하는 것을 말한다.

첫번째 조회된 데이터를 기반으로 두번째 조회를 하면 N+1 문제로 라운드트립이 많아지는데,

비용이 많이 드는 작업이라 성능적으로 좋지 않다.

 

데이터를 조인으로 합쳐서 한 번만 조회를 하면 이를 줄여 성능적으로 이점을 얻을 수 있다.

# 수정 전

content = session.query(Content).filter_by(id=1).first()
# 여기서 한 번 더 쿼리가 날아감 (Lazy Loading 등)
version = session.query(ContentVersion).filter_by(content_id=content.id).all()
# 수정 후

results = session.query(Content, ContentVersion)\
    .outerjoin(ContentVersion, Content.id == ContentVersion.content_id)\
    .filter(Content.id == 1)\
    .all()

아우터 조인을 쓴 이유는, 콘텐츠 버전이 없는 데이터는 이너 조인을 할 경우 유실되기 때문이다.

 

2-1. N+1 문제란?

N+1이란 연관된 하위 데이터들을 가지고 오기 위해 쿼리를 한 번씩 더 날리는 문제를 말한다.

예를 들어 쿼리를 한 번 날려서 상품 10개를 가져왔다고 하자.

이 상품들의 리뷰를 가져오려면 10개의 리뷰를 가져오기 위해 쿼리를 한 번씩 더 날려야 한다.

즉 쿼리를 총 1 + 10 = 11번 날리게 된다.

 

데이터 페칭 타입이 Eager든 Lazy든, 리뷰 데이터가 필요하다면 이 문제가 발생한다.

Eager는 상품 조회 쿼리를 하는 순간 무조건 N+1이 발생하고, 

Lazy는 리뷰를 가지고 오는 순간 N+1이 발생하기 때문이다.

 

 

2-2. SQLAlchemy의 joinedload 사용해 N+1 방지

상품을 기준으로,

일대일 관계(하나의 상품에 하나의 상세 정보)

다대일 관계(하나의 패키지에 여러 상품)일 때 주로 사용한다.

Spring의 Fetch Join과 같다.

이 옵션을 사용하면 쿼리를 생성할 때 알아서 LEFT OUTER JOIN을 걸어 준다.

async def get_products_with_packages(db: AsyncSession):
    # products를 조회할 때 packages 테이블을 바로 JOIN해서 함께 가져옴
    stmt = (
        select(Product)
        .options(joinedload(Product.package))
    )
    result = await db.execute(stmt)
    return result.scalars().all()

 

 

2-3. SQLAlchemy의 selectinload 사용해 N+1 방지

데이터의 양이 많거나 일대다 관계(하나의 상품에 여러 리뷰)일 때 주로 사용한다.

아우터 조인을 사용하는 joinedload는 특성상 중복 데이터가 너무 많이 생성되기 때문이다.

product_id name review_id content
1 티셔츠 101 예뻐요
1 티셔츠 102 사이즈가 커요
1 티셔츠 103 배송이 빨라요

(설명: 왼쪽 테이블이 계속 중복됨)

from sqlalchemy.orm import selectinload

async def get_products_with_reviews(db: AsyncSession):
    # 1번 쿼리: products 조회
    # 2번 쿼리: SELECT ... FROM reviews WHERE product_id IN (1, 2, 3...)
    stmt = (
        select(Product)
        .options(selectinload(Product.reviews))
    )
    result = await db.execute(stmt)
    return result.scalars().all()

selectinload는 먼저 상품을 조회하고,

그 결과로 where...in 문을 써서 2번의 쿼리만으로 데이터를 가져온다.

 

 

3. TOCTOU(Time-of-Check to Time-of-Use) 레이스 컨디션 방지를 위한 

TOCTOU는 확인하는 시간(Check)과 실제로 사용하는 시간(Use)이 달라서 발생하는 문제이다.

10개까지 만들 수 있는 데이터가 있다고 하자.

개수를 확인했을 때(Check) 9개로 데이터를 하나 더 만드는게 가능하기에, 하나를 만든다(Use).

하지만 그 Check와 Use 사이에 어떤 다른 스레드에서 10개를 채워버린다면,

결과적으로 11개가 만들어져 제한이 깨져버리게 된다.

 

그래서 이런 경우 비관적 락(Pessimistic Lock) 알고리즘을 통해서

Check와 Use 사이의 시간 동안 로우에 접근하지 못하도록 자물쇠를 건다.

def get_all_for_update_by_region(self, region_id):
    return self.db.query(Banner) \
        .filter(Banner.region_id == region_id) \
        .with_for_update() \  # <--- @Lock(LockModeType.PESSIMISTIC_WRITE) 과 같음
        .all()

이렇게 SQLAlchemy 구문을 입력하면 다음과 같은 쿼리가 날아간다.

그러면 DB에서는 배타적 락(Exclusive Lock, X-Lock) 메커니즘을 통해 물리적으로 행을 잠근다.

SELECT id, region_id, sort_order, ...
FROM banner
WHERE region_id = 'KR'
FOR UPDATE;  -- <--- 배타적 락

FOR UPDATE로 인해 데이터를 조회하는 동안, 다른 트랜잭션은 대기해야 한다.

이 락은 commit()이나 rollback()이 호출되어 트랜잭션이 종료될 때까지 유지된다.

 

 

3-2. 공유 락(Shared Lock)

비관적 락이 조회와 수정을 모두 막는다면 공유 락은 조회는 가능하게 열어둔다.

내가 조회를 할 때 다른 수정을 못하도록 막는 로직이다.

def get_all_for_update_by_region(self, region_id):
    return self.db.query(Banner) \
        .filter(Banner.region_id == region_id) \
        .with_for_update(read=True) \  # <--- @Lock(LockModeType.PESSIMISTIC_READ) 과 같음
        .all()

 

예시는 인터넷 뱅킹 점검 시간이다.

은행은 고객들의 자산을 조회해서 보고서로 뽑으려고 한다.

이 시간동안 고객들은 자산 조회가 가능하지만, 자산이 변동(UPDATE)되는 거래는 못 한다.

 

쇼핑몰에서 상품을 주문하는 고객이 있는 동안에는 그 상품에 대한 수정을 막는 경우도 될 수 있겠다.

 

 

3-3. 비관적 락과 공유 락의 데드락(Deadlock) 문제

비관적 락의 데드락 동시 요청에서 업데이트 순서가 정해져 있지 않기 때문에 발생한다.

회원 A가 B에게 만원을 보내고(트랜잭션1, 이하 1),

B가 A에게 5천원을 보내려고 한다(트랜잭션2, 이하 2).

 

1, 2 요청이 0.001초 차이로 동시에 들어온다면,

A와 B의 잔액 로우에 모두 배타적 락이 걸려 버린다.

 

<과정>

1은 A의 잔액에서 돈을 빼가야 하므로 A의 잔액 로우에 배타적 락을 건다.

2은 B의 잔액에서 돈을 빼가야 하므로 B의 잔액 로우에 배타적 락을 건다.

1이 B에게 돈을 더해줘야 돼서 B의 잔액 로우에 배타적 락을 요청하지만 2가 잡고 있어서 못 건다.

2는 A에게 돈을 더해줘야 돼서 A의 잔액 로우에 배타적 락을 요청하지만 1이 잡고 있어서 못 건다.

결국 둘다 끝나길 기다려서 무한 고착 상태에 빠져 버린다.

DB 엔진은 이를 감지하고 한쪽 트랜잭션을 롤백시켜버린다.

 

이걸 막기 위해선 규칙을 세우면 되는데,

'어떤 상황에서든 ID 숫자가 작은 로우부터 먼저 락을 건다' 처럼 세우면 된다.

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models import Account

async def transfer_money_safe(db: AsyncSession, from_id: int, to_id: int, amount: int):
    # 🔥 데드락 방지: ID가 작은 계좌부터 정렬하여 순서대로 락을 건다!
    first_lock_id = min(from_id, to_id)
    second_lock_id = max(from_id, to_id)
    
    # 1. ID가 작은 계좌 먼저 조회 (배타적 락)
    stmt1 = select(Account).where(Account.id == first_lock_id).with_for_update()
    account1 = (await db.execute(stmt1)).scalar_one()
    
    # 2. ID가 큰 계좌 그다음 조회 (배타적 락)
    # 순서가 보장되므로, 앞의 트랜잭션이 완전히 끝날 때까지 뒤의 트랜잭션은 1번 줄에서 대기.
    stmt2 = select(Account).where(Account.id == second_lock_id).with_for_update()
    account2 = (await db.execute(stmt2)).scalar_one()
    
    # 3. 파이썬 메모리 상에서 실제 보낸 사람(from)과 받는 사람(to) 매핑 찾기
    from_account = account1 if account1.id == from_id else account2
    to_account = account1 if account1.id == to_id else account2
    
    # 4. 비즈니스 로직 수행 및 저장
    if from_account.balance < amount:
        raise ValueError("잔액이 부족합니다.")
        
    from_account.balance -= amount
    to_account.balance += amount
    
    await db.commit()  # commit이 성공하면 락이 한 번에 모두 풀림

 

 

공유 락의 데드락 배타적 락/낙관적 락을 써야 하는 상황에서 공유 락을 쓸 때 발생한다.

ex) A와 B가 동시에 동일한 상품을 구매해서 재고가 깎일 때

 

수정 시 문제가 생기는 것이므로, 공유 락은 반드시 조회 시에만 써야 한다.

 

 

3-1. 낙관적 락(Optimistic Lock)으로 TOCTOU 방지하기

배타적 락은 안전하지만, 트래픽이 몰리면 대기 시간이 길어진다는 단점이 있다.

낙관적 락은 배타적 락처럼 물리적으로 DB 로우를 잠그지는 않는다.

 

낙관적 락을 걸기 위해서는 버전 관리를 위한 테이블 컬럼이 꼭 필요한데,

내가 데이터를 읽어온 시점의 버전 저장할 시점의 버전을 비교해야 하기 때문이다.

보통은 updated_at 컬럼이나 status 컬럼 (pending 상태일 때만 변경 가능) 을 이용한다.

그 두 시점의 버전이 같으면 저장하는 식으로 데이터 정합성을 유지한다.

만약 다르면 StaleDataError를 발생시키고, 그 때 예외 처리로 재시도하도록 하면 된다.

 

class Base(DeclarativeBase):
    pass

class Notice(Base):
    __tablename__ = "notices"
    
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]
    content: Mapped[str]
    
    # 1. 흔히 사용하는 수정일시 컬럼
    # onupdate=func.now()를 주어 데이터가 바뀔 때마다 자동으로 시간이 갱신.
    updated_at: Mapped[datetime] = mapped_column(
        default=func.now(), 
        onupdate=func.now(), 
        nullable=False
    )
    
    # 2. ★ 핵심: updated_at을 낙관적 락 컬럼으로 지정
    __mapper_args__ = {
        "version_id_col": updated_at,
        "version_id_generator": False  # 매퍼가 숫자를 +1 하는 게 아니라, DB/파이썬의 타임스탬프를 사용하도록 설정
    }

# --- FastAPI 엔드포인트 내 로직 예시 ---
async def update_notice(db: AsyncSession, notice_id: int, new_title: str):
    # 1. 사용자가 수정 페이지를 열었을 때의 데이터를 조회 (이때 updated_at 시각이 메모리에 기록됨)
    stmt = select(Notice).where(Notice.id == notice_id)
    result = await db.execute(stmt)
    notice = result.scalar_one()
    
    # 2. 값 변경
    notice.title = new_title
    
    # 3. 저장 시도
    try:
        await db.commit()  
        # UPDATE notices SET title = '...', updated_at = NOW() WHERE id = 1 AND updated_at = '내가 처음 읽어왔던 시각';
    except Exception: # SQLAlchemy의 StaleDataError 등을 캐치
        await db.rollback()
        return {"success": False, "message": "다른 사용자가 이미 수정한 글입니다. 다시 시도해주세요."}

 

낙관적 락은 성능적으로 이점이 있지만 충돌이 발생하면 재시도해야 하므로,

동시에 같은 데이터를 수정할 확률이 낮은 특정 상황에서만 제한적으로 사용된다.

  • 위키 글 수정 (비관적 락이라면 다른 사용자가 문서를 편집하고 있으면 편집조차 못함)
  • 사용자 프로필 수정 (사용자와 운영자 둘다 수정 가능하지만 동시 수정 확률은 낮음)
  • 주문 상태 변경 (사용자가 주문을 취소할 수 있지만 재시도해도 그다지 손해는 없음)

티켓팅이나 기차 예매 같은 한정 수량을 다룰 땐 당연히 낙관적 락을 걸면 안된다.

자리를 잡아서 결제하고 있는데 그걸 다른 사용자가 채가버리면 당연히 UX는 개판난다.

 

 

4. load_only로 특정 컬럼만 조회하기

from sqlalchemy import select
from sqlalchemy.orm import load_only
from db import tuple_or_session  # 세션 객체
from models import User

# User 테이블에서 id와 username 컬럼만 조회
stmt = select(User).options(load_only(User.id, User.username))
result = await session.execute(stmt) # [ (User(id=1, name="A"),), (User(id=2, name="B"),) ]
users = result.scalars().all()  # 튜플 껍데기 벗기기. [ User(id=1, name="A"), User(id=2, name="B") ]

DB 레벨에서부터 민감한 정보를 조회하지 않아 보안 측면에서도 도움이 되며,

성능 면에서도 좋다.

 

 

5. 하이브리드 속성(hybrid_property) 표현식(hybrid_method)

파이썬 속성처럼 작동하면서 WHERE문에서도 쓸 수 있는 속성을 정의한다.

from sqlalchemy.ext.hybrid import hybrid_property

class User(Base):
    __tablename__ = "users"
    first_name: Mapped[str]
    last_name: Mapped[str]

    @hybrid_property
    def full_name(self) -> str:
        return f"{self.first_name} {self.last_name}"
    
    # 파이썬 문법 -> SQL 문법 번역(단순한 합산이면 불필요)
    @full_name.expression
    def full_name(cls):
        return cls.first_name + " " + cls.last_name

코드에서도 user.full_name 으로 쓸 수 있고,

쿼리도 select(User).where(User.full_name == "홍길동") 으로 쓸 수 있다.

매우 강력한 기능.

 

 


https://doringri.tistory.com/312

 

[FastAPI] FastAPI 코드 기본

mainfrom fastapi import FastAPIapp = FastAPI()@app.get("/")async def root(): return {"message": "Hello World"} async def: FastAPI의 핵심. 입출력(I/O) 대기 시간 동안 다른 작업을 처리할 수 있게 해주는 비동기 문법.데코레이

doringri.tistory.com

위 게시글에서 분리하고 내용을 추가해서 작성된 글입니다!