Commit 053f7b0f by serpucga

Implementing fault tolerance, stage 1

parent 36d7a65a
timing = False timing = False
dumped_pages = []
...@@ -213,6 +213,43 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str: ...@@ -213,6 +213,43 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str:
return csv_appendable_tweet return csv_appendable_tweet
def dump_recovery_file(
host: str,
port: int,
database: str,
page_size: int,
dumped_pages: List[int],
error_page: int,
output_dir: str)\
-> None:
"""
In case of error, dump information to file to allow recovery
:param host: address of the host to which the script connected
:param port: port of the Mongo database
:param database: name of the database being queried
:param page_size: size of the page that was being used
:param dumped_pages: list of the pages that were written succesfully
:param error_page: number of the page that failed, if any
"""
recovery_file_path = os.path.join(
output_dir, ".recovery_" + database + ".csv")
recovery_file_contents = {}
recovery_file_contents["host"] = host
recovery_file_contents["port"] = port
recovery_file_contents["database"] = database
recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = globals.dumped_pages
recovery_file_contents["error_page"] = error_page
with open(recovery_file_path, "w") as f:
json.dump(f)
logger.error(
"Generated recovery file at {}".format(recovery_file_path))
######################### #########################
# TWEET DB PAGINATION # # TWEET DB PAGINATION #
######################### #########################
......
...@@ -63,15 +63,24 @@ def process_data_page( ...@@ -63,15 +63,24 @@ def process_data_page(
host, port, database, header, output_dir, pagesize, page, queue) host, port, database, header, output_dir, pagesize, page, queue)
# Launch single process to write to the filesystem try:
writer_worker = mp.Process( # Launch single process to write to the filesystem
writer_worker = mp.Process(
target=utils.filesystem_writer, args=(task_queue, header, )) target=utils.filesystem_writer, args=(task_queue, header, ))
writer_worker.start() writer_worker.start()
# Launch pool of workers to perform the format conversion # Launch pool of workers to perform the format conversion
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index)
task_queue.put("END") task_queue.put("END")
except Exception as exc:
logger.error("A fatal error occurred. Script will terminate")
error_page = exc # Change this
utils.dump_recovery_file(
args.host, args.port, args.database, args.pagesize,
globals.dumped_pages, error_page, output_dir)
if globals.timing: if globals.timing:
time1 = time.time() time1 = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment