SQS란? 서버들끼리 사용할 수 있는 완전관리형 Message Queue 서비스
Message Queue란? 메시지를 이용하여 여러 서버를 연결해주는 역할이라 생각 한다.
방식은 sender가 데이터를 Queue에 적재했다가 receiver가 꺼내서 처리 한다.
목적 Message Queue는 왜 사용해야 하는가?
실습 SQS는 표준, FIFO 두 가지를 지원 한다.
표준 express(sender)에서 SQS(Queue)에 메세지를 적재 하고 Lambda(receiver)에서
꺼낸 다음에 DynamoDB에 저장을 실습으로 해보겠다.
생성
표준으로 하면 메세지가 중복으로 올 수 있다는건데 어플리케이션에서 처리를 해야 할 꺼 같다.
sender API 호출 시 SQS에 메세지를 전송하게끔 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 const express = require ('express' );const bodyParser = require ('body-parser' );const AWS = require ('aws-sdk' );const app = express();const PORT = process.env.PORT || 3000 ;app.use(bodyParser.json()); app.use(bodyParser.urlencoded({ extended: true })); AWS.config.update({ region: '' , accessKeyId: '' , secretAccessKey: '' }); const sqs = new AWS.SQS({ apiVersion : '2012-11-05' });app.listen(PORT, () => { console .log('server is listening' ); }); app.get('/' , (req, res ) => { res.send('hihi' ); }); function sendMessage (message ) { const params = { QueueUrl: '생성한 SQS URL' , MessageBody: JSON .stringify(message), }; return new Promise ((resolve, reject ) => { sqs.sendMessage(params).promise().then(() => { resolve(); }).catch((e ) => { reject(e); }); }); } app.post('/login' , (req, res ) => { const id = req.body.id; const log = { id, dtm : Date .now(), action : 'login' }; sendMessage(log).then(() => { res.json({ code : 200 , message : 'success' }); }).catch((e ) => { res.json({ code : 500 , message : e }); }); }); app.post('/purchase' , (req, res ) => { const { id, item } = req.body; const log = { id, item, dtm : Date .now(), action : 'purchase' }; sendMessage(log).then(() => { res.json({ code : 200 , message : 'success' }); }).catch((e ) => { return res.json({ code : 500 , message : e }); }); }); app.post('/search' , (req, res ) => { const { id, word } = req.body.id; const log = { id, word, dtm : Date .now(), action : 'search' }; sendMessage(log).then(() => { res.json({ code : 200 , message : 'success' }); }).catch((e ) => { res.json({ code : 500 , message : e }); }); });
receiver Lambda를 receiver로 쓸 껀데 해당 하는 IAM 역할을 생성을 미리 해야 한다.
권한 생성
역핢 만들기를 누른다.
사용 사례는 lambda로
SQS 검색해서 추가
로그를 보기 위해 CloudWatch도 추가
태그는 대충..
마지막으로 확인 후 생성
Lambda 생성
위에서 만든 권한을 연결
코드를 작성 해서 Lambda에 업로드
아래의 코드는 대기열이 만든 메세지 ID를 기반으로 dynamodb에서 체크를 한 다음에 넣는다.
실무에서 안 써 봤기에 AWS 커뮤니티에 질문한 결과 이런 식으로 하는 게 맞다고 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 const AWS = require ('aws-sdk' );AWS.config.update({ region: '' , accessKeyId: '' , secretAccessKey: '' }); const sqs = new AWS.SQS({ apiVersion : '2012-11-05' });const dynamo = new AWS.DynamoDB.DocumentClient();exports .handler = async (event) => { const { body, messageId, receiptHandle } = event.Records[0 ]; const message = JSON .parse(body); let haveData; try { const params = { TableName: '테이블명' , Key: { messageId: messageId, dtm: Number (message.dtm), }, }; haveData = await dynamo.get(params).promise(); } catch (e) { console .log('sqs worker err' , e); return { statusCode : 500 , message : 'sqs worker err' }; } try { if (typeof haveData !== 'undefined' ) { const putParams = { TableName: '테이블명' , Item: { messageId: messageId, dtm: Number (message.dtm), action: message.action }, }; if (message.action === 'purchase' ) params.Item.item = message.item; if (message.action === 'search' ) params.Item.search = message.word; const removeParams = { QueueUrl : 'SQS URL' , ReceiptHandle }; await Promise .all([ dynamo.put(putParams).promise(), sqs.deleteMessage(removeParams).promise(), ]); return { statusCode : 200 , message : 'sqs worker success' }; } } catch (e) { console .log('sqs worker err' , e); return { statusCode : 500 , message : 'sqs worker err' }; } };
테스트 코드를 실행 하기 전에 DynamoDB를 생성 한다.
요청 결과는 다음과 같다.
FIFO sqs 생성 fifo 방식은 대기열 이름 뒤에 .fifo를 붙여야 한다.
아래에 대기열 생성 파랑 버튼 클릭
생성 완료 시 아래의 URL은 나중에 필요.
sender 로컬에서 테스트를 해 본다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 const AWS = require ('aws-sdk' );AWS.config.update({ region: '' , accessKeyId: '' , secretAccessKey: '' }); const sqs = new AWS.SQS({ apiVersion : '2012-11-05' });const randomStr = (len ) => { let result = '' ; const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' ; const charactersLength = characters.length; for (var i = 0 ; i < len; i++) { result += characters.charAt(Math .floor(Math .random() * charactersLength)); } return result; } const sendMessage = (type, id, body ) => { const params = { QueueUrl: 'SQS에서 발급한 URL' , MessageBody: JSON .stringify(body), MessageGroupId: type, MessageDeduplicationId: `${id} :${new Date ().getTime()} :${randomStr(5 )} ` , }; return new Promise ((resolve, reject ) => { const result = sqs.sendMessage(params).promise(); result.then(() => { console .log('성공한 메세지' , JSON .stringify((data))); resolve(data); }).catch((e ) => { console .log('실패한 메세지' , JSON .stringify(params)); reject(e); }); }); }; sendMessage('join' , 'a' , {id : 'a' , ts : new Date ().getTime(), message : 'hihi' }).then(() => { console .log('a - send success' ); }).catch((e ) => { console .log('e' , e); }); setTimeout (() => { sendMessage('join' , 'b' , {id : 'b' , ts : new Date ().getTime(), message : 'hello' }).then(() => { console .log('b - send success' ); }).catch((e ) => { console .log('e' , e); }); }, 2000 ); setTimeout (() => { sendMessage('join' , 'c' , {id : 'c' , ts : new Date ().getTime(), message : '나비보벳따우' }).then(() => { console .log('c - send success' ); }).catch((e ) => { console .log('e' , e); }); }, 500 );
전송 순서는 a -> c -> b다.
receiver 권한 생성 위에서 만든 표준 SQS 역할과 같다.
Lambda 생성 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 const AWS = require ('aws-sdk' );AWS.config.update({ region: '' , accessKeyId: '' , secretAccessKey: '' }); const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion : '2012-08-10' });const sqs = new AWS.SQS({ apiVersion : '2012-11-05' });exports .handler = async (event) => { const body = JSON .parse(event.Records[0 ].body); const receiptHandle = event.Records[0 ].receiptHandle; console .log('body' , body); console .log('receiptHandle' , receiptHandle); const putParams = { TableName: 'receive' , Item: { id: body.id, timestamp: body.ts, message: body.message, }, }; const sqsParams = { QueueUrl: 'SQS에서 발급하는 URL' , ReceiptHandle: receiptHandle, } try { await Promise .all([ ddb.put(putParams).promise(), sqs.deleteMessage(sqsParams).promise(), ]); const response = { statusCode: 200 , body: JSON .stringify('receive success' ), }; return response; } catch (e) { console .log('e' , e); const response = { statusCode: 500 , body: JSON .stringify('receive err' ), }; return response; } };
Lambda 트리거 설정
테스트
허용되는 범위의 메세지의 수로 테스트를 해봐야 정확하나 우선은 의도한대로 프로세스는 작동 했다.
참고