Commit 36d7a65a by serpucga

Merge branch 'feature/parallelism' into develop

parents e093713a 52dbc5e9
pymongodump pymongodump
tests.py
.mypy_cache
timing = False
import os import os
import pymongo
import json import json
import re import re
import time
import multiprocessing as mp
from math import ceil
from typing import List
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
from config import globals
def create_task_database_structure( # Logger
output_dir: str)\ import logging
-> str: logger = logging.getLogger(__name__)
def filesystem_writer(queue: mp.Queue, header: str) -> None:
"""
Reads the CSV pages from the queue and writes them to filesystem
This function adds tweets to the files by appending them at the
end. However, if the file or the dir structure didn't already exist,
it will be created anew. The execution loop will exit only when a
termination message is received.
:param queue: queue were processed data await to be written to FS
:param header: header of the new CSV files
"""
logger.debug(
"Worker {} launched: filesystem_writer executing"
.format(os.getpid()))
while True:
csv_page = queue.get()
if csv_page == "END":
logger.info("Writing loop finished")
break
if globals.timing and csv_page is not None:
time0 = time.time()
for output_path in csv_page.keys():
logger.debug("Dumping tweets for " + output_path)
if os.path.isfile(output_path):
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
else:
logger.debug("File {} not found, generating new..."
.format(output_path))
generate_path(output_path, header)
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
if globals.timing and csv_page is not None:
time1 = time.time()
logger.critical(
"Time spent writing tweet page to FS: {}s"
.format(time1 - time0))
def process_page(
host: str,
port: int,
database: str,
header: str,
output_dir: str,
pagesize: int,
page_number: List[int],
queue: mp.Queue)\
-> None:
"""
Retrieve a page of tweets from Mongo, convert to CSV and enqueue
:param host: IP address of the MongoDB host
:param port: Mongo port
:param database: name of the database that we want to query to
:param header: CSV header and all its fields
:param output_dir: root directory for the new CSV database
:param pagesize: size of page (number of tweets we want to get)
:param page_number: number of the tweet page to retrieve
:param queue: queue were processed data await to be written to FS
"""
logger.debug(
"Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing:
time0 = time.time()
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_number)
buffer_tweets = {}
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
queue.put(buffer_tweets)
logger.debug("Page {} enqueued".format(page_number))
if globals.timing:
time1 = time.time()
logger.critical(
"Time processing & buffering tweet page: {}s"
.format(time1 - time0))
def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
""" """
Generate the following directory tree: a top dir that will contain Generate a filesystem path string for the new CSV using tweet date
all the tweet collections if it didn't exist yet and within it the top
directory for this task with a new and empty metadata file :param tweet: tweet object in its Mongo JSON/dict form
:param output_dir: root directory for the new CSV database
:returns: path of the CSV file where the tweet should be written to
""" """
# Create the root directory for the tweet collection collection_path = output_dir
(output_dir, db_name) = os.path.split(output_dir)
# Extract year, month and date from the tweet using a regex
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$",
str(tweet["created_at"]))
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
tweet_output_file = os.path.join(
collection_path, year, month, day + ".csv")
return tweet_output_file
def generate_path(path: str, header: str) -> None:
"""
Generate dir structure and output file if they didn't exist yet
:param path: path of the output file
:param header: header of the new CSV files
"""
path_regex = r"(\./\w+)/(\w+)/(\d{4})/(\d{2})/(\d{2})\.csv$"
match = re.search(path_regex, path)
output_dir = match.group(1)
collection = match.group(2)
year = match.group(3)
month = match.group(4)
day = match.group(5)
if not os.path.isdir(output_dir): if not os.path.isdir(output_dir):
print( logger.info(
"Building directory to contain the collected tweets at: " "Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir) + os.path.abspath(output_dir)
) )
os.mkdir(output_dir) os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
collection_path = os.path.join(output_dir, collection)
if not os.path.isdir(collection_path): if not os.path.isdir(collection_path):
print("Initializing collection " + db_name + "...") logger.info("Initializing collection " + collection + "...")
os.mkdir(collection_path) os.mkdir(collection_path)
metadata_path = os.path.join(collection_path, ".metadata.json")
generate_metadata_file(metadata_path)
return collection_path
collection_year = os.path.join(collection_path, year)
if not os.path.isdir(collection_year):
logger.info("Adding year " + year + "...")
os.mkdir(collection_year)
def generate_metadata_file(metadata_path) -> None: collection_month = os.path.join(collection_year, month)
print("Executing generate_metadata_file") if not os.path.isdir(collection_month):
file_metadata = {} # type: Dict logger.info("Adding month " + month + "...")
metadata = {} os.mkdir(collection_month)
metadata["files"] = file_metadata
with open(metadata_path, "w") as f: csv_output_file = os.path.join(collection_month, day + ".csv")
json.dump(metadata, f) if os.path.isfile(csv_output_file) is False:
with open(csv_output_file, "w") as fw:
logger.info("Adding file for day " + day + "...")
fw.write(header.strip())
def add_newfile_to_metadata(file_path: str, metadata_path: str) -> None: def convert_tweet_to_csv(header: str, tweet: dict) -> str:
""" """
Add a new dictionary structure to the metadata file that contains Convert a JSON/dict-like tweet to a single string line in CSV form
information about a newly added CSV. This should just be user for files
that have just been added to the collection, because it initializes the Takes the JSON/dict object passed as an argument and performs the
count to 0 following transformations:
1. Flattening (removal of nested elements)
2. Conversion to CSV (arg1: flat tweet, arg2: array compression,
arg3: depth of array compression, arg4: remove dollars mode)
3. Format the CSV to make it suitable to append to the file (by
keeping just the information indicated in the header amongst
other required manipulations)
This function includes the core of the computations in the whole
script, and is surely the most expensive part.
For more information about how things get done and to see if
performance might be enhanced, take a look at the tweet_manager
library, which is probably an example of pretty bad code.
:param header: header of the new CSV files
:param tweet: tweet object in its Mongo JSON/dict form
:returns: single line string with the tweet info for the header
fields in CSV form
""" """
print("Executing add_newfile_to_metadata") flat_tweet = json2csv.flatten_dictionary(tweet)
try: csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
with open(metadata_path, "r+") as f: csv_appendable_tweet = format_csv.get_csv_line(header, csv_tweet)
metadata_file = json.load(f)
metadata_file["files"][file_path] = {}
metadata_file["files"][file_path]["count"] = 0
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
add_newfile_to_metadata(file_path, metadata_path)
return csv_appendable_tweet
def increase_metadata_count(
metadata_path: str, #########################
file_path: str, # TWEET DB PAGINATION #
increase: int = 1)\ #########################
-> None: def get_page_index(
collection: pymongo.collection.Collection,
page_size: int)\
-> List[int]:
""" """
Use this when one tweet is appended to one of the CSVs in the Get a list of the pages in the collection
collection. This function will update the metadata file by increasing
by x the corresponding dictionary structure :param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:returns: list of pages in the collection, starting from 0
""" """
print("Executing increase_metadata_count") return list(range(0, ceil(collection.count() / page_size)))
try:
with open(metadata_path, "r+") as f:
metadata_file = json.load(f)
metadata_file["files"][file_path]["count"] += increase
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
increase_metadata_count(metadata_path, file_path, increase)
def create_tweet_output_path( def get_tweets_page(
header: str, collection: pymongo.collection.Collection,
tweet: dict, page_size: int,
output_dir: str)\ num_page: int)\
-> str: -> pymongo.cursor.Cursor:
collection_path = create_task_database_structure(output_dir) """
Get a cursor pointing to the Mongo entries for that page
# Extract year, month and date from the tweet using a regex :param collection: pymongo collection of tweets
matchObj = re.search( :param page_size: number of tweets in each page
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$", :param num_page: relative index of the page within the collection
str(tweet["created_at"])) :returns: a Pymongo cursor pointing to the tweets
year = matchObj.group(1) """
month = matchObj.group(2)
day = matchObj.group(3)
date = (year, month, "")
# Classify the tweet chronologically tweets =\
tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path) collection.find().skip(num_page * page_size).limit(page_size)
tweet_output_file = os.path.join(tweet_output_path, day + ".csv") return tweets
# If the CSV file didn't already exist, initialize it with a header
if os.path.isfile(tweet_output_file) is False:
with open(tweet_output_file, "w") as fw:
fw.write(header.strip())
add_newfile_to_metadata(
tweet_output_file,
os.path.join(collection_path, ".metadata.json"))
return tweet_output_file #########################
# METADATA GENERATION #
#########################
def generate_metadata_file(collection_path: str) -> None:
"""
Create a metadata file for the current collection
Once all the CSV files have been created, generate a metadata file
with information about the number of tweets in each of the CSVs by
making a simple count of lines. Making a call to the system to
perform "wc -l" would surely be faster, but more system dependant
and a bit "hacky"
def convert_tweet_to_csv(header: str, tweet: dict) -> str: :param collection_path: path to the root dir of a database
# Flatten the tweet and store it in status_flat """
status_flat = json2csv.flatten_dictionary(tweet)
# Convert the flat JSON to CSV format metadata = {}
# 1st arg: flat tweet, 2nd arg: activate array compression, 3rd arg: metadata["files"] = {}
# number of array compression levels, 4th arg: remove dollars mode for root, dirs, files in os.walk(collection_path):
status_csv = json2csv.json2csv(status_flat, True, 5, False) for f in files:
file_path = os.path.join(root, f)
relative_path = os.path.relpath(file_path, collection_path)
metadata["files"][relative_path] = {}
metadata["files"][relative_path]["count"] =\
file_length(file_path)
output_path = os.path.join(collection_path, ".metadata.json")
with open(output_path, 'w') as f:
json.dump(metadata, f)
csv_appendable_line = format_csv.get_csv_line(header, status_csv)
return csv_appendable_line def file_length(file_path: str) -> int:
"""
Calculate number of lines of a file
:param file_path: path of the file
:returns: number of lines of the file
"""
with open(file_path) as f:
for i, l in enumerate(f):
pass
return i
...@@ -3,69 +3,83 @@ ...@@ -3,69 +3,83 @@
import pymongo import pymongo
import os import os
import argparse import argparse
import logging
import time
import multiprocessing as mp
from config import globals
from lib import utils from lib import utils
# Command line parsing
parser = argparse.ArgumentParser(
description="Dump the tweets of a database to a JSON file")
parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-t", "--timing", action="store_true")
parser.add_argument("database", type=str)
args = parser.parse_args()
def get_tweets_page(collection, page_size: int, num_page: int): # Logging config
tweets_page = [] logformat = "[%(asctime)s] %(message)s"
tweets = collection.find().skip(num_page * page_size).limit(page_size) dateformat = "%H:%M:%S"
for tweet in tweets: if args.verbose:
tweets_page.append(tweet) logging.basicConfig(
return tweets_page level=logging.DEBUG, format=logformat, datefmt=dateformat)
else:
logging.basicConfig(
if __name__ == "__main__": level=logging.ERROR, format=logformat, datefmt=dateformat)
logger = logging.getLogger(__name__)
# Command line parsing # Initialize some variables
parser = argparse.ArgumentParser( script_dir = os.path.dirname(__file__)
description="Dump the tweets of a database to a JSON file") output_dir = os.path.join(script_dir, "pymongodump", args.database)
parser.add_argument("-H", "--host", type=str, default="localhost") header_file = os.path.join(script_dir, "config", "header.txt")
parser.add_argument("-p", "--port", type=int, default=27017) with open(header_file) as f:
parser.add_argument("-s", "--pagesize", type=int, default=1000) header = f.readline()
parser.add_argument("database", type=str) buffer_tweets = {}
args = parser.parse_args() task_queue = mp.Queue()
if args.timing:
globals.timing = True
time0 = time.time()
# Dirs and files # MongoDB connection to get page index
script_dir = os.path.dirname(__file__) client = pymongo.MongoClient(args.host, args.port)
output_dir = os.path.join(script_dir, "pymongodump", args.database) database_tweets = client[args.database]["tweets"]
header_file = os.path.join(script_dir, "header.txt") page_index = utils.get_page_index(database_tweets, args.pagesize)
client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)"
.format(args.database, len(page_index), args.pagesize))
# MongoDB connection
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
with open(header_file) as f: # Build a picklable function that we can pass to map
header = f.readline() def process_data_page(
buffer_tweets = {} page, host=args.host, port=args.port, database=args.database,
num_page = 0 pagesize=args.pagesize, header=header, outputdir=output_dir,
queue=task_queue):
tweets_page = get_tweets_page(database_tweets, args.pagesize, num_page) utils.process_page(
while len(tweets_page) != 0: host, port, database, header, output_dir, pagesize, page, queue)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
utils.create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(utils.convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count # Launch single process to write to the filesystem
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents writer_worker = mp.Process(
buffer_tweets[csv_tweet_output_path][1] += 1 target=utils.filesystem_writer, args=(task_queue, header, ))
writer_worker.start()
# Perform the write operations in each of the files # Launch pool of workers to perform the format conversion
for output_path in buffer_tweets.keys(): with mp.Pool() as pool:
with open(output_path, "a") as tweet_writer: pool.map(process_data_page, page_index)
tweet_writer.write(buffer_tweets[output_path][0]) task_queue.put("END")
utils.increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1])
num_page += 1 if globals.timing:
tweets_page =\ time1 = time.time()
get_tweets_page(database_tweets, args.pagesize, num_page) utils.generate_metadata_file(output_dir)
if globals.timing:
logger.critical(
"Time spent generating metadata file: {}s"
.format(time.time() - time1))
logger.critical(
"Total execution time: {}s"
.format(time.time() - time0))
import os
import json
def create_task_database_structure(
output_dir: str,
db_name: str)\
-> str:
"""
Generate the following directory tree: a top dir that will contain
all the tweet collections if it didn't exist yet and within it the top
directory for this task with a new and empty metadata file
"""
# Create the root directory for the tweet collection
if not os.path.isdir(output_dir):
print(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path):
print("Initializing collection " + db_name + "...")
os.mkdir(collection_path)
generate_metadata_file(collection_path)
return collection_path
def generate_metadata_file(collection_path) -> None:
print("Executing generate_metadata_file")
metadata_path = os.path.join(collection_path, ".metadata.json")
file_metadata = {} # type: Dict
metadata = {}
metadata["files"] = file_metadata
with open(metadata_path, "w") as f:
json.dump(metadata, f)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment