# import SparkConf and SparkContext from pyspark import SparkConf, SparkContext # setup SparkConf for local setup wit Appname as KeyValueRDDApp sConf = SparkConf().setMaster("local").setAppName("KeyValueRDDApp") # setup SparkContext with the above sConf SparkConf object sContext = SparkContext(conf = sConf) # Python function to read each line, split the data by delimiter and return a key-value pair def processLines(line): lineFields = line.split() movieId = lineFields[1] rating = int(lineFields[2]) return (movieId, rating) # most important line to returna Key Value Pair to setup Key Value RDD # Now getting each lines of data from the data file into our 1stt RDD rddAllLines = sContext.textFile("file:///D:/dumps/BigData/ml-100k/u.data") #Now calling the mapper function to return a key value pairs to setup or Key-Value RDD movieRatingsKeyValueRDD = rddAllLines.map(processLines) # You can execute many operations on a KeyValue based RDD # reduceByKey - its helps to aggregate values based on the key # e.g - keyValueRDD.reduceByKey(lambda x,y: x+y) will add the values by key # groupByKey - its helps to group values based on the key # e.g - keyValueRDD.groupByKey(lambda x:x) will group the values by key # sortByKey - its helps to sort values based on the key # e.g - keyValueRDD.sortByKey() will add the values by key # Important - use mapValues() instead of map() if key is not going to be modified # Important - use flatMapValues() instead of flatMap() if key is not going to be modified movieRatingsbyCountRDD = movieRatingsKeyValueRDD.mapValues(lambda x: (x,1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) # apply filtering if needed, e.g filter movies which have at least rated by 10 people movieRatingsbyCountFiltered = movieRatingsbyCountRDD.filter(lambda x : x[1][1] >= 10) # get average movie ratings averageMovieRatingsRDD = movieRatingsbyCountFiltered.mapValues(lambda x : x[0]/x[1]) # sort movie ratings - False means descending sortedMovieRatings = averageMovieRatingsRDD.sortBy(lambda x : x[1], False) # get top 20 - based on the ratings column sortedMovieRatingsTop = sortedMovieRatings.top(20, lambda x: x[1]) # show data for item in sortedMovieRatingsTop: print(item)