normalian blog

Let's talk about Microsoft Azure, ASP.NET and Java!

Create joined query result from Nikkei and DJIA using Spark APIs with HDInsight

In previous topic, I have introduced how to use Hive tables with HDInsight in How to use Hive tables in HDInsight cluster with Nikkei and DJIA. I will introduce how to use Spark APIs with HDInsight in this topic.

requirements

You have to complete below requirements to follow this topic.

Modify csv file titles

You have already downloaded USDJPY.csv and nikkei_stock_average_daily_jp.csv files, but titles of the csv files are written by Japanese. Modify the titles into English to use from Spark APIs easily like below.

  • USDJPY.csv file
日付,始値,高値,安値,終値
2007/04/02,117.84,118.08,117.46,117.84

DATE,OPEN,HIGH,LOW,CLOSE
2007/04/02,117.84,118.08,117.46,117.84
  • nikkei_stock_average_daily_jp.csv
データ日付,終値,始値,高値,安値
"2014/01/06","15908.88","16147.54","16164.01","15864.44"

DATE,CLOSE,OPEN,HIGH,LOW
"2014/01/06","15908.88","16147.54","16164.01","15864.44"

And save the csv files as "USDJPY_en.csv" and "nikkei_stock_average_daily_en.csv". And upload the csv files into your Azure Storage associated with your Spark cluster like below.
f:id:waritohutsu:20170903124441p:plain

Refer below URL and Path example if you can't figure out which path you should locate the csv files, because some people sometimes confuse them.

Create Spark application with Scala

At first, refer https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-intellij-tool-plugin. You have to follow the topic until "Run a Spark Scala application on an HDInsight Spark cluster" at section "Run a Spark Scala application on an HDInsight Spark cluster". Now, you have a skeleton of your spark application. Update your scala file like below.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.{SaveMode, SparkSession}

object MyClusterApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MyClusterApp").getOrCreate()

    val dataset_djia = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/DJIA.csv"
    val dataset_nikkei = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/nikkei_stock_average_daily_en.csv"
    val dataset_usdjpy = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/USDJPY_en.csv"

    // Load csv files and create a DataFrame in temp view, you have to change this when your data will be massive
    val df_djia = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_djia)
    df_djia.createOrReplaceTempView("djia_table")
    val df_nikkei = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_nikkei)
    df_nikkei.createOrReplaceTempView("nikkei_table")
    val df_usdjpy = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_usdjpy)
    df_usdjpy.createOrReplaceTempView("usdjpy_table")

    // Spark reads DJIA date as "DATE" type but it reads Nikkei and USDJPY date as "STRING", so you have to cast the data type like below.
    val retDf = spark.sql("SELECT djia_table.DATE, djia_table.DJIA, nikkei_table.CLOSE/usdjpy_table.CLOSE as Nikkei_Dollar FROM djia_table INNER JOIN nikkei_table ON djia_table.DATE = from_unixtime(unix_timestamp(nikkei_table.DATE , 'yyyy/MM/dd')) INNER JOIN usdjpy_table on djia_table.DATE = from_unixtime(unix_timestamp(usdjpy_table.DATE , 'yyyy/MM/dd'))")
    //val retDf = spark.sql("SELECT * FROM usdjpy_table")
    retDf.write
      .mode(SaveMode.Overwrite)
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save("wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/sparkresult")
  }
}

After updating your scala file, run the application following https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-intellij-tool-plugin. You can find result files in your Azure Storage like below if your setup is correct!
f:id:waritohutsu:20170903124521p:plain

Download the part-xxxxxxxxxxxxxxx-xxx-xxxxx.csv file and check the content, and you can get date, DJIA dollar and Nikkei dollar data like below.

DATE,DJIA,Nikkei_Dollar
2014-01-06T00:00:00.000Z,16425.10,152.64709268854347
2014-01-07T00:00:00.000Z,16530.94,151.23238022377356
2014-01-08T00:00:00.000Z,16462.74,153.77193819152996
2014-01-09T00:00:00.000Z,16444.76,151.54432674873556
2014-01-10T00:00:00.000Z,16437.05,152.83892037268274
2014-01-14T00:00:00.000Z,16373.86,148.021883098186
2014-01-15T00:00:00.000Z,16481.94,151.22182896498947
2014-01-16T00:00:00.000Z,16417.01,150.96539162112933
2014-01-17T00:00:00.000Z,16458.56,150.79988499137434
2014-01-20T00:00:00.000Z,.,150.18415746519443
2014-01-21T00:00:00.000Z,16414.44,151.47640966628308
2014-01-22T00:00:00.000Z,16373.34,151.39674641148324
2014-01-23T00:00:00.000Z,16197.35,151.97414794732765
2014-01-24T00:00:00.000Z,15879.11,150.54342723004694