Commit c9b7fb65 by serpucga

Generation of recovery file achieved

parent 053f7b0f
timing = False timing = False
dumped_pages = []
...@@ -15,7 +15,15 @@ import logging ...@@ -15,7 +15,15 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def filesystem_writer(queue: mp.Queue, header: str) -> None: def filesystem_writer(
queue: mp.Queue,
header: str,
host: str,
port: int,
database: str,
pagesize: int,
output_dir: str)\
-> None:
""" """
Reads the CSV pages from the queue and writes them to filesystem Reads the CSV pages from the queue and writes them to filesystem
...@@ -31,30 +39,40 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -31,30 +39,40 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
logger.debug( logger.debug(
"Worker {} launched: filesystem_writer executing" "Worker {} launched: filesystem_writer executing"
.format(os.getpid())) .format(os.getpid()))
recovery_file_path = os.path.join(
output_dir, ".recovery_" + database + ".csv")
create_recovery_file(
recovery_file_path, host, port, database, pagesize)
while True: while True:
csv_page = queue.get() page_number, csv_page = queue.get()
if csv_page == "END": if csv_page == "END":
logger.info("Writing loop finished") logger.info("Writing loop finished")
os.remove(recovery_file_path)
break break
if globals.timing and csv_page is not None: elif csv_page is not None:
time0 = time.time() if globals.timing:
for output_path in csv_page.keys(): time0 = time.time()
logger.debug("Dumping tweets for " + output_path) for output_path in csv_page.keys():
if os.path.isfile(output_path): logger.debug("Dumping tweets for " + output_path)
with open(output_path, "a") as writer: if os.path.isfile(output_path):
writer.write(csv_page[output_path]) with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
else:
logger.debug("File {} not found, generating new..."
.format(output_path))
generate_path(output_path, header)
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
if page_number >= 0:
update_recovery_file(recovery_file_path, page_number)
if globals.timing:
logger.critical(
"Time spent writing tweet page to FS: {}s"
.format(time.time() - time0))
else: else:
logger.debug("File {} not found, generating new..." continue
.format(output_path))
generate_path(output_path, header)
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
if globals.timing and csv_page is not None:
time1 = time.time()
logger.critical(
"Time spent writing tweet page to FS: {}s"
.format(time1 - time0))
def process_page( def process_page(
...@@ -101,7 +119,7 @@ def process_page( ...@@ -101,7 +119,7 @@ def process_page(
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close() client.close()
queue.put(buffer_tweets) queue.put((page_number, buffer_tweets))
logger.debug("Page {} enqueued".format(page_number)) logger.debug("Page {} enqueued".format(page_number))
if globals.timing: if globals.timing:
...@@ -218,9 +236,9 @@ def dump_recovery_file( ...@@ -218,9 +236,9 @@ def dump_recovery_file(
port: int, port: int,
database: str, database: str,
page_size: int, page_size: int,
dumped_pages: List[int], dumped_pages: list,
error_page: int, output_dir: str,
output_dir: str)\ error_page: int = None)\
-> None: -> None:
""" """
In case of error, dump information to file to allow recovery In case of error, dump information to file to allow recovery
...@@ -240,16 +258,67 @@ def dump_recovery_file( ...@@ -240,16 +258,67 @@ def dump_recovery_file(
recovery_file_contents["port"] = port recovery_file_contents["port"] = port
recovery_file_contents["database"] = database recovery_file_contents["database"] = database
recovery_file_contents["pagesize"] = page_size recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = globals.dumped_pages recovery_file_contents["dumped_pages"] = dumped_pages
recovery_file_contents["error_page"] = error_page recovery_file_contents["error_page"] = str(error_page)
logger.debug(
"HERE DUMPED_PAGES: {}"
.format(dumped_pages))
with open(recovery_file_path, "w") as f: with open(recovery_file_path, "w") as f:
json.dump(f) json.dump(recovery_file_contents, f)
logger.error( logger.error(
"Generated recovery file at {}".format(recovery_file_path)) "Generated recovery file at {}".format(recovery_file_path))
def create_recovery_file(
file_path: str,
host: str,
port: int,
database: str,
page_size: int)\
-> None:
"""
In case of error, dump information to file to allow recovery
:param host: address of the host to which the script connected
:param port: port of the Mongo database
:param database: name of the database being queried
:param page_size: size of the page that was being used
"""
recovery_file_contents = {}
recovery_file_contents["host"] = host
recovery_file_contents["port"] = port
recovery_file_contents["database"] = database
recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = []
parent_dir = os.path.split(file_path)[0]
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
logger.error("Generated recovery file at {}".format(file_path))
def update_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add a new page to the list of already dumped pages in the recovery file
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["dumped_pages"].append(page_number)
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
######################### #########################
# TWEET DB PAGINATION # # TWEET DB PAGINATION #
######################### #########################
......
...@@ -66,20 +66,18 @@ def process_data_page( ...@@ -66,20 +66,18 @@ def process_data_page(
try: try:
# Launch single process to write to the filesystem # Launch single process to write to the filesystem
writer_worker = mp.Process( writer_worker = mp.Process(
target=utils.filesystem_writer, args=(task_queue, header, )) target=utils.filesystem_writer,
args=(task_queue, header, args.host, args.port,
args.database, args.pagesize, output_dir))
writer_worker.start() writer_worker.start()
# Launch pool of workers to perform the format conversion # Launch pool of workers to perform the format conversion
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index)
task_queue.put("END") task_queue.put((-1, "END"))
except Exception as exc: except (KeyboardInterrupt, Exception):
logger.error("A fatal error occurred. Script will terminate") logger.error("A fatal error occurred. Script will terminate")
error_page = exc # Change this
utils.dump_recovery_file(
args.host, args.port, args.database, args.pagesize,
globals.dumped_pages, error_page, output_dir)
if globals.timing: if globals.timing:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment