[GCP] Java Beam SDK & Maven으로 Dataflow 수행하기
Apache Beam SDK는 data processing pipeline을 구성하기 위한 오픈소스 프로그래밍 모델이다. Dataflow는 이 파이프라인을 구성하기 위한 여러 런타임 중 하나다. Beam과 Dataflow의 기본에 대해서는 이전에 [GCP] Apache Beam 알아보기 에서 살펴본 적이 있다.
Dataflow Pipeline을 수행할 수 있는 방법은 여러가지가 있다.
- Template을 활용해 Web Console에서 바로 수행하거나,
- REST API 또는
- CLI를 통한 수행 방법,
- 그리고 프로그래밍적인 방법으로 Python, Java SDK를 사용하는 방법이 있다.
Template이란 구글에서 미리 Template화 해놓은 Beam 데이터 처리 코드라고 보면 된다.
가장 간편한 방법은 Google에서 제공하는 이 Template을 사용하는 것인데, 간편하게 GCP Web Console에서 키워드 몇개+간단한 코드 스니펫만 입력하고 바로 수행할 수도 있다. 따라서 복잡한 Data Manipulation 작업이 없는 단순 ETL, Data Processing 작업이라면 Template활용이 효율적일 수 있다. 다만 운영 작업을 매번 Web Console에서 생성할 수는 없으므로 REST API 또는 CLI를 사용해서 간단히 Pipeline Job을 생성할 수 있도록 제공한다.
자세한 내용은 아래 링크를 참조하자.
아래는 Batch Template의 일부인데, 다양한 데이터 소스로부터 데이터 타겟을 설정하여 프로세싱할 수 있다.
Batch Templates
BigQuery to Cloud Storage TFRecords
BigQuery export to Parquet (via Storage API)
BigQuery to Elasticsearch
Bigtable to Cloud Storage Avro
Bigtable to Cloud Storage Parquet
Bigtable to Cloud Storage SequenceFiles
Cloud Storage Avro to Bigtable
Cloud Storage Parquet to Bigtable
Cloud Storage Text to BigQuery
Cloud Storage Text to Pub/Sub (Batch)
Cloud Storage Text to Cloud Spanner
Cloud Storage to Elasticsearch
Java Database Connectivity (JDBC) to BigQuery
그 외에 Data의 가공이 필요한 경우는 Python 또는 Java SDK를 통해 코딩을 하고 원하는 대로 데이터를 가공하도록 JOB을 만들고, 수행할 수 있다.
웹콘솔 등에서 Template Job을 수행하는 방법은 상당히 간단해서 위 링크에도 상세히 나와있으므로, 이 글에서는 Java Beam SDK를 통해 Pipeline Job을 수행해보려고 한다.
Java Beam SDK를 통해 Pipeline Job 수행하기
기본적으로 Service Account등의 머신계정 및 GCS Bucket 이 필요하다. 이 부분에 대한 설정이 필요한 경우, 아래 공식 문서의 before-you-begin 섹션을 참고하자.
JDK와 Maven 설치가 되어있지 않다면 이를 먼저 설치해야한다.
JDK 설치
JDK 11 다운로드
$ sudo apt-get install openjdk-11-jdk$ java -version
openjdk version "11.0.14" 2022-01-18
OpenJDK Runtime Environment (build 11.0.14+9-post-Debian-1deb10u1)
OpenJDK 64-Bit Server VM (build 11.0.14+9-post-Debian-1deb10u1, mixed mode, sharing)
PATH 환경변수 설정
$ vi .bashrc
...
export JAVA_HOME=$(dirname $(dirname $(readlink -f $(which java))))
export PATH=$PATH:$JAVA_HOME/bin
...$ source .bashrc
$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk-amd64
Install Maven
위 사이트 Files-Link menu에서 install file의 주소를 확인한다.
$ wget https://dlcdn.apache.org/maven/maven-3/3.8.4/binaries/apache-maven-3.8.4-bin.tar.gz$ tar -xvf apache-maven-3.8.4-bin.tar.gz# create a symbolic link
$ ln -s apache-maven-3.8.4 maven# set a PATH
$ vi .basrc
...
export PATH=$PATH:/opt/apache-maven-3.8.4/bin
...$ source .bashrc
Create Maven Project
공식 홈페이지서 제공하는 예제인 wordcount를 수행하기 위한 Maven project는 아래와 같이 Maven Archetype Plugin을 사용해서 생성할 수 있다.
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.35.0 \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
위 명령어는 word-count-beam이라는 디렉토리를 생성하며 pom.xml파일과 몇개의 샘플 pipeline을 생성한다.
Run
wordcount Job을 local 환경에서 먼저 수행해보자.
아래와 같은 명령어로 수행해볼 수 있다.
$ mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--output=counts"
이제 Dataflow service 환경에서 수행해보자.
$ mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
Dataflow Template
다른 형태의 Pipeline들도 한번 수행해보자.
예를 들어 위에서 언급했던 Dataflow Template들을 가지고 테스트해보자.
Batch, Streaming 처리 타입별로 각각 수십여개의 Template이 있다.
Template의 소스코드는 아래 git에서 확인 가능하다.
해당 소스를 clone한다.
$ git clone https://github.com/GoogleCloudPlatform/DataflowTemplates.git
이 글에서는 위의 Template 중, Running the Cloud Storage Text to BigQuery (Stream) Template으로 테스트해보고자 한다. 이름에서 보다시피 GCS(Google Cloud Storage)의 CSV 파일 등 Text파일을 읽어서 BigQuery 로드를 Streaming 형태로 처리하는 파이프라인이다.
해당 파일의 Schema를 정의한 JSON 및 Javascript처리 로직 등 일부 파일을 미리 GCS에 올려두어야 한다.
Schema Definition JSON File
{
"BigQuery Schema": [
{
"name": "location",
"type": "STRING"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "STRING"
},
{
"name": "color",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "coffee",
"type": "STRING",
"mode": "REQUIRED"
}
]
}
JavaScript (.js) with UDF function
텍스트를 transform(이 경우 CSV를 split하는)해서 JSON string을 리턴하는 function을 작성한다.
function transform(line) {
var values = line.split(‘,’);var obj = new Object();
obj.location = values[0];
obj.name = values[1];
obj.age = values[2];
obj.color = values[3];
obj.coffee = values[4];
var jsonString = JSON.stringify(obj);return jsonString
각 Template별로 필수 argument가 다르며, argument에 입력로 받을 json, javascript UDF 등이모두 다르므로 Google-provided templates 페이지에서 사용 전에 확인하자.
RUN
이제 Dataflow service 환경에서 Job을 생성해보자.
github의 해당 Template의 소스를 참고하면, 각 Template별 Sample mvn compile script가 있다.
$ mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.TextToBigQueryStreaming \
-Dexec.args="\
--project=${PROJECT_ID} \
--stagingLocation=gs://${STAGING_BUCKET}/staging \
--tempLocation=gs://${STAGING_BUCKET}/tmp \
--runner=DataflowRunner \
--inputFilePattern=gs://path/to/input* \
--JSONPath=gs://path/to/json/schema.json \
--outputTable={$PROJECT_ID}:${OUTPUT_DATASET}.${OUTPUT_TABLE} \
--javascriptTextTransformGcsPath=gs://path/to/transform/udf.js \
--javascriptTextTransformFunctionName=${TRANSFORM_NAME} \
--bigQueryLoadingTemporaryDirectory=gs://${STAGING_BUCKET}/tmp \
--outputDeadletterTable=${PROJECT_ID}:${ERROR_DATASET}.${ERROR_TABLE}"
위 Example Usage script 에서는 없었는데, region이 없어서 오류가 난다.
` — region=us-central1` 를 argument에 추가해주고, 다시 compile하여 BUILD SUCCESS된 것을 볼 수 있다.
수행 결과 Job Name과 ID는 자동생성되며, GCP Web Console에서도 Job이 생성된 것을 확인 가능하다.
이로써 간편하게 GCS 파일을 BigQuery로 로드할 수 있는 Streaming 파이프라인이 완성되었다.
References
Disclaimer
위 글은 개인적으로 작성한 것입니다. Google의 공식 문서 및 의견과 다를 수 있습니다.