Commit 16f47745 by serpucga

Moved auxiliar functions out of the main script

parent 95eb843f
import os
import json
import re
from tweet_manager.lib import json2csv, format_csv
def create_task_database_structure(
output_dir: str)\
-> str:
"""
Generate the following directory tree: a top dir that will contain
all the tweet collections if it didn't exist yet and within it the top
directory for this task with a new and empty metadata file
"""
# Create the root directory for the tweet collection
(output_dir, db_name) = os.path.split(output_dir)
if not os.path.isdir(output_dir):
print(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path):
print("Initializing collection " + db_name + "...")
os.mkdir(collection_path)
metadata_path = os.path.join(collection_path, ".metadata.json")
generate_metadata_file(metadata_path)
return collection_path
def generate_metadata_file(metadata_path) -> None:
print("Executing generate_metadata_file")
file_metadata = {} # type: Dict
metadata = {}
metadata["files"] = file_metadata
with open(metadata_path, "w") as f:
json.dump(metadata, f)
def add_newfile_to_metadata(file_path: str, metadata_path: str) -> None:
"""
Add a new dictionary structure to the metadata file that contains
information about a newly added CSV. This should just be user for files
that have just been added to the collection, because it initializes the
count to 0
"""
print("Executing add_newfile_to_metadata")
try:
with open(metadata_path, "r+") as f:
metadata_file = json.load(f)
metadata_file["files"][file_path] = {}
metadata_file["files"][file_path]["count"] = 0
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
add_newfile_to_metadata(file_path, metadata_path)
def increase_metadata_count(
metadata_path: str,
file_path: str,
increase: int = 1)\
-> None:
"""
Use this when one tweet is appended to one of the CSVs in the
collection. This function will update the metadata file by increasing
by x the corresponding dictionary structure
"""
print("Executing increase_metadata_count")
try:
with open(metadata_path, "r+") as f:
metadata_file = json.load(f)
metadata_file["files"][file_path]["count"] += increase
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
increase_metadata_count(metadata_path, file_path, increase)
def create_tweet_output_path(
tweet: dict,
output_dir: str)\
-> str:
collection_path = create_task_database_structure(output_dir)
# Extract year, month and date from the tweet using a regex
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$",
str(tweet["created_at"]))
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
date = (year, month, "")
# Classify the tweet chronologically
tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path)
tweet_output_file = os.path.join(tweet_output_path, day + ".csv")
# If the CSV file didn't already exist, initialize it with a header
if os.path.isfile(tweet_output_file) is False:
with open(os.path.join(output_dir, ".metadata.json")) as f:
header = f.readline().strip()
with open(tweet_output_file, "w") as fw:
fw.write(header)
add_newfile_to_metadata(
tweet_output_file,
os.path.join(collection_path, ".metadata.json"))
return tweet_output_file
def convert_tweet_to_csv(header: str, tweet: dict) -> str:
# Flatten the tweet and store it in status_flat
status_flat = json2csv.flatten_dictionary(tweet)
# Convert the flat JSON to CSV format
# 1st arg: flat tweet, 2nd arg: activate array compression, 3rd arg:
# number of array compression levels, 4th arg: remove dollars mode
status_csv = json2csv.json2csv(status_flat, True, 5, False)
csv_appendable_line = format_csv.get_csv_line(header, status_csv)
return csv_appendable_line
...@@ -3,189 +3,50 @@ ...@@ -3,189 +3,50 @@
import pymongo import pymongo
import os import os
import argparse import argparse
import json
import re
import datetime
from email.utils import parsedate
from tweet_manager.lib import json2csv, format_csv
from lib import utils
def parse_datetime(string):
return datetime.datetime(*(parsedate(string)[:6]))
# Command line parsing
parser = argparse.ArgumentParser(
def create_task_database_structure( description="Dump the tweets of a database to a JSON file")
output_dir: str)\ parser.add_argument("-H", "--host", type=str, default="localhost")
-> str: parser.add_argument("-p", "--port", type=int, default=27017)
""" parser.add_argument("database", type=str)
Generate the following directory tree: a top dir that will contain args = parser.parse_args()
all the tweet collections if it didn't exist yet and within it the top
directory for this task with a new and empty metadata file # Dirs and files
""" script_dir = os.path.dirname(__file__)
output_dir = os.path.join(script_dir, "pymongodump", args.database)
# Create the root directory for the tweet collection header_file = os.path.join(script_dir, "header.txt")
(output_dir, db_name) = os.path.split(output_dir)
if not os.path.isdir(output_dir): # MongoDB connection
print( client = pymongo.MongoClient(args.host, args.port)
"Building directory to contain the collected tweets at: " database_tweets = client[args.database]["tweets"]
+ os.path.abspath(output_dir)
) with open(header_file) as f:
os.mkdir(output_dir) header = f.readline()
collection_path = os.path.join(output_dir, db_name)
if not os.path.isdir(collection_path): buffer_tweets = {}
print("Initializing collection " + db_name + "...") for tweet in database_tweets.find():
os.mkdir(collection_path) # Get output path and contents for the new CSV file
metadata_path = os.path.join(collection_path, ".metadata.json") csv_tweet_output_path =\
generate_metadata_file(metadata_path) utils.create_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
return collection_path "\n" + str(utils.convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
def generate_metadata_file(metadata_path) -> None: if csv_tweet_output_path not in buffer_tweets.keys():
print("Executing generate_metadata_file") buffer_tweets[csv_tweet_output_path] = ["", 0]
file_metadata = {} # type: Dict
metadata = {} # Update the buffer adding the tweet and increasing tweet count
metadata["files"] = file_metadata buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
with open(metadata_path, "w") as f:
json.dump(metadata, f) # Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
def add_newfile_to_metadata(file_path: str, metadata_path: str) -> None: tweet_writer.write(buffer_tweets[output_path][0])
""" utils.increase_metadata_count(
Add a new dictionary structure to the metadata file that contains os.path.join(output_dir, ".metadata.json"),
information about a newly added CSV. This should just be user for files output_path, increase=buffer_tweets[output_path][1])
that have just been added to the collection, because it initializes the
count to 0
"""
print("Executing add_newfile_to_metadata")
try:
with open(metadata_path, "r+") as f:
metadata_file = json.load(f)
metadata_file["files"][file_path] = {}
metadata_file["files"][file_path]["count"] = 0
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
add_newfile_to_metadata(file_path, metadata_path)
def increase_metadata_count(
metadata_path: str,
file_path: str,
increase: int = 1)\
-> None:
"""
Use this when one tweet is appended to one of the CSVs in the
collection. This function will update the metadata file by increasing
by x the corresponding dictionary structure
"""
print("Executing increase_metadata_count")
try:
with open(metadata_path, "r+") as f:
metadata_file = json.load(f)
metadata_file["files"][file_path]["count"] += increase
f.seek(0)
f.truncate()
json.dump(metadata_file, f)
except IOError:
generate_metadata_file(metadata_path)
increase_metadata_count(metadata_path, file_path, increase)
def create_tweet_output_path(
tweet: dict,
output_dir: str)\
-> str:
collection_path = create_task_database_structure(output_dir)
# Extract year, month and date from the tweet using a regex
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$", str(tweet["created_at"])
)
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
date = (year, month, "")
# Classify the tweet chronologically
tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path)
tweet_output_file = os.path.join(tweet_output_path, day + ".csv")
# If the CSV file didn't already exist, initialize it with a header
if os.path.isfile(tweet_output_file) is False:
with open(header_file) as f:
header = f.readline().strip()
with open(tweet_output_file, "w") as fw:
fw.write(header)
add_newfile_to_metadata(
tweet_output_file,
os.path.join(collection_path, ".metadata.json"))
return tweet_output_file
def convert_tweet_to_csv(header: str, tweet: dict) -> str:
# Flatten the tweet and store it in status_flat
status_flat = json2csv.flatten_dictionary(tweet)
# Convert the flat JSON to CSV format
# 1st arg: flat tweet, 2nd arg: activate array compression, 3rd arg:
# number of array compression levels, 4th arg: remove dollars mode
status_csv = json2csv.json2csv(status_flat, True, 5, False)
csv_appendable_line = format_csv.get_csv_line(header, status_csv)
return csv_appendable_line
if __name__ == '__main__':
# Command line parsing
parser = argparse.ArgumentParser(
description="Dump the tweets of a database to a JSON file")
parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("database", type=str)
args = parser.parse_args()
# Dirs and files
script_dir = os.path.dirname(__file__)
output_dir = os.path.join(script_dir, "pymongodump", args.database)
header_file = os.path.join(script_dir, "header.txt")
# MongoDB connection
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
with open(header_file) as f:
header = f.readline()
buffer_tweets = {}
for tweet in database_tweets.find():
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
create_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1]
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment