Analyzing video dataset in Spark

 In this tutorial we will analyze the video dataset and process it through spark dataframe. We will see how we can use SQL queries to perform various operations on it.

First we will load the data from a csv file with below command:

val video_dataDF_csv = spark.read.format("csv").option("header", "true").option("delimiter", ",").load("/user/sasmitsb4081/project/batch/data/data_gen.csv")

We will create a temp table on the dataframe to perform the SQL queries.

video_dataDF_csv.registerTempTable("video_user_data")

registerTempTable is depreciated so we can use createOrReplaceTempView :

video_dataDF_csv.createOrReplaceTempView("video_user_data")

We can import the SQL context so that sql operations can be performed.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Here 'sc' is the Spark context.

1. Total minutes played by each video where the video end was successful. Here are the top 10 : 

sqlContext.sql("""SELECT video_id,SUM(minutes_played) AS duration FROM video_user_data WHERE video_end_type=0 GROUP BY video_id ORDER BY SUM(minutes_played) DESC LIMIT 10""").show()

Here all the videos where video_end_typ = 0 means the video was successful.

scala> sqlContext.sql("""SELECT video_id,SUM(minutes_played) AS duration FROM video_user_data WHERE video_end_type=0 GROUP BY video_id ORDER BY SUM(minutes_played) DESC LIMIT 10""").show()

+--------+--------+

|video_id|duration|

+--------+--------+

|    9675|   400.0|

|    6084|   400.0|

|    2493|   400.0|

|    2093|   399.0|

|    3689|   399.0|

|    1694|   399.0|

|    5684|   399.0|

|    4088|   399.0|

|    9674|   399.0|

|    4886|   399.0|

+--------+--------+

2. Top 10 distinct video played by distinct user for each channel where the video was Liked.

scala> sqlContext.sql("""SELECT channel_id, COUNT(DISTINCT video_id) AS total_distinct_videos_played, COUNT(DISTINCT user_id) AS  distinct_user_count FROM video_user_data WHERE liked='True' GROUP BY channel_id ORDER BY total_distinct_videos_played DESC LIMIT 10""").show()

+----------+----------------------------+-------------------+                   

|channel_id|total_distinct_videos_played|distinct_user_count|

+----------+----------------------------+-------------------+

|      3057|                           2|                  2|

|      1773|                           2|                  2|

|       307|                           2|                  2|

|      3167|                           2|                  2|

|      3902|                           2|                  2|

|      1445|                           2|                  2|

|       475|                           2|                  2|

|       613|                           2|                  2|

|      3993|                           2|                  2|

|      3491|                           2|                  2|

+----------+----------------------------+-------------------+

3. No of users per geographical area:

scala>sqlContext.sql("select distinct count(distinct user_id) as Number_of_users,geo_cd as Geographical_area from video_user_data where geo_c
d is not null group by geo_cd order by Number_of_users desc").show()

+---------------+-----------------+
|Number_of_users|Geographical_area|
+---------------+-----------------+
|           1874|               AP|
|           1852|               JP|
|           1812|               AU|
|           1807|               EU|
|           1756|               AA|
+---------------+-----------------+

Spark submit execution :

spark-submit --class com.video.analytics.Video_Analysis \

> /home/sasmitsb4081/video_dataset_2.11-0.1.jar \

> master yarn

O/p:

+--------+--------+|video_id|duration|+--------+--------+| 2493| 400.0|| 6084| 400.0|| 9675| 400.0|| 2093| 399.0|| 3689| 399.0|| 1694| 399.0|| 4088| 399.0|| 5684| 399.0|| 9674| 399.0|| 4886| 399.0|+--------+--------+

Newest
Previous
Next Post »