Source code for reprowd.operators.crowdjoin

# -*- coding: utf-8 -*-

from reprowd.utils.simjoin import SimJoin, jaccard, editsim
from reprowd.utils.union_find import UnionFind
from reprowd.operators.crowddata import CrowdData
from sets import ImmutableSet
import pbclient
import sqlite3
import time
import itertools




[docs]class CrowdJoin: """ Given a list of objects (or two lists of objects), the CrowdJoin operator finds matching objects in the list (or between the two lists). """ MATCHING = 1 NONMATCHING = 0 UNKNOWN = -1
[docs] def __init__(self, object_list, table_name, crowdcontext): """ Initialize a CrowdJoin object. Note: It is not recommended to call the constructor directly. Please call it through :func:`reprowd.crowdcontext.CrowdContext.CrowdJoin`. >>> object_list = ["iPad 2", "iPad Two", "iPhone 2", "iPad2"] >>> cc.CrowdJoin(object_list, table_name = "jointest") #doctest: +SKIP <reprowd.operators.crowdjoin.CrowdJoin instance at 0x...> """ self.cc = crowdcontext self.object_list =object_list self.table_name = table_name self.presenter = None self.map_func = lambda op: {'obj1':op[0], 'obj2':op[1]} self.matcher_func = None self.nonmatcher_func = None self.simjoin_on = False self.transitivity_on = False self.n_assignments = 1 self.priority = 0 self.crowddata = self.cc.CrowdData([], table_name)
[docs] def set_presenter(self, presenter, map_func = lambda op: {'obj1':op[0], 'obj2':op[1]}): """ Specify a presenter :param presenter: A Presenter object (e.g., :class:`reprowd.presenter.test.TextCmp`). :param map_func: map_func() maps a pair of objects into the data format the presenter requires. If ``map_func`` is not specified, it will use the default ``map_func = lambda op: {'obj1':op[0], 'obj2':op[1]}`` :return: The updated CrowdJoin object >>> from reprowd.presenter.text import TextCmp >>> object_list = ["iPad 2", "iPad Two", "iPhone 2", "iPad2"] >>> def map_func(obj_pair): ... o1, o2 = obj_pair ... return {'obj1':o1, 'obj2':o2} >>> cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp(), map_func) #doctest: +SKIP <reprowd.operators.crowdjoin.CrowdJoin instance at 0x...> """ self.presenter = presenter self.map_func = map_func return self
[docs] def set_simjoin(self, joinkey_func, threshold = 0.4, weight_on = False): """ Set a simjoin operator :param joinkey_func: A function that takes an object as input and outputs a join key on which simjoin will perform. The join key has to be a set of elements (e.g., bag of words, n-grams) :param threshold: A float number in [0, 1]. The higher the value, the more the number of object pairs that are removed. :param weight_on: A boolean value that indicates which similarity function, non-weighted Jaccard (``weight_on = False``) or weighted Jaccard (``weight_on = True``), will be used to compute similarity. :return: The updated CrowdJoin object Why do we need this? Consider a list of n objects. A naive implementation of CrowdJoin is to ask workers to label all n^2 object pairs. In fact, among these pairs, most of them look quite dissimilar and can be easily identified as non-matching pairs. Setting a simjoin operator will help us to remove these obviously non-matching pairs. Specifically, when it is set, all the object pairs whose Jaccard similarity values are below the threshold will be removed. >>> from reprowd.presenter.text import TextCmp >>> from reprowd.utils.simjoin import gramset >>> object_list = ["iPad 2", "iPad Two", "iPhone 2", "iPad2"] >>> def joinkey_func(obj): ... # Use a 2-gram set as a joinkey ... return gramset(obj, 2) >>> crowdjoin = cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp()) \\ #doctest: +SKIP ... .set_simjoin(joinkey_func, 0.2) #doctest: +SKIP """ self.joinkey_func = joinkey_func self.threshold = threshold self.weight_on = weight_on self.simjoin_on = True return self
[docs] def set_matcher(self, matcher_func): """ Specify a function for determining which object pairs are matching :param matcher_func: A function that takes a pair of objects as input and outputs True for matching pairs :return: The updated CrowdJoin object >>> from reprowd.presenter.text import TextCmp >>> from reprowd.utils.simjoin import gramset, jaccard >>> object_list = ["iPad 2", "iPad Two", "iPhone 2", "iPad2"] >>> # Identify the pairs whose Jaccard similarity is above 0.9 as matching. >>> def matcher_func(obj_pair): ... o1, o2 = obj_pair ... return jaccard(gramset(o1, 2), gramset(o2, 2)) >= 0.9 >>> crowdjoin = cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp(), map_func) \\ #doctest: +SKIP ... .set_matcher(matcher_func) #doctest: +SKIP """ self.matcher_func = matcher_func return self
[docs] def set_nonmatcher(self, nonmatcher_func): """ Specify a function for determining which object pairs are **not** matching :param nonmatcher_func: A function that takes a pair of objects as input and outputs True for non-matching pairs :return: The updated CrowdJoin object >>> from reprowd.presenter.text import TextCmp >>> from reprowd.utils.simjoin import gramset >>> object_list = [("iPad 2", 300), ("iPad Two", 305), ("iPhone 2", 400), ("iPad2", 298)] # (name, price) >>> def map_func(obj_pair): ... o1, o2 = obj_pair ... return {'obj1':o1[0] + " | " + str(o1[1]), 'obj2':o2[0] + " | " + str(o2[1])} >>> # If the prices of two product differ by more than 80, they will be identified as a non-matching pair >>> def nonmatcher_func(obj_pair): ... o1, o2 = obj_pair ... return abs(o1[1]-o2[1]) > 80 >>> >>> crowdjoin = cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp(), map_func) \\#doctest: +SKIP ... .set_nonmatcher(nonmatcher_func) #doctest: +SKIP """ self.nonmatcher_func = nonmatcher_func return self
[docs] def set_transitivity(self, score_func = None): """ Use transitivity to reduce the number of the pairs that need to be labeled by workers. Two types of transitivity will be considered: 1. If A and B are matching, B and C are matching, then A and C will be deduced as matching 2. If A and B are matching, B and C are non-matching, then A and C will be deduced as non-matching. :param score_func: A score function that tends to return a high score for matching pairs and a low score for non-matching pairs. Having this function will increase the effectiveness of transitivity (See [Wang et al. SIGMOD 2013] for more detail). :return: The updated CrowdJoin object >>> from reprowd.presenter.text import TextCmp >>> from reprowd.utils.simjoin import gramset, jaccard >>> object_list = ["iPhone 2", "iPad 2", "iPad Two", "iPad2"] >>> def score_func(obj_pair): ... o1, o2 = obj_pair ... return jaccard(gramset(o1, 2), gramset(o2, 2)) >>> crowdjoin = cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp()) \\ #doctest: +SKIP ... .set_transitivity(score_func) #doctest: +SKIP """ self.transitivity_on = True self.score_func = score_func return self
[docs] def set_task_parameters(self, n_assignments = 1, priority = 0): """ Set the values of the parameters for published tasks :param n_assignments: The number of assignments. For example, ``n_assignments`` = 3 means that each task needs to be done by three different workers :param priority: A float number in [0, 1] that indicates the priority of the published tasks. The larger the value, the higher the priority. :return: The updated CrowdJoin object """ self.n_assignments = n_assignments self.priority = priority return self
[docs] def join(self, other_object_list = None): """ If **other_object_list** is not specified, perform a self-join on **self.object_list**; otherwise, perform a join between *self.object_list* and **other_object_list** :param other_object_list: A list of objects that will be joined with **self.object_list** :return: A dict with the following attributes: - **"all"**: All the matching pairs - **"human"**: A subset of matching pairs identified by humans - **"machine"**: A subset of matching pairs identified by ``matcher_func()`` in :func:`set_matcher` - **"transitivity"**: A subset of matching pairs deduced based on transitivity Note: No matter in which order set_simjoin(), set_matcher(), and set_nonmatcher() are being called, they will be applied in the **join()** function in the following order: 1. set_simjoin() 2. set_nonmatcher() 3. set_matcher() >>> from reprowd.presenter.text import TextCmp >>> object_list = ["iPad 2", "iPad Two", "iPhone 2"] >>> crowdjoin = cc.CrowdJoin(object_list, table_name = "jointest") \\ #doctest: +SKIP ... .set_presenter(TextCmp()) \\ #doctest: +SKIP ... .join() # Ask workers to check all pairs #doctest: +SKIP >>> matches['all'] #doctest: +SKIP [('iPad 2', 'iPad Two')] >>> matches['human'] #doctest: +SKIP [('iPad 2', 'iPad Two')] >>> matches['machine'] #doctest: +SKIP [] >>> matches['transitivity'] #doctest: +SKIP [] """ if self.presenter == None: raise Exception("""Presenter has not been specified. Please use set_presenter() func to specifiy a presenter.""") pairs = None # Apply fast simjoin algorithms to remove obviously non-matching pairs if other_object_list == None: # self-join if self.simjoin_on: k_o_list = [(self.joinkey_func(o), o) for o in self.object_list] joined = SimJoin(k_o_list).selfjoin(self.threshold, self.weight_on) pairs = [ (x[0][1], x[1][1]) for x in joined] else: pairs= list(itertools.combinations(self.object_list, 2)) else: # join between two lists if self.simjoin_on: k_o_list1 = [(self.joinkey_func(o), o) for o in self.object_list] k_o_list2 = [(self.joinkey_func(o), o) for o in other_object_list] joined = SimJoin(k_o_list1).join(k_o_list2, self.threshold, self.weight_on) pairs = [ (x[0][1], x[1][1]) for x in joined] else: pairs= list(itertools.product(self.object_list, other_object_list)) # remove nonmatching pairs if self.nonmatcher_func != None: pairs = [p for p in pairs if not self.nonmatcher_func(p)] # Apply the user-specified matcher to identify obviously matching pairs unknown_pairs = pairs matching_pairs= [] if self.matcher_func != None: unknown_pairs = [] for pair in pairs: if self.matcher_func(pair): matching_pairs.append(pair) else: unknown_pairs.append(pair) matching_pairs = self._unique_pair(matching_pairs) # remove duplicate matching pairs crowddata = self.crowddata # Ask the crowd to label the remaining pairs if not self.transitivity_on: # Get the unique unknown pairs unique_unknown_pairs = self._unique_pair(unknown_pairs) crowddata.extend(unique_unknown_pairs).set_presenter(self.presenter, self.map_func) \ .publish_task(self.n_assignments, self.priority).get_result().quality_control("em") # Return the matching pairs identified by the crowd and the matcher crowdsourced_pairs = [] for object_pair, result in zip(crowddata.data["object"], crowddata.data['em']): if result == 'Yes': crowdsourced_pairs.append(object_pair) return {"all": crowdsourced_pairs+matching_pairs, "human":crowdsourced_pairs, "machine": matching_pairs, "transitivity": []} else: crowdsourced_pairs, transitivity_pairs = self._label_pairs_with_transitivity(unknown_pairs, self.score_func) return {"all": crowdsourced_pairs+matching_pairs+transitivity_pairs, \ "human":crowdsourced_pairs, \ "machine": matching_pairs, \ "transitivity": transitivity_pairs}
def _label_pairs_with_transitivity(self, unknown_pairs, score_func): """ References: Jiannan Wang, Guoliang Li, Tim Kraska, Michael J. Franklin, Jianhua Feng. Leveraging Transitive Relations for Crowdsourced Joins. SIGMOD 2013 """ sorted_pairs = unknown_pairs if score_func != None: sorted_pairs = sorted(unknown_pairs, key = lambda x: score_func(x), reverse = True) crowdsourced_pairs = [] pair_crowdlabel = [] # a list of published pairs and crowdsourced labels crowddata = self.crowddata.set_presenter(self.presenter, self.map_func) while True: must_crowdsourced_pairs = self._must_crowdsourced_pairs(sorted_pairs, pair_crowdlabel) # Termination condition: No pair can be published if len(must_crowdsourced_pairs) == 0: break # publish tasks and wait for results crowddata = crowddata.clear().extend(must_crowdsourced_pairs) \ .publish_task(self.n_assignments, self.priority).get_result().quality_control('em') # Once all the results are collected, add the newly labled pairs into pair_crowdlabel for object_pair, result in zip(crowddata.data["object"], crowddata.data['em']): if result == 'Yes': crowdsourced_pairs.append(object_pair) pair_crowdlabel.append((object_pair, CrowdJoin.MATCHING)) else: pair_crowdlabel.append((object_pair, CrowdJoin.NONMATCHING)) # Get the matching pairs whose labels are deduced based on transitivity pairid_to_label = self._deduce_labels(sorted_pairs, pair_crowdlabel) matching_pairs = [pair for pair in unknown_pairs if pairid_to_label[self._id(pair)] == CrowdJoin.MATCHING] transitivity_pairs = [p for p in matching_pairs if p not in crowdsourced_pairs] return crowdsourced_pairs, transitivity_pairs def _id(self, pair): return (id(pair[0]), id(pair[1])) def _unique_pair(self, l): seen = set() return [x for x in l if self._id(x) not in seen and not seen.add(self._id(x))] def _deduce_labels(self, unlabeled_pairs, pair_crowdlabel): """ Deduce the labels of "unlabeled_pairs" based on labeled pairs """ # Init cluster graph cluster_graph = self.__ClusterGraph() for pair, label in pair_crowdlabel: if label == CrowdJoin.MATCHING: cluster_graph.add_matching_edge(self._id(pair)) elif label == CrowdJoin.NONMATCHING: cluster_graph.add_nonmatching_edge(self._id(pair)) # Use the cluster graph to deduce the labels of unlabeled pairs pairid_to_label = {} for pair in unlabeled_pairs: deduced_label = cluster_graph.deduce_label(self._id(pair)) pairid_to_label[self._id(pair)] = deduced_label return pairid_to_label def _must_crowdsourced_pairs(self, sorted_pairs, pair_crowdlabel): """ Return the pairs that must need to be crowdsourced, i.e. those cannot be deduced based on transitivity """ # Deduce the labels of "sorted_pairs" based on labeled pairs pairid_to_label = self._deduce_labels(sorted_pairs, pair_crowdlabel) # Identify the pairs in "sorted_pairs" that must require to be crowdsourced must_crowdsourced_pairs = [] cluster_graph = self.__ClusterGraph() for pair in sorted_pairs: deduced_label = pairid_to_label.get(self._id(pair), CrowdJoin.UNKNOWN) if deduced_label == CrowdJoin.MATCHING: cluster_graph.add_matching_edge(self._id(pair)) elif deduced_label == CrowdJoin.NONMATCHING: cluster_graph.add_nonmatching_edge(self._id(pair)) else: if cluster_graph.deduce_label(self._id(pair)) == CrowdJoin.UNKNOWN: must_crowdsourced_pairs.append(pair) cluster_graph.add_matching_edge(self._id(pair)) return must_crowdsourced_pairs class __ClusterGraph: """ This class maintains a cluster graph for a labeled graph, where the label is either matching or non-matching. The cluster graph merges the matching vertices into the same cluster, and then, for each pair of non-matching vertices, it adds an edge between their corresponding clusters. """ def __init__(self): self.cluster_graph = {} # The Union-Find data structure keeps track of a set of vertices # partitioned into a number of disjoint clusters. self.uf = UnionFind() def add_matching_edge(self, edge): v1, v2 = edge cluster_v1 = self.uf[v1] cluster_v2 = self.uf[v2] if cluster_v1 == cluster_v2: return # Add cluster_v1 and cluster_v2 into the cluster graph if they do not exist self.cluster_graph.setdefault(cluster_v1, {}) self.cluster_graph.setdefault(cluster_v2, {}) # Merge the adjacent vertices of cluster_v1 and cluster_v2 adjacent_cluster_vset = dict(self.cluster_graph[cluster_v1].items() +self.cluster_graph[cluster_v2].items()) # Delete cluster_v1 and cluster_v2 if they are in "adjacent_cluster_vset" adjacent_cluster_vset.pop(cluster_v1, None) adjacent_cluster_vset.pop(cluster_v2, None) # Remove cluster_v1 and cluster_v2 from the cluster graph del self.cluster_graph[cluster_v1] del self.cluster_graph[cluster_v2] # Remove the edges (cluster_v1, adjacent_cluster_v) and # (cluster_v2, adjacent_cluster_v) from the cluster graph for adjacent_cluster_v in adjacent_cluster_vset.keys(): self.cluster_graph[adjacent_cluster_v].pop(cluster_v1, None) self.cluster_graph[adjacent_cluster_v].pop(cluster_v2, None) # Merge cluster_v1 and cluster_v2 as merged_cluster_v merged_cluster_v = self.uf.union(cluster_v1, cluster_v2) assert merged_cluster_v not in adjacent_cluster_vset assert merged_cluster_v == cluster_v1 or merged_cluster_v == cluster_v2 # Add edges (merged_cluster_v, adjacent_cluster_v) self.cluster_graph[merged_cluster_v] = adjacent_cluster_vset for adjacent_cluster_v in adjacent_cluster_vset.keys(): self.cluster_graph[adjacent_cluster_v][merged_cluster_v] = True def add_nonmatching_edge(self, edge): v1, v2 = edge cluster_v1 = self.uf[v1] cluster_v2 = self.uf[v2] if cluster_v1 == cluster_v2: return self.cluster_graph.setdefault(cluster_v1, {})[cluster_v2] = True self.cluster_graph.setdefault(cluster_v2, {})[cluster_v1] = True def deduce_label(self, edge): """ Decide the label of the input edge based on the cluster graph """ v1, v2 = edge cluster_v1 = self.uf[v1] cluster_v2 = self.uf[v2] if cluster_v1 == cluster_v2: return CrowdJoin.MATCHING elif cluster_v1 in self.cluster_graph.get(cluster_v2, []): return CrowdJoin.NONMATCHING else: return CrowdJoin.UNKNOWN