自己投資としてチャレンジしている内容を Blog で公開しています。今回は Apache Spark を利用し、ある文字を含む行をカウントするコードについて紹介します。
————————————
▼1. Apache Spark のメリット
————————————
Spark Job では、イン-メモリで処理を行うことにより、高速に処理を実装することができます。また 複数の worker ノードに分散されたデータセットをローカルのデータのように扱うことが可能です。
————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。
Apache Spark インストール – 3 ノード No.29
2-2. Spark で処理するデータを HDFS 上に置きます。
2-2-1. ホームディレクトリの作成後、その配下にディレクトリを作成。
hdfs dfs -mkdir -p /user/hadoop
hdfs dfs -mkdir appbox2-2-2. ファイルをディレクトリにコピーします。
今回は Spark をインストールした際得られた Readme.md を利用します。
Readme.md を 作成したディレクトリ appbox にコピーします。あとで行う wordcount の入力のファイルとして利用します。
hdfs dfs -put README.md appbox2-2–3. コピーしたファイルの確認
hdfs dfs -ls appbox
--- 出力結果の例 ----
Found 1 items
-rw-r--r-- 1 hadoop supergroup xxx 2021-07-04 10:00 appbox/README.md
--------————————————
▼3. WordCount を行う Spark の Java コード
————————————
3-1. Sparktest のフォルダを作成し、以下のディレクトリも作成します。
Mkdir ~/Sparktest
cd ~/Sparktest以下のような階層でフォルダおよびファイルを作成します。
./pom.xml (3-2 を参照) ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java (3-3 を参照)
3-2. pom.xml を作成し。Sparktest 配下 (./pom.xml) に置きます。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>
</dependencies>
</project>3-3. SimpleApp.java を作成し。/src/main/java/ 配下に置きます。
以下のコードでは README.md に含まれる、a および b の文字を含む行のカウントしています。コードや pom.xml をダウンロードされたい方は以降の ”5. サンプルコード販売” ご確認ください。
/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "/user/hadoop/appbox/README.md"; // Should be some file on your system
SparkConf conf = new SparkConf().setAppName("Simple Application");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
sc.stop();
}
}3-4. maven をインストールします。
wget https://downloads.apache.org/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.tar.gz -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.1/bin/mvn -versionMaven の version の出力結果
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: /opt/apache-maven-3.8.1
Java version: 1.8.0_292, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-1113-azure", arch: "amd64", family: "unix"3-5. コンパイルします。
cd Sparktest
/opt/apache-maven-3.8.1/bin/mvn package3-6. 生成された jar を実行します。
spark-submit --class "SimpleApp" --master yarn ./target/simple-project-1.0.jar > simple-project-1.0.jar4.log 2>&1// simple-project-1.0.jar4.log
2021-07-05 00:11:49,527 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 32 ms on wn210622wn1 (executor 1) (2/2)
2021-07-05 00:11:49,529 INFO scheduler.DAGScheduler: ResultStage 1 (count at SimpleApp.java:16) finished in 0.181 s
2021-07-05 00:11:49,529 INFO scheduler.DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
2021-07-05 00:11:49,531 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
2021-07-05 00:11:49,531 INFO cluster.YarnScheduler: Killing all running tasks in stage 1: Stage finished
2021-07-05 00:11:49,532 INFO scheduler.DAGScheduler: Job 1 finished: count at SimpleApp.java:16, took 0.196700 s
Lines with a: 64, lines with b: 32
2021-07-05 00:11:49,549 INFO server.AbstractConnector: Stopped Spark@3732977{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2021-07-05 00:11:49,553 INFO ui.SparkUI: Stopped Spark web UI at http://masternodet:4040YARN UI は以下のようになります。
![Cluster
About
Nodes
Node Labels
Applications
NEW SAVING
SUBMITTED
ACCEPTED
RUNNING
FINISHED
KILLED
Scheduler
Tools
Cluster Metrics
Apps Submitted
3
Cluster Nodes Metrics
Active Nodes
2
Scheduler Metrics
Scheduler Type
Capacity Scheduler
Show 20 v entries
duplication 1625449985973 0003
duplication 1625449985973 0002
duplication 1625449985973 0001
Showing 1 to 3 of 3 entries
Apps Pending
Apps Running
Apps Completed
1
2
Decommissioning Nodes
Scheduling Resource Type
[memory-mb (unit=Mi), vcores]
User
hadoop
hadoop
hadoop
Name
Simple
Application
Python
spark SQL
basic
example
Python
spark SQL
basic
example
Application
Type
SPARK
SPARK
SPARK
Application
lags
Queue
default
default
default
Applil
pric](https://www.kumoman.net/site/wp-content/uploads/2021/07/image-1024x695.png)
————————————
▼4. 参考情報
————————————
Quick Start – Spark 0.7.3 Documentation (apache.org)
以上です。参考になりましたら幸いです。