Node.js app log를 Fluentd에 저장하기
2020-05-09 14:29:43

설정

예제 가이드대로 td-agent.conf를 바꿔본다.

1
2
3
<match fluentd.test.**>
@type stdout
</match>

설정추가

그리고 나서 td-agent를 종료 후 다시 켜 주면 된다. 근데 match랑 source가 뭔지 보고 가자.

match

“match” 지시문은 일치하는 태그가 있는 이벤트를 찾아서 처리합니다, 일치하는 태그가 있는 이벤트를 찾아서 처리합니다

공홈의 내용을 번역기로 돌렸더니 아 뭔가 카테고리나 네임스페이스인가? 라는 생각이 들었다.

source

입력 소스는 소스 지시문을 사용하여 원하는 입력 플러그인을 선택하고 구성하여 활성화됩니다.

1
2
3
4
5
6
7
8
9
<source>
@type forward
@id input_forward
</source>
<source>
@type http
@id input_http
port 8888
</source>

아 여기로 로그를 보내는건가 보군! 이란 생각이 들었음. 여러 개 추가 된다고 한다.

전송하기

예제 코드를 따라가 보며 해 본다.

패키지 설치

1
npm i fluent-logger -S

코드

1
2
3
4
5
6
7
8
9
10
const logger = require('fluent-logger');

logger.configure('fluentd.test', { // NOTE: tag
host: 'localhost',
port: 24224, // NOTE: td-agent 실행 시 log에 port 24224로 뜸.
timeout: 3.0,
reconnectInterval: 600000, // 10 minutes, NOTE: 연결 실패 시 재연결 쿨 타임
});

logger.emit('join', {id: 'a', name: 'delryn', age: 32});

실행 결과는 아래 명령어를 쳐 보면 나온다.

1
2
tail -n 1 /var/log/td-agent/td-agent.log
2020-05-09 17:45:47.000000000 +0900 fluentd.test.join: {"id":"a","name":"delryn","age":32}

어플리케이션 -> agent 연결된 상태이기 때문에 agent를 종료하면 연결이 끊기는 메세지를 볼수 있다.

1
2
3
4
5
6
7
8
Fluentd error { Error: connect ECONNREFUSED 127.0.0.1:24224
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1106:14)
errno: 'ECONNREFUSED',
code: 'ECONNREFUSED',
syscall: 'connect',
address: '127.0.0.1',
port: 24224 }
Fluentd will reconnect after 600 seconds

winston, morgan으로 연동

이전에 해본 winston + morgan 로그를 fluentd로 보내서 파일을 생성하게 해 보자.

설정 추가

기존 conf를 참조해서 한번 추가 해보았다.

1
2
3
4
5
6
7
8
9
<match myproject.app.**>
@type file
path /var/log/td-agent/myproject/app
</match>

<match myproject.http.**>
@type file
path /var/log/td-agent/myproject/http
</match>

코드 작성

이전에 작성한 코드에 추가 해 본다.

logger.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
const { createLogger, format, transports } = require('winston');
const fluentTransport = require('fluent-logger').support.winstonTransport();
const mt = require('moment-timezone');

const { combine, label, printf } = format;

const date = mt().tz('Asia/Seoul');
const myFormat = printf(info => `${info.timestamp} [${info.level}]: ${info.label} - ${info.message}`);
const koreaTime = format((info) => {
info.timestamp = date.format();
return info;
});

const fluentConfig = {
host: 'localhost',
port: 24224,
timeout: 3.0,
requireAckResponse: false, // NOTE: true로 설졍하면 fluentd의 응답을 기다린다라고 번역기로 나오는데 난 false로 해봤다.
};

const fluentInit = (tag) => {
const init = new fluentTransport(tag, fluentConfig);
return init;
}

const appLogger = (category) => {
const init = createLogger({
format: combine(
label({ label: category }),
koreaTime(),
myFormat,
),
transports: [fluentInit('myproject.app')],
});
return init;
};

const httpLogger = createLogger({
format: combine(
label({ label: 'http' }),
koreaTime(),
myFormat,
),
transports: [fluentInit('myproject.http')],
});

const httpLogStream = {
write: (message) => {
httpLogger.log({
level: 'info',
message: message,
});
},
};

exports.appLogger = appLogger;
exports.httpLogStream = httpLogStream;

index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

const express = require('express');
const morgan = require("morgan");
const logger = require('./logger');
const app = express();
const PORT = process.env.PORT || 3000;

const morganFormat = process.env.NODE_ENV !== "production" ? "dev" : "combined";

app.use(morgan(morganFormat, { stream: logger.httpLogStream }));

app.get('/', (req, res) => {
res.send('hihi');
});

app.get('/set-item', (req, res) => {
const { id, name, item_id } = req.query;
if (typeof id === 'undefined' || typeof name === 'undefined' || typeof item_id === 'undefined') {
logger.appLogger('spend_item').log({
level: 'warn',
message: '파라미터 누락',
});
return res.status(200).json({ statusCode: 403, message: '파라미터 누락' });
}
if (typeof item_id !== 'string' || !item_id.includes('item_')) {
logger.appLogger('spend_item').log({
level: 'error',
message: '잘못된 아이템 타입',
});
return res.status(200).json({ statusCode: 500, message: '잘못된 아이템 타입' });
}
logger.appLogger('spend_item').log({
level: 'info',
message: `user_id: ${id}, name: ${name}, 획득 아이템: ${item_id}`,
});
return res.status(200).json({ statusCode: 200, message: '처리 성공' });
});

app.listen(PORT, () => console.log(`app listening on port ${PORT}`));

실행을 해서 테스트를 하고 내가 설정한 경로에서 로그를 찾아보면 기록이 남아 있다.

1
2
3
2020-05-09T19:35:19+09:00       myproject.app   {"level":"info","message":"user_id: abc, name: 123, 획득 아이템: item_123","label":"spend_item","timestamp":"2020-05-09T19:35:17+09:00"}

2020-05-09T19:35:19+09:00 myproject.http {"level":"info","message":"\u001B[0mGET /set-item?id=abc&name=123&item_id=item_123 \u001B[32m200\u001B[0m 8.309 ms - 44\u001B[0m\n","label":"http","timestamp":"2020-05-09T19:35:17+09:00"}

s3 연동

로컬 파일이 아닌 s3에 파일을 업로드하도록 바꿔보자

설정 변경

이전 설정은 주석으로 해두고 새 설정을 추가 하자.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<match myproject.app.**>
@type s3

aws_key_id AWS 키
aws_sec_key AWS 시크릿 키
s3_bucket s3 버켓명
s3_region 리전
path logs/${tag}/%Y/%m/%d/ # 저장할 s3 디렉토리
<buffer tag,time>
@type file
path /var/log/fluent/s3/app # 버퍼로 갖고 있을 디텍토리 설정
timekey 0 # timekey는 시간
timekey_wait 10s # timekey_wait는 초 또는 분 (s, m)
timekey_use_utc true # use utc
chunk_limit_size 256m # 버퍼 파일 크기 지정
</buffer>
</match>

<match myproject.http.**>
@type s3

aws_key_id AWS 키
aws_sec_key AWS 시크릿 키
s3_bucket s3 버켓명
s3_region 리전
path logs/${tag}/%Y/%m/%d/
<buffer tag,time>
@type file
path /var/log/fluent/s3/http
timekey 0
timekey_wait 10s
timekey_use_utc true
chunk_limit_size 256m
</buffer>
</match>

timekey, timekey_wait에 설정한 시간단위 별로 버퍼에 담아둔 로그를 s3에 업로드한다.

지금은 10초마다 업로드해놓게끔 해놨고 실제 어플리케이션에서 테스트를 해봤을 시

s3업로드

gzip으로 압축 되서 올라 간다.