본문 바로가기
BigData/Spark

[Spark] RDD(Resilient Distributed Dataset) 개념과 연산 예제

by 제2의지니 2021. 11. 6.

 

2021.10.20 - [BigData/Spark] - [Spark] 스파크 개념

 앞에 게시물에서 Spark Core에  RDD라는 내용이 나왔는데 RDD의 개념과 사용법을 알아보도록 하겠습니다.

 

1. RDD (Resilient Distributed Dataset) 란?

- 회복력 있는 메모리에 분산된 데이터셋으로 기본적으로 스파크 내부적으로 연산하는 데이터들을 RDD 형태로 사용한다.

 

2. RDD 특징

- 여러 분산 노드에 나누어진다.

- 다수의 파티션으로 관리된다.

- 변경이 불가능한 데이터 셋이다.

3. RDD 생성

 위 그림을 참고해서 보면 RDD는 파일 시스템( HDFS, GlusterFS )을 읽어와서 메모리에 저장할 때 생성이 되고 코드에서 생성되는 데이터를 저장할 때 사용된다. 

즉 정리하자면 아래 두가지 경우로 생성할 수 있다. scala 언어로 간단한 예제를 작성해보았다.

1. 파일 시스템을 읽어오기 (ex. csv파일 가지고 오기)

spark.read.option("header","true"). option("inferSchema","true"). format("csv").load("/home/testusr/testFile.csv")

2. 코드에서 생성되는 데이터를 저장하기 (parallelize 이용하기)

spark.sparkContext.parallelize(Seq(("strawberry", 10000), ("banana", 100000)))

4. RDD 연산

RDD를 제어하는 2개의 연산 타입 :  Transformation, Action

4.1 Transformation(변환)

Transformation : RDD에서 새로운 RDD를 생성하는 함수
  • map(func)

 원본 RDD 요소를 함수 func에 적용하여 변환한 새로운 RDD를 만들어 내는 연산자이다.

val arryData = Array(1,2,3,4,5)
// RDD 생성
var arryDataRdd = sc.parallelize(arryData)

// RDD to array
arryDataRdd.collect()

// map 사용 arryDataRdd 요소들에 + 2 값 반환
arryDataRdd = arryDataRdd.map(x => x+2)

// RDD to array
arryDataRdd.collect()

결과

map안에 정의한 func을 적용하여 새로운 RDD 형태를 만들어 주는 것을 확인할 수 있다.

 

  • filter(func)

func이 true인 값을 필터링하여 Dataset을 만들어준다.

// RDD 생성
val arryData = Array(1,2,3,4,5)
var arryDataRdd = sc.parallelize(arryData)

arryDataRdd.collect()

// 4 이상인 요소 반환
arryDataRdd = arryDataRdd.filter(x => x > 3)

arryDataRdd.collect()

결과

 

  • flatMap(func)

map과 비슷하지만 flatMap은 배열을 반환하고, 이 배열들을 하나의 배열로 반환한다.

// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)

arryDataRdd.collect()

// map 함수 사용
arryDataRdd.map(x => x.split(" ")).collect()

// flatMap 함수 사용
arryDataRdd.flatMap(x => x.split(" ")).collect()

결과

res70  : 초기에 정의한 형태

res71 : map 함수 사용했을 경우 형태

res72 :  flatMap 함수 사용 했을 경우 형태

 

map과 flatMap 은 유사 하지만 flatMap은 배열을 반환하고 이 배열을 하나의 배열로 묶어준다.

 

  • mapPartitions(func)

partition 단위 map 연산을 하며 Iterator => Iterator 형식으로 사용

 

// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)

arryDataRdd.collect()

arryDataRdd.map(x => x.split(" ")).collect()

arryDataRdd.mapPartitions(x => {
    x.map(a => a.split(" "))
}).collect()

결과

res96  : 초기에 정의한 형태

res97 : map 함수 사용했을 경우 형태

res98 :  mapPartitions 함수 사용 했을 경우 형태

 

  • mapPartitionsWithIndex(func)

mapPartitions와 동일하나 index가 존재한다  (int, Iterator)=>Iterator

// RDD 생성
val arryData = Array("create csv","read json","write xml")
var arryDataRdd = sc.parallelize(arryData)

arryDataRdd.collect()


arryDataRdd.mapPartitions(x => {
    x.map(a => a.split(" "))
}).collect()

arryDataRdd.mapPartitionsWithIndex((i,x) => {
    x.map(a => a.split(" ")(0) + ":" + i)
}).collect()

 결과

res128  : 초기에 정의한 형태

res129 : mapPartitions 함수 사용했을 경우 형태

res130 :  mapPartitionsWithIndex 함수 사용 했을 경우 형태

 

  • sample(withReplacement, fraction, seed)

RDD에서 랜덤으로 요소를 뽑아 새로운 RDD를 만들어 주는 변환 연산자이다.

withReplacement : 같은 요소가 여러 번 샘플링될 수 있는지 지정, false일 경우 한 번 샘플링된 요소는 메서드 호출이 끝날 때까지 샘플링 대상에서 제외

fraction : withReplacement 값이 true이면 샘플링될 횟수의 기댓값, false이면 샘플링될 기대 확률을 의미한다.

seed ;   난수 생성에 사용, 인수를 사용하지 않으면 해당 인자의 기본 값을 사용

// RDD 생성
val arryData = Array(1,2,3,4,5,8,6,3,5,"a","b","c")
var arryDataRdd = sc.parallelize(arryData)

// 데이터중 30% 가져오기
val sampleData = arryDataRdd.sample(false, 0.3)

sampleData.count

sampleData.collect

- sample 된 데이터를 랜덤으로 가지고 온다. 

결과 1
결과2

  • union(otherDataset)

다른 RDD와 합쳐진 결괏값을 리턴한다. (= 합집합)

%spark
val rdd1 = sc.parallelize(List("1","2","3"))

val rdd2 = sc.parallelize(List("a","b","c"))

rdd1.union(rdd2).collect()

  • intersection(otherDataset)

2개의 RDD에서 서로 교차되어 있는 (intersection) 값 리턴한다. (= 교집합)

val rdd1 = sc.parallelize(List("1","2","3","a"))

val rdd2 = sc.parallelize(List("a","b","c"))

rdd1.intersection(rdd2).collect()

 

  • distinct([numPartitions]))

중복된 값 제거해 준다.

val rdd1 = sc.parallelize(List("1","2","3","a","1","1"))

rdd1.distinct().collect()

  • groupByKey([numPartitions])

(K, V) Pair 객체를 이용하여 (K, Iterator) 만들어 준다. 공식 doc에 따르면 groupByKey 보다는 aggregateKey, reduceByKey를 사용하라고 권장한다. groupByKey에 대한 퍼포먼스 관련해서는 다음 글에서 다루도록 하겠다.

val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))  
rdd.collect  

val groupfunc = rdd.groupByKey().collect

  • reduceByKey(func, [numPartitions])

(K, V) Pair 객체에서 Key 기준으로 reduce 한다

아래는 키값에 따라 더하는 예제이다.

val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))  

rdd.reduceByKey(_ + _).collect 

rdd.reduceByKey((x, y) => (x + y)).collect 

rdd.reduceByKey((x, y) => (x)).collect 
rdd.reduceByKey((x, y) => (y)).collect

  • aggregateByKey(zeroValue)(seqOpcombOp, [numPartitions])

aggregateByKey zeroValue를 받아 RDD 값을 병합한다.

foldByKey reduceByKey와 유사하지만, 값 타입을 바꿀 수 있다는 점은 다르다.

 

seqOp:  임의의 V 타입을 가진 값을 또 다른 U 타입으로 변환[(U, V) => U]하는 변환

combOp:  첫 번째 함수가 변환한 값을 두 개씩 하나로 병합[(U, U) => U]하는 병합 

val rdd = sc.parallelize(Seq(("C",3),("A",1),("B",4),("A",2),("B",5)))  

rdd.aggregateByKey(0)((k,v) => v.toInt+k, (v,k) => k+v).collect

  • sortByKey([ascending], [numPartitions])

Key 값을 기준으로 정렬한다. (desc, asc)

 

  • join(otherDataset, [numPartitions])

(K, V) RDD와 (K, U) RDD을 join (K, (V, U)) 값을 리턴한다.

 

val members = sc.parallelize(Seq((1,"유진"), (2,"예진"), (3,"도희")))


val departments = sc.parallelize(Seq((1, "VIP"), (2, "GOLD")))


members.join(departments).collect

members.leftOuterJoin(departments).collect

 

  • cogroup(otherDataset, [numPartitions])

(K, V) RDD와 (K,U) RDD를 join (K (Iterator , Iterator))

 

  • cartesian(otherDataset)

partition 개수를 줄인다.

 

  • pipe(command[envVars])

shell command를 통해 rdd를 생성할 수 있다.

 

  • coalesce(numPartitions)

partition 갯수를 줄인다.

 

  • repartition(numPartitions)

partition을 다시 shuffle 한다. partition의 갯수를 늘리거나 줄일 수 있다.

 

  • repartitionAndSortWithinPartitions(partitioner)]

RDD의 파티션을 다시 진행한다.  각각의 파티션은 sort가 된 값들이 존재한다.

참고
https://spark.apache.org/docs/3.2.0/rdd-programming-guide.html#transformations
스파크를 다루는 기술 Spark in Action (https://www.gilbut.co.kr/book/view?bookcode=BN001997#bookData)

 

'BigData > Spark' 카테고리의 다른 글

[개발] Spark SQL DataFrame Vector to Array  (0) 2021.10.21
[Spark] 스파크 개념  (0) 2021.10.20

댓글