Data

[1028 from 실무로 배우는 빅데이터 기술 By 김강원 07]수집- 플럼

Error1

현상 2021-05-25 22:16:06,356 ERROR org.apache.flume.lifecycle.LifecycleSupervisor: Unable to start EventDrivenSourceRunner: { source:Spool Directory source SmartCarInfo_SpoolSource: { spoolDir: /home/pilot-pjt/working/car-batch-log } } - Exception follows.
org.apache.flume.FlumeException: Unable to read and modify files in the spooling directory: /home/pilot-pjt/working/car-batch-log
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:195)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:89)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader$Builder.build(ReliableSpoolingFileEventReader.java:882)

해결 폴더 권한 수정

>> SpoolDir 작동할때 : 읽고 쓰는 권한이 필요. root를 사용하지 않음. 
cd /home/pilot-pjt/
chmod 777 -R  working/

Error2

현상 재기동후
환경설정도 문제없는데...  CPU 


2021-05-27 12:42:14,419 WARN org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds
2021-05-27 12:42:14,675 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Last read was never committed - resetting mark position.
2021-05-27 12:43:11,456 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2021-05-27 12:43:11,457 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source SmartCarInfo_SpoolSource: { spoolDir: /home/pilot-pjt/working/car-batch-log }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File has been modified since being read: /home/pilot-pjt/working/car-batch-log/SmartCarStatusInfo_20160102.txt


해결 파일이.. 다 생성되기 전에. 옮기니까 발생... 

 

 


플럼 flume.apache.org
: 원천데이터 수집(파일, DB, API) -> 플럼-> 하둡 적재
                                        Source    Sink
라이센스 Apache 2.0
유사 프로젝트 Fluented, Scribe, logstash, Chukwa, NiFI, Embulk 
주요
구성요소
****Source 다양한 원천 시스템의 데이터 수집위해  Avro, Thrift, JMS, Spool Dir , Kafka, 등 여러 컴포넌트 제공, 수집한 데이터 Channel로 전달
****Sink 수집한 데이터를 Channel로 부터 전달받아 최종 목적지에 저장하기 위한 기능으로 HDFS, Hive, Logger, Avro, ElasticSearch Thrift등을 제공. 
***Channel Source와 Sink를 연결 데이터를 버퍼링하는 컴포넌트로 메모리,파일, 데이터베이스 채널 의 저장소로 활용. 
Interceptor Source와 Channel사이에 데이터 필터링 및 가공하는 컴포넌트로서 Timestamp, Host, Regex, Filttering 등을 기본 제공하며, 필요 시 사용자 정의 Interceptor 추가. 
Agent Source-> (Interceptor) -> Channel-> Sink 컴포넌트 순으로 구성된 작업 단위, 독립된 인스턴스로 생. 
아키텍처 유형1 Agent (Source -> Channel -> Sink )
아키텍처 유형2 Agent
(Source -> Channel -> Sink : DB
Source -> Interceptor -> Channel -> Sink : 하둡
                                             -> Sink : DB )
아키텍처 유형3
병렬처리
(라우팅 이용)
플럼 에이전트1(Source -> Channel -> Sink) ->플럼에이전트2( Source ->Channel-> Sink):하둡
                                                          ->플럼에이전트3(Source-> Channel-> Sink): DB
아키텍처 유형4
(티어1 대규모데이터
티어 2 비지니스로직)
플럼
활용
장시간 수집 3초 간격 발생 - 대용량파일100MB/1일: 일단위수집-> 플럼에이전트 -> 장기수집 배치분석
발생 시
동시수집
1초 간격밸상 - 신시간 로그 499KB/1초: 실시간수집-> 플럼에이전트-> 실시간 분석
: 실시간 정보로 가치가 높을때만. 

** 실제 스마트카 안에 프럼이 설치..
설치 1. CM의 홈에서 Cluster1 콤보박스 [서비스 추가: add Service] > 추가할 서비스 유형 Flume 선택, 우측하단 계속버튼
> Agent > 설치 서버 호스트 server02.hadoop.com.. 확인> 계속.> 완료. 
2. 힙메모리 올리기
CM홈 > Flume >구성 > java heap  : 50 -> 100
3. Flume 재시작 
에이전트 생성 수정전 # Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per
# agent name, in this case 'tier1'.
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

# For each source, channel, and sink, set
# standard properties.
tier1.sources.source1.type     = netcat
tier1.sources.source1.bind     = 127.0.0.1
tier1.sources.source1.port     = 9999
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type   = memory
tier1.sinks.sink1.type         = logger
tier1.sinks.sink1.channel      = channel1

# Other properties are specific to each type of
# source, channel, or sink. In this case, we
# specify the capacity of the memory channel.
tier1.channels.channel1.capacity = 100
플럼 수집기능
플럼 > 구성

-에이전트 생성 1
시스템 그룹 Flume(서비스전체 : Service-Wide) : flume
Agent 이름 Agent Default Group : SmartCar_Agent
구성파일  Agent Default Group
:
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel

-에이전트 생성 2
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource
SmartCar_Agent.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink 

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
-드라이브카. 
구성파일  Agent Default Group
:
SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working /car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
    SmartCar_Agent.sources  = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks    = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false


SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity  = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity  = 10000


SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger

SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel



SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2

SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false

SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000


SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000


SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel