요 몇일전에 log를 수집하기 위해 회사에서 만든건데 내가 만들어 영 시원찮아 그냥 안쓰기로 했다.
그래도 만약 필요한 사람들이 있다면 고쳐서 쓰면 좋을 것 같다. logback에 flume을 내장해서 kafka에 보내는 라이브러리다.
일단 카프카가 설치 되어 있어야 한다. 카프카 설치는
여기서 보고 따라하며 된다.
zookeeper와 kafka, 그리고 kafka consumer를 실행 시키자.
그 다음에 아래와 같이 메이븐에 추가 하자.
<repositories>
<repository>
<id>spring-boot-gcm-mvn-repo</id>
<url>https://raw.github.com/wonwoo/logback-flume-kafka-appender/mvn-repo/</url>
</repository>
</repositories>
<dependency>
<groupId>me.wonwoo</groupId>
<artifactId>logback-flume-kafka-appender</artifactId>
<version>0.9.0-SNAPSHOT</version>
</dependency>
그런다음 logback.xml에 appender를 등록하자
<appender name="flume" class="me.wonwoo.appender.FlumeKafkaAppender">
<sinkName>kafkaSink</sinkName>
<channelName>kafkaChannel</channelName>
<mode>INFO</mode>
<!--https://flume.apache.org/FlumeUserGuide.html#file-channel -->
<channelAttr>
<capacity>100000</capacity>
<transactionCapacity>1000</transactionCapacity>
<dataDirs>/Users/wonwoo/Desktop/flume</dataDirs>
<checkpointDir>/Users/wonwoo/Desktop/flume/checkout</checkpointDir>
</channelAttr>
<!-- topic ,brokerList, batchSize -->
<KafkaAttr>
<topic>test</topic>
<brokerList>localhost:9092</brokerList>
<!--<brokerList>localhost:9092,localhost:9093,localhost:9094</brokerList>-->
<batchSize>100</batchSize>
</KafkaAttr>
<encoder class="me.wonwoo.encoding.DefaultFlumeMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg</pattern>
</layout>
</encoder>
</appender>
속성으로는 sinkName, channelName, mode, channelAttr, KafkaAttr 등이 있다.
sinkName, channelName 은 각각의 이름을 지정해주면 된다. 딱히 의미가 있지는 않다.
mode인 경우에는 logback의 레벨이라고 생각하면 된다. INFO이면 INFO 이상(WARN, ERROR)이 보내지고 그 이하들은 보내지지 않는다.
하지만 이 경우에는 appender의 레벨보다 같거나 높아야 가능하다.
channelAttr과 KafkaAttr의 속성들은 flume 홈페이지에 자세히 나와있으니 그 것을 참고하면 되겠다.
테스트를 해보자.
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Test
public void appenderKafkaTest() {
logger.info("kafka1 sink test");
}
@Test
public void appenderKafkaError() {
try {
URL uri = new URL("urltest");
uri.openConnection();
} catch (MalformedURLException e) {
logger.error("url formed exception ", e);
} catch (IOException e) {
logger.error("error {} : ", e.toString());
}
}
위의 테스트는 info 테스트이고 그 아래 테스트는 에러 로그를 찍는 테스트이다. 실직적으로 테스트를 해보면 아래와 같이 kafka 컨슈머에 로그가 찍힌다.
23:11:34.385 [main] INFO m.w.appender.FlumeKafkaAppenderTest - kafka1 sink test
23:12:15.612 [main] ERROR m.w.appender.FlumeKafkaAppenderTest - url formed exception java.net.MalformedURLException: no protocol: urltest
at java.net.URL.<init>(URL.java:586)
at java.net.URL.<init>(URL.java:483)
at java.net.URL.<init>(URL.java:432)
at me.wonwoo.appender.FlumeKafkaAppenderTest.appenderKafkaError(FlumeKafkaAppenderTest.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
...
...
//이하 생략
길어서 조금 생략했다.
아주 간단하게 logback을 이용해서 kafka로 전달하는 코드를 만들어 봤다.
또한 일반 로그 말고도 json으로 보내는 로그도 만들었다.
logback.xml에 encoder을 아래와 같이 바꾸자.
<encoder class="me.wonwoo.encoding.DefaultFlumeMessageEncoder">
<layout class="me.wonwoo.layout.JsonLayout">
<jsonFormatter class="me.wonwoo.layout.JacksonJsonFormatter">
<prettyPrint>true</prettyPrint>
</jsonFormatter>
<includeContextName>false</includeContextName>
</layout>
</encoder>
속성은 아래와 같다.
timestamp, level, mdc, logger, message, raw-message, exception, context, lineNumber, hostName 등이 있다.
include 옵션으로 포함할지 하지 않을지 결정할 수 있다.
테스트를 다시 해보면 json으로 kafka에 전달되는 것을 볼 수 있다.
{
"timestamp" : "1466345834333",
"level" : "ERROR",
"thread" : "main",
"logger" : "me.wonwoo.appender.FlumeKafkaAppenderTest",
"message" : "url formed exception ",
"lineNumber" : "28",
"hostName" : "lee-ui-MacBook-Air.local",
"exception" : "java.net.MalformedURLException: no protocol: urltest\
\\tat java.net.URL.<init>(URL.java:586)\
\\tat java.net.URL.<init>(URL.java:483)\
\\tat java.net.URL.<init>(URL.java:432)\
\\tat me.wonwoo.appender.FlumeKafkaAppenderTest.appenderKafkaError(FlumeKafkaAppenderTest.java:25)\
\\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\
\\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
...
...
//생략
}
마찬가지로 길어서 생략 했다.
하다가 버그가 있었는데 web인 경우에 톰캣을 내리면 톰캣이 죽지 않고 계속 떠 있는 걸 확인했다.
원인을 찾다보니 flume의 channel이 web이 내려갈때 닫히지 않는 문제가 발생하여 logback 리스너를 등록 해야 한다.
메이븐에 아래와 같이 디펜더시를 받고 web.xml에는 리스너를 등록하자.
<dependency>
<groupId>org.logback-extensions</groupId>
<artifactId>logback-ext-spring</artifactId>
<version>0.1.4</version>
</dependency>
web.xml
<listener>
<listener-class>ch.qos.logback.ext.spring.web.LogbackConfigListener</listener-class>
</listener>
이렇게 하면 channel이 닫힌다.
근데 flume을 쓰는 이유는 파일로 트랜잭션 처리를 할 수 있어서 만약 kafka가 죽더라고 flume이 로그를 보내지 못하더라도 나중에 kafka에게 다시 보낼 수 있다는 것이다. 한번 테스트를 해보자.
일단 카프카를 죽이고 테스트를 다시 해보자. 테스트를 해보면 어플리케이션에는 딱히 문제가 없다.
그러나 kafka를 죽였으니 kafka에게는 전송이 안되어 있을 것이다. 그럼 다시 kafka를 실행시키고 다시 테스트를 해보자.
그럼 좀전에 kafka가 죽었을때 보냈던 로그까지 함께 출력 된다.
{
"timestamp" : "1466346165047",
"level" : "INFO",
"thread" : "main",
"logger" : "me.wonwoo.appender.FlumeKafkaAppenderTest",
"message" : "kafka1 sink test",
"lineNumber" : "19",
"hostName" : "lee-ui-MacBook-Air.local"
}
{
"timestamp" : "1466346221907",
"level" : "INFO",
"thread" : "main",
"logger" : "me.wonwoo.appender.FlumeKafkaAppenderTest",
"message" : "kafka1 sink test",
"lineNumber" : "19",
"hostName" : "lee-ui-MacBook-Air.local"
}
한번씩 잘 되나 해보자. 일단 현재까지는 버그가 없다고 판단되지만 아직 잘 모른다. 버그가 있다면 고쳐서 쓰길 권장한다.
참고로 kafka의 버전이 0.9.0.1이다. 몇일전에 0.10.0.0이 릴리즈 되었는데 내가 만들 당시에는 베타 버전이라 사용하지 않았다.
나중에 시간이 되면 0.10.0.0 버전도 함께 만들어서 해봐야겠다.
소스는
https://github.com/wonwoo/logback-flume-kafka-appender에 있다.
참고 하길 바란다.