Redis를 MessageQueue 서버로 사용해보기
2018-12-23 21:31:04

테스트로 서비스의 api가 오면 redis에 큐 메세지를 등록해두고 job 으로

분마다 redis에 적재된 큐 메세지들을 읽어서 mysql에 저장 한 다음에 redis에서 삭제.

로직

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

const mysql = require('mysql2');
const RedisSMQ = require('rsmq');

const rsmq = new RedisSMQ({ host: '127.0.0.1', port: 6379, ns: 'rsmq' }); // NOTE: ns는 저장할 큐의 네임스페이스

/**
* TODO: 큐 생성
* @description 중복된 큐 이름은 생성 불가
* @param {String} qname 큐 이름
*/
const createQueue = (qname) => {
return new Promise((resolve, reject) => {
rsmq.createQueue({ qname }, (err, result) => {
if (err) return reject(err);
if (!err && Number(result) === 1) return resolve();
return reject(new Error('큐 생성 실패'));
});
});
};


/**
* TODO: 메세지 전송
* @param {String} qname 큐 이름
* @param {String} message 보낼 메세지
*/
const sendMessage = (qname, message) => {
return new Promise((resolve, reject) => {
rsmq.sendMessage({ qname, message }, (err, result) => {
if (err) return reject(err);
if (!err && Number(result) === 1) return resolve();
return reject(new Error('메세지 전송 실패'));
});
});
};

/**
* TODO: 메세지 큐 보내는 프로세스.
* @param {String} qname 큐 이름
* @param {String} message 보낼 메세지
*/
const sendProcess = (qname, message) => {
return new Promise((resolve, reject) => {
sendMessage(qname, message).then(async () => {
return resolve();
}).catch(async (e) => {
try {
if (e.name === 'queueNotFound') { // NOTE: queue를 생성 안해서 나는 오류.
await createQueue(qname);
await sendMessage(qname, message);
return resolve();
}
return reject(e);
} catch (err) {
return reject(err);
}
});
});
};

/**
* TODO: 메세지 수신하기
* @param {String} qname 큐 이름
*/
const receiveMessage = (qname) => {
return new Promise((resolve, reject) => {
rsmq.receiveMessage({ qname }, (err, result) => {
if (err && err.name === 'queueNotFound') return resolve(); // NOTE: 아직 등록되지 않은 큐를 읽을 시 예외 처리
if (err) return reject(err);
if (!err && result.id) return resolve(result);
return resolve();
});
});
};

/**
* TODO: sql에 메세지 큐 내용 저장
* @param {*} data 메세지 내용
*/
const saveLog = (data) => {
return new Promise((resolve, reject) => {
const connection = mysql.createConnection({
host: 'localhost',
user: 'root',
database: 'test',
waitForConnections: true,
connectionLimit: 20,
});
const sql = 'INSERT INTO `my_log` (`user_id`, `event`, `create_at`) VALUES (?, ?, ?);';
const values = JSON.parse(data.message);
connection.execute(sql, [values.user_id, values.event, values.create_at], (err) => {
connection.end();
if (err) return reject(err);
return resolve(data.id);
});
});
};

/**
* TODO: queue 삭제
* @param {String} qname 큐 이름
* @param {String} id 큐 ID
*/
const deleteMessage = (qname, id) => {
return new Promise((resolve, reject) => {
rsmq.deleteMessage({ qname, id }, (err) => {
if (err) return reject(err);
return resolve();
});
});
};

/**
* TODO: 메세지 읽은 후 mysql에 저장, 읽은 메세지 삭제.
* @param {String} qname 큐 이름
*/
const receiveProcess = (qname) => {
return new Promise((resolve, reject) => {
receiveMessage(qname).then(async (data) => { // NOTE: queue를 꺼내서 읽음.
try {
if (typeof data !== 'undefined') {
const id = await saveLog(data);
await deleteMessage(qname, id);
return resolve(true);
}
return resolve(false);
} catch (err) {
return reject(err);
}
}).catch((e) => {
return reject(e);
});
});
};


exports.sendProcess = sendProcess;
exports.receiveProcess = receiveProcess;

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

const express = require('express');
const bodyParser = require('body-parser');
const mt = require('moment-timezone');
const queue = require('./queue');

const app = express();
app.use(bodyParser.json());
app.use(bodyParser.urlencoded({
extended: true,
}));

app.get('/', (req, res) => {
res.send('hi');
});

app.post('/send', (req, res) => {
const d = mt.tz('Asia/Seoul'); // 서버 시간은 서울로 맞추기.
let hour = d.hour(); // 0 ~ 23시
let min = d.minute() + 1; // 0 ~ 59분 큐는 1분뒤에 꺼낼꺼라 더해준다
if (min > 59) { // 시간 예외 처리용
min = min === 60 ? 0 : min - 59;
hour = hour === 23 ? 0 : hour + 1;
}
const qname = `${hour}_${min}`;
const data = {
user_id: req.body.user_id,
event: req.body.event,
create_at: d.format('YYYY-MM-DD HH:mm:ss'),
};
queue.sendProcess(qname, JSON.stringify(data)).then(() => {
return res.json({ result: 'success' });
}).catch((e) => {
return res.json(e);
});
});

app.listen(3000, () => {
console.log('server listen');
});

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

const { CronJob } = require('cron');
const mt = require('moment-timezone');
const queue = require('./queue');

const job = new CronJob('* * * * *', () => { // NOTE: 1분마다 확인.
const d = mt.tz('Asia/Seoul');
const hour = d.hour();
const min = d.minute();
const qname = `${hour}_${min}`;
const loop = async () => { // NOTE: 큐에 등록된 메세지들을 전부 다 읽기 위해 재귀로 처리를 한다.
try {
const result = await queue.receiveProcess(qname);
console.log('qname', qname, 'result', result);
if (result) return loop();
return true;
} catch (err) {
console.log(err);
return false;
}
};
loop();
}, null, true, 'Asia/Seoul');
job.start();

테스트

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const request = require('request');

const event = ['join', 'login', 'purcharse', 'logout'];

const params = [];

const test = () => {
return new Promise((resolve, reject) => {
request({
method: 'POST',
uri: 'http://localhost:3000/send',
json: {
user_id: Math.floor(Math.random() * 10000) + 1,
event: event[Math.floor(Math.random() * 3)],
},
}, (err, res, body) => {
if (err) return reject(err);
return resolve(body);
});
});
};

for (let i = 0; i < 400; i += 1) {
params.push(test());
}

Promise.all(params).then(() => {
console.log('end');
}).catch((e) => {
console.log(e);
});

참고