Commit 49f13f4a by serpucga

Merge branch 'feature/enhance_performance' into develop

parents 9ba7bac1 f05f9d63
......@@ -6,7 +6,7 @@ import time
import datetime
import multiprocessing as mp
from math import ceil
from typing import List
from typing import List, Dict
from tweet_manager.lib import json2csv, format_csv
from config import globals
......@@ -91,7 +91,8 @@ def process_page(
header: str,
output_dir: str,
pagesize: int,
page_number: List[int],
page_number: int,
page_index: Dict[int, int],
queue: mp.Queue)\
-> None:
"""
......@@ -113,9 +114,11 @@ def process_page(
if globals.timing:
time0 = time.time()
real_page_number = page_index[page_number]
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
tweets_page = get_tweets_page_fast(
database_tweets, pagesize, real_page_number)
buffer_tweets = {}
for tweet in tweets_page:
......@@ -383,6 +386,71 @@ def get_tweets_page(
return tweets
def get_page_index_fast(
collection: pymongo.collection.Collection,
page_size: int)\
-> Dict[int, int]:
"""
Get a list of the pages indexed by their tweet ID.
Skip is very slow for large collections where we need to skip
millions of records. Thus, it is much better for performance
to paginate with references to some identifier field, in this
case, "id" of tweets. This function finds the first and last ID
for a page of "page_size" tweets, and then asks in a loop for the
next page of tweets after the last found ID. This way, it builds
a list with the first ID for each page. The user will be able to
get that page i by asking for the "page_size" tweets with ID lesser
than page[i] (IDs are sorted in descending order). The loop stops
adding pages when it finds one that is not complete.
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:returns: list of indexes, using ID
"""
pages = []
first_page = collection.find()\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
pages.append(first_page[0]["id"])
last_id = first_page[page_size - 1]["id"]
while True:
page = collection.find({"id": {"$lt": last_id}})\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
pages.append(page[0]["id"])
try:
last_id = page[page_size - 1]["id"]
except IndexError:
break
pages_index = {}
for i in range(len(pages)):
pages_index[i] = pages[i]
return pages_index
def get_tweets_page_fast(
collection: pymongo.collection.Collection,
page_size: int,
page_index: int)\
-> pymongo.cursor.Cursor:
"""
Get a cursor pointing to the Mongo entries for that page
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:param num_page: relative index of the page within the collection
:returns: a Pymongo cursor pointing to the tweets
"""
tweets = collection\
.find({"id": {"$lte": page_index}})\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
return tweets
#########################
# METADATA GENERATION #
#########################
......
......@@ -47,28 +47,31 @@ if args.timing:
time0 = time.time()
# MongoDB connection to get page index
logger.debug("The indexing of the collection may take a while if "
+ "the collection is too big. Please, be patient...")
if args.recovery:
with open(args.recovery) as f:
recovery_data = json.load(f)
client = pymongo.MongoClient(
recovery_data["host"], recovery_data["port"])
database_tweets = client[recovery_data["database"]]["tweets"]
full_page_index = utils.get_page_index(
page_index = utils.get_page_index_fast(
database_tweets, recovery_data["pagesize"])
client.close()
page_index = [page for page in full_page_index
if page not in recovery_data["dumped_pages"]]
full_page_index_len = len(page_index)
for page in recovery_data["dumped_pages"]:
page_index.pop(page, None)
if "error_page" in recovery_data:
logger.debug("Discarding corrupted page")
page_index.remove(recovery_data.pop("error_page"))
page_index.pop(recovery_data.pop("error_page"))
logger.debug(
"Resuming collection conversion. {} of {} pages left."
.format(len(page_index), len(full_page_index)))
.format(len(page_index), full_page_index_len))
else:
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
page_index = utils.get_page_index(database_tweets, args.pagesize)
page_index = utils.get_page_index_fast(database_tweets, args.pagesize)
client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)"
......@@ -79,10 +82,11 @@ else:
def process_data_page(
page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir,
queue=task_queue):
queue=task_queue, page_index=page_index):
utils.process_page(
host, port, database, header, output_dir, pagesize, page, queue)
host, port, database, header, output_dir, pagesize, page,
page_index, queue)
# Launch single process to write to the filesystem
......@@ -99,7 +103,7 @@ except Exception:
# Launch pool of workers to perform the format conversion
try:
with mp.Pool() as pool:
pool.map(process_data_page, page_index)
pool.map(process_data_page, page_index.keys())
except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment