Commit 866da8f7 by serpucga

Trying to make it thread safe, step1

parent 844fabe9
......@@ -7,29 +7,62 @@ from typing import List
from tweet_manager.lib import json2csv, format_csv
def create_task_database_structure(
output_dir: str)\
-> str:
def write_tweets_to_files(
host: str,
port: int,
database: str,
pagesize: int,
header: str,
output_dir: str,
page_index: List[int])\
-> None:
print("Hi there! write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
# buffer_tweets[csv_tweet_output_path][1] += 1
client.close()
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
"""
Generate the following directory tree: a top dir that will contain
all the tweet collections if it didn't exist yet and within it the top
directory for this task
Returns same output than create_tweet_output_path, but without
touching the filesystem
"""
# Create the root directory for the tweet collection
(output_dir, db_name) = os.path.split(output_dir)
if not os.path.isdir(output_dir):
print(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path):
print("Initializing collection " + db_name + "...")
os.mkdir(collection_path)
collection_path = output_dir
return collection_path
# Extract year, month and date from the tweet using a regex
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$",
str(tweet["created_at"]))
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
tweet_output_file = os.path.join(
collection_path, year, month, day + ".csv")
return tweet_output_file
def create_tweet_output_path(
......@@ -63,6 +96,31 @@ def create_tweet_output_path(
return tweet_output_file
def create_task_database_structure(
output_dir: str)\
-> str:
"""
Generate the following directory tree: a top dir that will contain
all the tweet collections if it didn't exist yet and within it the top
directory for this task
"""
# Create the root directory for the tweet collection
(output_dir, db_name) = os.path.split(output_dir)
if not os.path.isdir(output_dir):
print(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path):
print("Initializing collection " + db_name + "...")
os.mkdir(collection_path)
return collection_path
def convert_tweet_to_csv(header: str, tweet: dict) -> str:
# Flatten the tweet and store it in status_flat
status_flat = json2csv.flatten_dictionary(tweet)
......@@ -77,42 +135,6 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str:
return csv_appendable_line
def write_tweets_to_files(
host: str,
port: int,
database: str,
pagesize: int,
header: str,
output_dir: str,
page_index: list)\
-> None:
print("Hi there! write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
# buffer_tweets[csv_tweet_output_path][1] += 1
client.close()
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
#########################
# TWEET DB PAGINATION #
#########################
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment