AWS SQS 사용해보기
2020-03-04 23:17:51

SQS란?

서버들끼리 사용할 수 있는 완전관리형 Message Queue 서비스

Message Queue란?

메시지를 이용하여 여러 서버를 연결해주는 역할이라 생각 한다.

방식은 sender가 데이터를 Queue에 적재했다가 receiver가 꺼내서 처리 한다.

목적

Message Queue는 왜 사용해야 하는가?

실습

SQS는 표준, FIFO 두 가지를 지원 한다.

표준

express(sender)에서 SQS(Queue)에 메세지를 적재 하고 Lambda(receiver)에서

꺼낸 다음에 DynamoDB에 저장을 실습으로 해보겠다.

생성

생성

표준으로 하면 메세지가 중복으로 올 수 있다는건데 어플리케이션에서 처리를 해야 할 꺼 같다.

생성결과

sender

API 호출 시 SQS에 메세지를 전송하게끔 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
const express = require('express');

const bodyParser = require('body-parser');

const AWS = require('aws-sdk');

const app = express();

const PORT = process.env.PORT || 3000;

app.use(bodyParser.json());
app.use(bodyParser.urlencoded({
extended: true
}));

AWS.config.update({
region: '', // 리전
accessKeyId: '', // IAM
secretAccessKey: '' // IAM
});

const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });

app.listen(PORT, () => {
console.log('server is listening');
});

app.get('/', (req, res) => {
res.send('hihi');
});

function sendMessage(message) {
const params = {
QueueUrl: '생성한 SQS URL',
MessageBody: JSON.stringify(message),
};
return new Promise((resolve, reject) => {
sqs.sendMessage(params).promise().then(() => {
resolve();
}).catch((e) => {
reject(e);
});
});
}

app.post('/login', (req, res) => {
const id = req.body.id;
const log = { id, dtm: Date.now(), action: 'login' };
sendMessage(log).then(() => {
res.json({ code: 200, message: 'success' });
}).catch((e) => {
res.json({ code: 500, message: e });
});
});

app.post('/purchase', (req, res) => {
const { id, item } = req.body;
const log = { id, item, dtm: Date.now(), action: 'purchase' };
sendMessage(log).then(() => {
res.json({ code: 200, message: 'success' });
}).catch((e) => {
return res.json({ code: 500, message: e });
});
});

app.post('/search', (req, res) => {
const { id, word } = req.body.id;
const log = { id, word, dtm: Date.now(), action: 'search' };
sendMessage(log).then(() => {
res.json({ code: 200, message: 'success' });
}).catch((e) => {
res.json({ code: 500, message: e });
});
});

receiver

Lambda를 receiver로 쓸 껀데 해당 하는 IAM 역할을 생성을 미리 해야 한다.

권한 생성
  1. 역핢 만들기를 누른다.
  2. 사용 사례는 lambda로
  3. SQS 검색해서 추가
  4. 로그를 보기 위해 CloudWatch도 추가
  5. 태그는 대충..
  6. 마지막으로 확인 후 생성

권한 - 1

권한 - 2

권한 - 3

권한 - 4

권한 - 5

권한 - 6

Lambda 생성
  1. 위에서 만든 권한을 연결
  2. 코드를 작성 해서 Lambda에 업로드

권한연결

결과

트리거추가

아래의 코드는 대기열이 만든 메세지 ID를 기반으로 dynamodb에서 체크를 한 다음에 넣는다.

실무에서 안 써 봤기에 AWS 커뮤니티에 질문한 결과 이런 식으로 하는 게 맞다고 한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const AWS = require('aws-sdk');

AWS.config.update({
region: '', // 리전
accessKeyId: '', // IAM
secretAccessKey: '' // IAM
});

const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });

const dynamo = new AWS.DynamoDB.DocumentClient();

exports.handler = async (event) => {

/**
* body: 내가 보낸 메세지
* messageId: unique인진 모르겠지만 메세지 아디가 존재
* receiptHandle: 메세지를 큐에서 지우기 위한 파라미터
*/
const { body, messageId, receiptHandle } = event.Records[0];

const message = JSON.parse(body);

let haveData;

try {
const params = {
TableName: '테이블명',
Key: {
messageId: messageId,
dtm: Number(message.dtm),
},
};
haveData = await dynamo.get(params).promise();
} catch (e) {
console.log('sqs worker err', e);
return { statusCode: 500, message: 'sqs worker err' };
}

try {
if (typeof haveData !== 'undefined') {
const putParams = {
TableName: '테이블명',
Item: {
messageId: messageId,
dtm: Number(message.dtm),
action: message.action
},
};
if (message.action === 'purchase') params.Item.item = message.item;
if (message.action === 'search') params.Item.search = message.word;
const removeParams = { QueueUrl: 'SQS URL', ReceiptHandle };
await Promise.all([
dynamo.put(putParams).promise(),
sqs.deleteMessage(removeParams).promise(),
]);
return { statusCode: 200, message: 'sqs worker success' };
}
} catch (e) {
console.log('sqs worker err', e);
return { statusCode: 500, message: 'sqs worker err' };
}
};

테스트

코드를 실행 하기 전에 DynamoDB를 생성 한다.

dynamodb

요청 결과는 다음과 같다.

request

response

FIFO

sqs 생성

fifo 방식은 대기열 이름 뒤에 .fifo를 붙여야 한다.

SQS 생성

아래에 대기열 생성 파랑 버튼 클릭

SQS 생성 완료

생성 완료 시 아래의 URL은 나중에 필요.

sender

로컬에서 테스트를 해 본다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
const AWS = require('aws-sdk');

AWS.config.update({
region: '', // 리전
accessKeyId: '', // IAM
secretAccessKey: '' // IAM
});

const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });

const randomStr = (len) => {
let result = '';
const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
const charactersLength = characters.length;
for (var i = 0; i < len; i++) {
result += characters.charAt(Math.floor(Math.random() * charactersLength));
}
return result;
}

const sendMessage = (type, id, body) => {
const params = {
QueueUrl: 'SQS에서 발급한 URL',
MessageBody: JSON.stringify(body), // NOTE: 보낼 메세지
MessageGroupId: type, // NOTE: 메시지 그룹 ID 특정 메시지 그룹에 속한 메시지를 지정하는 태그. 동일한 메시지 그룹에 속한 메시지는 메시지 그룹에 따라 엄격한 순서로 항상 하나씩 처리됩니다
MessageDeduplicationId: `${id}:${new Date().getTime()}:${randomStr(5)}`, // NOTE: 메시지 중복 제거 ID 전송된 메시지의 중복 제거에 사용되는 토큰. 특정 메시지 중복 제거 ID가 있는 메시지를 성공적으로 전송한 경우, 메시지 중복 제거 ID가 동일한 모든 메시지는 성공적으로 수신되지만 중복 제거 간격인 5분 동안은 전달되지 않습니다.
};
return new Promise((resolve, reject) => {
const result = sqs.sendMessage(params).promise();
result.then(() => {
console.log('성공한 메세지', JSON.stringify((data)));
resolve(data);
}).catch((e) => {
console.log('실패한 메세지', JSON.stringify(params));
reject(e);
});
});
};

sendMessage('join', 'a', {id: 'a', ts: new Date().getTime(), message: 'hihi'}).then(() => {
console.log('a - send success');
}).catch((e) => {
console.log('e', e);
});

setTimeout(() => {
sendMessage('join', 'b', {id: 'b', ts: new Date().getTime(), message: 'hello'}).then(() => {
console.log('b - send success');
}).catch((e) => {
console.log('e', e);
});
}, 2000);

setTimeout(() => {
sendMessage('join', 'c', {id: 'c', ts: new Date().getTime(), message: '나비보벳따우'}).then(() => {
console.log('c - send success');
}).catch((e) => {
console.log('e', e);
});
}, 500);

전송 순서는 a -> c -> b다.

receiver

권한 생성

위에서 만든 표준 SQS 역할과 같다.

IAM 생성

Lambda 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
const AWS = require('aws-sdk');

AWS.config.update({
region: '', // 리전
accessKeyId: '', // IAM
secretAccessKey: '' // IAM
});

const ddb = new AWS.DynamoDB.DocumentClient({ apiVersion: '2012-08-10' });
const sqs = new AWS.SQS({ apiVersion: '2012-11-05' });

exports.handler = async (event) => {
const body = JSON.parse(event.Records[0].body);
const receiptHandle = event.Records[0].receiptHandle;
console.log('body', body);
console.log('receiptHandle', receiptHandle);
const putParams = {
TableName: 'receive',
Item: {
id: body.id,
timestamp: body.ts,
message: body.message,
},
};
const sqsParams = {
QueueUrl: 'SQS에서 발급하는 URL',
ReceiptHandle: receiptHandle, // NOTE: 해당 메세지의 UUID라 생각하면 될듯.
}
try {
await Promise.all([
ddb.put(putParams).promise(), // NOTE: dynamodb에 저장
sqs.deleteMessage(sqsParams).promise(), // NOTE: 수신한 메세지는 삭제
]);
const response = {
statusCode: 200,
body: JSON.stringify('receive success'),
};
return response;
} catch (e) {
console.log('e', e);
const response = {
statusCode: 500,
body: JSON.stringify('receive err'),
};
return response;
}
};
Lambda 트리거 설정

lambda 트리거 설정

테스트

cloudwatch 결과

dynamodb 결과

허용되는 범위의 메세지의 수로 테스트를 해봐야 정확하나 우선은 의도한대로 프로세스는 작동 했다.

참고