Commit f05f9d63 by serpucga

Added dict-like indexing of the pagination system to show "relative" pagination…

Added dict-like indexing of the pagination system to show "relative" pagination in the logs and the recovery file instead of the tweet ids, which would be confusing for the user
parent a0751265
...@@ -6,7 +6,7 @@ import time ...@@ -6,7 +6,7 @@ import time
import datetime import datetime
import multiprocessing as mp import multiprocessing as mp
from math import ceil from math import ceil
from typing import List from typing import List, Dict
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
from config import globals from config import globals
...@@ -91,7 +91,8 @@ def process_page( ...@@ -91,7 +91,8 @@ def process_page(
header: str, header: str,
output_dir: str, output_dir: str,
pagesize: int, pagesize: int,
page_number: List[int], page_number: int,
page_index: Dict[int, int],
queue: mp.Queue)\ queue: mp.Queue)\
-> None: -> None:
""" """
...@@ -113,10 +114,11 @@ def process_page( ...@@ -113,10 +114,11 @@ def process_page(
if globals.timing: if globals.timing:
time0 = time.time() time0 = time.time()
real_page_number = page_index[page_number]
client = pymongo.MongoClient(host, port) client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"] database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page_fast( tweets_page = get_tweets_page_fast(
database_tweets, pagesize, page_number) database_tweets, pagesize, real_page_number)
buffer_tweets = {} buffer_tweets = {}
for tweet in tweets_page: for tweet in tweets_page:
...@@ -387,7 +389,7 @@ def get_tweets_page( ...@@ -387,7 +389,7 @@ def get_tweets_page(
def get_page_index_fast( def get_page_index_fast(
collection: pymongo.collection.Collection, collection: pymongo.collection.Collection,
page_size: int)\ page_size: int)\
-> List[int]: -> Dict[int, int]:
""" """
Get a list of the pages indexed by their tweet ID. Get a list of the pages indexed by their tweet ID.
...@@ -422,7 +424,10 @@ def get_page_index_fast( ...@@ -422,7 +424,10 @@ def get_page_index_fast(
last_id = page[page_size - 1]["id"] last_id = page[page_size - 1]["id"]
except IndexError: except IndexError:
break break
return pages pages_index = {}
for i in range(len(pages)):
pages_index[i] = pages[i]
return pages_index
def get_tweets_page_fast( def get_tweets_page_fast(
......
...@@ -47,23 +47,26 @@ if args.timing: ...@@ -47,23 +47,26 @@ if args.timing:
time0 = time.time() time0 = time.time()
# MongoDB connection to get page index # MongoDB connection to get page index
logger.debug("The indexing of the collection may take a while if "
+ "the collection is too big. Please, be patient...")
if args.recovery: if args.recovery:
with open(args.recovery) as f: with open(args.recovery) as f:
recovery_data = json.load(f) recovery_data = json.load(f)
client = pymongo.MongoClient( client = pymongo.MongoClient(
recovery_data["host"], recovery_data["port"]) recovery_data["host"], recovery_data["port"])
database_tweets = client[recovery_data["database"]]["tweets"] database_tweets = client[recovery_data["database"]]["tweets"]
full_page_index = utils.get_page_index_fast( page_index = utils.get_page_index_fast(
database_tweets, recovery_data["pagesize"]) database_tweets, recovery_data["pagesize"])
client.close() client.close()
page_index = [page for page in full_page_index full_page_index_len = len(page_index)
if page not in recovery_data["dumped_pages"]] for page in recovery_data["dumped_pages"]:
page_index.pop(page, None)
if "error_page" in recovery_data: if "error_page" in recovery_data:
logger.debug("Discarding corrupted page") logger.debug("Discarding corrupted page")
page_index.remove(recovery_data.pop("error_page")) page_index.pop(recovery_data.pop("error_page"))
logger.debug( logger.debug(
"Resuming collection conversion. {} of {} pages left." "Resuming collection conversion. {} of {} pages left."
.format(len(page_index), len(full_page_index))) .format(len(page_index), full_page_index_len))
else: else:
client = pymongo.MongoClient(args.host, args.port) client = pymongo.MongoClient(args.host, args.port)
...@@ -79,10 +82,11 @@ else: ...@@ -79,10 +82,11 @@ else:
def process_data_page( def process_data_page(
page, host=args.host, port=args.port, database=args.database, page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir, pagesize=args.pagesize, header=header, outputdir=output_dir,
queue=task_queue): queue=task_queue, page_index=page_index):
utils.process_page( utils.process_page(
host, port, database, header, output_dir, pagesize, page, queue) host, port, database, header, output_dir, pagesize, page,
page_index, queue)
# Launch single process to write to the filesystem # Launch single process to write to the filesystem
...@@ -99,7 +103,7 @@ except Exception: ...@@ -99,7 +103,7 @@ except Exception:
# Launch pool of workers to perform the format conversion # Launch pool of workers to perform the format conversion
try: try:
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index.keys())
except utils.ExceptionAtPage as exc: except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page)) logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR")) task_queue.put((exc.error_page, "ERROR"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment