2018년 2월 9일 금요일

NiFi Clustering과 HA(High Availability)


  NiFi Clustering으로 찾아보면 Zero_master Clustering이라는 문구를 가장 쉽게 마주치게 됩니다. 1.0.0 버전에서부터 적용한 방식으로, 간단히 설명하면 ZooKeeper를 활용해서 내부적으로 Cluster Coordinator를 선정하여 NiFi의 flow를 클러스터된 노드들 모두 동일하게 유지합니다.
  Zero-master라 함은, Cluster Coordinator가 떠있는 노드가 SPOF가 되는 것이 아니라 또다시 다른 Cluster Coordinator를 띄울 리더를 선출하기 때문에, 마스터 역할만을 하는 노드가 별도로 존재하지 않기 때문입니다.

  그래서 처음에 쉽게 생각했습니다.
  'NiFi Clustering을 통해서 작업도 쉽게 분배하고, HA도 이룰 수 있겠구나.' 라고...

  결과는 둘 다 아니었습니다?
  HDFS, Hive, Spark 같이 처음부터 distributed 환경에서 쓰도록 만들어진 Application을 너무도 쉽고 당연하게 접할 수 있어서, 막연히 비슷하게 생각했던 것 같습니다.


  먼저, NiFi Clustering.

  3개의 노드를 하나의 클러스터로 구성을 하고, 그 중 하나의 노드의 UI에 접근을 해서 작업을 정의한다고 생각해 봅시다. 간단하게 아래와 같은 flow로 작성을 했습니다.
  ExecuteSQL -> ConvertAvroToORC -> PutHDFS
  그러면, 3개의 NiFi 노드에 동일하게 해당 flow가 생성이 됩니다. 그리고 작업을 전부 시작시켜 봅시다. 첫 번째 Processor(ExecuteSQL)가 폭주(?!)하지 않도록 적당한 schedule을 넣어 실행시켜 보겠습니다. 3개의 노드에서 모두 같은 SQL을 수행을 할 것이고, 그 결과 파일을 각자의 노드로 가져와서 ConvertAvroToORC 작업을 수행한 후에 해당 flowfile의 이름대로 HDFS에 파일을 put 할 것입니다. 즉 같은 파일이 3개가 HDFS에 쌓이게 되겠죠. (?)
  NiFi를 Single 노드로 쓸 때와 같이 작업을 정의하게 되면, 클러스터링된 NiFi는 각각의 노드에서 모두 같은 작업을 수행하게 됩니다. 사실 문서에도 해당 내용을 나와 있습니다. 클러스터링이 된다고 했지, 작업 분배를 자동으로 해 준다고는 안 했으니까요. (작업 분배가 필요하다고도 나와 있습니다.)
  그래서, NiFi 클러스터링에서는 작업 분배가 필요하고, 보통 아래와 같은 추가적인 Data flow 정의가 필요합니다.

  1) Kafka와 같은 queue로부터 작업을 시작 (ex. ConsumeKafka, ConsumeJMS)
  2) Nifi의 remote group이나 HAProxy처럼 Nifi Clustering 앞 단에서 분배해서 시작
  3) Data source를 바꿀 수 없는 경우 (대부분), 해당 데이터를 가져와야 할 processor를 On Primary Node로 설정한 후, 1,2와 같은 방법을 통해 작업을 재분배

  어쨌든 작업을 분배했으니 클러스터링해서 사용하는 의미를 찾을 수 있습니다. 우리는 처리해야 할 data에 비해서 cpu core도 부족하고, NiFi JVM에게 할당한 메모리도 넉넉치 못한 서버라서, 어쩔 수 없으니까요. 하지만 클러스터링이 곧 HA는 아닙니다.


  그러면 이제 HA (High Availability)

  NiFi를 3개의 노드로 클러스터링을 했으니까, 노드 하나가 죽어도 NiFi는 다음 리더를 선출해서 남은 노드들 중에서 Cluster Coordinator(및 Primary Node)를 시키면서 클러스터링을 유지해 나갑니다. NiFi UI를 통해서 현재 클러스터링 상태가 2/3 으로 바뀌는 것을 볼 수 있습니다. 그러니까 HA라고 할 수 있을까요.

  이번에도 이미 안 될 거라고 말했는데요.. NiFi도 클러스터링이 된다고 했지, HA는 된다고 한 적이 없습니다. 아마도 Failover가 안된다는 것이 조금 더 정확한 표현일 지 모르겠습니다.
  되는 것부터 보면, 하나의 노드가 떨어져나간 클러스터링의 상태에서도 Primary node를 zookeeper를 통해 새로 선정하게 될 것입니다. 그러면 위에서 말한 어느 방법이더라도 작업 분배에는 이상이 없습니다.

   문제는 이미 분배된 작업(및 data)인데요. 죽은 노드에서 돌던 작업들은 그 노드의 NiFi 프로세스를 되살리기 전까지는 그냥 사라져버린다고 볼 수 있습니다. 돌고 있는 노드들이 뭔가 조치를 해줄 수도 없고, 분배된 작업만 수행하는 입장에서는 솔직히 무슨 문제가 있는 지도 모릅니다.
  이 점은 정의해 놓은 flow가 복잡하고 오래 걸리는 작업일 수록, 그리고 데이터가 빠지지 않고 다 들어오는 것이 중요할 수록, 그리고 시간 제약이 있는 데이터일수록 문제가 크다는 생각을 합니다.
  어떤 이유에서 죽었는 지는 모르겠지만 해당 노드의 NiFi를 되살리면 NiFi의 ContentRepository, FlowFileRepository가 힘을 발휘해서 이어서 작업을 할 수는 있습니다.
  죽은 노드에서의 작업을 온전히 재수행이라도 시키기 위해서는 해당 노드의 NiFI를 어떻게든 되살리는 것 말고는 답이 없습니다.



  그래서

  ...
  '그래서'라고 부제목을 달고 보니, 답도 없으면서 무얼 위해 이렇게 장황하게 썼는가라는 생각이 드는 순간입니다. 저는 모르겠습니다. 비슷한 고민을 하는 Nifi 개발자가 Confluence에 던진 제안이 있는데 보면 요약해보면, Repository 를 HDFS와 같은 외부의 distributed 환경에 저장을 하고, failover 시 활용할 수 있도록 하면 HA를 할 수 있을 것이라는 내용입니다.(https://cwiki.apache.org/confluence/display/NIFI/High+Availability+Processing)

  NiFi 적용을 고려 중이시라면, 위 내용을 알고 접근하시면 좋을 것 같다는 생각에 글을 남깁니다. 긴 글 읽어주셔서 감사합니다. (사실은 넋두리..)

2018년 1월 28일 일요일

Hive LLAP (Interactive SQL) 이란?

참고 : https://cwiki.apache.org/confluence/display/Hive/LLAP

LLAP

Live Long And Process라고 Hive 2.0에서 추가된 기능이다.


Overview

Hive는 최근 수년 내에 커뮤니티의 활발한 활동의 결과(Tez와 Cost-based-optimization) 덕에 상당히 빨라질 수 있었다.  Hive가 다음 단계로 나아가기 위해서는 아래와 같은 것들을
필요했다.
 - Asynchronous spindle-aware IO
 - Pre-fetching and caching of column chunks
 - Multi-threaded JIT-friendtly operator pipelines

LLAP는 hybrid 실행 모델을 제공한다. 그 것은 HDFS Datanode와 직접적으로 통신하는 long-lived daemon과 긴말하게 통합된 DAG 기반 framework로 이뤄진다.
캐시, pre-fetching, 쿼리 프로세싱, 접근 관리와 같은 기능들이 daemon에서 수행된다. 또한 작은(짧은) 쿼리들은 대개 daemon에서 직접 수행되며, 무거운 작업은 일반적인 YARN 컨테이너를 통해 수행된다.

Persistent Daemon

캐싱과 JIT 최적화, 그리고 시작 비용을 없애기 위해, daemon이 클러스터의 worker 노드에서 돌고 있다. 해당 daemon은 I/O, 캐싱, query fragment execution을 수행한다.
 - Stateless
 - Recovery/resilency
 - Communication between nodes

Execution Engine (실행 엔진)

LLAP는 기존의 프로세스 기반의 Hive execution 모델에서 수행되므로, 기존 Hive의 확장성과 다재다능함(versatility)을 그대로 유지한다.
 - The ademons are optional. LLAP가 배포되어 동작 중인 상태에서도 Hive는 LLAP 없이 수행될 수 있다.
 - External orchestraction and execution engines. LLAP는 MapReduce나 Tez 같은 실행 엔진이 아니다. LLAP의 모든 작업 수행은 기존의 Hive 실행 엔진(Tez와 같은)에 의해 스케줄되고 관리된다. LLAP 지원 수준은 각자의 실행 엔진에 따라 다르다. 현재는 Tez를 지원하며, MapReduce는 계획에 없다.
 - Partial execution. LLAP daemon에 의해 수행 된 작업 결과는 쿼리에 따라 Hive 쿼리 결과의 일부를 구성하거나 외부 Hive 작업에 전달 될 수 있다.
- Resourece Management. 여전히 YARN이 리소스 할당 및 관리를 책임진다. JVM 메모리 설정의 한계를 피하기 위해 큰 작업(group by, joins)을 위한 버퍼나, 캐시된 데이터들은 off-heap에 저장된다. 이 방법은 작업 부하에 따라 추가적인 리소스 사용을 가능케 한다.

Query Fragment Execution

부분 실행을 위해 LLAP 노드들은 "쿼리 조각(query fragments)"들을 나누어 수행하도록 되어 있다. 여기서 쿼리 조각은 필터, projection, 데이터 변형, 부분 취합, 정렬, bucketing, hash join/semi-join 등의 작업이 될 수 있다.
 - Parallel execution. 하나의 LLAP 노드는 여러 쿼리 조각을 병렬처리 할 수 있다.
 - Interface. 유저는 LLAP 노드들에 client API를 통해 직접 접근이 가능하다.


I/O



Caching

LLAP daemon은 입력 파일의 메타 데이터 뿐 아니라, 데이터 자체도 캐시한다. 메타데이터와 인덱스 정보는 자바 객체로 프로세스에 저장되며, 캐시된 데이터는 off-heap에 저장된다.
 - Eviction policy. 캐치 교체 정책은 테이블 스캔을 통한 작업부하 분석을 통해 조정된다. 기본적으로는 LRFU와 같은 정책이 사용되며, pluggable하다.
 - Caching granularity. Column-chuck의 크기가 캐시의 데이터 단위가 된다. 이것으로 프로세싱 오버헤드와 스토리지 효율성 간의 조정이 가능하다. 파일 포맷과 실행 엔진에 따라 해당 청크의 세분성(granularity)가 정해진다.

Workload Management

YARN을 통해 작업 부하에 따른 자원을 관리한다. 실행엔진이 YARN으로부터 할당 받은 자원을 LLAP나 Hive executor를 새로 띄우기 위해 위임할 수 있다.

ACID Support

LLAP는 transactions을 인지한다. 테이블의 정상적인 상태를 위해 수행되는 델타 파일에 대한 머지가 일어난 후에 데이터 캐싱이 수행된다.
여러 버전이 가능하며 그 중에 사용될 버전을 선택하여 캐시를 요청할 수 있다. 이것으로 얻는 장점은 머지를 비동기로 수행하고 한번에 데이터를 캐시할 수 있다는 점이다.

Security

LLAP 서버는 태생적으로 파일 단위보다도 더 세분화된 레벨의 access control이 가능하다. 데몬이 처리 중인 컬럼과 레코드를 알고 있기 때문에 이런 오브젝트에 대한 정책 적용이 가능하다.

Monitoring

LLAP 모니터링에 대한 설정은 Slider가 사용하는 templates.py에 들어있다. (resources.json, appConfig.json, metainfo.xml)
LLAP Monitor Daemon은 LLAP 데몬과 마찬가지로 YARN 컨테이너에서 수행되며, 같은 port로 리슨하고 있다.
LLAP Metrics Collection Server는 모든 LLAP daemon들로부터 주기적으로 JMX metric 정보를 수집한다.

HDP 기준) Ambari Tez view를 통해 쿼리 수행 현황을 파악할 수 있다.


Web Services

) 기본적인 metrics 정보는 15002(default) 포트를 통해 web UI를 제공한다.

HIVE-9814 introduces the following web services:
JSON JMX data - /jmx
JVM Stack Traces of all threads - /stacks
XML Configuration from llap-daemon-site - /conf 
HIVE-13398 introduces the following web services:
LLAP Status - /status
LLAP Peers - /peers 


SLIDER on YARN Deployment

LLAP can be deployed via Slider, which bypasses node installation and related complexities (HIVE-9883).




참고하면 더 좋은 slideshare

https://www.slideshare.net/Hadoop_Summit/llap-longlived-execution-in-hive


Tunings

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.2/bk_hive-performance-tuning/content/ch_hive-perf-tuning-intro.html











2018년 1월 27일 토요일

LRFU (Least Recently/Frequently Used) 이란?

LRFU (Least Recently/Frequently Used)

Adaptive Cache Replacement Policy


캐시를 통해 데이터 처리 성능 향상의 꾀하는 경우에 캐시를 어떻게 유지하고 새로 교체해줄 것인 가에 대한 정책 중에 LRFU라는 게 있어서 찾아보는 중이다.
해당 기법을 본 건 LLAP Caching 부분을 읽다가 였다. (https://cwiki.apache.org/confluence/display/Hive/LLAP)

그 전에 알면 좋을 주요 교체 알고리즘으로 FIFO, LRU, LFU가 있다.
간단히 소개하면 아래와 같다.

FIFO (First In, First Out)
 - 가장 오래된 정보를 삭제한 자리에 새로운 정보를 저장하는 식이다.

LRU (Least Recently Used)
 - 가장 오래 사용되지 않은 정보를 교체한다.

LFU (Least Frequently Used)
 - 가장 빈도수가 적게 사용된 정보를 교체한다.

문제는 LRFU인데, 검색해보면 생각보다 나오는 내용이 없다.


먼저 wikipedia에서 Cache replacement policies를 찾아봤는데, 비슷한게 있었다.
그 내용을 살펴보면 아래와 같다.
https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_Frequent_Recently_Used_(LFRU)_[11]

LFRU (Least Frequent Recently Used)

LFU와 LRU의 장점을 합친 교체 정책으로써, 정보 중심 네트워크(Information-Centric Networking, ICN), 컨텐츠 전송 네트워크(CDN), 일반적인 형태의 분산 네트워크와 같은 캐시 어플리케이션에 적합하다. 
LFRU에서는 캐시가 2 부분으로 나눠지는데, privileged와 unprivileged 파티션으로 불린다. privileged 파티션에선 LRU를 적용하고, unprivileged 파티션에는 ALFU(거의 LFU?)를 적용한다는 것으로 예를 들면 다음과 같다. 
먼저, unprivileged에서 적은 횟수로 사용된 정보를 버리고, privileged 파티션에서 가장 오래된 정보가 unprivileged로 이동한다. 그리고 새로운 정보가 privileged의 빈자리로 들어온다.

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 

위와 같은 내용이었는데, 문제는 순서가 다르다. LRFU가 아니라 LFRU인데.
그래서 조금 더 찾아봤더니 96년도에 서울대에 쓴 논문이 검색된다. 
LRFU (Least Recently/Frequently Used) Replacement Policy: A Spectrum of Block Replacement Policies (1996)

내용을 요약해보면 아래와 같다. 이 논문에서 block을 하나의 저장 및 교체 단위로 본다.

LRFU (Least Recently/Frequently Used)

LFU는 작업량의 추세를 반영할 수 없다. 예를 들면, 예전에는 자주 쓰이다가 현재는 안 쓰이는 block과 최근에 들어서야 점점 더 많이 쓰이기 시작한 block이 있을 때, 전자를 교체 대상으로 삼을 것이다. 구현 관점에서 보면, 보통 Priority queue가 사용되므로 시간 복잡도는 O(log n)을 나타낸다.
반면 LRU는 가장 최근에 남아있는 데이터에 대한 빈도만을 통해 선택을 해야 한다. 결과적으로 제대로된 빈도수 확인이 안될 수 있다. 하지만 다른 정책에 비해 패턴에 의한 변화를 줄 수 있는 형태이며 매우 효율적으로 동작한다. 하나의 연결 리스트로 구현할 수 있으므로, 시간 복잡도는 상수 O(1)이다.

두 가지의 장점을 합쳐 과거의 모든 block의 빈도(frequency)와 최신성(recency)을 둘다 보고 교체 대상을 선정할 수 있도록 하는 것을 Least Recently/Frequently Used(LRFU)라고 부를 것이다. 가중치에 따라 해당 정책은 스펙트럼의 형태로 나타나며 시간 복잡도는 O(1)과 O(log n)의 사이가 될 것이다.
LRFU 정책은 CRF(Combined Recency and Frequency)라는 값을 통해 해당 블럭의 재사용성을 판단하며, 가장 작은 값의 block이 교체 대상이 된다.

자세한 CRF 계산 과정은 논문에서 확인할 수 있다.
그 중 인상적인 부분은 CRF를 계산하기 위한 공식에서 사용되는 F(x)의 parameter인 λ의 범위를 0과 1사이에서 선택하며, 그 값에 따라 LFU와 LRU의 스펙트럼 어딘가에 위치하게 된다는 점이었다.
F(x) = (1/2) ^(λx)

또한 상관관계가 있는 기간(correlated period)을 하나로 묶어 계산할 수 있는 형태에 대한 계산 변형도 가능하다. Database 시스템에서는 data access에 대한 frequency와 recency보다는 상위 작업인 transaction 레벨에서 미래 예측이 더 용이하다고 보고, Gc(x)을 정의해서 사용할 수 있도록 한다.

CRF의 계산을 매번 가지고 있는 전체 block에 대해 수행하는 이슈는, 자료구조 heap을 통해 피해갈 수 있도록 상세한 Implementation 가이드를 포함한다. 결과적으로 Implemetation은 가중치에 따라 연결 리스트와 heap(Priority queue)로 나뉘어 계산된다고 볼 수 있다.

작업량(workload)의 특성에 따라 parameter λ와 c를 변형하여 addaptive 정책을 구성할 수도 있다.