스프링

[SQS]Amazon SQS

hmjhaha 2024. 10. 15. 00:12

AWS SQS는 분산 시스템에서 메시지 전달 및 관리에 적합한 서비스로, 여러 컴포넌트 간의 비동기 통신을 가능하게 해줍니다.

 

application.yml

aws:
  access-key-id: AKIA4...
  secret-access-key: oiLaI2bJC...
  region: ap-northeast-2
  sqs:
    queue-url: https://sqs.ap-northeast-2.amazonaws.com/851725245699/AllClear.fifo
    message-group-id: wishlist
    message-deduplication-id: true  # 중복 제거 ID 사용 여부

 

SqsConfig.java

@Configuration
public class SqsConfig {

    @Value("${aws.access-key-id}")
    private String accessKeyId;

    @Value("${aws.secret-access-key}")
    private String secretAccessKey;

    @Value("${aws.region}")
    private String region;

    @Bean
    public AmazonSQS amazonSQS() {
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKeyId, secretAccessKey);
        return AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(awsCreds))
                .withRegion(region)
                .build();
    }
}

 

● application.yml에 있는 AWS 자격증명과 리전 정보를 가져와 AmazonSQS 객체를 생성합니다.

 

SQSService.java

@Service
public class SQSService {

    @Autowired
    private AmazonSQS amazonSQS;

    @Value("${aws.sqs.queue-url}")
    private String queueUrl;

    public void sendMessage(String messageBody, String messageDeduplicationId, String messageGroupId) {
        SendMessageRequest sendMessageRequest = new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody(messageBody)
            .withMessageGroupId(messageGroupId)
            .withMessageDeduplicationId(messageDeduplicationId);  // 중복 제거 ID
        amazonSQS.sendMessage(sendMessageRequest);
    }
}

 

 sendMessage 메서드를 통해 SQS에 메시지를 보내는 역할을 합니다.

 

queueUrl : AWS SQS에서 메시지를 보낼 큐의 URL을 지정

 

messageBody : 큐에 전송될 메시지의 내용

 

messageGroupId : FIFO 큐에서 메시지의 순서를 보장하려면, 메시지를 특정 그룹에 묶어야 합니다. 이 값은 동일한 그룹 ID를 가진 메시지들이 순차적으로 처리되게 합니다.

messageGroupId가 "wishlist"인 모든 메시지는 순차적으로 하나씩 처리됩니다.

 

messageDeduplicationId : 동일한 메시지가 중복해서 큐에 전송되지 않도록 하는 고유한 식별자입니다.

 

MessageController.java

@RestController
@RequestMapping("/api/messages")
public class MessageController {

    @Autowired
    private SQSService sqsService;

    @PostMapping("/send")
    public String sendMessage(@RequestParam String messageBody, @RequestParam String messageDeduplicationId) {
        String messageGroupId = "wishlist";  // 메시지 그룹 ID를 설정
        sqsService.sendMessage(messageBody, messageDeduplicationId, messageGroupId);
        return "Message sent successfully!";
    }
}

 

 

  이 메서드는 두개의 요청 파라미터를 받아서 메시지를 SQS에 전송합니다.

 

WishlistService.java

@Transactional
public void updatePriorities(Map<String, String> priorities) {
    for (Map.Entry<String, String> entry : priorities.entrySet()) {
        try {
            String[] keyParts = entry.getKey().split("__");
            if (keyParts.length < 2) {
                throw new IllegalArgumentException("Invalid key format: " + entry.getKey());
            }
            Long wishlistId = Long.parseLong(keyParts[1]);
            int priority = Integer.parseInt(entry.getValue());

            // DB 업데이트
            Wishlist wishlist = wishlistRepository.findById(wishlistId)
                    .orElseThrow(() -> new IllegalArgumentException("Wishlist item not found: " + wishlistId));
            wishlist.setPriority(priority);
            wishlistRepository.save(wishlist);

            // SQS 메시지 생성 및 전송
            String messageBody = String.format("{\"action\": \"updatePriority\", \"id\": %d, \"priority\": %d}", wishlistId, priority);
            SendMessageRequest sendMessageRequest = new SendMessageRequest()
                    .withQueueUrl(queueUrl)
                    .withMessageBody(messageBody)
                    .withMessageGroupId("wishlist")  // FIFO 큐 메시지 그룹
                    .withMessageDeduplicationId(String.valueOf(wishlistId));  // 중복 제거 ID

            amazonSQS.sendMessage(sendMessageRequest);
        } catch (IllegalArgumentException e) {
            // 에러 처리
            System.err.println("Error processing entry: " + entry + ". " + e.getMessage());
        }
    }
}

 

전달된 priorities 맵을 순회하여 각 항목을 처리한다 -> 키를 _ 구분자로 나누어 wishlistId를 추출한다.

 

 updatePriorities 메서드는 위시리스트 항목의 우선순위를 업데이트하고, 이를 SQS로 메시지를 전송하여 비동기적으로 처리할 수 있도록 설계되었습니다. FIFO 큐를 사용하여 중복되지 않도록 메시지를 관리합니다.

 

@Transactional
public void deleteWishlist(Long id) {
    Wishlist wishlist = entityManager.find(Wishlist.class, id);
    if (wishlist != null) {
        entityManager.remove(wishlist);
        entityManager.flush();  // 캐시 강제 플러시
    }

    // SQS 메시지 생성 및 전송
    String messageBody = String.format("{\"action\": \"delete\", \"id\": %d}", id);
    SendMessageRequest sendMessageRequest = new SendMessageRequest()
            .withQueueUrl(queueUrl)
            .withMessageBody(messageBody)
            .withMessageGroupId("wishlist")
            .withMessageDeduplicationId(String.valueOf(id));  // 중복 제거 ID

    amazonSQS.sendMessage(sendMessageRequest);
}

 

id에 해당하는 Wishlist 객체를 조회하고 db에서 삭제

캐시된 데이터를 즉시 DB에 반영하여, 삭제 작업이 실제로 DB에 반영되도록 강제한다.

deleteWishlist 메서드는 데이터베이스에서 위시리스트 항목을 삭제한 후, 해당 삭제 작업을 SQS에 알리기 위해 메시지를 보냅니다. 이렇게 하면 삭제 작업이 발생한 경우 다른 서비스가 이를 추적할 수 있습니다.

@Transactional
public void addLectureToWishlist(String lectureCode, String division, Long studentId) {
    Optional<Lecture> optionalLecture = lectureRepository.findByLectureCodeAndDivision(lectureCode, division);

    if (optionalLecture.isPresent()) {
        Lecture lecture = optionalLecture.get();

        Wishlist wishlist = new Wishlist();
        wishlist.setLecture(lecture);
        wishlist.setStudentId(studentId);
        wishlist.setPriority(0);  // 기본 우선순위 설정
        wishlistRepository.save(wishlist);

        // SQS 메시지 생성 및 전송
        String messageBody = String.format("Added lecture to wishlist: %s (%s)", lectureCode, division);
        String messageGroupId = "wishlist";
        String messageDeduplicationId = UUID.randomUUID().toString();
        amazonSQS.sendMessage(new SendMessageRequest(queueUrl, messageBody)
                .withMessageGroupId(messageGroupId)
                .withMessageDeduplicationId(messageDeduplicationId));
    } else {
        throw new IllegalArgumentException("강의를 찾을 수 없습니다.");
    }
}

 

사용자가 강의를 위시리스트에 추가하면 해당 정보를 SQS에 메시지로 보내어 비동기적으로 다른 서비스와 연동할 수 있습니다.