Commit 174863a2 by serpucga

Added new mode of execution, 'recovery', which allows to continue execution of a…

Added new mode of execution, 'recovery', which allows to continue execution of a task by loading a recovery file from a previous process
parent c9b7fb65
pymongodump pymongodump
tests.py tests.py
.mypy_cache .mypy_cache
.recovery*
...@@ -22,7 +22,8 @@ def filesystem_writer( ...@@ -22,7 +22,8 @@ def filesystem_writer(
port: int, port: int,
database: str, database: str,
pagesize: int, pagesize: int,
output_dir: str)\ output_dir: str,
recovery_file: str)\
-> None: -> None:
""" """
Reads the CSV pages from the queue and writes them to filesystem Reads the CSV pages from the queue and writes them to filesystem
...@@ -39,8 +40,10 @@ def filesystem_writer( ...@@ -39,8 +40,10 @@ def filesystem_writer(
logger.debug( logger.debug(
"Worker {} launched: filesystem_writer executing" "Worker {} launched: filesystem_writer executing"
.format(os.getpid())) .format(os.getpid()))
recovery_file_path = os.path.join( if recovery_file:
output_dir, ".recovery_" + database + ".csv") recovery_file_path = recovery_file
else:
recovery_file_path = ".recovery_" + database + ".csv"
create_recovery_file( create_recovery_file(
recovery_file_path, host, port, database, pagesize) recovery_file_path, host, port, database, pagesize)
...@@ -294,9 +297,6 @@ def create_recovery_file( ...@@ -294,9 +297,6 @@ def create_recovery_file(
recovery_file_contents["pagesize"] = page_size recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = [] recovery_file_contents["dumped_pages"] = []
parent_dir = os.path.split(file_path)[0]
if not os.path.exists(parent_dir):
os.makedirs(parent_dir)
with open(file_path, "w") as f: with open(file_path, "w") as f:
json.dump(recovery_file_contents, f) json.dump(recovery_file_contents, f)
......
...@@ -5,6 +5,7 @@ import os ...@@ -5,6 +5,7 @@ import os
import argparse import argparse
import logging import logging
import time import time
import json
import multiprocessing as mp import multiprocessing as mp
from config import globals from config import globals
from lib import utils from lib import utils
...@@ -17,6 +18,7 @@ parser.add_argument("-p", "--port", type=int, default=27017) ...@@ -17,6 +18,7 @@ parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000) parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-t", "--timing", action="store_true") parser.add_argument("-t", "--timing", action="store_true")
parser.add_argument("-r", "--recovery", type=str)
parser.add_argument("database", type=str) parser.add_argument("database", type=str)
args = parser.parse_args() args = parser.parse_args()
...@@ -44,11 +46,27 @@ if args.timing: ...@@ -44,11 +46,27 @@ if args.timing:
time0 = time.time() time0 = time.time()
# MongoDB connection to get page index # MongoDB connection to get page index
client = pymongo.MongoClient(args.host, args.port) if args.recovery:
database_tweets = client[args.database]["tweets"] with open(args.recovery) as f:
page_index = utils.get_page_index(database_tweets, args.pagesize) recovery_data = json.load(f)
client.close() client = pymongo.MongoClient(
logger.debug( recovery_data["host"], recovery_data["port"])
database_tweets = client[recovery_data["database"]]["tweets"]
full_page_index = utils.get_page_index(
database_tweets, recovery_data["pagesize"])
client.close()
page_index = [page for page in full_page_index
if page not in recovery_data["dumped_pages"]]
logger.debug(
"Resuming collection conversion. {} of {} pages left."
.format(len(page_index), len(full_page_index)))
else:
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
page_index = utils.get_page_index(database_tweets, args.pagesize)
client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)" "Database {} partitioned in {} pages of {} tweets (maximum)"
.format(args.database, len(page_index), args.pagesize)) .format(args.database, len(page_index), args.pagesize))
...@@ -63,21 +81,17 @@ def process_data_page( ...@@ -63,21 +81,17 @@ def process_data_page(
host, port, database, header, output_dir, pagesize, page, queue) host, port, database, header, output_dir, pagesize, page, queue)
try: # Launch single process to write to the filesystem
# Launch single process to write to the filesystem writer_worker = mp.Process(
writer_worker = mp.Process(
target=utils.filesystem_writer, target=utils.filesystem_writer,
args=(task_queue, header, args.host, args.port, args=(task_queue, header, args.host, args.port,
args.database, args.pagesize, output_dir)) args.database, args.pagesize, output_dir, args.recovery))
writer_worker.start() writer_worker.start()
# Launch pool of workers to perform the format conversion # Launch pool of workers to perform the format conversion
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index)
task_queue.put((-1, "END")) task_queue.put((-1, "END"))
except (KeyboardInterrupt, Exception):
logger.error("A fatal error occurred. Script will terminate")
if globals.timing: if globals.timing:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment