Data

[1028 from 실무로 배우는 빅데이터 기술 By 김강원 09]적재-대용량로그파일

강의 개요 1. 빅데이터 적재 개요_ 빅데이터 대용량 파일 적재의 기본 개념 설명
2. 빅데이터 적재에 활용하는 기술 _ 하둡, 주키퍼에 대한 소개 , 기술별 주요 기능, 아키텍처 활용방안
3. 적재 파일럿 실행 1단계 _ 적재 아키텍처  : 로그파일 적재와 관련한 요구사항 구체화, 파일럿 아키 설명
4. 적재 파일럿 실행 2단계 _ 적재 환경구성 : 스마트카 로그 파일 적재 아키텍처를 설치 및 환경구성
5. 적재 파일럿 실행 3단계 _ 적재 기능구현 : 플럼이용해 스마트카 생태정보 르그파일 하둡적제기능구현
6. 적재 파일럿 실행 4단계 _ 적재 기능 테스트 : 로그 시뮬레이터 이용해 스마트카 상태 정보 데이터 발생, 플럼이 해당 데이터를 HDFS정상적재 확인
적재 유형 수집 -> 적재(배치성) -> (처리 /탐색 -> 분석/응용) : 배치성.
      -> 적재(실시간성)-> 

적재- 대용량 : 배치성 처리
적재- 메시지 : 실시간성 처리.  
적재 저장소 유형 1. 내/외부 원천데이터
정형 데이터 : 데이터베이스
                 (관계/계층/객체/네트워크)
반정형 데이터 : HTML, XML, JSON, 서버로그
비정형 데이터 : 소셜미디어, 문서, 이미지, 오디어 , 비디오 , IOT

-------------->           
배치수집
실시간 수집
--------------> 
2, 적재 저장소 유형. 
- 배치처리 : 큰파일
대용량 파일 전체를 영구 저장 - 분산파일 시스템
- 실시간처리 : 작은메시지
대규모 메시지 전체를 영구저장 -No-SQL  :Hbase, 카산드라, 몽고DB
대규모 메시지 전체를 버퍼링처리 - MoM : 카프카 
대규모 데이터 일부만 임시저장 - Cached : 인메모리 캐시시스템 : 레디스.

에코시스템 : 생태계 

 

적재 요구사항 1. 차량의 다양한 장치로부터 발생하는 로그 파일을 수집해서 기능별 상태를 점검한다. 
>> 배치
2. 운전자의 운행정보가 담긴 로그를 실시간으로 수집해서 주행패턴을 분석한다 
>> 운전자 운행로그. 이벤트로그 실시간. 
적재 요구사항 
구체화
1. 100대에 달하는 스카트카들의 상태정보가 일단위로 취합
>> 플럼에서 수집 발생시점의 날짜를 HdfsSink에 전달해서 해당 날짜 단위로 적재
logSink -> HdfsSink
2. 매일 100 대의 스마트카 상태 정보는 약 100MB정도이며 220만건의 상태정보 발생
>> 1년 적재시 8억건 이상의 데이터가 적재되며, 연 단위분석에 하둡의 분산병렬처리사용. 
3. 스마트카의 상태 정보 데이터의 발생일과 수집/적재되는 날짜가 다를수 있다. 
>> 수집/적재되는 모든 데이터마다 데이터 발생일 외에 수집/적재 처리돼야 하는 처리일 추가. 
4. 적재된 스마트카들의 상태 정보를 일/월/년 단위로 분석할수 있어야 한다. 
>> HDFS에 수집 일자별로 디렉터리 경로를 만들어 적재.
5. 적재 및 생성되는 파일은 HDFS의 특성을 잘 고려해야 한다. 
>> 플럼의 HdfsSink의 옵션을 파일럿 프로젝트의 HDFS에 최적화 해서 설정
6. 적재과 완료된 후에는 원천 파일이 삭제되어야 한다. 
>> 플럼Source 컴포넌트 중 SpoolDir DeletePolicy 옵션을 활용.  
적재 아키텍처


차량상태정보 100MB/1일
-> 플럼에이전트(SpoolDir Source -> Memory Cannel-> HDFS Sink)
-> 일단위 적재 -> 하둡

하둡:
-NameNode
-DataNode
- HDFS : 경로   ... 일별분석, 주, 워별분석.. 년별분석
-맵리듀스
 
하둡 설치 확인
CM> HDFS 상세 > DataNode > DataNode웹ui
URL : http://server01.hadoop.com:9870/dfshealth.html
CM> YARN 상세 > 웹ui > 리소스메니저
리소스매니저 :  http://server01.hadoop.com:8088/cluster
CM> YARN 상세 > 웹ui > 히스토리
잡 히스토리 : http://server01.hadoop.com:19888/jobhistory

cf_ http://server01.hadoop.com:7180/  클라우데라 매니저. 
적재기능구현
-스마트카
에이전트수정. 
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_HdfsSink DriverCarInfo_KafkaSink


SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000


SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = timeInterceptor typeInterceptor collectDayInterceptor filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.type = timestamp
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.timeInterceptor.preserveExisting = true

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.type = static
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.key = logType
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.typeInterceptor.value = car-batch-log

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.collectDayInterceptor.type = com.wikibook.bigdata.smartcar.flume.CollectDayInterceptor$Builder

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false

SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.type = hdfs
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.path = /pilot-pjt/collect/%{logType}/wrk_date=%Y%m%d
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.filePrefix = %{logType}
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileSuffix = .log
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.fileType = DataStream
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.writeFormat = Text
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.batchSize = 10000
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollInterval = 0
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollCount = 0
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.idleTimeout = 100
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.callTimeout = 600000
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.rollSize = 67108864
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.hdfs.threadsPoolSize = 10


SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_HdfsSink.channel = SmartCarInfo_Channel


SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000


SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000


SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
플럼의 사용자정의 Interceptor 추가 /opt/cludera/parcels/CDH/lib/flume-ng/lib  파일업로드. bigdata.smartcar.flume-1.0.jar
  0. 시뮬레이터 실행. 
cd /home/pilot-pjt/working
$ java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.CarLogMain 20160102 100 &
$ tail -f /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20160102.txt

-이번엔통과
$ java -cp bigdata.smartcar.loggen-1.0.jar com.wikibook.bigdata.smartcar.loggen.DriverLogMain 20160102 3 &
$ tail -f /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log

1. 일간 로그파일을  플럼 SmartCarInfo 에이전트의 SpoolDir 경로로 옮긴다. 
 SpoolDir (car-batch-log) -> Channel -> HDFS Sink -> HDFS
$ mv /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20160102.txt /home/pilot-pjt/working/car-batch-log/

--안되서.ㅜㅜ cp로... 필요없음 통과. 
cp /home/pilot-pjt/working/SmartCar/SmartCarStatusInfo_20160102.txt /home/pilot-pjt/working/car-batch-log/
$cd /home/pilot-pjt/working/car-batch-log/
$ls -ltr

3. 플럼로그 정상확인
$tail -f /var/log/flume-ng/flume-cmf-flume-AGENT-server02.hadoop.com.log


2021-05-27 13:14:27,620 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log.tmp
2021-05-27 13:14:55,557 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2021-05-27 13:14:55,557 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to delete file /home/pilot-pjt/working/car-batch-log/SmartCarStatusInfo_20160102.txt
2021-05-27 13:16:40,491 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing idle bucketWriter /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log.tmp at 1622089000491
2021-05-27 13:16:40,491 INFO org.apache.flume.sink.hdfs.HDFSEventSink: Writer callback called.
2021-05-27 13:16:40,491 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log.tmp
2021-05-27 13:16:40,516 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log.tmp to /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log


Creating /pilot-pjt/ ~~  : 적재중
BucketWriter: Closing /pilot-pjt/ ~~ 
BucketWriter: Renaming /pilot-pjt/coll  ~~  :: HDFS 적재 성공적으로 끝남. 

4. hdfs 적재 확인
$hdfs dfs -ls -R /pilot-pjt/collect/car-batch-log

[root@server02 working]# hdfs dfs -ls -R /pilot-pjt/collect/car-batch-log
drwxr-xr-x   - flume supergroup          0 2021-05-27 13:27 /pilot-pjt/collect/car-batch-log/wrk_date=20210527
-rw-r--r--   1 flume supergroup   68303437 2021-05-27 13:14 /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824228.log
-rw-r--r--   1 flume supergroup   55195294 2021-05-27 13:16 /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824229.log
$hdfs dfs -cat /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824228.log
$hdfs dfs -tail /pilot-pjt/collect/car-batch-log/wrk_date=20210527/car-batch-log.1622088824228.log

플럼넣기전 :  /home/pilot-pjt/working/SmartCar/, /home/pilot-pjt/working/car-batch-log/
-rw-r--r-- 1 root root 104070631 May 27 13:11 SmartCarStatusInfo_20160102.txt
20160102000100,R0001,94,88,96,98,1,1,1,1,B,B,92
20160102000104,R0001,82,90,79,82,1,1,1,1,A,A,73
20160102000108,R0001,88,93,89,95,1,1,1,1,A,A,93
hdfs에서 : ",20210527" 끝에 작업일자 추가됨. 플럼의 인터셉터가 붙임.  104070631->64M 1개, +나머지
20160102000100,R0001,94,88,96,98,1,1,1,1,B,B,92,20210527
20160102000104,R0001,82,90,79,82,1,1,1,1,A,A,73,20210527
20160102000108,R0001,88,93,89,95,1,1,1,1,A,A,93,20210527

5. 시뮬레이터 종료. 

마치며.  1. 빅데이터 적재 개요
2. 빅데이터 적재에 활용하는 기술 : 주요기능, 이키텍처, 활용방안
3. 적재 파일럿 실행 1단계 - 적재 아키텍쳐
4. 적재 파일럿 실행 2단계 - 적재 환경구성. 
5. 적재 파일럿 실행 3단계 - 적재 기능 구현
6. 적재 파일럿 실행 4단계 - 적재 기능 테스트

 ps -ef |grep smartcar.log

 

 

https://flume.apache.org/FlumeUserGuide.html

channel  
type The component type name, needs to be hdfs
hdfs.path HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix Suffix to append to file (eg .avro - NOTE: period is not automatically added)
hdfs.inUsePrefix Prefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmp Suffix that is used for temporal files that flume actively writes into
hdfs.emptyInUseSuffix false If false an hdfs.inUseSuffix is used while writing the output. After closing the output hdfs.inUseSuffix is removed from the output file name. If true the hdfs.inUseSuffix parameter is ignored an empty string is used instead.
hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout 0 Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.codeC Compression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileType SequenceFile File format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicas Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormat Writable Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise those files cannot be read by either Apache Impala (incubating) or Apache Hive.
hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab Kerberos keytab for accessing secure HDFS
hdfs.proxyUser    
hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnit second The unit of the round down value - second, minute or hour.
hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles.
hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
hdfs.closeTries 0 Number of times the sink must try renaming a file, after initiating a close attempt. If set to 1, this sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. If set to 0, the sink will try to rename the file until the file is eventually renamed (there is no limit on the number of times it would try). The file may still remain open if the close call fails but the data will be intact and in this case, the file will be closed only after a Flume restart.
hdfs.retryInterval 180 Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, the sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializer TEXT Other possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
serializer.*    

크러스터 시작시 : 주키퍼 > 카프카> HDFS > Flume > YARN 순으로 기동.