## Rank-based Evaluator for Recommendations with Implicit Preference in PySpark

This is an implementation of the evaluator presented in Collaborative Filtering for Implicit Feedback Datasets. Given a set of users u and items i we have a collection of implicit user ratings for items. These implicit ratings could be the number of times the user watched a TV program or visited a specific page. In the above paper, the quality metric presented is based the expected percentile ranking given by the following formula:

$\overline{\text{rank}}_1 = \frac{\sum_{u, i}{\text{rating}_{ui}} \, \text{rank}_{ui}}{\sum_{u, i} \text{rating}_{ui}}$

In Spark and python, I calculate this metric with the following class. The constructor accepts 3 column names, the user column, the (implicit) rating column and the prediction column.

class RankBasedEvaluator():
def __init__(self, user_col, rating_col, prediction_col):
self._user_col = user_col
self._rating_col = rating_col
self._prediction_col = prediction_col

def evaluate(self, spark, predictions):
# I filter out NaN predictions
predictions = predictions.filter("NOT ISNAN({0})".format(self._prediction_col))
predictions.createOrReplaceTempView("predictions")

denominator = predictions.groupBy().sum(self._rating_col).collect()[0][0]

spark.sql("""SELECT {0}
, {1}
, PERCENT_RANK() OVER (PARTITION BY {0} ORDER BY {2} DESC) AS rank
FROM predictions""".format(self._user_col, self._rating_col,
self._prediction_col)) \
.createOrReplaceTempView("ratings_with_ranks")
numerator = spark.sql("""SELECT SUM({0} * rank)
FROM ratings_with_ranks""".format(self._rating_col)) \
.collect()[0][0]

return numerator / denominator

def transform_evaluate(self, spark, model, df):
predictions = model.transform(df)
return self.evaluate(spark, predictions)


Once I have an instance of this class, I can call evaluate, passing a Spark Session and a DataFrame with predictions, and get back a single number from 0 to 1 (or 0 to 100% if multiplied by 100). This DataFrame should contain the 3 columns specified in the constructor, and it is the output of transforming our test/evaluate dataset using the trained ALS model. Alternatively, I can use transform_evaluate and pass in a trained model and my test/evaluate dataset that contains just the user and rating columns.

In the above formula, users with higher activity (higher implicit ratings) dominate, as it is averaging over all ratings across users. A second quality measure can be defined where I calculate the expected percentile ranking per user, and then take the average over all users. This makes sure that recommendations for all users are equally important, irrespective of the scale of activity.

$\overline{\text{rank}}_2 = \frac{1}{n_u} \sum_u {\frac{\sum_{i}{\text{rating}_{ui}} \, \text{rank}_{ui}}{\sum_{i} \text{rating}_{ui}}}$

In Spark and python this can be calculated with the following class. The API is identical with the previous implementation.

class RankBasedEvaluator2():
def __init__(self, user_col, rating_col, prediction_col):
self._user_col = user_col
self._rating_col = rating_col
self._prediction_col = prediction_col

def evaluate(self, spark, predictions):
predictions.createOrReplaceTempView("original_predictions")
# I filter out NaN predictions
spark.sql("""SELECT *
FROM original_predictions
WHERE NOT ISNAN({0})""".format(self._prediction_col)) \
.createOrReplaceTempView("predictions")
spark.sql("""SELECT {0}
, SUM({1}) as rating_sum
FROM predictions
GROUP BY {0}""".format(self._user_col, self._rating_col)) \
.createOrReplaceTempView("rating_sums")
spark.sql("""SELECT {0}
, {1}
, PERCENT_RANK() OVER (PARTITION BY {0} ORDER BY {2} DESC) AS rank
FROM predictions""".format(self._user_col, self._rating_col,
self._prediction_col)) \
.createOrReplaceTempView("ratings_with_ranks")
spark.sql("""SELECT {0},
SUM({1} * rank) AS weighted_sum
FROM ratings_with_ranks
GROUP BY {0}""".format(self._user_col, self._rating_col)) \
.createOrReplaceTempView("weighted_rating_sums")
spark.sql("""SELECT weighted_rating_sums.*, rating_sums.rating_sum
FROM weighted_rating_sums
JOIN rating_sums
ON weighted_rating_sums.{0} = rating_sums.{0}""".format(self._user_col)) \
.createOrReplaceTempView("joined_sums")

result = spark.sql("""SELECT AVG(weighted_sum / rating_sum)
FROM joined_sums""")

return result.collect()[0][0]

def transform_evaluate(self, spark, model, df):
predictions = model.transform(df)
return self.evaluate(spark, predictions)