SpringBoot 재고 감소 동시성 제어 - Redisson 사용 (+ AOP 적용)

반응형
728x90
반응형

들어가기전

Redis는 docker를 사용하여 설치한다.

https://devfunny.tistory.com/424

 

docker로 redis 설치 (with docker-compose)

Redis 이미지 설치 docker pull redis docker-compose 파일 생성 version: '3.0' services: redis1: image: redis command: redis-server --requirepass root --port 6379 restart: always ports: - 6379:6379 doc..

devfunny.tistory.com

 

 

redis 의존성 추가

implementation 'org.redisson:redisson-spring-boot-starter:3.17.7'

 

 

Service 로직 - 재고 감소

accommodationRoom.stockDecrease() 메서드가 재고를 감소시킨다.

아래 Service의 stockDecrease() 메서드가 호출할 때마다 특정 accommodationRoomId에 해당하는 accommodationRoom 객체의 '재고' 가 -1 이 수행된다.

@RedissonLock(key = "accommodationRoomId")
public void stockDecrease(Long accommodationRoomId) {
    log.info("Thread Name : " + Thread.currentThread().getName());

    ...

    accommodationRoom.stockDecrease();

    log.info("decrease after stock() : " + accommodationRoom.getStock());
}

 

 

커스텀 어노테이션 @RedissonLock

기본 메서드 로직 또는 람다식 등을 활용한 여러 방법이 있지만, 이번 포스팅에서는 커스텀 어노테이션을 사용해서 공통 로직을 적용하는 시간을 갖겠다.

@RedissonLock(key = "accommodationRoomId")

key 를 무엇으로 설정하느냐는 중요한 문제다. 해당 key를 기준으로 lock을 걸기 때문에 동시성 제어에 필요한 key 값을 명확하게 설정해야한다. ACCOMMODATION_ROOM 테이블의 accommodationRoomId가 PK이고, 해당 RoomId 기준으로 동시성 제어가 필요하므로 위 코드의 key는 accommodationRoomId이다.

 

▶ RedissonLock.java

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedissonLock {
    String key();

    TimeUnit timeUnit() default TimeUnit.SECONDS;

    // wait time : wait time 동안 lock 획득을 시도하고, 이 시간이 초과되면 lock 획득에 실패하고 false를 리턴한다.
    long waitTime() default 5L;

    // lease time : lock 획득에 성공한 이후, lease time 이 지나면 자동으로 lock을 해제한다.
    long leaseTime() default 3L;
}

 1) waitTIme()

wait time 동안 lock 획득을 시도하고, 이 시간이 초과되면 lock 획득에 실패하고 false를 리턴한다.

long waitTime() default 5L;

 

2) leaseTime()

lock 획득에 성공한 이후, lease time 이 지나면 자동으로 lock을 해제한다. 따라서 lock을 무제한 보유하거나 lock을 보유한채로 장애가 발생하거나 등의 여러 상황을 방지할 수 있다. 이는 RedissonLock을 사용의 큰 장점 중 하나다.

long leaseTime() default 3L;

 

 

▶ RedissonLockAop.java

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect
@Component
@RequiredArgsConstructor
@Slf4j
public class RedissonLockAop {
    private final RedissonClient redissonClient;
    private final RedissonCallTransaction redissonCallTransaction;

    @Around("@annotation(org.clonecoder.productserver.common.aop.RedissonLock)")
    public Object lock(final ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();
        RedissonLock redissonLock = method.getAnnotation(RedissonLock.class);

        /* create key */
        String key = this.createKey(signature.getParameterNames(), joinPoint.getArgs(), redissonLock.key());

        /* get rLock 객체 */
        RLock rock = redissonClient.getLock(key);

        try {
            /* get lock */
            boolean isPossible = rock.tryLock(redissonLock.waitTime(), redissonLock.leaseTime(), redissonLock.timeUnit());
            if (!isPossible) {
                return false;
            }

            log.info("Redisson Lock Key : {}", key);

            /* service call */
            return redissonCallTransaction.proceed(joinPoint);
        } catch (Exception e) {
            throw new InterruptedException();
        } finally {
            rLock.unlock();
        }
    }

    /**
     * Redisson Key Create
     * @param parameterNames
     * @param args
     * @param key
     * @return
     */
    private String createKey(String[] parameterNames, Object[] args, String key) {
        String resultKey = key;

        /* key = parameterName */
        for (int i = 0; i < parameterNames.length; i++) {
            if (parameterNames[i].equals(key)) {
                resultKey += args[i];
                break;
            }
        }

        return resultKey;
    }
}

1) redis 에 저장할 key 생성

String key = this.createKey(signature.getParameterNames(), joinPoint.getArgs(), redissonLock.key());

파라미터에서 key와 동일한 값을 key로 설정한다.

for (int i = 0; i < parameterNames.length; i++) {
    if (parameterNames[i].equals(key)) {
        resultKey += args[i];
        break;
    }
}
@RedissonLock(key = "accommodationRoomId")

여기서는 accommodationRoomId 값을 key에 추가하였다.

 

2) RLock 객체를 얻는다.

RLock rLock = redissonClient.getLock(key);

 

3) tryLock() 메서드를 통해 락을 건다.

  • 1번째 인자 : Lock 획득을 기다리는 시간
    • 이만큼의 시간이 지나면 false 가 반환되어, 락 획득에 실패했다고 알려준다. 
  • 2번재 인자 : Lock의 타임아웃(만료) 시간
    • 이만큼의 시간이 지나면 락이 만료되어 사라지기 때문에 애플리케이션에서 락을 해제해주지 않더라도 다른 스레드 혹은 어플리케이션에서 락을 획득할 수 있게된다.
  • 3번째 인자 : 인자의 단위
boolean isPossible = rLock.tryLock(distributeLock.waitTime(), distributeLock.leaseTime(), distributeLock.timeUnit());

 

4) 실질적인 service 로직을 호출한다.

return redissonCallTransaction.proceed(joinPoint);

 

5) 모든 요청이 끝난 후 lock을 해제한다.

rLock.unlock();

 

 

 

▶ RedissonCallTransaction.java

import org.aspectj.lang.ProceedingJoinPoint;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

@Component
public class RedissonCallTransaction {
    /**
     * 부모트랜잭션의 유무와 관계없이 동시성에 대한 처리는 별도의 트랜잭션으로 동작하기 위함
     * @param joinPoint
     * @return
     * @throws Throwable
     */
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Object proceed(final ProceedingJoinPoint joinPoint) throws Throwable {
        return joinPoint.proceed();
    }
}

1) 별도의 트랜잭션으로 동작

@Transactional(propagation = Propagation.REQUIRES_NEW)
try {
    /* get lock */
    boolean isPossible = rLock.tryLock(redissonLock.waitTime(), redissonLock.leaseTime(), redissonLock.timeUnit());
    if (!isPossible) {
        return false;
    }

    log.info("Redisson Lock Key : {}", key);

    /* service call */
    return redissonCallTransaction.proceed(joinPoint);
} catch (Exception e) {
    throw new InterruptedException();
} finally {
    rLock.unlock();
}

try~catch~finally 문에서 finally에 unlock() 메서드를 호출하면서 해당 메서드의 마지막 부분에서 lock을 해제하는데, 부모 트랜잭션의 commit은 메서드 로직 수행이 모두 끝난 후에 실행된다.

 

commit 전의 상황 [Redis가 unlock() 되는 시점 < commit() 시점]
이때 만약 A쓰레드, B 쓰레드가 있을때, A쓰레드가 Redis Rlock을 잡고있다가 unlock()이 호출되어 반환을 했는데 commit 수행 전에 B쓰레드가 Rlock을 얻고 재고 수량을 확인할때 DB에는 -1이 되지 않은 재고값이 조회가 되서 데이터가 맞지 않게 될 수 있다.
재고 로직을 수행하는 실제 Service를 별도의 트랜잭션으로 처리해서 이 트랜잭션을 커밋 후 lock()을 해제하도록 해야한다.

만약 위 트랜잭션 방법이 아닌, JPA를 사용하는 경우 saveAndFlush() 코드를 적용할 수도 있겠다.

 

 

RedissonClient.java

RedissonClient.java
public interface RedissonClient {
    ...
}

 

Redisson.java : RedissonClient.java 구현체
public class Redisson implements RedissonClient {
    ...
    
    @Override
    public RLock getLock(String name) {
        return new RedissonLock(commandExecutor, name);
    }
    
    ...
}

 

RedissonLock 생성자
public class RedissonLock extends RedissonBaseLock {

    protected long internalLockLeaseTime;

    protected final LockPubSub pubSub;

    final CommandAsyncExecutor commandExecutor;

    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    
    ...
}

 

unlock() 호출
public interface RLock extends Lock, RLockAsync {
    ...
}

 

Lock.java
public interface Lock {
    ...
    
    void unlock();
}

 

RedissonLock.java
...
 @Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}
...

 

 

테스트 코드

@Test
void 멀티쓰레드_재고감소() throws InterruptedException {
    // given
    final Long accommodationRoomIdx = 1L;
    AccommodationRoom accommodationRoom = accommodationRoomRepository.findById(accommodationRoomIdx).get();
    int threadCount = accommodationRoom.getStock();

    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    // when
    IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
            try {
                accommodationStore.stockDecrease(accommodationRoomIdx);
            } finally {
                countDownLatch.countDown();
            }
        }
    ));

    countDownLatch.await();

    // then
    accommodationRoom = accommodationRoomRepository.findById(accommodationRoomIdx).get();
    int afterStock = accommodationRoom.getStock();

    assertThat(afterStock).isZero();
}

1) 재고 개수만큼 threadCount 변수 설정

int threadCount = accommodationRoom.getStock();

 

2) 재고 개수만큼 ThreadPool 개수 설정

ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

 

3) 별도 쓰레드로 수행

재고 개수만큼 쓰레드가 수행되므로 결과는 0이여야한다.

IntStream.range(0, threadCount).forEach(e -> executorService.submit(() -> {
        try {
            accommodationStore.stockDecrease(accommodationRoomIdx);
        } finally {
            countDownLatch.countDown();
        }
    }
));

 

4) 결과 체크

assertThat(afterStock).isZero();

 

 

Redisson pub/sub 방식

  • Netty를 사용하여 non-Blocking I/O를 사용한다. 

https://jypthemiracle.medium.com/weekly-java-%EA%B0%84%EB%8B%A8%ED%95%9C-%EC%9E%AC%EA%B3%A0-%EC%8B%9C%EC%8A%A4%ED%85%9C%EC%9C%BC%EB%A1%9C-%ED%95%99%EC%8A%B5%ED%95%98%EB%8A%94-%EB%8F%99%EC%8B%9C%EC%84%B1-%EC%9D%B4%EC%8A%88-9daa85155f66

 

  • Redisson 방식은 Pub/Sub 기반으로 분산 락(Distributed Lock)을 구현한다.
Pub/Sub 방식을 사용하므로 계속 요청을 보내지 않는다.
락이 해제될때마다 subscribe 하는 클라이언트들에게 "락 획득이 가능하다." 라는 알림을 준다.
알림을 받은 클라이언트들은 다시 lock 획득을 시도한다.
따라서, 일일이 레디스에 요청을 보내서 락의 획득 가능 여부를 체크하지 않아도 된다.

 

  • 채널을 하나 만들고 락을 점유하고있는 쓰레드가 락을 받으려는 쓰레드에게 점유 해제를 공지한다.

 

 

채널 구독/메시지 발행 예시

Redis 실행
docker exec -it a41f663b72b3 redis-cli

 

1) 채널 ch1 구독
-- ch01 이라는 이름의 채널에 메시지를 수신하겠다.
subscribe ch1

 

2) 채널 ch1에 메시지 발행
-- ch1 채널에 "hello" 메시지를 발행한다.
publish ch1 hello

발행시, 1)번에 메시지가 노출된다.

 

 

더 많은 메시지를 발행해보자.

1) 채널 ch1에 메시지 발행

 

2) 구독한 클라이언트

 

3) 구독자 수 추가

 

4) 결과값이 2 출력

결과값 : publish한 메시지를 받은 subscriber(구독자)의 수 

 

 

반응형

Designed by JB FACTORY