Commit ed2c9d74 by serpucga

First parallel version of the code

Parallelized using multiprocessing library. I'm not really sure about the code being thread safe. I think we don't care if tweets are appended to the files in a different order, but the metadata files being corrupted would be problematic. In the first tests the metadata were fine, but I think this line is probably not thread safe (two threads could load try to update the old value at the same time, resulting in inconsistencies): """ metadata_file["files"][file_path]["count"] += increase """ Apart from that, code is much faster than before.
parent 34776b63
...@@ -32,13 +32,13 @@ def create_task_database_structure( ...@@ -32,13 +32,13 @@ def create_task_database_structure(
def generate_metadata_file(metadata_path) -> None: def generate_metadata_file(metadata_path) -> None:
print("Executing generate_metadata_file") print("Executing generate_metadata_file")
file_metadata = {} # type: Dict file_metadata = {}
metadata = {} metadata = {}
metadata["files"] = file_metadata metadata["files"] = file_metadata
with open(metadata_path, "w") as f: with open(metadata_path, "w") as f:
json.dump(metadata, f) json.dump(metadata, f)
def add_newfile_to_metadata(file_path: str, metadata_path: str) -> None: def add_newfile_to_metadata(file_path: str, metadata_path: str) -> None:
...@@ -92,7 +92,10 @@ def create_tweet_output_path( ...@@ -92,7 +92,10 @@ def create_tweet_output_path(
tweet: dict, tweet: dict,
output_dir: str)\ output_dir: str)\
-> str: -> str:
collection_path = create_task_database_structure(output_dir) try:
collection_path = create_task_database_structure(output_dir)
except FileExistsError as e:
collection_path = e.filename
# Extract year, month and date from the tweet using a regex # Extract year, month and date from the tweet using a regex
matchObj = re.search( matchObj = re.search(
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
import pymongo import pymongo
import os import os
import argparse import argparse
import multiprocessing as mp
from math import ceil from math import ceil
from lib import utils from lib import utils
...@@ -10,7 +11,7 @@ from lib import utils ...@@ -10,7 +11,7 @@ from lib import utils
def get_page_index(collection, page_size: int): def get_page_index(collection, page_size: int):
page_index = [] page_index = []
for i in range(0, ceil(collection.count() / page_size)): for i in range(0, ceil(collection.count() / page_size)):
page_index.append(get_tweets_page(collection, page_size, i)) page_index.append(get_tweets_page(collection, page_size, i))
return page_index return page_index
...@@ -20,6 +21,33 @@ def get_tweets_page(collection, page_size: int, num_page: int): ...@@ -20,6 +21,33 @@ def get_tweets_page(collection, page_size: int, num_page: int):
return tweets return tweets
def write_tweets_to_files(header: str, output_dir: str, tweets_page):
print("Hi there! write_tweets_to_files executing")
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
utils.create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(utils.convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
utils.increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1])
if __name__ == "__main__": if __name__ == "__main__":
# Command line parsing # Command line parsing
...@@ -46,27 +74,13 @@ if __name__ == "__main__": ...@@ -46,27 +74,13 @@ if __name__ == "__main__":
num_page = 0 num_page = 0
page_index = get_page_index(database_tweets, args.pagesize) page_index = get_page_index(database_tweets, args.pagesize)
for page in page_index:
buffer_tweets = {} output = mp.Queue()
for tweet in page: processes = (mp.Process(
# Get output path and contents for the new CSV file target=write_tweets_to_files, args=(
csv_tweet_output_path =\ header, output_dir, page)) for page in page_index)
utils.create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\ for p in processes:
"\n" + str(utils.convert_tweet_to_csv(header, tweet)) p.start()
for p in processes:
# Check if buffer exists for the file. If not, add to dictionary p.join()
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
utils.increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment