Data

[1028 from 실무로 배우는 빅데이터 기술 By 김강원 15]Storm

STORM http://storm.apache.org/
대규모 메시지성 데이터 발생
-> 카프카(토픽: 버퍼링 _브로커 한계 전에 이동필요)
-> 스톰(카프카 컨슈머 역할) 1. 분산병렬위치, 영구저장소에 빠른 적재 2. 전처리, 집계, 분석
-> HDFS or NoSQL or RDBMS or Radis(캐시)
라이센스 Apache
유사프로젝트 Samza, S4, Akka, Spark Stram (마이크로배치: 배치처리짧게한것 요즘대세,  스톰은 리얼타임)
주요
구성요소
Spout (이번엔. 카프카 spout)
외부로부터 데이터를 유입받아 가공, 처리해서 튜플을 생성, 이휴 해당 튜플을 Bolt에 전송
Bolt 튜플을 받아 실제 분산 작업을 수행하며, 필터링(Filtering), 집계(Aggregation),조인(Join), 등의 연산 병렬로 실행. 
Topology Spout-Bolt의 데이터 처리 흐름을 정의, 하나의 Spout와 다수의Bolt로 구성. 
Nimbus Topology를 Supervisor에 배포하고 작업을 할당, Supervisor를 모니터링하다 필요시 페일오버(Fail-Over)처리. 
Supervisor Topology를 실행할 Worker를 구동시키며 Topology를 Worker에 할당 및관리. 
Worker Supervisor 상에서 실행중인 자바 프로세스로 spout와 Bolt를 실행
Executor Worker 내에서 실행되는 자바 쓰레드
Task Spout및 Bolt 객체가 할당. 
아키텍처

활용방안 운전자 운행로그 400kb/1초
-> 플럼-> 카프카 -> 스톰
스톰: ->spout->bolt-> bolt -> HBase
                          -> bolt (과속여부)-> 레디스 : . 

>> 실시간 운행 로그 데이터 병렬 처리
>> 실시간 운행 로그 데이터 분석 결과 처리. 
설치

1.설치파일 다운 설치
서버 02
/home/pilot-pjt
wget http://archive.apache.org/dist/storm/apache-storm-1.2.3/apache-storm-1.2.3.tar.gz 
tar -xvf apache-storm-1.2.3.tar.gz
ln -s apache-storm1.2.3 storm

2. 설정 수정. 
cd /home/pilot-pjt/storm/conf
vi storm.yaml
# 주키퍼 정보
storm.zookeeper.servers:
 -"server02.hadoop.com"
# 스톰 작동하는데 필요한 데이터 저장소
storm.local.dir: "/home/pilot-pjt/storm/data"
# 스톰의 Nimbus 정보
nimbus.seeds: ["server02.hadoop.com"]
# Worker의 포트 포트 갯수 만큼 worker가 만들어진다. 
supervisor.slots.ports:
 -6700
# 스톰 UI 접속포트 설정. 
ui.port: 8088

3. 스톰 로그 레벨 조정 대규모 트랜잭션 데이터 유입 과도로그 오버해드 발생 때문에 수정. 
info 기본값,  -> ERROR
cd /home/pilot-pjt/storm/log4j2
vi cluster.xml
    #<Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="info"
    <Logger name="org.apache.storm.logging.filters.AccessLoggingFilter" level="ERROR" 
    # <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="info" 
    <Logger name="org.apache.storm.logging.ThriftAccessLogger" level="ERROR"
    #<Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info"   <Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="ERROR"      # <root level="info"> <!-- We log everything -->
    <root level="ERROR"> <!-- We log everything -->

vi worker.xml
    #<Logger name="STDERR" level="INFO">
    <Logger name="STDERR" level="ERROR">
    </Logger>
     #<Logger name="STDOUT" level="INFO">
    <Logger name="STDOUT" level="ERROR">

4. 스톰 명령 편리하게 사용하기 위해 root 계정의 프로파일에 스톰패스 설정. 
vi /root/.bash_profile
PATH=$PATH:/home/pilot-pjt/storm/bin


수정한 root 계정의 프로파일 정보를 다시 읽어온다. 
source /root/.bash_profile

* 리눅스 명령어 안먹을떄
PATH=/usr/local/bin:/bin:/usr/bin

5. java -version 1.8 아니면 1.8로 변경
rm /usr/bin/java
rm /usr/bin/javac
ln -s /usr/java/jdk1.8.0.181-cloudera/bin/javac /user/bin/javac
ln -s /usr/java/jdk1.8.0.181-cloudera/bin/java /user/bin/java

6. 리눅스 재시작할때 스톰 자동 실행되도록 설정. 
총 3개 스톰 서비스 : Nimbus, Supervisor, UI
스크립트 출처 : https://gist.github.com/yulrizka

#!/bin/bash
# /etc/init.d/storm-nimbus
# Startup script for storm-nimbus
# chkconfig: 2345 20 80
# description: Starts and stops storm-nimbus
#. /etc/initd/functions

7. 스톰 서비스 스크립트 파일  2번서버 업로드. 
/etc/rc.d/init.d

8. 업로드 세 파일 권한 변경.  755 , log, pid 디렉터리 만들어줌
mkdir /var/log/storm
mkdir /var/run/storm

9. service/chkconfig 등록 명령 각각 실행
service storm-nimbus start
service storm-supervisor start
service storm-ui start

10 정상 구동확인
service storm-nimbus status
service storm-supervisor status
service storm-ui status

11. server02.hadoop.com:8088 모니터링. 

** service 명령어는 sbin 에있음. 
root 계정이 잘 들어가지지 않으면(일반계정접속후 root로그인) service 없다고 나옴. 
su -root 로 들어가서 사용. 

스톰 프로세스 점검 스톰도 주키퍼에 의존도가 높다. 
주키퍼의 Z노드인 /storm의 위치에 스톰의 주요 설정값이 관리되고 있기 때문이다. 
그래서 주키퍼가 작동되지 않은 상태에서 스톰을 실행하면 주키퍼 접속 에러가 발생, 
스톰의 서비스 실패로 이어진다. 실제 환경이라면 주키퍼가 365일 24시간 작동. 
파일럿 환경은 주의 필요. 가상머신 시작, 재시작후 반드시 주키퍼 상태 확인 필요. 
스톰 
= 카프카 스카우트 병렬처리
+ 스플릿 볼트 병렬처리 
+(
카프카스파우트
-> 스플릿볼트
-> HBase볼트
-> HBase
)에러가 나면 카프카까지 롤백기능
TupleTableConfig hTableConfig = new TupleTableConfig("DriverCarInfo", "r_key");
hTableConfig.setZkQuorum("server02.hadoop.com");
hTableConfig.setZkClientPort("2181");
hTableConfig.setBatch(false);
hTableConfig.addColumn("cf1", "date");
hTableConfig.addColumn("cf1", "car_number");
hTableConfig.addColumn("cf1", "speed_pedal");

HBaseBolt hbaseBolt = new HBaseBolt(hTableConfig);
DriverCarTopologyBuilder.setBolt("HBASE", hbaseBolt,1).shuffleGrouping("splitBolt");
# 1 >> HBase 병렬처리.