Commit 888acbe2 by serpucga

Merge branch 'feature/fault_tolerance' into develop

parents 36d7a65a ab69fb73
pymongodump pymongodump
recovery
tests.py tests.py
.mypy_cache .mypy_cache
.recovery*
...@@ -3,6 +3,7 @@ import pymongo ...@@ -3,6 +3,7 @@ import pymongo
import json import json
import re import re
import time import time
import datetime
import multiprocessing as mp import multiprocessing as mp
from math import ceil from math import ceil
from typing import List from typing import List
...@@ -15,7 +16,16 @@ import logging ...@@ -15,7 +16,16 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def filesystem_writer(queue: mp.Queue, header: str) -> None: def filesystem_writer(
queue: mp.Queue,
header: str,
host: str,
port: int,
database: str,
pagesize: int,
output_dir: str,
recovery_file: str)\
-> None:
""" """
Reads the CSV pages from the queue and writes them to filesystem Reads the CSV pages from the queue and writes them to filesystem
...@@ -31,13 +41,27 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -31,13 +41,27 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
logger.debug( logger.debug(
"Worker {} launched: filesystem_writer executing" "Worker {} launched: filesystem_writer executing"
.format(os.getpid())) .format(os.getpid()))
if recovery_file:
recovery_file_path = recovery_file
else:
recovery_file_path = build_recovery_filepath(database)
create_recovery_file(
recovery_file_path, host, port, database, pagesize)
while True: while True:
csv_page = queue.get() page_number, csv_page = queue.get()
if csv_page == "END": if csv_page == "END":
logger.info("Writing loop finished") logger.info("Writing loop finished")
os.remove(recovery_file_path)
break
elif csv_page == "ERROR":
logger.error("Dumping recovery file and exiting")
if recovery_file_path >= 0:
dump_error_recovery_file(recovery_file_path, page_number)
break break
if globals.timing and csv_page is not None: elif csv_page is not None:
if globals.timing:
time0 = time.time() time0 = time.time()
for output_path in csv_page.keys(): for output_path in csv_page.keys():
logger.debug("Dumping tweets for " + output_path) logger.debug("Dumping tweets for " + output_path)
...@@ -50,11 +74,14 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -50,11 +74,14 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
generate_path(output_path, header) generate_path(output_path, header)
with open(output_path, "a") as writer: with open(output_path, "a") as writer:
writer.write(csv_page[output_path]) writer.write(csv_page[output_path])
if globals.timing and csv_page is not None: if page_number >= 0:
time1 = time.time() update_recovery_file(recovery_file_path, page_number)
if globals.timing:
logger.critical( logger.critical(
"Time spent writing tweet page to FS: {}s" "Time spent writing tweet page to FS: {}s"
.format(time1 - time0)) .format(time.time() - time0))
else:
continue
def process_page( def process_page(
...@@ -80,6 +107,7 @@ def process_page( ...@@ -80,6 +107,7 @@ def process_page(
:param queue: queue were processed data await to be written to FS :param queue: queue were processed data await to be written to FS
""" """
try:
logger.debug( logger.debug(
"Worker {} launched: process_page executing".format(os.getpid())) "Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing: if globals.timing:
...@@ -93,15 +121,20 @@ def process_page( ...@@ -93,15 +121,20 @@ def process_page(
for tweet in tweets_page: for tweet in tweets_page:
csv_tweet_output_path =\ csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir) get_tweet_output_path(tweet, output_dir)
try:
csv_tweet_contents =\ csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet)) "\n" + str(convert_tweet_to_csv(header, tweet))
except TweetConversionException as exc:
logger.error(exc.message)
logger.error("Origin tweet:\n" + str(exc.tweet))
logger.error("Discarding tweet and proceeding...")
continue
if csv_tweet_output_path not in buffer_tweets.keys(): if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = "" buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close() client.close()
queue.put(buffer_tweets) queue.put((page_number, buffer_tweets))
logger.debug("Page {} enqueued".format(page_number)) logger.debug("Page {} enqueued".format(page_number))
if globals.timing: if globals.timing:
...@@ -110,6 +143,10 @@ def process_page( ...@@ -110,6 +143,10 @@ def process_page(
"Time processing & buffering tweet page: {}s" "Time processing & buffering tweet page: {}s"
.format(time1 - time0)) .format(time1 - time0))
except Exception:
raise ExceptionAtPage(
"Something failed while processing page", page_number)
def get_tweet_output_path(tweet: dict, output_dir: str) -> str: def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
""" """
...@@ -206,13 +243,109 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str: ...@@ -206,13 +243,109 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str:
fields in CSV form fields in CSV form
""" """
try:
flat_tweet = json2csv.flatten_dictionary(tweet) flat_tweet = json2csv.flatten_dictionary(tweet)
except Exception:
raise TweetConversionException(
"Error when flattening tweet", tweet)
try:
csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False) csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
csv_appendable_tweet = format_csv.get_csv_line(header, csv_tweet) except Exception:
raise TweetConversionException(
"Error when trying to convert tweet to CSV", flat_tweet)
try:
csv_appendable_tweet = format_csv.get_csv_line(
header, csv_tweet)
except Exception:
raise TweetConversionException(
"Error when formatting CSV tweet", csv_tweet)
return csv_appendable_tweet return csv_appendable_tweet
def build_recovery_filepath(dbname: str) -> str:
"""
Build the path of a recovery file
:param dbname: name of the database being queried
:returns: the path of the recovery file generated in this execution
"""
recovery_dir = "./recovery"
if not os.path.isdir(recovery_dir):
os.mkdir(recovery_dir)
now = datetime.datetime.now()
datetime_str = "%04d%02d%02d-%02d%02d%02d" %\
(now.year, now.month, now.day, now.hour, now.minute, now.second)
recovery_file_path = os.path.join(
recovery_dir,
"recovery_" + dbname + "_" + datetime_str + ".json")
return recovery_file_path
def create_recovery_file(
file_path: str,
host: str,
port: int,
database: str,
page_size: int)\
-> None:
"""
In case of error, dump information to file to allow recovery
:param host: address of the host to which the script connected
:param port: port of the Mongo database
:param database: name of the database being queried
:param page_size: size of the page that was being used
"""
recovery_file_contents = {}
recovery_file_contents["host"] = host
recovery_file_contents["port"] = port
recovery_file_contents["database"] = database
recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = []
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
logger.error("Generated recovery file at {}".format(file_path))
def update_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add a new page to the list of already dumped pages in the recovery
file
:param file_path: path to the recovery file
:param page_number: number of the page that was safely written
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["dumped_pages"].append(page_number)
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
def dump_error_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add information pointing to the page where error was detected
:param file_path: path to the recovery file
:param page_number: number of the page that crashed
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["error_page"] = page_number
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
######################### #########################
# TWEET DB PAGINATION # # TWEET DB PAGINATION #
######################### #########################
...@@ -292,3 +425,38 @@ def file_length(file_path: str) -> int: ...@@ -292,3 +425,38 @@ def file_length(file_path: str) -> int:
for i, l in enumerate(f): for i, l in enumerate(f):
pass pass
return i return i
#######################
# CUSTOM EXCEPTIONS #
#######################
class ExceptionAtPage(Exception):
"""
Exception designed to be raised when the conversion of a page of
tweets taken from Mongo fails
"""
def __init__(self, message: str, error_page: int):
"""
:param message: str descriptive of the error
:param error_page: int indicating the number of page that failed
"""
self.message = message
self.error_page = error_page
class TweetConversionException(Exception):
"""
Should be raised when a tweet raises an exception in the process of
being converted
"""
def __init__(self, message: str, tweet: str):
"""
:param message: str descriptive of the error
:param tweet: str with the contents of the tweet that caused the
failure
"""
self.message = message
self.tweet = tweet
...@@ -2,9 +2,11 @@ ...@@ -2,9 +2,11 @@
import pymongo import pymongo
import os import os
import sys
import argparse import argparse
import logging import logging
import time import time
import json
import multiprocessing as mp import multiprocessing as mp
from config import globals from config import globals
from lib import utils from lib import utils
...@@ -17,6 +19,7 @@ parser.add_argument("-p", "--port", type=int, default=27017) ...@@ -17,6 +19,7 @@ parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000) parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-t", "--timing", action="store_true") parser.add_argument("-t", "--timing", action="store_true")
parser.add_argument("-r", "--recovery", type=str)
parser.add_argument("database", type=str) parser.add_argument("database", type=str)
args = parser.parse_args() args = parser.parse_args()
...@@ -44,11 +47,30 @@ if args.timing: ...@@ -44,11 +47,30 @@ if args.timing:
time0 = time.time() time0 = time.time()
# MongoDB connection to get page index # MongoDB connection to get page index
client = pymongo.MongoClient(args.host, args.port) if args.recovery:
database_tweets = client[args.database]["tweets"] with open(args.recovery) as f:
page_index = utils.get_page_index(database_tweets, args.pagesize) recovery_data = json.load(f)
client.close() client = pymongo.MongoClient(
logger.debug( recovery_data["host"], recovery_data["port"])
database_tweets = client[recovery_data["database"]]["tweets"]
full_page_index = utils.get_page_index(
database_tweets, recovery_data["pagesize"])
client.close()
page_index = [page for page in full_page_index
if page not in recovery_data["dumped_pages"]]
if "error_page" in recovery_data:
logger.debug("Discarding corrupted page")
page_index.remove(recovery_data.pop("error_page"))
logger.debug(
"Resuming collection conversion. {} of {} pages left."
.format(len(page_index), len(full_page_index)))
else:
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
page_index = utils.get_page_index(database_tweets, args.pagesize)
client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)" "Database {} partitioned in {} pages of {} tweets (maximum)"
.format(args.database, len(page_index), args.pagesize)) .format(args.database, len(page_index), args.pagesize))
...@@ -65,17 +87,31 @@ def process_data_page( ...@@ -65,17 +87,31 @@ def process_data_page(
# Launch single process to write to the filesystem # Launch single process to write to the filesystem
writer_worker = mp.Process( writer_worker = mp.Process(
target=utils.filesystem_writer, args=(task_queue, header, )) target=utils.filesystem_writer,
args=(task_queue, header, args.host, args.port,
args.database, args.pagesize, output_dir, args.recovery))
writer_worker.start() writer_worker.start()
# Launch pool of workers to perform the format conversion # Launch pool of workers to perform the format conversion
with mp.Pool() as pool: try:
with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index)
task_queue.put("END") except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR"))
sys.exit(1)
except (Exception, KeyboardInterrupt):
logger.error("Error detected")
task_queue.put((-2, "ERROR"))
sys.exit(1)
task_queue.put((-1, "END"))
if globals.timing: if globals.timing:
time1 = time.time() time1 = time.time()
utils.generate_metadata_file(output_dir) utils.generate_metadata_file(output_dir)
logger.info("Metadata file created")
if globals.timing: if globals.timing:
logger.critical( logger.critical(
"Time spent generating metadata file: {}s" "Time spent generating metadata file: {}s"
...@@ -83,3 +119,5 @@ if globals.timing: ...@@ -83,3 +119,5 @@ if globals.timing:
logger.critical( logger.critical(
"Total execution time: {}s" "Total execution time: {}s"
.format(time.time() - time0)) .format(time.time() - time0))
logger.info("Conversion completed successfully!!")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment