# How to execute : spark-submit sparkSimilarMovies100k.py 50 # Import sys, SparkConf, SparkContext and sqrt import sys from pyspark import SparkConf, SparkContext from math import sqrt # Python function for getting movie data as dict of (movie ID : Movie Title) def getMovieData(): movieData = {} with open("/home/user/bigdata/datasets/ml-100k/u.item") as fl: for ln in fl: fields = ln.split("|") movieData[int(fields[0])] = fields[1] return movieData # Python function for getting movie pairs from ratings def getPairs( usersRatings ): ratings = usersRatings[1] (movie1, rating1) = ratings[0] (movie2, rating2) = ratings[1] return ((movie1, movie2), (rating1, rating2)) # Python function for striping similar movies ratings from ratings def stripDuplicates( usersRatings ): ratings = usersRatings[1] (movie1, rating1) = ratings[0] (movie2, rating2) = ratings[1] return movie1 < movie2 # Python function for computing Similarity between movies def computeCosineMoviesSimilarity(ratingPairs): numPairs = 0 sum_xx = sum_yy = sum_xy = 0 for ratingX, ratingY in ratingPairs: sum_xx += ratingX * ratingX sum_yy += ratingY * ratingY sum_xy += ratingX * ratingY numPairs += 1 numerator = sum_xy denominator = sqrt(sum_xx) * sqrt(sum_yy) score = 0 if (denominator): score = (numerator / (float(denominator))) return (score, numPairs) # Setup SparkConf and SparkContext with new way to use all cores of the cluster - local[*] sConf = SparkConf().setMaster("local[*]").setAppName("SimilarMoviesApp") sContext = SparkContext(conf = sConf) # Get the movies data first print("\nGetting movies from movied data...") movieDict = getMovieData() # Get the rating data file, then Map ratings as user ID => movie ID, rating key pair file = sContext.textFile("/home/user/bigdata/datasets/ml-100k/u.data") ratingsData = file.map(lambda ln: ln.split()).map(lambda ln: (int(ln[0]), (int(ln[1]), float(ln[2])))) # Now do a cartesian join to get each combination of the movie pair to get an RDD like userID => ((movieID, rating), (movieID, rating)) joinedRatingsData = ratingsData.join(ratingsData) # strip duplicate pairs uniqueJoinedRatingsData = joinedRatingsData.filter(stripDuplicates) # Now make (movie1, movie2) as new key like (movie1, movie2) => (rating1, rating2) moviePairsData = uniqueJoinedRatingsData.map(getPairs) # Now collect all ratings for each movie pair and group by key to find similarity like (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ... moviePairRatingsData = moviePairsData.groupByKey() # Now apply the Cosine simialrity algorithm. moviePairSimilaritiesData = moviePairRatingsData.mapValues(computeCosineMoviesSimilarity).persist() # Now persist data on the storage, else use .cache() on above line moviePairSimilaritiesData.sortByKey() moviePairSimilaritiesData.saveAsTextFile("sim-movies") # Now extract the best similar movies if (len(sys.argv) > 1): # hypothetical condition for a good similar movie based on parThreshold, occurThreshold parThreshold = 0.97 occurThreshold = 50 # get the input from command line movieID = int(sys.argv[1]) # filter the movies based parThreshold, occurThreshold for the movie pair filteredResultsData = moviePairSimilaritiesData.filter(lambda pairSim: \ (pairSim[0][0] == movieID or pairSim[0][1] == movieID) \ and pairSim[1][0] > parThreshold and pairSim[1][1] > occurThreshold) # Sort by quality score. resultsData = filteredResultsData.map(lambda pairSim: (pairSim[1], pairSim[0])).sortByKey(ascending = False).take(25) print("Top 25 similar movies for " + movieDict[movieID]) for result in resultsData: (simlr, pair) = result # Display the similarity results similarMovieID = pair[0] if (similarMovieID == movieID): similarMovieID = pair[1] print(movieDict[similarMovieID] + "\tscore: " + str(simlr[0]) + "\tstrength: " + str(simlr[1]))