Data

[1028 from 실무로 배우는 빅데이터 기술 By 김강원 16]EsperTech

esperTech

에스퍼 https://www.espertech.com/
실시간 적재 핵심기술.  룰엔진 라이브러리 
에스퍼만의 쿼리를 사용. 
ex) 과속로직/룰 을 디파인함. 
라이센스 GNU GPLv2 : https://www.jopenbusiness.com/mediawiki/Esper
유사프로젝트 Drools
주요
구성요소
Event 실시간 스트림으로 발생하는 데이터들의 특정 흐름 또는 패턴을 정의
EPL 유사 SQL을 기반으로 하는 이벤트 데이터 처리 스크립트 언어 ex) 과속의 룰 정의 
Input Adapter 소스로부터 전송되는 데이터를 처리하기 위한 어댑터 제공
CSV, Socker, JDBC, Http, 등
Output Adapter 타깃으로 전송하는 데이터를 처리하기 위한 어댑터 제공
HDFS, CSV, Socket, Email, Http, 등
Window 실시간 스트림 데이터로부터 특정 시간 또는 개수를 설정한 이벤트들을 메모리상에 등록한 후 EPL을 통해 결과를 추출. 
아키텍처








에스퍼 EPL
EPL 은 자주 변경됨 >>> 에스퍼 아키텍쳐 확장. 
활용방안 운행자 운행로그 400kb/1초
-> 플럼-> 카프카 -> 스톰
스톰 : spout -> Bolt(에스퍼엔진(Event Processing Language, Core Library)) -> Bolt
                                                                                                 -> Bolt -> 레디스
>> 운전자 운행로그 필터링
>> 과속 운행정보 이벤트 감지
카프카 스파우트
-> 에스퍼 볼트 
-> 레디스 볼트 
-> 레디스 캐시

public class EsperBolt extends BaseBasicBolt{
    private static final long serialVersionUID = 1L;
    private EPServiceProvider espService;      
    #esp 이벤트 스트립 프로세싱 = 에스퍼 서비스 런타임객체 
    private boolean isOverSpeedEvent = false;

    public void prepare(Map stormConf, TopologyContext context) {

        Configuration configuration = new Configuration();
        configuration.addEventType("DriverCarInfoBean", DriverCarInfoBean.class.getName());

        espService = EPServiceProviderManager.getDefaultProvider(configuration);
        espService.initialize();

        int avgOverSpeed = 80;
        int windowTime  = 30;

        String overSpeedEpl =  "SELECT date, carNumber, speedPedal, breakPedal, "
        + "steerAngle, directLight, speed , areaNumber "
        + " FROM DriverCarInfoBean.win:time_batch("+windowTime+" sec) "
        + " GROUP BY carNumber HAVING AVG(speed) > " + avgOverSpeed;

        EPStatement driverCarinfoStmt = espService.getEPAdministrator().createEPL(overSpeedEpl);

        driverCarinfoStmt.addListener((UpdateListener) new OverSpeedEventListener());
    }

    public void execute(Tuple tuple, BasicOutputCollector collector) {

        // TODO Auto-generated method stub
        String tValue = tuple.getString(0); 

        //발생일시(14자리), 차량번호, 가속페달, 브레이크페달, 운전대회적각, 방향지시등, 주행속도, 뮤직번호
        String[] receiveData = tValue.split("\\,");

        DriverCarInfoBean driverCarInfoBean =new DriverCarInfoBean();

        driverCarInfoBean.setDate(receiveData[0]);
        driverCarInfoBean.setCarNumber(receiveData[1]);
        driverCarInfoBean.setSpeedPedal(receiveData[2]);
        driverCarInfoBean.setBreakPedal(receiveData[3]);
        driverCarInfoBean.setSteerAngle(receiveData[4]);
        driverCarInfoBean.setDirectLight(receiveData[5]);
        driverCarInfoBean.setSpeed(Integer.parseInt(receiveData[6]));
        driverCarInfoBean.setAreaNumber(receiveData[7]);

        espService.getEPRuntime().sendEvent(driverCarInfoBean); 

        if(isOverSpeedEvent) {
        //발생일시(14자리), 차량번호
        collector.emit(new Values( driverCarInfoBean.getDate().substring(0,8), 
        driverCarInfoBean.getCarNumber()+"-"+driverCarInfoBean.getDate()));
        isOverSpeedEvent = false;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
        declarer.declare(new Fields("date", "car_number"));
    }

    private class OverSpeedEventListener implements UpdateListener
    {
        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents) {
            if (newEvents != null) {
            try {
                isOverSpeedEvent = true;
            } catch (Exception e) {
                System.out.println("Failed to Listener Update" + e);
            } 
         }
     }
}

}