본문 바로가기
Data Engineering

AWS Glue JDBC 병렬 처리 성능 테스트

by KaneTheDataEngineer 2023. 7. 10.

0. Prerequisites

테스트 배경

⇒ 고객사에 AWS Glue Job을 사용하여 데이터 서비스를 딜리버리함에 있어 발생한 비용 및 성능 비효율성을 개선하고자 함

발생한 비효율성과 해결 방안

  • On-Premise Oracle Database 테이블에서 AWS Glue JDBC connector를 사용하여 값을 가져와 S3에 적재하는 Glue Job을 수행했으나 테이블 크기가 크지 않음에도 약 2일간 Job이 돌아가다가 Job이 Fail되는 상황이 발생하여 상당한 비용의 손실과 작업 일정에 차질이 발생하였다
  • AWS Glue는 Job 실행 시 할당한 worker 유형 및 수와 Job이 수행된 시간 만큼이 DPU (Data Processing Units)라는 청구 단위로 계산되어 부과된다 (성공 / 실패 상관 없이 청구됨)
  • 이때, Worker 할당 시 worker가 실제 job 수행에 사용되지 않았더라도 비용은 청구된다
  • 따라서 할당된 worker를 최대한 많이, 그리고 병렬로 사용함으로써 효용성을 극대화할 필요성이 있다
  • 또한, In-Memory 데이터 분산 병렬 처리 컴퓨팅에 최적화된 Apache Spark가 가진 퍼포먼스 측면의 장점을 활용하기 위해서도 병렬 처리를 하는 것이 필수적이다

AWS Glue의 Job 처리 방식 및 Executor 활용 방식 조사

  • Glue에 Job이 Submit 되었을때 할당된 Worker를 어떻게 사용하는지 그 원리를 파악하고자 함

테스트 상황 설정

 Oracle 샘플 테이블 생성 후 이를 Glue Connection으로 연결하여 S3에 Parquet Format으로 저장하는 Job을 생성하였다

Oracle Table Specification

  • Table Name : order_line_big
  • Table Size : 2.9GB
  • Row Count : 40,183,307 (약 4천만건 정도)
  • Schema & Distinct Values

⇒ 컬럼 중 OL_D_ID가 Number type (DynamicFrame의 hashexpression 사용 가능)이고 아래와 같이 10개의 UNIQUE한 값이 약 4백만건씩 고르게 가지고 있기 때문에 해당 컬럼을 파티셔닝 기준으로 세웠다.

Glue Job Details

출처 : https://www.youtube.com/watch?v=qLkTe9kT0PU
  • G.1X 사용
  • AWS Glue 4.0 version 사용
  • worker 당 4 vCPU / 16GB Memory
  • driver 및 executor memory는 10GB
  • spark.executor.core 수는 4개 (Glue 3.0 기준, 4.0과 동일한 것으로 추정)

 worker 1개가 동시에 실행할 수 있는 task 수는 spark.executor.cores 수와 동일하다

  • 따라서 G.1x 기준 executor core 수는 4개이며 executor memory는 10GB이므로, task가 개당 2.5GB 이하의 데이터를 처리하도록 하는 것이 가장 효율적인 것으로 추정할 수 있다 (10GB / 4 = 2.5GB)
 

Scenario 1. DynamicFrame without Parallelism

  • worker count : 11 (1 driver / 10 executors)
  • Execution time : 5 minutes 19 seconds
  • used dpu : 0.98
  • output : 20 files (약 95.5 MB / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

path = 's3://dyc-oracle-glue-test/output/'


df = glueContext.create_dynamic_frame.from_catalog(
    database="oracle-test",
    table_name="test_testusr_order_line_big"
)

glueContext.write_dynamic_frame.from_options(
    frame=df,
    connection_type='s3',
    connection_options={
        'path': path,
    },
    format='parquet'
)

job.commit()

Results

Stage 0

  • 1개의 worker가 데이터를 전부 가져와서 다른 worker에게 분배하는 방식으로 처리되었다
  • shuffle : 약 4천만건 (records) 발생
  • spill (local / remote) : 4.4gb (memory) / 1gb (disk) 발생

Stage 1

  • S3에 write하는 task를 20개로 나누어서 10개의 worker가 각자 write 작업을 수행
  • 처음에 가져갔던 worker가 4개의 task를 수행함 (shuffle 최소화)
  • task가 20개였기 때문에 output file도 20개로 생성되었다 (약 2백만건 / file)

⇒ 즉, 1개의 worker가 데이터 전부 가져와서 나머지 9개의 worker에게 write 작업을 분배함

⇒ 굉장히 느리고 비효율적인 것으로 파악됨 (병렬처리 X)

 

Scenario 2. DataFrame without Parallelism

  • worker count : 11 (1 driver / 10 executors)
  • Execution time : 12 minutes 25 seconds
  • used dpu : 2.28
  • output : 1 file (1.5gb / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

CONNECTION_NAME = 'oracle_test_connection'
session = boto3.session.Session()
client = session.client('glue')
response = client.get_connection(Name=CONNECTION_NAME)

DB_URL=response['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL']
DB_USERNAME=str(response['Connection']['ConnectionProperties']['USERNAME'])
DB_PASSWORD=str(response['Connection']['ConnectionProperties']['PASSWORD'])

query = f"""(SELECT * FROM ORDER_LINE_BIG) a"""

df = spark.read.format("jdbc").option("url", DB_URL).option("user", DB_USERNAME).option("password", DB_PASSWORD).option("dbtable", query).load()

df.write.format('parquet').mode("overwrite").save('s3://dyc-oracle-glue-test/output/')

Results

Stage 0

  • 1개의 워커가 4천만건을 전부 read / write
  • 단일 task 1개로 수행됨 (1개가 12분 소요)
  • executor 10개 중 1개만 사용되었으며, 9개는 어떠한 활동도 없었던 것으로 확인됨

⇒ 시나리오 중 가장 낮은 병렬성과 성능을 보임

 

Scenario 3. DynamicFrame with Parallelism

  • worker count : 11 (1 driver / 10 executors)
  • Execution time : 2 minutes 34 seconds
  • used dpu : 0.47
  • output : 10 files (148mb / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

path = 's3://dyc-oracle-glue-test/output/'


df = glueContext.create_dynamic_frame.from_catalog(
    database="oracle-test",
    table_name="test_testusr_order_line_big",
    additional_options = {
        "hashexpression" : "OL_D_ID",
        "hashpartitions" : "10"
    }
)

glueContext.write_dynamic_frame.from_options(
    frame=df,
    connection_type='s3',
    connection_options={
        'path': path,
    },
    format='parquet'
)

job.commit()

Results

Stage 0

  • stage 1개에서 전체 데이터에 대한 read/write가 전부 수행됨
  • task는 10개로 구성되었으며, 3개의 worker가 이를 나눠가져서 병렬로 처리함
  • shuffle은 발생하지 않았다
  • 10개의 task가 각각 1개의 output file 생성

⇒ 원래의 예측 상으로는 10개의 worker가 각각 task를 1개씩 가져가서 처리하는 방식을 예상했지만, 3개의 worker만 작업을 수행한 것으로 나타났다 (task를 4개/4개/2개 이렇게 나눠가짐)

  • executor당 core 수가 4개이므로 executor 1개가 최대한 많은 Task를 병렬로 처리하는 것이 가장 효율적인 것으로 판단했기 때문이다.
  • Spark Cluster Manager를 통해 할당받은 Executor 리소스에 대해 TaskScheduler가 실행될 Task를 Executor에게 전송하여 Task가 수행됨
  • Job 수행 시 worker마다 Database에 Connection을 생성하기 때문에 되도록 이를 최소화함
  • 주의할 점은 YARN Resource Manager가 이를 할당한 것은 아니다 (Glue 3.0부터 Spark Application은 YARN 위에서 실행되지 않음)
  • 따라서 worker 수를 4개로 해도 동일한 성능이 나올 것으로 예측할 수 있으며, 실제 테스트 시에도 worker 수 11개와 4개에는 큰 성능 차이가 없었다.
 

Scenario 4. DataFrame with Parallelism

  • worker count : 11 (1 driver / 10 executors)
  • Execution time : 4 minutes 13 seconds
  • used dpu : 0.78
  • output : 10 files (148 mb / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

CONNECTION_NAME = 'oracle_test_connection'
session = boto3.session.Session()
client = session.client('glue')
response = client.get_connection(Name=CONNECTION_NAME)

DB_URL=response['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL']
DB_USERNAME=str(response['Connection']['ConnectionProperties']['USERNAME'])
DB_PASSWORD=str(response['Connection']['ConnectionProperties']['PASSWORD'])

query = f"""(SELECT * FROM ORDER_LINE_BIG) a"""

df = spark.read.format("jdbc").option("url", DB_URL).option("user", DB_USERNAME).option("password", DB_PASSWORD).option("dbtable", query).option("partitionColumn", "OL_D_ID").option("numPartitions", 10).option("lowerBound", 1).option("upperBound", 11).load()

df.write.format('parquet').mode("overwrite").save('s3://dyc-oracle-glue-test/output/')

Results

Stage 0

  • stage 1개에서 read/write 전부 수행됨
  • 위 3번과 동일하게 task 10개를 3개의 worker가 병렬로 read/write함 (4/4/2)
  • shuffle 발생 안함
  • 3개의 worker만 일하고 나머지는 사용 X

⇒ DynamicFrame (3번)과 유사한 방식으로 작동했지만, DynamicFrame 대비 성능이 떨어지는 것으로 확인되었다 (DataFrame은 4분 13초가 걸린 반면, DynamicFrmae은 2분 34초 걸림)

⇒ 마찬가지로 worker 수 4개 이상 부터는 상관없이 동일한 성능을 보여주는 것으로 확인됨

 

Scenario 5. DynamicFrame with Parallelism (more parititions)

  • worker count : 13 (1 driver / 12 executors)
  • Execution time : 4 minutes 25 seconds
  • used dpu : 0.96
  • output : 46 files (33 MB / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

path = 's3://dyc-oracle-glue-test/output/'


df = glueContext.create_dynamic_frame.from_catalog(
    database="oracle-test",
    table_name="test_testusr_order_line_big",
    additional_options = {
        "hashexpression" : "OL_W_ID",
        "hashpartitions" : "46"
    }
)

glueContext.write_dynamic_frame.from_options(
    frame=df,
    connection_type='s3',
    connection_options={
        'path': path,
    },
    format='parquet'
)

job.commit(
  • OL_W_ID 컬럼은 46개의 distinct한 값을 갖는 number type 컬럼이며, unique 값마다 약 90만개 정도의 row를 균등하게 나눠가진 상태이다.
SELECT OL_W_ID, count(*) FROM ORDER_LINE_BIG olb   GROUP BY OL_W_ID;
  • 따라서 해당 컬럼을 46개의 파티션으로 나눠서 병렬 처리를 시도해보았다
  • 이유는, worker를 13개로 할당 시 12개의 executor가 존재할 것이고 총 48개의 core가 병렬처리 가능하기 때문 (12 executors * 4 cores)

Results

46개의 task가 병렬 생성
  • stage 1개로 처리됨
  • task는 46개로 나뉘어서 생성됨

Stage 0

  • 예측된 대로 12개의 executor가 46개의 task를 나눠가진 후 병렬처리됨
  • 11개의 executor * 4 tasks + 1개의 executor * 2 tasks = 46 tasks
  • Task는 총 46개이며, 예측과 같이 개당 약 90만개의 record를 read/write한 것을 알 수 있다
executors

⇒ 하지만, 결과적으로 10개의 partition으로 나눈 DynamicFrame에 비해 성능적으로 떨어지는 것을 확인할 수 있었다

 

Scenario 6. DataFrame with Parallelism (more parititions)

  • worker count : 13 (1 driver / 12 executors)
  • Execution time : 2 minutes 44 seconds
  • used dpu : 0.60
  • output : 46 files (약 33MB / file)
  • code
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

CONNECTION_NAME = 'oracle_test_connection'
session = boto3.session.Session()
client = session.client('glue')
response = client.get_connection(Name=CONNECTION_NAME)

DB_URL=response['Connection']['ConnectionProperties']['JDBC_CONNECTION_URL']
DB_USERNAME=str(response['Connection']['ConnectionProperties']['USERNAME'])
DB_PASSWORD=str(response['Connection']['ConnectionProperties']['PASSWORD'])

query = f"""(SELECT * FROM ORDER_LINE_BIG) a"""

df = spark.read.format("jdbc").option("url", DB_URL).option("user", DB_USERNAME).option("password", DB_PASSWORD).option("dbtable", query).option("partitionColumn", "OL_W_ID").option("numPartitions", 46).option("lowerBound", 1).option("upperBound", 47).load()

df.write.format('parquet').mode("overwrite").save('s3://dyc-oracle-glue-test/output/')

⇒ 시나리오 5번과 동일하게 OL_W_ID 컬럼을 기준으로 파티셔닝 후 DataFrame을 생성하여 값을 가져오도록 구성하였다

Results

  • Stage 1개에서 전부 처리됨

Stage 0

  • 시나리오 5번과 동일하게 46개의 task에 대해 12개의 executor가 균등하게 분배하여 read/write 수행함
  • 11개의 executor * 4 tasks + 1개의 executor * 2 tasks = 46 tasks
  • Shuffle도 발생하지 않았다
  • executor 당 약 360만건씩 Record 처리

⇒ 같은 파티셔닝 기준으로 DynamicFrame에 비해 훨씬 빠른 성능을 보였다

  • DynamicFrame은 4분 25초, DataFrame은 2분 44초 소요

⇒ 파티셔닝 수가 10개였던 시나리오 4번에 비해 46개로 늘린 해당 구성이 월등한 성능 향상을 보였다 (이는 worker 수를 늘려서라기 보다는 Executor의 core 수를 계산하여 최대한 많은 task가 병렬 처리될 수 있도록 파티션 수를 계산했기 때문으로 보임)

결론

  • 단순 시간 비교 (빠른 순서)

Scenario 3. DynamicFrame with Parallelism (2분 34초)

Scenario 6. DataFrame with Parallelism (more parititions) (2분 44초)

Scenario 4. DataFrame with Parallelism (4분 13초)

Scenario 5. DynamicFrame with Parallelism (more parititions) (4분 25초)

Scenario 1. DynamicFrame without Parallelism (5분 19초)

Scenario 2. DataFrame without Parallelism (12분 25초)

  • AWS Glue에서 JDBC Driver를 사용하여 데이터를 가져올 때 파티셔닝을 하여 병렬로 값을 가져왔을 때가 병렬로 가져오지 않았을 때보다 훨씬 더 빠른 성능과 비용 감소를 나타냈다
  • 1개의 Executor Core가 1개의 Task를 수행하여 병렬로 데이터를 처리할 수 있기 때문에 파티션 개수를 전체 Executor Core 수에 맞게 최적화하여 계산할 필요가 있음을 알 수 있었다
  • AWS Glue에서 사용 가능한 DynamicFrame과 Spark Native한 Dataframe의 성능 비교는 추가 테스트가 필요한 것으로 보인다

'Data Engineering' 카테고리의 다른 글

AWS EMR 클러스터 생성 가이드  (0) 2023.07.10
Amazon Linux 2 Airflow 2.4 설치 가이드 (KOR)  (0) 2023.07.10