에스퍼 | https://www.espertech.com/ 실시간 적재 핵심기술. 룰엔진 라이브러리 에스퍼만의 쿼리를 사용. ex) 과속로직/룰 을 디파인함. |
|
라이센스 | GNU GPLv2 : https://www.jopenbusiness.com/mediawiki/Esper | |
유사프로젝트 | Drools | |
주요 구성요소 |
Event | 실시간 스트림으로 발생하는 데이터들의 특정 흐름 또는 패턴을 정의 |
EPL | 유사 SQL을 기반으로 하는 이벤트 데이터 처리 스크립트 언어 ex) 과속의 룰 정의 | |
Input Adapter | 소스로부터 전송되는 데이터를 처리하기 위한 어댑터 제공 CSV, Socker, JDBC, Http, 등 |
|
Output Adapter | 타깃으로 전송하는 데이터를 처리하기 위한 어댑터 제공 HDFS, CSV, Socket, Email, Http, 등 |
|
Window | 실시간 스트림 데이터로부터 특정 시간 또는 개수를 설정한 이벤트들을 메모리상에 등록한 후 EPL을 통해 결과를 추출. | |
아키텍처 | ||
![]() ![]() 에스퍼 EPL EPL 은 자주 변경됨 >>> 에스퍼 아키텍쳐 확장. |
||
활용방안 | 운행자 운행로그 400kb/1초 -> 플럼-> 카프카 -> 스톰 스톰 : spout -> Bolt(에스퍼엔진(Event Processing Language, Core Library)) -> Bolt -> Bolt -> 레디스 >> 운전자 운행로그 필터링 >> 과속 운행정보 이벤트 감지 |
|
카프카 스파우트 -> 에스퍼 볼트 -> 레디스 볼트 -> 레디스 캐시 |
public class EsperBolt extends BaseBasicBolt{ private static final long serialVersionUID = 1L; private EPServiceProvider espService; #esp 이벤트 스트립 프로세싱 = 에스퍼 서비스 런타임객체 private boolean isOverSpeedEvent = false; public void prepare(Map stormConf, TopologyContext context) { Configuration configuration = new Configuration(); configuration.addEventType("DriverCarInfoBean", DriverCarInfoBean.class.getName()); espService = EPServiceProviderManager.getDefaultProvider(configuration); espService.initialize(); int avgOverSpeed = 80; int windowTime = 30; String overSpeedEpl = "SELECT date, carNumber, speedPedal, breakPedal, " + "steerAngle, directLight, speed , areaNumber " + " FROM DriverCarInfoBean.win:time_batch("+windowTime+" sec) " + " GROUP BY carNumber HAVING AVG(speed) > " + avgOverSpeed; EPStatement driverCarinfoStmt = espService.getEPAdministrator().createEPL(overSpeedEpl); driverCarinfoStmt.addListener((UpdateListener) new OverSpeedEventListener()); } public void execute(Tuple tuple, BasicOutputCollector collector) { // TODO Auto-generated method stub String tValue = tuple.getString(0); //발생일시(14자리), 차량번호, 가속페달, 브레이크페달, 운전대회적각, 방향지시등, 주행속도, 뮤직번호 String[] receiveData = tValue.split("\\,"); DriverCarInfoBean driverCarInfoBean =new DriverCarInfoBean(); driverCarInfoBean.setDate(receiveData[0]); driverCarInfoBean.setCarNumber(receiveData[1]); driverCarInfoBean.setSpeedPedal(receiveData[2]); driverCarInfoBean.setBreakPedal(receiveData[3]); driverCarInfoBean.setSteerAngle(receiveData[4]); driverCarInfoBean.setDirectLight(receiveData[5]); driverCarInfoBean.setSpeed(Integer.parseInt(receiveData[6])); driverCarInfoBean.setAreaNumber(receiveData[7]); espService.getEPRuntime().sendEvent(driverCarInfoBean); if(isOverSpeedEvent) { //발생일시(14자리), 차량번호 collector.emit(new Values( driverCarInfoBean.getDate().substring(0,8), driverCarInfoBean.getCarNumber()+"-"+driverCarInfoBean.getDate())); isOverSpeedEvent = false; } } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("date", "car_number")); } private class OverSpeedEventListener implements UpdateListener { @Override public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { try { isOverSpeedEvent = true; } catch (Exception e) { System.out.println("Failed to Listener Update" + e); } } } } } |
|
'Data' 카테고리의 다른 글
[1028 SQL] (0) | 2022.04.01 |
---|---|
[1028 데이터베이스] 데이터베이스 기본 작성중 (0) | 2021.08.16 |
[1028 from 실무로 배우는 빅데이터 기술 By 김강원 15]Storm (0) | 2021.05.27 |
[1028 from 실무로 배우는 빅데이터 기술 By 김강원 14]redis (0) | 2021.05.27 |
[1028 from 실무로 배우는 빅데이터 기술 By 김강원 13]HBase (0) | 2021.05.27 |