Create joined query result from Nikkei and DJIA using Spark APIs with HDInsight
In previous topic, I have introduced how to use Hive tables with HDInsight in How to use Hive tables in HDInsight cluster with Nikkei and DJIA. I will introduce how to use Spark APIs with HDInsight in this topic.
requirements
You have to complete below requirements to follow this topic.
- Download Nikkei, DJIA, USD/JPY CSV files
- USD/JPY CSV files: Go to http://www.m2j.co.jp/market/historical.php, and download from "日足" link
- Refer previous topic to download Nikkei and DJIA csv files
- Create HDInsight Spark Cluster in your Azure subscriptions
- Install IntelliJ and Azure plugin
- Install IntelliJ plugin with refering https://docs.microsoft.com/en-us/azure/azure-toolkit-for-intellij-installation
Modify csv file titles
You have already downloaded USDJPY.csv and nikkei_stock_average_daily_jp.csv files, but titles of the csv files are written by Japanese. Modify the titles into English to use from Spark APIs easily like below.
- USDJPY.csv file
日付,始値,高値,安値,終値 2007/04/02,117.84,118.08,117.46,117.84 DATE,OPEN,HIGH,LOW,CLOSE 2007/04/02,117.84,118.08,117.46,117.84
- nikkei_stock_average_daily_jp.csv
データ日付,終値,始値,高値,安値 "2014/01/06","15908.88","16147.54","16164.01","15864.44" DATE,CLOSE,OPEN,HIGH,LOW "2014/01/06","15908.88","16147.54","16164.01","15864.44"
And save the csv files as "USDJPY_en.csv" and "nikkei_stock_average_daily_en.csv". And upload the csv files into your Azure Storage associated with your Spark cluster like below.
Refer below URL and Path example if you can't figure out which path you should locate the csv files, because some people sometimes confuse them.
- URL: https://hellosparkatxxxxxxxtorage.blob.core.windows.net/hellosparkxxxxxxx-2017-08-77777-33-yy-zzz/financedata/nikkei_stock_average_daily_en.csv
- Path from Spark API or Hive: "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/nikkei_stock_average_daily_en.csv"
Create Spark application with Scala
At first, refer https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-intellij-tool-plugin. You have to follow the topic until "Run a Spark Scala application on an HDInsight Spark cluster" at section "Run a Spark Scala application on an HDInsight Spark cluster". Now, you have a skeleton of your spark application. Update your scala file like below.
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.types.TimestampType import org.apache.spark.sql.{SaveMode, SparkSession} object MyClusterApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("MyClusterApp").getOrCreate() val dataset_djia = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/DJIA.csv" val dataset_nikkei = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/nikkei_stock_average_daily_en.csv" val dataset_usdjpy = "wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/USDJPY_en.csv" // Load csv files and create a DataFrame in temp view, you have to change this when your data will be massive val df_djia = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_djia) df_djia.createOrReplaceTempView("djia_table") val df_nikkei = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_nikkei) df_nikkei.createOrReplaceTempView("nikkei_table") val df_usdjpy = spark.read.options(Map("header" -> "true", "inferSchema" -> "true", "ignoreLeadingWhiteSpace" -> "true")).csv(dataset_usdjpy) df_usdjpy.createOrReplaceTempView("usdjpy_table") // Spark reads DJIA date as "DATE" type but it reads Nikkei and USDJPY date as "STRING", so you have to cast the data type like below. val retDf = spark.sql("SELECT djia_table.DATE, djia_table.DJIA, nikkei_table.CLOSE/usdjpy_table.CLOSE as Nikkei_Dollar FROM djia_table INNER JOIN nikkei_table ON djia_table.DATE = from_unixtime(unix_timestamp(nikkei_table.DATE , 'yyyy/MM/dd')) INNER JOIN usdjpy_table on djia_table.DATE = from_unixtime(unix_timestamp(usdjpy_table.DATE , 'yyyy/MM/dd'))") //val retDf = spark.sql("SELECT * FROM usdjpy_table") retDf.write .mode(SaveMode.Overwrite) .format("com.databricks.spark.csv") .option("header", "true") .save("wasb://hellosparkxxxxxxx-2017-08-77777-33-yy-zzz@hellosparkatxxxxxxxtorage.blob.core.windows.net/financedata/sparkresult") } }
After updating your scala file, run the application following https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-apache-spark-intellij-tool-plugin. You can find result files in your Azure Storage like below if your setup is correct!
Download the part-xxxxxxxxxxxxxxx-xxx-xxxxx.csv file and check the content, and you can get date, DJIA dollar and Nikkei dollar data like below.
DATE,DJIA,Nikkei_Dollar 2014-01-06T00:00:00.000Z,16425.10,152.64709268854347 2014-01-07T00:00:00.000Z,16530.94,151.23238022377356 2014-01-08T00:00:00.000Z,16462.74,153.77193819152996 2014-01-09T00:00:00.000Z,16444.76,151.54432674873556 2014-01-10T00:00:00.000Z,16437.05,152.83892037268274 2014-01-14T00:00:00.000Z,16373.86,148.021883098186 2014-01-15T00:00:00.000Z,16481.94,151.22182896498947 2014-01-16T00:00:00.000Z,16417.01,150.96539162112933 2014-01-17T00:00:00.000Z,16458.56,150.79988499137434 2014-01-20T00:00:00.000Z,.,150.18415746519443 2014-01-21T00:00:00.000Z,16414.44,151.47640966628308 2014-01-22T00:00:00.000Z,16373.34,151.39674641148324 2014-01-23T00:00:00.000Z,16197.35,151.97414794732765 2014-01-24T00:00:00.000Z,15879.11,150.54342723004694