Java – Spark 処理の実装 WordCount (Spark SQL の利用) No.31

自己投資としてチャレンジしている内容を Blog で公開しています。 

今回は Apache Spark を利用し、ある文字を含む行をカウントするコードについて紹介します。
前回は Resilient Distributed dataset (RDD) を使った WordCount でしたが、今回は Spark SQL を利用したより省エネで高いパフォーマンスを実現する方法となります。

▼1. RDD と Spark SQL

Spark がリリースされた当初、分散処理に Resilient Distributed dataset (RDD) API として org.apache.spark.api.java.JavaRDD のライブラリが Java で利用されていました。現在では Spark SQL の org.apache.spark.sql.Dataset のライブラリが利用され、より最適化されたリソースの消費量が少ない API に改良されています。今回は Spark SQL を利用し、前回同様ある文字が含まれる行の数をカウントするコードを紹介します。


▼2. 事前準備

2-1. Apache Spark クラスターを作成

詳細は以前 blog で紹介した下記サイトを参照ください。Apache Spark インストール – 3 ノード No.29

2-2. Spark で処理するデータを HDFS 上に配置

前回の blog と同じです。既に実行済みの場合は飛ばします。

2-2-1. ホームディレクトリの作成後、その配下にディレクトリを作成

hdfs dfs -mkdir -p /user/hadoop
hdfs dfs -mkdir appbox

2-2-2.ファイルをディレクトリにコピー

今回は Spark をインストールした際得られた Readme.md を利用します。
Readme.md を 作成したディレクトリ appbox にコピーします。以降で行う wordcount の入力のファイルとして利用します。

hdfs dfs -put README.md appbox

2-2-3. コピーしたファイルの確認

hdfs dfs -ls appbox
--- 出力結果の例 ----
Found 1 items
-rw-r--r-- 1 hadoop supergroup xxx 2021-07-04 10:00 appbox/README.md
--------

▼3. Spark SQL を使って WordCount を行う Java コード

3-1. Sparktest のフォルダ作成および、ディレクトリ作成

mkdir ~/Sparkdstest
cd ~/Sparkdstest

以下のような階層でフォルダおよびファイルを作成していきます。

./pom.xml (3-2)
./src
./src/main
./src/main/java
./src/main/java/SimpleAppds.java (3-3)

3-2. pom.xml を作成し Sparktest 配下 (./pom.xml) を配置

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.1.2</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.3</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
       </configuration>
     </plugin>
   </plugins>
  </build>
</project>

3-3. SimpleAppds.java を作成し /src/main/java/ 配下に配置

以下のコードでは README.md に含まれる、a および b の文字を含む行のカウントしています。コードや pom.xml をダウンロードされたい方は以降の ”5. サンプルコード販売” ご確認ください。

/* SimpleAppds.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleAppds {
  public static void main(String[] args) {
    String logFile = "/user/hadoop/appbox/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application Dataset").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter((org.apache.spark.api.java.function.FilterFunction<String>)s -> s.contains("a")).count();
    long numBs = logData.filter((org.apache.spark.api.java.function.FilterFunction<String>)s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

3-4. maven をインストール

前回の blog で実行している場合は飛ばします。

wget https://downloads.apache.org/maven/maven-3/3.8.1/binaries/apache-maven-3.8.1-bin.tar.gz  -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.1/bin/mvn -version
---
Apache Maven 3.8.1 (05c21c65bdfed0f71a2f2ada8b84da59348c4c5d)
Maven home: /opt/apache-maven-3.8.1
Java version: 1.8.0_292, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-1113-azure", arch: "amd64", family: "unix"

3-5. コンパイル

cd Sparktest
/opt/apache-maven-3.8.1/bin/mvn package

3-6. 生成された jar を実行

spark-submit --class "SimpleAppds" --master yarn ./target/simple-project-1.0.jar > simple-project-1.0.jar4.log 2>&1
// simple-project-1.0.jards2.log の一部
2021-07-07 23:24:36,389 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 3.0 (TID 3) in 87 ms on wn210622wn1 (executor 1) (1/1)
2021-07-07 23:24:36,394 INFO scheduler.DAGScheduler: ResultStage 3 (count at SimpleAppds.java:12) finished in 0.110 s
2021-07-07 23:24:36,395 INFO scheduler.DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
2021-07-07 23:24:36,398 INFO cluster.YarnScheduler: Removed TaskSet 3.0, whose tasks have all completed, from pool
2021-07-07 23:24:36,398 INFO cluster.YarnScheduler: Killing all running tasks in stage 3: Stage finished
2021-07-07 23:24:36,399 INFO scheduler.DAGScheduler: Job 1 finished: count at SimpleAppds.java:12, took 0.256345 s
Lines with a: 64, lines with b: 32
2021-07-07 23:24:36,422 INFO server.AbstractConnector: Stopped Spark@2c0d73ec{HTTP/1.1, (http/1.1)}{0.0.0.0:4040}
2021-07-07 23:24:36,425 INFO ui.SparkUI: Stopped Spark web UI at http://masternode:4040

YarnUI では以下のように ApplicationId がリストされます。

• Cluster 
About 
Nodes 
Node Labels 
Applications 
NEW SAVING 
SUBMITTED 
ACCEPTED 
RUNNING 
FINISHED 
KILLED 
Scheduler 
Tools 
Cluster Metrics 
Apps Submitted 
3 
Cluster Nodes Metrics 
Active Nodes 
2 
Scheduler Metrics 
Scheduler Type 
Capacity Scheduler 
Show 20 v entries 
duplication 1625665449048 0003 
duplication 1625665449048 0002 
duplication 1625665449048 0001 
Showing 1 to 3 of 3 entries 
Apps Pending 
Apps Running 
Apps Completed 
3 
Decommissioning Nodes 
Scheduling Resource Type 
[memory-mb (unit—Mi), vcores] 
User 
hadoop 
hadoop 
hadoop 
Name 
Simple 
Application 
Dataset 
Simple 
Application 
Simple 
Application 
Application 
Type 
SPARK 
SPARK 
SPARK 
Application 
Tags 
Queue 
default 
default 
default 
Applicat 
Priori'

▼4. 参考情報


以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です