Java – Apache Spark 処理の実装 Word Count No.30

自己投資としてチャレンジしている内容を Blog で公開しています。今回は Apache Spark を利用し、ある文字を含む行をカウントするコードについて紹介します。

————————————
▼1. Apache Spark のメリット
————————————
Spark Job では、イン-メモリで処理を行うことにより、高速に処理を実装することができます。また 複数の worker ノードに分散されたデータセットをローカルのデータのように扱うことが可能です。

————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。
Apache Spark インストール – 3 ノード No.29

2-2. Spark で処理するデータを HDFS 上に置きます。
2-2-1. ホームディレクトリの作成後、その配下にディレクトリを作成。

hdfs dfs -mkdir -p /user/hadoop
hdfs dfs -mkdir appbox

2-2-2. ファイルをディレクトリにコピーします。
今回は Spark をインストールした際得られた Readme.md を利用します。
Readme.md を 作成したディレクトリ appbox にコピーします。あとで行う wordcount の入力のファイルとして利用します。

hdfs dfs -put README.md appbox

2-2–3. コピーしたファイルの確認 

 hdfs dfs -ls appbox

--- 出力結果の例 ----
Found 1 items
-rw-r--r-- 1 hadoop supergroup xxx 2021-07-04 10:00 appbox/README.md
--------

————————————
▼3. WordCount を行う Spark の Java コード
————————————
3-1. Sparktest のフォルダを作成し、以下のディレクトリも作成します。

Mkdir ~/Sparktest
cd ~/Sparktest

以下のような階層でフォルダおよびファイルを作成します。

./pom.xml (3-2 を参照)
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java (3-3 を参照)

3-2. pom.xml を作成し。Sparktest 配下 (./pom.xml) に置きます。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.2</version>
    </dependency>
  </dependencies>
</project>

3-3. SimpleApp.java を作成し。/src/main/java/ 配下に置きます。
以下のコードでは README.md に含まれる、a および b の文字を含む行のカウントしています。コードや pom.xml をダウンロードされたい方は以降の ”5. サンプルコード販売” ご確認ください。

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/user/hadoop/appbox/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
    sc.stop();
  }
}

3-4. maven をインストールします。

wget https://downloads.apache.org/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.tar.gz  -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.1/bin/mvn -version

Maven の version の出力結果

Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: /opt/apache-maven-3.8.1
Java version: 1.8.0_292, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-1113-azure", arch: "amd64", family: "unix"

3-5. コンパイルします。

cd Sparktest
/opt/apache-maven-3.8.1/bin/mvn package

3-6. 生成された jar を実行します。

spark-submit --class "SimpleApp" --master yarn ./target/simple-project-1.0.jar > simple-project-1.0.jar4.log 2>&1
// simple-project-1.0.jar4.log
2021-07-05 00:11:49,527 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 32 ms on wn210622wn1 (executor 1) (2/2)
2021-07-05 00:11:49,529 INFO scheduler.DAGScheduler: ResultStage 1 (count at SimpleApp.java:16) finished in 0.181 s
2021-07-05 00:11:49,529 INFO scheduler.DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
2021-07-05 00:11:49,531 INFO cluster.YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
2021-07-05 00:11:49,531 INFO cluster.YarnScheduler: Killing all running tasks in stage 1: Stage finished
2021-07-05 00:11:49,532 INFO scheduler.DAGScheduler: Job 1 finished: count at SimpleApp.java:16, took 0.196700 s
Lines with a: 64, lines with b: 32
2021-07-05 00:11:49,549 INFO server.AbstractConnector: Stopped Spark@3732977{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2021-07-05 00:11:49,553 INFO ui.SparkUI: Stopped Spark web UI at http://masternodet:4040

YARN UI は以下のようになります。

Cluster 
About 
Nodes 
Node Labels 
Applications 
NEW SAVING 
SUBMITTED 
ACCEPTED 
RUNNING 
FINISHED 
KILLED 
Scheduler 
Tools 
Cluster Metrics 
Apps Submitted 
3 
Cluster Nodes Metrics 
Active Nodes 
2 
Scheduler Metrics 
Scheduler Type 
Capacity Scheduler 
Show 20 v entries 
duplication 1625449985973 0003 
duplication 1625449985973 0002 
duplication 1625449985973 0001 
Showing 1 to 3 of 3 entries 
Apps Pending 
Apps Running 
Apps Completed 
1 
2 
Decommissioning Nodes 
Scheduling Resource Type 
[memory-mb (unit=Mi), vcores] 
User 
hadoop 
hadoop 
hadoop 
Name 
Simple 
Application 
Python 
spark SQL 
basic 
example 
Python 
spark SQL 
basic 
example 
Application 
Type 
SPARK 
SPARK 
SPARK 
Application 
lags 
Queue 
default 
default 
default 
Applil 
pric

————————————
▼4. 参考情報
————————————
Quick Start – Spark 0.7.3 Documentation (apache.org)


 以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です