# first import SparkConf, SparkContext from pyspark import SparkConf, SparkContext # setup SparkConf, SparkContext sConf = SparkConf().setMaster("local").setAppName("SparkDegreeofSeparationApp") sContext = SparkContext(conf = sConf) # The characters we wish to find the degree of separation between: startingHeroID = 2399 # "HAWK" targetHeroID = 12930 # "MARVELS 3" # setup the accumulator heroHitCtr = sContext.accumulator(0) def processHeroNetworks(line): fields = line.split() heroID = int(fields[0]) networks = [] for hID in fields[1:]: networks.append(int(hID)) clr = 'WHITE' dist = 9999 if (heroID == startingHeroID): clr = 'GRAY' dist = 0 return (heroID, (networks, dist, clr)) def loadData(): dataFile = sContext.textFile("/home/user/bigdata/datasets/Otherdata/marvel-network.txt") return dataFile.map(processHeroNetworks) def execMapper(heroNode): #get the tuple data in the form (heroID, (networks, dist, clr)) extract the heroId and network data heroID = heroNode[0] networkData = heroNode[1] networks = networkData[0] dist = networkData[1] clr = networkData[2] results = [] # expand heroNode if required, by checking if the target HeroId is hit or not if (clr == 'GRAY'): for connection in networks: newheroID = connection newdist = dist + 1 newclr = 'GRAY' if (targetHeroID == connection): heroHitCtr.add(1) newEntry = (newheroID, ([], newdist, newclr)) # preserve the node results.append(newEntry) # set color black for that node if all the nodes are executed clr = 'BLACK' # save input heroID results.append( (heroID, (networks, dist, clr)) ) return results def execReducer(heroData1, heroData2): edges1 = heroData1[0] edges2 = heroData2[0] dist1 = heroData1[1] dist2 = heroData2[1] clr1 = heroData1[2] clr2 = heroData2[2] dist = 9999 clr = clr1 edges = [] # preserve edges if (len(edges1) > 0): edges.extend(edges1) if (len(edges2) > 0): edges.extend(edges2) # preserve min distance if (dist1 < dist): dist = dist1 if (dist2 < dist): dist = dist2 # Preserve darkest color if (clr1 == 'WHITE' and (clr2 == 'GRAY' or clr2 == 'BLACK')): clr = clr2 if (clr1 == 'GRAY' and clr2 == 'BLACK'): clr = clr2 if (clr2 == 'WHITE' and (clr1 == 'GRAY' or clr1 == 'BLACK')): clr = clr1 if (clr2 == 'GRAY' and clr1 == 'BLACK'): clr = clr1 return (edges, dist, clr) # here the program starts by loading the marvel-network.txt data file as per the tuple structure we want rddToIterate = loadData() # assuming that no one would be as far as 10th degree of separation thats why range(0,10) for ctr in range(0, 10): print("Running ctr iteration# " + str(ctr+1)) # execute the mapper to create new vertices's and and update the accumulator as we encounter gray node mappedRDD = rddToIterate.flatMap(execMapper) # count function here is an action thus updating the RDD print("Processing " + str(mappedRDD.count()) + " values.") #if the target is found then prompt the message if (heroHitCtr.value > 0): print("What a catch! Got the target " + str(heroHitCtr.value) + " different direction(s).") break # execute the reduce to combine all data and preserve the darkest color plus shortest distance rddToIterate = mappedRDD.reduceByKey(execReducer)