Commit c05a677c by serpucga

Include corrupt number page in recovery file

Added way of dumping the number of the page that raised an error when converting to CSV. This way we can in the future implement some ways of dealing with it (skipping the corrupt page, inspecting it to inquire where the problem resides, etc.)
parent f16cbf7e
......@@ -8,6 +8,7 @@ import multiprocessing as mp
from math import ceil
from typing import List
from tweet_manager.lib import json2csv, format_csv
from tweet_manager.lib.json2csv import NotFlattenedException
from config import globals
......@@ -55,6 +56,10 @@ def filesystem_writer(
logger.info("Writing loop finished")
os.remove(recovery_file_path)
break
elif csv_page == "ERROR":
logger.error("Dumping recovery file and exiting")
dump_error_recovery_file(recovery_file_path, page_number)
break
elif csv_page is not None:
if globals.timing:
time0 = time.time()
......@@ -112,16 +117,20 @@ def process_page(
tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
buffer_tweets = {}
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
try:
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
except NotFlattenedException as exc:
logger.critical(exc.args)
raise ExceptionAtPage("Corrupt tweets found", page_number)
queue.put((page_number, buffer_tweets))
......@@ -339,6 +348,21 @@ def update_recovery_file(
json.dump(recovery_file_contents, f)
def dump_error_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add information pointing to the page where error was detected
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["error_page"] = page_number
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
#########################
# TWEET DB PAGINATION #
#########################
......@@ -418,3 +442,12 @@ def file_length(file_path: str) -> int:
for i, l in enumerate(f):
pass
return i
#######################
# CUSTOM EXCEPTIONS #
#######################
class ExceptionAtPage(Exception):
def __init__(self, message, error_page):
self.message = message
self.error_page = error_page
......@@ -2,6 +2,7 @@
import pymongo
import os
import sys
import argparse
import logging
import time
......@@ -89,8 +90,13 @@ writer_worker = mp.Process(
writer_worker.start()
# Launch pool of workers to perform the format conversion
with mp.Pool() as pool:
pool.map(process_data_page, page_index)
try:
with mp.Pool() as pool:
pool.map(process_data_page, page_index)
except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR"))
sys.exit(1)
task_queue.put((-1, "END"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment