Commit 662b9e66 by serpucga

Error handling added

The system should now be capable of overcoming a failure during the process of conversion, either by ignoring the error or by dumping the state at the moment of failure and allowing to resume the process later from the point where it was stopped. The policies followed at this stage for avoiding corrupt data or other errors are the following: 1. If an specific tweet raises an error in the process of being converted to CSV, the tweet is skipped and the whole execution continues. 2. If there is any other error when processing a page of tweets, the number of that page is recorded in the recovery file, and that page will be skipped when the user tries to resume the execution from the recovery file. 3. If any other unexpected error, keyboard interruption or anything happened, a standard recovery file will be dumped, with the list of already converted pages but without "error_page", so when executing the script with the flag "-r", the program will try to resume the execution from the point where it was left without discarding any info.
parent c05a677c
......@@ -8,7 +8,6 @@ import multiprocessing as mp
from math import ceil
from typing import List
from tweet_manager.lib import json2csv, format_csv
from tweet_manager.lib.json2csv import NotFlattenedException
from config import globals
......@@ -58,7 +57,8 @@ def filesystem_writer(
break
elif csv_page == "ERROR":
logger.error("Dumping recovery file and exiting")
dump_error_recovery_file(recovery_file_path, page_number)
if recovery_file_path >= 0:
dump_error_recovery_file(recovery_file_path, page_number)
break
elif csv_page is not None:
if globals.timing:
......@@ -107,39 +107,45 @@ def process_page(
:param queue: queue were processed data await to be written to FS
"""
logger.debug(
"Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing:
time0 = time.time()
try:
logger.debug(
"Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing:
time0 = time.time()
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
buffer_tweets = {}
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
buffer_tweets = {}
try:
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
try:
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
except TweetConversionException as exc:
logger.error(exc.message)
logger.error("Origin tweet:\n" + str(exc.tweet))
logger.error("Discarding tweet and proceeding...")
continue
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
except NotFlattenedException as exc:
logger.critical(exc.args)
raise ExceptionAtPage("Corrupt tweets found", page_number)
queue.put((page_number, buffer_tweets))
queue.put((page_number, buffer_tweets))
logger.debug("Page {} enqueued".format(page_number))
if globals.timing:
time1 = time.time()
logger.critical(
"Time processing & buffering tweet page: {}s"
.format(time1 - time0))
logger.debug("Page {} enqueued".format(page_number))
if globals.timing:
time1 = time.time()
logger.critical(
"Time processing & buffering tweet page: {}s"
.format(time1 - time0))
except Exception:
raise ExceptionAtPage(
"Something failed while processing page", page_number)
def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
......@@ -237,10 +243,22 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str:
fields in CSV form
"""
flat_tweet = json2csv.flatten_dictionary(tweet)
csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
csv_appendable_tweet = format_csv.get_csv_line(header, csv_tweet)
try:
flat_tweet = json2csv.flatten_dictionary(tweet)
except Exception:
raise TweetConversionException(
"Error when flattening tweet", tweet)
try:
csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
except Exception:
raise TweetConversionException(
"Error when trying to convert tweet to CSV", flat_tweet)
try:
csv_appendable_tweet = format_csv.get_csv_line(
header, csv_tweet)
except Exception:
raise TweetConversionException(
"Error when formatting CSV tweet", csv_tweet)
return csv_appendable_tweet
......@@ -280,7 +298,7 @@ def dump_recovery_file(
with open(recovery_file_path, "w") as f:
json.dump(recovery_file_contents, f)
logger.error(
logger.debug(
"Generated recovery file at {}".format(recovery_file_path))
......@@ -451,3 +469,9 @@ class ExceptionAtPage(Exception):
def __init__(self, message, error_page):
self.message = message
self.error_page = error_page
class TweetConversionException(Exception):
def __init__(self, message, tweet):
self.message = message
self.tweet = tweet
......@@ -58,6 +58,9 @@ if args.recovery:
client.close()
page_index = [page for page in full_page_index
if page not in recovery_data["dumped_pages"]]
if "error_page" in recovery_data:
logger.debug("Discarding corrupted page")
page_index.remove(recovery_data.pop("error_page"))
logger.debug(
"Resuming collection conversion. {} of {} pages left."
.format(len(page_index), len(full_page_index)))
......@@ -97,6 +100,11 @@ except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR"))
sys.exit(1)
except (Exception, KeyboardInterrupt):
logger.error("Error detected")
task_queue.put((-2, "ERROR"))
sys.exit(1)
task_queue.put((-1, "END"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment