Commit 662a3566 by serpucga

Refactoring and documenting

Names of variables enhanced for clarity, old and unused code removed, some changes in the logs and lots of new docstrings.
parent 34170f35
...@@ -4,7 +4,7 @@ import json ...@@ -4,7 +4,7 @@ import json
import re import re
import multiprocessing as mp import multiprocessing as mp
from math import ceil from math import ceil
from typing import List, Tuple from typing import List
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
# Logger # Logger
...@@ -16,7 +16,13 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -16,7 +16,13 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
""" """
Reads the CSV pages from the queue and writes them to filesystem Reads the CSV pages from the queue and writes them to filesystem
This function adds tweets to the files by appending them at the
end. However, if the file or the dir structure didn't already exist,
it will be created anew. The execution loop will exit only when a
termination message is received.
:param queue: queue were processed data await to be written to FS :param queue: queue were processed data await to be written to FS
:param header: header of the new CSV files
""" """
logger.info("filesystem_writer active") logger.info("filesystem_writer active")
...@@ -30,14 +36,12 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None: ...@@ -30,14 +36,12 @@ def filesystem_writer(queue: mp.Queue, header: str) -> None:
logger.debug("Dumping tweets for " + output_path) logger.debug("Dumping tweets for " + output_path)
if os.path.isfile(output_path): if os.path.isfile(output_path):
with open(output_path, "a") as writer: with open(output_path, "a") as writer:
logger.debug("File exists, dumping...")
# logger.debug(csv_page[output_path])
writer.write(csv_page[output_path]) writer.write(csv_page[output_path])
else: else:
logger.debug("File not found, generating new...") logger.debug("File {} not found, generating new..."
.format(output_path))
generate_path(output_path, header) generate_path(output_path, header)
with open(output_path, "a") as writer: with open(output_path, "a") as writer:
# logger.debug(csv_page[output_path])
writer.write(csv_page[output_path]) writer.write(csv_page[output_path])
...@@ -64,7 +68,7 @@ def process_page( ...@@ -64,7 +68,7 @@ def process_page(
:param queue: queue were processed data await to be written to FS :param queue: queue were processed data await to be written to FS
""" """
logger.info("process_page executing") logger.debug("process_page executing")
client = pymongo.MongoClient(host, port) client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"] database_tweets = client[database]["tweets"]
...@@ -83,54 +87,16 @@ def process_page( ...@@ -83,54 +87,16 @@ def process_page(
client.close() client.close()
queue.put(buffer_tweets) queue.put(buffer_tweets)
logger.debug("Page {} enqueued".format(page_index))
logger.debug("================")
logger.debug("PAGE {} BUFFER".format(page_index))
logger.debug("================")
# logger.debug(buffer_tweets)
def write_tweets_to_files(
host: str,
port: int,
database: str,
pagesize: int,
header: str,
output_dir: str,
page_index: List[int])\
-> None:
logger.info("write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = [""]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path])
def get_tweet_output_path(tweet: dict, output_dir: str) -> str: def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
""" """
Returns same output than create_tweet_output_path, but without Generate a filesystem path string for the new CSV using tweet date
touching the filesystem
:param tweet: tweet object in its Mongo JSON/dict form
:param output_dir: root directory for the new CSV database
:returns: path of the CSV file where the tweet should be written to
""" """
collection_path = output_dir collection_path = output_dir
...@@ -149,34 +115,12 @@ def get_tweet_output_path(tweet: dict, output_dir: str) -> str: ...@@ -149,34 +115,12 @@ def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
return tweet_output_file return tweet_output_file
def create_tweet_output_path(
date: Tuple[int, int, int],
header: str,
output_dir: str)\
-> str:
try:
collection_path = create_task_database_structure(output_dir)
except FileExistsError as e:
collection_path = e.filename
day = date[2]
# Classify the tweet chronologically
tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path)
tweet_output_file = os.path.join(tweet_output_path, day + ".csv")
# If the CSV file didn't already exist, initialize it with a header
if os.path.isfile(tweet_output_file) is False:
with open(tweet_output_file, "w") as fw:
fw.write(header.strip())
return tweet_output_file
def generate_path(path: str, header: str) -> None: def generate_path(path: str, header: str) -> None:
""" """
Generate dirs and output file (initialized with a header) Generate dir structure and output file if they didn't exist yet
if they didn't exist yet
:param path: path of the output file
:param header: header of the new CSV files
""" """
path_regex = r"(\./\w+)/(\w+)/(\d{4})/(\d{2})/(\d{2})\.csv$" path_regex = r"(\./\w+)/(\w+)/(\d{4})/(\d{2})/(\d{2})\.csv$"
...@@ -219,43 +163,35 @@ def generate_path(path: str, header: str) -> None: ...@@ -219,43 +163,35 @@ def generate_path(path: str, header: str) -> None:
fw.write(header.strip()) fw.write(header.strip())
def create_task_database_structure( def convert_tweet_to_csv(header: str, tweet: dict) -> str:
output_dir: str)\
-> str:
""" """
Generate the following directory tree: a top dir that will contain Convert a JSON/dict-like tweet to a single string line in CSV form
all the tweet collections if it didn't exist yet and within it the top
directory for this task Takes the JSON/dict object passed as an argument and performs the
following transformations:
1. Flattening (removal of nested elements)
2. Conversion to CSV (arg1: flat tweet, arg2: array compression,
arg3: depth of array compression, arg4: remove dollars mode)
3. Format the CSV to make it suitable to append to the file (by
keeping just the information indicated in the header amongst
other required manipulations)
This function includes the core of the computations in the whole
script, and is surely the most expensive part.
For more information about how things get done and to see if
performance might be enhanced, take a look at the tweet_manager
library, which is probably an example of pretty bad code.
:param header: header of the new CSV files
:param tweet: tweet object in its Mongo JSON/dict form
:returns: single line string with the tweet info for the header
fields in CSV form
""" """
# Create the root directory for the tweet collection flat_tweet = json2csv.flatten_dictionary(tweet)
(output_dir, db_name) = os.path.split(output_dir) csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
if not os.path.isdir(output_dir): csv_appendable_tweet = format_csv.get_csv_line(header, csv_tweet)
logger.info(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path):
logger.info("Initializing collection " + db_name + "...")
os.mkdir(collection_path)
return collection_path
def convert_tweet_to_csv(header: str, tweet: dict) -> str:
# Flatten the tweet and store it in status_flat
status_flat = json2csv.flatten_dictionary(tweet)
# Convert the flat JSON to CSV format
# 1st arg: flat tweet, 2nd arg: activate array compression, 3rd arg:
# number of array compression levels, 4th arg: remove dollars mode
status_csv = json2csv.json2csv(status_flat, True, 5, False)
csv_appendable_line = format_csv.get_csv_line(header, status_csv)
return csv_appendable_line return csv_appendable_tweet
######################### #########################
...@@ -266,8 +202,11 @@ def get_page_index( ...@@ -266,8 +202,11 @@ def get_page_index(
page_size: int)\ page_size: int)\
-> List[int]: -> List[int]:
""" """
Get an iterator with ints between 0 and N-1, where N is the number Get a list of the pages in the collection
of pages of the collection for the given page size
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:returns: list of pages in the collection, starting from 0
""" """
return list(range(0, ceil(collection.count() / page_size))) return list(range(0, ceil(collection.count() / page_size)))
...@@ -279,11 +218,16 @@ def get_tweets_page( ...@@ -279,11 +218,16 @@ def get_tweets_page(
num_page: int)\ num_page: int)\
-> pymongo.cursor.Cursor: -> pymongo.cursor.Cursor:
""" """
Returns a pymongo cursor pointing to the MongoDB entries comprised Get a cursor pointing to the Mongo entries for that page
in the current page
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:param num_page: relative index of the page within the collection
:returns: a Pymongo cursor pointing to the tweets
""" """
tweets = collection.find().skip(num_page * page_size).limit(page_size) tweets =\
collection.find().skip(num_page * page_size).limit(page_size)
return tweets return tweets
...@@ -292,9 +236,15 @@ def get_tweets_page( ...@@ -292,9 +236,15 @@ def get_tweets_page(
######################### #########################
def generate_metadata_file(collection_path: str) -> None: def generate_metadata_file(collection_path: str) -> None:
""" """
Create a metadata file for the current collection
Once all the CSV files have been created, generate a metadata file Once all the CSV files have been created, generate a metadata file
with information about the number of tweets in each of the CSVs by with information about the number of tweets in each of the CSVs by
making a simple count of lines making a simple count of lines. Making a call to the system to
perform "wc -l" would surely be faster, but more system dependant
and a bit "hacky"
:param collection_path: path to the root dir of a database
""" """
metadata = {} metadata = {}
...@@ -314,6 +264,9 @@ def generate_metadata_file(collection_path: str) -> None: ...@@ -314,6 +264,9 @@ def generate_metadata_file(collection_path: str) -> None:
def file_length(file_path: str) -> int: def file_length(file_path: str) -> int:
""" """
Calculate number of lines of a file Calculate number of lines of a file
:param file_path: path of the file
:returns: number of lines of the file
""" """
with open(file_path) as f: with open(file_path) as f:
......
...@@ -17,8 +17,8 @@ parser.add_argument("-v", "--verbose", action="store_true") ...@@ -17,8 +17,8 @@ parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("database", type=str) parser.add_argument("database", type=str)
args = parser.parse_args() args = parser.parse_args()
# Activate logging # Logging config
logformat = "[%(asctime)s]: %(message)s" logformat = "[%(asctime)s] %(message)s"
dateformat = "%H:%M:%S" dateformat = "%H:%M:%S"
if args.verbose: if args.verbose:
logging.basicConfig( logging.basicConfig(
...@@ -45,7 +45,7 @@ client.close() ...@@ -45,7 +45,7 @@ client.close()
# Build a picklable function that we can pass to map # Build a picklable function that we can pass to map
def write_page( def process_data_page(
page, host=args.host, port=args.port, database=args.database, page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir, pagesize=args.pagesize, header=header, outputdir=output_dir,
queue=task_queue): queue=task_queue):
...@@ -54,14 +54,14 @@ def write_page( ...@@ -54,14 +54,14 @@ def write_page(
host, port, database, header, output_dir, pagesize, page, queue) host, port, database, header, output_dir, pagesize, page, queue)
# Single process to write to the filesystem # Launch single process to write to the filesystem
writer_worker = mp.Process( writer_worker = mp.Process(
target=utils.filesystem_writer, args=(task_queue, header, )) target=utils.filesystem_writer, args=(task_queue, header, ))
writer_worker.start() writer_worker.start()
# Make the computation # Launch pool of workers to perform the format conversion
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(write_page, page_index) pool.map(process_data_page, page_index)
task_queue.put("END") task_queue.put("END")
utils.generate_metadata_file(output_dir) utils.generate_metadata_file(output_dir)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment