2021.10.20 - [BigData/Spark] - [Spark] 스파크 개념
앞에 게시물에서 Spark Core에 RDD라는 내용이 나왔는데 RDD의 개념과 사용법을 알아보도록 하겠습니다.
1. RDD (Resilient Distributed Dataset) 란?
- 회복력 있는 메모리에 분산된 데이터셋으로 기본적으로 스파크 내부적으로 연산하는 데이터들을 RDD 형태로 사용한다.
2. RDD 특징
- 여러 분산 노드에 나누어진다.
- 다수의 파티션으로 관리된다.
- 변경이 불가능한 데이터 셋이다.
3. RDD 생성
위 그림을 참고해서 보면 RDD는 파일 시스템( HDFS, GlusterFS )을 읽어와서 메모리에 저장할 때 생성이 되고 코드에서 생성되는 데이터를 저장할 때 사용된다.
즉 정리하자면 아래 두가지 경우로 생성할 수 있다. scala 언어로 간단한 예제를 작성해보았다.
1. 파일 시스템을 읽어오기 (ex. csv파일 가지고 오기)
spark.read.option("header","true"). option("inferSchema","true"). format("csv").load("/home/testusr/testFile.csv")
2. 코드에서 생성되는 데이터를 저장하기 (parallelize 이용하기)
spark.sparkContext.parallelize(Seq(("strawberry", 10000), ("banana", 100000)))
4. RDD 연산
RDD를 제어하는 2개의 연산 타입 : Transformation, Action
4.1 Transformation(변환)
Transformation : RDD에서 새로운 RDD를 생성하는 함수
- map(func)
원본 RDD 요소를 함수 func에 적용하여 변환한 새로운 RDD를 만들어 내는 연산자이다.
val arryData = Array(1,2,3,4,5)
// RDD 생성
var arryDataRdd = sc.parallelize(arryData)
// RDD to array
arryDataRdd.collect()
// map 사용 arryDataRdd 요소들에 + 2 값 반환
arryDataRdd = arryDataRdd.map(x => x+2)
// RDD to array
arryDataRdd.collect()
map안에 정의한 func을 적용하여 새로운 RDD 형태를 만들어 주는 것을 확인할 수 있다.
- filter(func)
func이 true인 값을 필터링하여 Dataset을 만들어준다.
// RDD 생성
val arryData = Array(1,2,3,4,5)
var arryDataRdd = sc.parallelize(arryData)
arryDataRdd.collect()
// 4 이상인 요소 반환
arryDataRdd = arryDataRdd.filter(x => x > 3)
arryDataRdd.collect()
- flatMap(func)
map과 비슷하지만 flatMap은 배열을 반환하고, 이 배열들을 하나의 배열로 반환한다.
// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)
arryDataRdd.collect()
// map 함수 사용
arryDataRdd.map(x => x.split(" ")).collect()
// flatMap 함수 사용
arryDataRdd.flatMap(x => x.split(" ")).collect()
res70 : 초기에 정의한 형태
res71 : map 함수 사용했을 경우 형태
res72 : flatMap 함수 사용 했을 경우 형태
map과 flatMap 은 유사 하지만 flatMap은 배열을 반환하고 이 배열을 하나의 배열로 묶어준다.
- mapPartitions(func)
partition 단위 map 연산을 하며 Iterator => Iterator 형식으로 사용
// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)
arryDataRdd.collect()
arryDataRdd.map(x => x.split(" ")).collect()
arryDataRdd.mapPartitions(x => {
x.map(a => a.split(" "))
}).collect()
res96 : 초기에 정의한 형태
res97 : map 함수 사용했을 경우 형태
res98 : mapPartitions 함수 사용 했을 경우 형태
- mapPartitionsWithIndex(func)
mapPartitions와 동일하나 index가 존재한다 (int, Iterator)=>Iterator
// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)
arryDataRdd.collect()
arryDataRdd.mapPartitions(x => {
x.map(a => a.split(" "))
}).collect()
arryDataRdd.mapPartitionsWithIndex((i,x) => {
x.map(a => a.split(" ")(0) + ":" + i)
}).collect()
res128 : 초기에 정의한 형태
res129 : mapPartitions 함수 사용했을 경우 형태
res130 : mapPartitionsWithIndex 함수 사용 했을 경우 형태
- sample(withReplacement, fraction, seed)
RDD에서 랜덤으로 요소를 뽑아 새로운 RDD를 만들어 주는 변환 연산자이다.
withReplacement : 같은 요소가 여러 번 샘플링될 수 있는지 지정, false일 경우 한 번 샘플링된 요소는 메서드 호출이 끝날 때까지 샘플링 대상에서 제외
fraction : withReplacement 값이 true이면 샘플링될 횟수의 기댓값, false이면 샘플링될 기대 확률을 의미한다.
seed ; 난수 생성에 사용, 인수를 사용하지 않으면 해당 인자의 기본 값을 사용
// RDD 생성
val arryData = Array(1,2,3,4,5,8,6,3,5,"a","b","c")
var arryDataRdd = sc.parallelize(arryData)
// 데이터중 30% 가져오기
val sampleData = arryDataRdd.sample(false, 0.3)
sampleData.count
sampleData.collect
- sample 된 데이터를 랜덤으로 가지고 온다.
- union(otherDataset)
다른 RDD와 합쳐진 결괏값을 리턴한다. (= 합집합)
%spark
val rdd1 = sc.parallelize(List("1","2","3"))
val rdd2 = sc.parallelize(List("a","b","c"))
rdd1.union(rdd2).collect()
- intersection(otherDataset)
2개의 RDD에서 서로 교차되어 있는 (intersection) 값 리턴한다. (= 교집합)
val rdd1 = sc.parallelize(List("1","2","3","a"))
val rdd2 = sc.parallelize(List("a","b","c"))
rdd1.intersection(rdd2).collect()
- distinct([numPartitions]))
중복된 값 제거해 준다.
val rdd1 = sc.parallelize(List("1","2","3","a","1","1"))
rdd1.distinct().collect()
- groupByKey([numPartitions])
(K, V) Pair 객체를 이용하여 (K, Iterator) 만들어 준다. 공식 doc에 따르면 groupByKey 보다는 aggregateKey, reduceByKey를 사용하라고 권장한다. groupByKey에 대한 퍼포먼스 관련해서는 다음 글에서 다루도록 하겠다.
val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))
rdd.collect
val groupfunc = rdd.groupByKey().collect
- reduceByKey(func, [numPartitions])
(K, V) Pair 객체에서 Key 기준으로 reduce 한다
아래는 키값에 따라 더하는 예제이다.
val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))
rdd.reduceByKey(_ + _).collect
rdd.reduceByKey((x, y) => (x + y)).collect
rdd.reduceByKey((x, y) => (x)).collect
rdd.reduceByKey((x, y) => (y)).collect
- aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])
aggregateByKey는 zeroValue를 받아 RDD 값을 병합한다.
foldByKey나 reduceByKey와 유사하지만, 값 타입을 바꿀 수 있다는 점은 다르다.
seqOp: 임의의 V 타입을 가진 값을 또 다른 U 타입으로 변환[(U, V) => U]하는 변환
combOp: 첫 번째 함수가 변환한 값을 두 개씩 하나로 병합[(U, U) => U]하는 병합
val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))
rdd.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect
- sortByKey([ascending], [numPartitions])
Key 값을 기준으로 정렬한다. (desc, asc)
- join(otherDataset, [numPartitions])
(K, V) RDD와 (K, U) RDD을 join (K, (V, U)) 값을 리턴한다.
val members = sc.parallelize(Seq((1,"유진"), (2,"예진"), (3,"도희")))
val departments = sc.parallelize(Seq((1, "VIP"), (2, "GOLD")))
members.join(departments).collect
members.leftOuterJoin(departments).collect
- cogroup(otherDataset, [numPartitions])
(K, V) RDD와 (K,U) RDD를 join (K (Iterator , Iterator))
- cartesian(otherDataset)
partition 개수를 줄인다.
- pipe(command, [envVars])
shell command를 통해 rdd를 생성할 수 있다.
- coalesce(numPartitions)
partition 갯수를 줄인다.
- repartition(numPartitions)
partition을 다시 shuffle 한다. partition의 갯수를 늘리거나 줄일 수 있다.
- repartitionAndSortWithinPartitions(partitioner)]
RDD의 파티션을 다시 진행한다. 각각의 파티션은 sort가 된 값들이 존재한다.
참고
https://spark.apache.org/docs/3.2.0/rdd-programming-guide.html#transformations
스파크를 다루는 기술 Spark in Action (https://www.gilbut.co.kr/book/view?bookcode=BN001997#bookData)
'BigData > Spark' 카테고리의 다른 글
[개발] Spark SQL DataFrame Vector to Array (0) | 2021.10.21 |
---|---|
[Spark] 스파크 개념 (0) | 2021.10.20 |
댓글