Commit 52dbc5e9 by serpucga

Added new mode to measure times and improved logs

Added a new option and mode, "-t", which will show the time costs for some of the most relevant operations (writing to file, converting a page to CSV format, creating the metadata file...). Besides, the verbose mode was enhanced considerably, leaving the most noisy messages out and introducing some useful ones and improving others.
parent 662a3566
pymongodump pymongodump
tests.py tests.py
.mypy_cache
timing = False
...@@ -2,11 +2,14 @@ import os ...@@ -2,11 +2,14 @@ import os
import pymongo import pymongo
import json import json
import re import re
import time
import multiprocessing as mp import multiprocessing as mp
from math import ceil from math import ceil
from typing import List from typing import List
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
from config import globals
# Logger # Logger
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -25,13 +28,17 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -25,13 +28,17 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
:param header: header of the new CSV files :param header: header of the new CSV files
""" """
logger.info("filesystem_writer active") logger.debug(
"Worker {} launched: filesystem_writer executing"
.format(os.getpid()))
while True: while True:
csv_page = queue.get() csv_page = queue.get()
if csv_page == "END": if csv_page == "END":
logger.info("filesystem_writer can rest at last") logger.info("Writing loop finished")
break break
if globals.timing and csv_page is not None:
time0 = time.time()
for output_path in csv_page.keys(): for output_path in csv_page.keys():
logger.debug("Dumping tweets for " + output_path) logger.debug("Dumping tweets for " + output_path)
if os.path.isfile(output_path): if os.path.isfile(output_path):
...@@ -43,6 +50,11 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -43,6 +50,11 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
generate_path(output_path, header) generate_path(output_path, header)
with open(output_path, "a") as writer: with open(output_path, "a") as writer:
writer.write(csv_page[output_path]) writer.write(csv_page[output_path])
if globals.timing and csv_page is not None:
time1 = time.time()
logger.critical(
"Time spent writing tweet page to FS: {}s"
.format(time1 - time0))
def process_page( def process_page(
...@@ -52,7 +64,7 @@ def process_page( ...@@ -52,7 +64,7 @@ def process_page(
header: str, header: str,
output_dir: str, output_dir: str,
pagesize: int, pagesize: int,
page_index: List[int], page_number: List[int],
queue: mp.Queue)\ queue: mp.Queue)\
-> None: -> None:
""" """
...@@ -64,15 +76,18 @@ def process_page( ...@@ -64,15 +76,18 @@ def process_page(
:param header: CSV header and all its fields :param header: CSV header and all its fields
:param output_dir: root directory for the new CSV database :param output_dir: root directory for the new CSV database
:param pagesize: size of page (number of tweets we want to get) :param pagesize: size of page (number of tweets we want to get)
:param page_index: number of the tweet page to retrieve :param page_number: number of the tweet page to retrieve
:param queue: queue were processed data await to be written to FS :param queue: queue were processed data await to be written to FS
""" """
logger.debug("process_page executing") logger.debug(
"Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing:
time0 = time.time()
client = pymongo.MongoClient(host, port) client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"] database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index) tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
buffer_tweets = {} buffer_tweets = {}
for tweet in tweets_page: for tweet in tweets_page:
...@@ -87,7 +102,13 @@ def process_page( ...@@ -87,7 +102,13 @@ def process_page(
client.close() client.close()
queue.put(buffer_tweets) queue.put(buffer_tweets)
logger.debug("Page {} enqueued".format(page_index))
logger.debug("Page {} enqueued".format(page_number))
if globals.timing:
time1 = time.time()
logger.critical(
"Time processing & buffering tweet page: {}s"
.format(time1 - time0))
def get_tweet_output_path(tweet: dict, output_dir: str) -> str: def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
...@@ -158,8 +179,6 @@ def generate_path(path: str, header: str) -> None: ...@@ -158,8 +179,6 @@ def generate_path(path: str, header: str) -> None:
if os.path.isfile(csv_output_file) is False: if os.path.isfile(csv_output_file) is False:
with open(csv_output_file, "w") as fw: with open(csv_output_file, "w") as fw:
logger.info("Adding file for day " + day + "...") logger.info("Adding file for day " + day + "...")
logger.debug("Initializing CSV file with header")
logger.debug(header)
fw.write(header.strip()) fw.write(header.strip())
......
...@@ -4,7 +4,9 @@ import pymongo ...@@ -4,7 +4,9 @@ import pymongo
import os import os
import argparse import argparse
import logging import logging
import time
import multiprocessing as mp import multiprocessing as mp
from config import globals
from lib import utils from lib import utils
# Command line parsing # Command line parsing
...@@ -14,6 +16,7 @@ parser.add_argument("-H", "--host", type=str, default="localhost") ...@@ -14,6 +16,7 @@ parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017) parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000) parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("-v", "--verbose", action="store_true") parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-t", "--timing", action="store_true")
parser.add_argument("database", type=str) parser.add_argument("database", type=str)
args = parser.parse_args() args = parser.parse_args()
...@@ -31,17 +34,23 @@ logger = logging.getLogger(__name__) ...@@ -31,17 +34,23 @@ logger = logging.getLogger(__name__)
# Initialize some variables # Initialize some variables
script_dir = os.path.dirname(__file__) script_dir = os.path.dirname(__file__)
output_dir = os.path.join(script_dir, "pymongodump", args.database) output_dir = os.path.join(script_dir, "pymongodump", args.database)
header_file = os.path.join(script_dir, "header.txt") header_file = os.path.join(script_dir, "config", "header.txt")
with open(header_file) as f: with open(header_file) as f:
header = f.readline() header = f.readline()
buffer_tweets = {} buffer_tweets = {}
task_queue = mp.Queue() task_queue = mp.Queue()
if args.timing:
globals.timing = True
time0 = time.time()
# MongoDB connection to get page index # MongoDB connection to get page index
client = pymongo.MongoClient(args.host, args.port) client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"] database_tweets = client[args.database]["tweets"]
page_index = utils.get_page_index(database_tweets, args.pagesize) page_index = utils.get_page_index(database_tweets, args.pagesize)
client.close() client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)"
.format(args.database, len(page_index), args.pagesize))
# Build a picklable function that we can pass to map # Build a picklable function that we can pass to map
...@@ -64,4 +73,13 @@ with mp.Pool() as pool: ...@@ -64,4 +73,13 @@ with mp.Pool() as pool:
pool.map(process_data_page, page_index) pool.map(process_data_page, page_index)
task_queue.put("END") task_queue.put("END")
if globals.timing:
time1 = time.time()
utils.generate_metadata_file(output_dir) utils.generate_metadata_file(output_dir)
if globals.timing:
logger.critical(
"Time spent generating metadata file: {}s"
.format(time.time() - time1))
logger.critical(
"Total execution time: {}s"
.format(time.time() - time0))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment