# -*- coding: utf-8 -*-
from reprowd.operators.crowddata import CrowdData
from reprowd.operators.crowdjoin import CrowdJoin
import sqlite3
import pbclient
import json
import os
[docs]class CrowdContext:
"""
Main entry point for Reprowd functionality. Intuitively, a CrowdContext can be
thought of as a fault-tolerant and reproducible environment for doing crowdsourced
data processing tasks. Once a CrowdContext is created, it will connect to a pybossa
server and a local database, providing APIs for creating crowd operators(e.g., CrowdJoin),
and manipulating cached crowddata.
"""
# Note that __current_cd may not contain all the cached data in the database,
# but only contains those created by the current CrowdContext
# Map local_db to <tablename, CrowdData object>
__current_cd = {}
[docs] def __init__(self, endpoint=None, api_key=None, local_db = "reprowd.db"):
"""
Create a new CrowdContext. The endpoint and api_key should be set,
either through the named parameters here or through environment variables (
REPROWD_ENDPOINT, REPROWD_API_KEY)
:param endpoint: Pybossa server URL (e.g. http://localhost:7000).
:param api_key: An api_key to access the pybossa server. You can get an api_key by
creating an account in the pybossa server, and check the api_key of the account by clicking the
"account name" --> "My Profile" on the top right of the page.
:param local_db: The local database name
:return: A CrowdContext object
>>> from reprowd.crowdcontext import CrowdContext
>>> CrowdContext("http://localhost:7000", api_key = "test", local_db = "reprowd.test.db") #doctest: +SKIP
<reprowd.crowdcontext.CrowdContext instance at 0x...>
"""
if not endpoint:
endpoint = os.environ.get("REPROWD_ENDPOINT", "")
if not api_key:
api_key = os.environ.get("REPROWD_API_KEY", "")
self.endpoint = endpoint
self.api_key = api_key
pbclient.set('endpoint', endpoint)
pbclient.set('api_key', api_key)
self.pbclient = pbclient
self.local_db = local_db
CrowdContext.__current_cd[local_db] = []
self.db = sqlite3.connect(local_db)
self.cursor = self.db.cursor()
[docs] def CrowdData(self, object_list, table_name):
"""
Return :class:`CrowdData` object
:param object_list: A list of objects where an object can be anything (e.g., int, string, dict)
:param table_name: The table used for caching the crowd tasks/results related to the :class:`CrowdData` object
>>> # Create a CrowdData object for image labeling
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp") #doctest: +SKIP
<reprowd.operators.crowddata.CrowdData instance at 0x...>
"""
# Check if table_name has been used before
for _table_name, _crowddata in CrowdContext.__current_cd[self.local_db]:
if table_name == _table_name:
return _crowddata
else:
_crowddata = CrowdData(object_list, table_name, self)
CrowdContext.__current_cd[self.local_db].append((table_name, _crowddata))
return _crowddata
[docs] def CrowdJoin(self, object_list, table_name):
"""
Return :class:`CrowdJoin` object
:param object_list: A list of objects where an object can be anything (e.g., int, string, dict)
:param table_name: The table used for caching the crowd tasks/results related to the :class:`CrowdJoin` object
>>> # Create a CrowdJoin object for deduplication
>>> cc.CrowdJoin(["iphone 4", "ipad 2", "ipad two"], "tmp") #doctest: +SKIP
<reprowd.operators.crowdjoin.CrowdJoin instance at 0x...>
"""
return CrowdJoin(object_list, table_name, self)
[docs] def show_tables(self):
"""
Return the list of the tables cached in the local database
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp1") #doctest: +ELLIPSIS
<reprowd.operators.crowddata.CrowdData instance at 0x...>
>>> cc.CrowdJoin(["iphone 4", "ipad 2", "ipad two"], "tmp2") #doctest: +ELLIPSIS
<reprowd.operators.crowdjoin.CrowdJoin instance at 0x...>
>>> tables = cc.show_tables()
>>> print ", ".join(tables)
tmp1, tmp2
>>> cc.delete_tmp_tables()
2
"""
exe_str = "SELECT * FROM sqlite_master WHERE type='table'"
self.cursor.execute(exe_str)
results = self.cursor.fetchall()
tables = []
for result in results:
tables.append(result[1])
return tables
[docs] def print_tables(self):
"""
Print a sorted list of the tables cached in the local database (alphabetical order)
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp2") #doctest: +ELLIPSIS
<reprowd.operators.crowddata.CrowdData instance at 0x...>
>>> cc.CrowdJoin(["iphone 4", "ipad 2", "ipad two"], "tmp1") #doctest: +ELLIPSIS
<reprowd.operators.crowdjoin.CrowdJoin instance at 0x...>
>>> cc.print_tables()
1 tmp1
2 tmp2
>>> cc.delete_tmp_tables()
2
"""
tables = self.show_tables()
tables.sort()
for i, table in enumerate(tables):
print "%d %s" %(i+1, table)
[docs] def rename_table(self, oldname, newname):
"""
Rename a cached table
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp1") #doctest: +ELLIPSIS
<reprowd.operators.crowddata.CrowdData instance at 0x...>
>>> cc.rename_table("tmp1", "tmp2")
True
>>> cc.print_tables()
1 tmp2
>>> cc.delete_tmp_tables()
1
"""
tables = self.show_tables()
if oldname not in tables:
print "'%s' does not exist. " %(oldname)
return False
elif newname in tables:
print "'%s' has been used. Please choose another name. " %(newname)
return False
else:
exe_str = "ALTER TABLE '%s' RENAME TO '%s'" %(oldname, newname)
self.cursor.execute(exe_str)
self.db.commit()
# rename the table name in __current_cd
for i, (x, y) in enumerate(CrowdContext.__current_cd[self.local_db]):
if x == oldname:
CrowdContext.__current_cd[self.local_db][i] = (newname, y)
return True
[docs] def delete_table(self, table_name):
"""
Delete a cached table
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp1") #doctest: +ELLIPSIS
<reprowd.operators.crowddata.CrowdData instance at 0x...>
>>> cc.CrowdJoin(["iphone 4", "ipad 2", "ipad two"], "tmp2") #doctest: +ELLIPSIS
<reprowd.operators.crowdjoin.CrowdJoin instance at 0x...>
>>> cc.print_tables()
1 tmp1
2 tmp2
>>> cc.delete_table("tmp1")
True
>>> cc.print_tables()
1 tmp2
>>> cc.delete_tmp_tables()
1
"""
if table_name not in self.show_tables():
print "'%s' does not exist " %(table_name)
return False
else:
exe_str = "DROP TABLE '%s'" %(table_name)
self.cursor.execute(exe_str)
self.db.commit()
# delete the table from __current_cd if exists
for i, (x, y) in enumerate(CrowdContext.__current_cd[self.local_db]):
if x == table_name:
del CrowdContext.__current_cd[self.local_db][i]
break
return True
[docs] def delete_tmp_tables(self):
"""
The function deletes all the tables whose names start with "tmp",
and returns the number of deleted tables
>>> cc.CrowdData(["image1.jpg", "image2.jpg"], "tmp1") #doctest: +ELLIPSIS
<reprowd.operators.crowddata.CrowdData instance at 0x...>
>>> cc.CrowdJoin(["iphone 4", "ipad 2", "ipad two"], "not_tmp") #doctest: +ELLIPSIS
<reprowd.operators.crowdjoin.CrowdJoin instance at 0x...>
>>> cc.delete_tmp_tables()
1
>>> cc.delete_table("not_tmp")
True
"""
# Remove the tmp tables from the database
tables = self.show_tables()
n = 0
for table in tables:
if table.startswith("tmp"):
self.delete_table(table)
n += 1
# Remove the tmp tables from __current_cd
old_list = CrowdContext.__current_cd[self.local_db]
new_list = [(x, y) for (x, y) in old_list if not x.startswith("tmp")]
CrowdContext.__current_cd[self.local_db] = new_list
return n
@staticmethod
[docs] def remove_db_file(filename):
"""Remove a database file. Note that it is dangerous to remove a database.
Please make sure you understand the meaning of the function and only use it
if you have to.
"""
try:
os.remove(filename)
except OSError:
pass
def __del__(self):
try:
self.cursor.close()
except:
pass
try:
self.db.close()
except:
pass
def _test():
import doctest
from reprowd.crowdcontext import CrowdContext
globs = globals().copy()
test_db = 'reprowd.test.db'
CrowdContext.remove_db_file(test_db)
globs['cc'] = CrowdContext(local_db = test_db)
(failure_count, test_count) = doctest.testmod(globs=globs)
CrowdContext.remove_db_file(test_db)
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()