1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| const mysql = require('mysql2'); const RedisSMQ = require('rsmq');
const rsmq = new RedisSMQ({ host: '127.0.0.1', port: 6379, ns: 'rsmq' });
const createQueue = (qname) => { return new Promise((resolve, reject) => { rsmq.createQueue({ qname }, (err, result) => { if (err) return reject(err); if (!err && Number(result) === 1) return resolve(); return reject(new Error('큐 생성 실패')); }); }); };
const sendMessage = (qname, message) => { return new Promise((resolve, reject) => { rsmq.sendMessage({ qname, message }, (err, result) => { if (err) return reject(err); if (!err && Number(result) === 1) return resolve(); return reject(new Error('메세지 전송 실패')); }); }); };
const sendProcess = (qname, message) => { return new Promise((resolve, reject) => { sendMessage(qname, message).then(async () => { return resolve(); }).catch(async (e) => { try { if (e.name === 'queueNotFound') { await createQueue(qname); await sendMessage(qname, message); return resolve(); } return reject(e); } catch (err) { return reject(err); } }); }); };
const receiveMessage = (qname) => { return new Promise((resolve, reject) => { rsmq.receiveMessage({ qname }, (err, result) => { if (err && err.name === 'queueNotFound') return resolve(); if (err) return reject(err); if (!err && result.id) return resolve(result); return resolve(); }); }); };
const saveLog = (data) => { return new Promise((resolve, reject) => { const connection = mysql.createConnection({ host: 'localhost', user: 'root', database: 'test', waitForConnections: true, connectionLimit: 20, }); const sql = 'INSERT INTO `my_log` (`user_id`, `event`, `create_at`) VALUES (?, ?, ?);'; const values = JSON.parse(data.message); connection.execute(sql, [values.user_id, values.event, values.create_at], (err) => { connection.end(); if (err) return reject(err); return resolve(data.id); }); }); };
const deleteMessage = (qname, id) => { return new Promise((resolve, reject) => { rsmq.deleteMessage({ qname, id }, (err) => { if (err) return reject(err); return resolve(); }); }); };
const receiveProcess = (qname) => { return new Promise((resolve, reject) => { receiveMessage(qname).then(async (data) => { try { if (typeof data !== 'undefined') { const id = await saveLog(data); await deleteMessage(qname, id); return resolve(true); } return resolve(false); } catch (err) { return reject(err); } }).catch((e) => { return reject(e); }); }); };
exports.sendProcess = sendProcess; exports.receiveProcess = receiveProcess;
|