Rank-based Evaluator for Recommendations with Implicit Preference in PySpark

This is an implementation of the evaluator presented in Collaborative Filtering for Implicit Feedback Datasets. Given a set of users u and items i we have a collection of implicit user ratings for items. These implicit ratings could be the number of times the user watched a TV program or visited a specific page. In the above paper, the quality metric presented is based the expected percentile ranking given by the following formula:

\[ \overline{\text{rank}}_1 = \frac{\sum_{u, i}{\text{rating}_{ui}} \, \text{rank}_{ui}}{\sum_{u, i} \text{rating}_{ui}} \]

In Spark and python, I calculate this metric with the following class. The constructor accepts 3 column names, the user column, the (implicit) rating column and the prediction column.

class RankBasedEvaluator():
    def __init__(self, user_col, rating_col, prediction_col):
        self._user_col = user_col
        self._rating_col = rating_col
        self._prediction_col = prediction_col

    def evaluate(self, spark, predictions):
        # I filter out NaN predictions
        predictions = predictions.filter("NOT ISNAN({0})".format(self._prediction_col))
        predictions.createOrReplaceTempView("predictions")

        denominator = predictions.groupBy().sum(self._rating_col).collect()[0][0]

        spark.sql("""SELECT {0}
                          , {1}
                          , PERCENT_RANK() OVER (PARTITION BY {0} ORDER BY {2} DESC) AS rank
                     FROM predictions""".format(self._user_col, self._rating_col,
                                                self._prediction_col)) \
             .createOrReplaceTempView("ratings_with_ranks")
        numerator = spark.sql("""SELECT SUM({0} * rank)
                                 FROM ratings_with_ranks""".format(self._rating_col)) \
                           .collect()[0][0]

        return numerator / denominator

    def transform_evaluate(self, spark, model, df):
        predictions = model.transform(df)
        return self.evaluate(spark, predictions)

Once I have an instance of this class, I can call evaluate, passing a Spark Session and a DataFrame with predictions, and get back a single number from 0 to 1 (or 0 to 100% if multiplied by 100). This DataFrame should contain the 3 columns specified in the constructor, and it is the output of transforming our test/evaluate dataset using the trained ALS model. Alternatively, I can use transform_evaluate and pass in a trained model and my test/evaluate dataset that contains just the user and rating columns.

In the above formula, users with higher activity (higher implicit ratings) dominate, as it is averaging over all ratings across users. A second quality measure can be defined where I calculate the expected percentile ranking per user, and then take the average over all users. This makes sure that recommendations for all users are equally important, irrespective of the scale of activity.

\[ \overline{\text{rank}}_2 = \frac{1}{n_u} \sum_u {\frac{\sum_{i}{\text{rating}_{ui}} \, \text{rank}_{ui}}{\sum_{i} \text{rating}_{ui}}} \]

In Spark and python this can be calculated with the following class. The API is identical with the previous implementation.

class RankBasedEvaluator2():
    def __init__(self, user_col, rating_col, prediction_col):
        self._user_col = user_col
        self._rating_col = rating_col
        self._prediction_col = prediction_col

    def evaluate(self, spark, predictions):
        predictions.createOrReplaceTempView("original_predictions")
        # I filter out NaN predictions
        spark.sql("""SELECT *
                     FROM original_predictions
                     WHERE NOT ISNAN({0})""".format(self._prediction_col)) \
             .createOrReplaceTempView("predictions")
        spark.sql("""SELECT {0}
                          , SUM({1}) as rating_sum
                     FROM predictions
                     GROUP BY {0}""".format(self._user_col, self._rating_col)) \
             .createOrReplaceTempView("rating_sums")
        spark.sql("""SELECT {0}
                          , {1}
                          , PERCENT_RANK() OVER (PARTITION BY {0} ORDER BY {2} DESC) AS rank
                     FROM predictions""".format(self._user_col, self._rating_col,
                                                self._prediction_col)) \
             .createOrReplaceTempView("ratings_with_ranks")
        spark.sql("""SELECT {0},
                     SUM({1} * rank) AS weighted_sum
                     FROM ratings_with_ranks
                     GROUP BY {0}""".format(self._user_col, self._rating_col)) \
             .createOrReplaceTempView("weighted_rating_sums")
        spark.sql("""SELECT weighted_rating_sums.*, rating_sums.rating_sum
                     FROM weighted_rating_sums
                     JOIN rating_sums
                       ON weighted_rating_sums.{0} = rating_sums.{0}""".format(self._user_col)) \
             .createOrReplaceTempView("joined_sums")

        result = spark.sql("""SELECT AVG(weighted_sum / rating_sum)
                              FROM joined_sums""")

        return result.collect()[0][0]

    def transform_evaluate(self, spark, model, df):
        predictions = model.transform(df)
        return self.evaluate(spark, predictions)

 

comments powered by Disqus