반응형
안녕하세요. 남산돈가스입니다.
지난 포스팅에 이어서 ELK와 MySQL 연동을 진행해보겠습니다.
지난 시간까지 ELK 중 Elastic Search와 Kibana 까지만 설치를 하였는데
이제 Logstash를 설치하고 MySQL에 연동해보겠습니다.
먼저 MySQL 연동 시 조회해야 할 데이터를 임의로 넣기 위해서 지난 시간에 Docker로 올렸던 MySQL 접속정보로 Workbench에 접속합니다.
최초 접속 시 생성 된 Schema가 없기 때문에 새로운 Schema를 생성해야 합니다.
ELK_TEST라는 스키마를 생성하고 ELK_TEST에 person이라는 테이블을 생성합니다.
테이블 컬럼으로는 id, name, gender, birth 이렇게 4가지의 기본 컬럼을 추가합니다.
테스트 데이터를 insert 하고 조회하여 데이터가 정상적으로 입력되었는지 확인합니다.
8개 데이터가 입력 된 것을 확인했습니다. 그럼 이제 logstash 설치로 다시 돌아가봅시다.
터미널에서 docker 명령어를 이용하여 logstash를 설치하겠습니다.
$ docker pull logstash
Using default tag: latest
latest: Pulling from library/logstash
0bd44ff9c2cf: Already exists
047670ddbd2a: Already exists
ea7d5dc89438: Already exists
4a05570971bb: Already exists
66f679cd5859: Already exists
89362eaac850: Already exists
d76c23323cb4: Already exists
f7a113d2d566: Already exists
cb2dece5a7e2: Pull complete
8cf5a699244c: Pull complete
c275eeaebd29: Pull complete
159e9a1395db: Pull complete
01be31fa2906: Pull complete
9b5ca62233e3: Pull complete
39280aea2b61: Pull complete
6bac8e5eed53: Pull complete
Digest: sha256:edd49321633aff49b0f7d65f0f369b5555b467b1d8b7a29c629802beb9f0a68f
Status: Downloaded newer image for logstash:latest
Docker Pull을 이용하여 logstash 이미지를 내려받았습니다.
logstash를 구동하기 전에 mysql 연동을 위하여 필요한 설정들이 있습니다.
먼저 Logstash - MySQL을 연동하기 위해선 jdbc 드라이버가 있어야하는데요.
https://dev.mysql.com/downloads/connector/j/5.1.html 에서 다운받으실 수 있습니다.
로컬환경에 MySQL-Connector 파일을 다운받은 뒤 폴더를 하나 생성하여 해당 파일을 이동시켜 놓습니다. 그 다음 해당 폴더로 이동하여 텍스트 편집기나 터미널환경에선 vim을 이용하여 logstash.conf 라는 파일을 생성합니다.
input {
jdbc {
jdbc_driver_library => "/config-dir/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://DB접속주소:3306/스키마명?useUnicode=true&characterEncoding=utf8"
jdbc_user => "DB사용자명"
jdbc_password => "비밀번호"
statement => "select * from person" # 실행할 쿼리문
jdbc_pool_timeout => 10 #jdbc 접속 TimeOut 설정
jdbc_paging_enabled => true
jdbc_page_size => 10000
schedule => "* * * * *" # crontab 표기법의 스케쥴 설정
}
}
output {
elasticsearch {
hosts => ["192.168.2.153:9200"] # 결과값을 입력받을 elasticsearch 주소
index => "elk_test" # index명
document_type => "person" # document type 명
}
stdout {
codec => rubydebug
}
}
생성한 파일에 위와 같이 입력합니다. 위 형식은 크게 input 과 output으로 나뉘어져 있는데, input은 logstash가 입력받는 내용을 설정하는 것이고, output은 input에서 받은 내용을 처리하는 부분을 설정하는 곳입니다.
우리의 목적은 logstash가 jdbc connection으로 설정한 DB 접속정보로 접속하여 schedule에 맞게 입력한 쿼리문을 실행하여 해당 데이터를 elasticsearch에 저장하는 것입니다.
그러므로 input에서 DB접속정보들을 설정하였고, output에선 elasticsearch의 접속정보, 저장할 index, document_type 등을 설정합니다.
위와 같이 설정한 뒤, 이제 docker 환경에서 logstash를 구동해보겠습니다.
명령어는 다음과 같습니다.
$ docker run -it --rm -v "$PWD":/config-dir logstash logstash -f /config-dir/logstash.conf
간단히 설명하자면, -it를 이용하여 logstash 내의 명령어를 입력할 수 있고, --rm으로 프로세스를 종료하면 docker가 바로 삭제되도록 설정했습니다. 그리고 -v 옵션은 로컬의 경로를 docker 프로세스의 경로로 마운트하는 명령어입니다. 추가적으로 -v 뒤에 "$PWD":/config-dir 라고 입력하면, 로컬의 터미널창에서 현재 디렉토리를 의미하는 PWD의 경로를 logstash docker의 /config-dir 에 마운트한다는 의미입니다.
이 명령을 추가한 이유는 제가 방금 로컬에서 작성한 logstash.conf와 mysql-connector 파일을 실제 logstash환경에서 사용해야하기 때문입니다. 그 다음으로 logstash -f /config-dir/logstash.conf 명령어를 로컬에서 -it명령을 통해서 직접 실행하였습니다.
위 명령대로 실행하면 logstash는 logstash.conf의 설정정보를 가지고 logstash를 구동하게 됩니다.
그 결과를 보시면,
gimseongsin@gimseongsin-ui-MacBook-Pro:~/Desktop/IBK시스템/교육/ElasticSearch/data:> docker run -it --rm -v "$PWD":/config-dir logstash logstash -f /config-dir/logstash.conf
Sending Logstash's logs to /var/log/logstash which is now configured via log4j2.properties
00:44:19.929 [main] INFO logstash.modules.scaffold - Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
00:44:19.938 [main] INFO logstash.modules.scaffold - Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
00:44:19.975 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.queue", :path=>"/var/lib/logstash/queue"}
00:44:19.986 [main] INFO logstash.setting.writabledirectory - Creating directory {:setting=>"path.dead_letter_queue", :path=>"/var/lib/logstash/dead_letter_queue"}
00:44:20.101 [LogStash::Runner] INFO logstash.agent - No persistent UUID file found. Generating new UUID {:uuid=>"5919ee36-50e3-4112-ba46-661951d6f4f3", :path=>"/var/lib/logstash/uuid"}
00:44:21.276 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://192.168.2.153:9200/]}}
00:44:21.278 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://192.168.2.153:9200/, :path=>"/"}
00:44:21.440 [[main]-pipeline-manager] WARN logstash.outputs.elasticsearch - Restored connection to ES instance {:url=>"http://192.168.2.153:9200/"}
00:44:21.898 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - Using mapping template from {:path=>nil}
00:44:21.903 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
00:44:21.940 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - Installing elasticsearch template to _template/logstash
00:44:22.528 [[main]-pipeline-manager] INFO logstash.outputs.elasticsearch - New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//192.168.2.153:9200"]}
00:44:22.534 [[main]-pipeline-manager] INFO logstash.pipeline - Starting pipeline {"id"=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>250}
00:44:22.726 [[main]-pipeline-manager] INFO logstash.pipeline - Pipeline main started
00:44:22.869 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
Thu Jul 12 00:45:00 UTC 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
00:45:01.377 [Ruby-0-Thread-16: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO logstash.inputs.jdbc - (0.012000s) SELECT version()
00:45:01.384 [Ruby-0-Thread-16: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO logstash.inputs.jdbc - (0.002000s) SELECT version()
00:45:01.438 [Ruby-0-Thread-16: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO logstash.inputs.jdbc - (0.051000s) SELECT count(*) AS `count` FROM (select * from person) AS `t1` LIMIT 1
00:45:01.461 [Ruby-0-Thread-16: /usr/share/logstash/vendor/bundle/jruby/1.9/gems/rufus-scheduler-3.0.9/lib/rufus/scheduler/jobs.rb:283] INFO logstash.inputs.jdbc - (0.009000s) SELECT * FROM (select * from person) AS `t1` LIMIT 10000 OFFSET 0
{
"name" => "KIM",
"@version" => "1",
"birth" => "19920323",
"id" => 1,
"@timestamp" => 2018-07-12T00:45:01.532Z,
"gender" => "male"
}
{
"name" => "LEE",
"@version" => "1",
"birth" => "19900225",
"id" => 2,
"@timestamp" => 2018-07-12T00:45:01.535Z,
"gender" => "female"
}
{
"name" => "PARK",
"@version" => "1",
"birth" => "19980313",
"id" => 3,
"@timestamp" => 2018-07-12T00:45:01.539Z,
"gender" => "male"
}
{
"name" => "CHOI",
"@version" => "1",
"birth" => "19780523",
"id" => 4,
"@timestamp" => 2018-07-12T00:45:01.544Z,
"gender" => "female"
}
{
"name" => "KANG",
"@version" => "1",
"birth" => "19821002",
"id" => 5,
"@timestamp" => 2018-07-12T00:45:01.551Z,
"gender" => "female"
}
{
"name" => "AHN",
"@version" => "1",
"birth" => "19850502",
"id" => 6,
"@timestamp" => 2018-07-12T00:45:01.552Z,
"gender" => "male"
}
{
"name" => "YOO",
"@version" => "1",
"birth" => "19920708",
"id" => 7,
"@timestamp" => 2018-07-12T00:45:01.553Z,
"gender" => "male"
}
{
"name" => "SON",
"@version" => "1",
"birth" => "19940706",
"id" => 8,
"@timestamp" => 2018-07-12T00:45:01.555Z,
"gender" => "male"
}
logstash가 실행되고, 1분 뒤에 제가 logstash.conf 파일에서 설정한 쿼리인 select * from person 를 실행하고 그 결과를 출력하게 됩니다. 또 동시에 이 출력 결과는 output으로 설정한 elasticsearch에 입력되게 됩니다.
실제로 elasticsearch에 저 조회 데이터가 입력되었는지 확인하기 위해 kibana에 접속하여 확인해봅니다.
Kibana에 접속해보니 해당 데이터가 입력되어 있습니다.
logstash.conf에 schedule로 * * * * * * 로 입력되어 있는데, 이것은 crontab 표기법으로 1분에 한번 씩 설정한 쿼리를 실행하고 데이터를 저장한다는 것을 의미합니다.
이렇게 mysql과 연동하여 주기적인 데이터를 수집할 수 있게 되었습니다.
감사합니다.