Commit a4b15d31 by serpucga

First definitive version. May contain bugs

parents d1923e7e 8d7eb4a2
pymongodump
recovery
tests.py
.mypy_cache
.recovery*
timing = False
id,text,created_at,source,truncated,in_reply_to_status_id,in_reply_to_user_id,in_reply_to_screen_name,quoted_status_id,is_quote_status,retweet_count,favorite_count,user.id,user.name,user.created_at,user.screen_name,user.location,user.profile_image_url,user.verified,user.followers_count,user.friends_count,user.listed_count,user.favourites_count,user.statuses_count,user.geo_enabled,user.lang,entities.hashtags.text,entities.urls.expanded_url,entities.user_mentions.screen_name,entities.media.media_url,place.id,place.name,place.full_name,place.country,place.country_code,place.place_type,place.url,place.bounding_box.type,place.bounding_box.coordinates,coordinates.type,coordinates.coordinates
import os
import pymongo
import json
import re
import time
import datetime
import multiprocessing as mp
from math import ceil
from typing import List, Dict
from tweet_manager.lib import json2csv, format_csv
from config import globals
# Logger
import logging
logger = logging.getLogger(__name__)
def filesystem_writer(
queue: mp.Queue,
header: str,
host: str,
port: int,
database: str,
pagesize: int,
output_dir: str,
recovery_file: str)\
-> None:
"""
Reads the CSV pages from the queue and writes them to filesystem
This function adds tweets to the files by appending them at the
end. However, if the file or the dir structure didn't already exist,
it will be created anew. The execution loop will exit only when a
termination message is received.
:param queue: queue were processed data await to be written to FS
:param header: header of the new CSV files
"""
logger.debug(
"Worker {} launched: filesystem_writer executing"
.format(os.getpid()))
if recovery_file:
recovery_file_path = recovery_file
else:
recovery_file_path = build_recovery_filepath(database)
create_recovery_file(
recovery_file_path, host, port, database, pagesize)
while True:
page_number, csv_page = queue.get()
if csv_page == "END":
logger.info("Writing loop finished")
os.remove(recovery_file_path)
break
elif csv_page == "ERROR":
logger.error("Dumping recovery file and exiting")
if recovery_file_path >= 0:
dump_error_recovery_file(recovery_file_path, page_number)
break
elif csv_page is not None:
if globals.timing:
time0 = time.time()
for output_path in csv_page.keys():
logger.debug("Dumping tweets for " + output_path)
if os.path.isfile(output_path):
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
else:
logger.debug("File {} not found, generating new..."
.format(output_path))
generate_path(output_path, header)
with open(output_path, "a") as writer:
writer.write(csv_page[output_path])
if page_number >= 0:
update_recovery_file(recovery_file_path, page_number)
if globals.timing:
logger.critical(
"Time spent writing tweet page to FS: {}s"
.format(time.time() - time0))
else:
continue
def process_page(
host: str,
port: int,
database: str,
header: str,
output_dir: str,
pagesize: int,
page_number: int,
page_index: Dict[int, int],
queue: mp.Queue)\
-> None:
"""
Retrieve a page of tweets from Mongo, convert to CSV and enqueue
:param host: IP address of the MongoDB host
:param port: Mongo port
:param database: name of the database that we want to query to
:param header: CSV header and all its fields
:param output_dir: root directory for the new CSV database
:param pagesize: size of page (number of tweets we want to get)
:param page_number: number of the tweet page to retrieve
:param queue: queue were processed data await to be written to FS
"""
try:
logger.debug(
"Worker {} launched: process_page executing".format(os.getpid()))
if globals.timing:
time0 = time.time()
real_page_number = page_index[page_number]
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page_fast(
database_tweets, pagesize, real_page_number)
buffer_tweets = {}
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
try:
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
except TweetConversionException as exc:
logger.error(exc.message)
logger.error("Origin tweet:\n" + str(exc.tweet))
logger.error("Discarding tweet and proceeding...")
continue
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
queue.put((page_number, buffer_tweets))
logger.debug("Page {} enqueued".format(page_number))
if globals.timing:
time1 = time.time()
logger.critical(
"Time processing & buffering tweet page: {}s"
.format(time1 - time0))
except Exception:
raise ExceptionAtPage(
"Something failed while processing page", page_number)
def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
"""
Generate a filesystem path string for the new CSV using tweet date
:param tweet: tweet object in its Mongo JSON/dict form
:param output_dir: root directory for the new CSV database
:returns: path of the CSV file where the tweet should be written to
"""
collection_path = output_dir
# Extract year, month and date from the tweet using a regex
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$",
str(tweet["created_at"]))
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
tweet_output_file = os.path.join(
collection_path, year, month, day + ".csv")
return tweet_output_file
def generate_path(path: str, header: str) -> None:
"""
Generate dir structure and output file if they didn't exist yet
:param path: path of the output file
:param header: header of the new CSV files
"""
path_regex = r"(\./\w+)/(\w+)/(\d{4})/(\d{2})/(\d{2})\.csv$"
match = re.search(path_regex, path)
output_dir = match.group(1)
collection = match.group(2)
year = match.group(3)
month = match.group(4)
day = match.group(5)
if not os.path.isdir(output_dir):
logger.info(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, collection)
if not os.path.isdir(collection_path):
logger.info("Initializing collection " + collection + "...")
os.mkdir(collection_path)
collection_year = os.path.join(collection_path, year)
if not os.path.isdir(collection_year):
logger.info("Adding year " + year + "...")
os.mkdir(collection_year)
collection_month = os.path.join(collection_year, month)
if not os.path.isdir(collection_month):
logger.info("Adding month " + month + "...")
os.mkdir(collection_month)
csv_output_file = os.path.join(collection_month, day + ".csv")
if os.path.isfile(csv_output_file) is False:
with open(csv_output_file, "w") as fw:
logger.info("Adding file for day " + day + "...")
fw.write(header.strip())
def convert_tweet_to_csv(header: str, tweet: dict) -> str:
"""
Convert a JSON/dict-like tweet to a single string line in CSV form
Takes the JSON/dict object passed as an argument and performs the
following transformations:
1. Flattening (removal of nested elements)
2. Conversion to CSV (arg1: flat tweet, arg2: array compression,
arg3: depth of array compression, arg4: remove dollars mode)
3. Format the CSV to make it suitable to append to the file (by
keeping just the information indicated in the header amongst
other required manipulations)
This function includes the core of the computations in the whole
script, and is surely the most expensive part.
For more information about how things get done and to see if
performance might be enhanced, take a look at the tweet_manager
library, which is probably an example of pretty bad code.
:param header: header of the new CSV files
:param tweet: tweet object in its Mongo JSON/dict form
:returns: single line string with the tweet info for the header
fields in CSV form
"""
try:
flat_tweet = json2csv.flatten_dictionary(tweet)
except Exception:
raise TweetConversionException(
"Error when flattening tweet", tweet)
try:
csv_tweet = json2csv.json2csv(flat_tweet, True, 5, False)
except Exception:
raise TweetConversionException(
"Error when trying to convert tweet to CSV", flat_tweet)
try:
csv_appendable_tweet = format_csv.get_csv_line(
header, csv_tweet)
except Exception:
raise TweetConversionException(
"Error when formatting CSV tweet", csv_tweet)
return csv_appendable_tweet
def build_recovery_filepath(dbname: str) -> str:
"""
Build the path of a recovery file
:param dbname: name of the database being queried
:returns: the path of the recovery file generated in this execution
"""
recovery_dir = "./recovery"
if not os.path.isdir(recovery_dir):
os.mkdir(recovery_dir)
now = datetime.datetime.now()
datetime_str = "%04d%02d%02d-%02d%02d%02d" %\
(now.year, now.month, now.day, now.hour, now.minute, now.second)
recovery_file_path = os.path.join(
recovery_dir,
"recovery_" + dbname + "_" + datetime_str + ".json")
return recovery_file_path
def create_recovery_file(
file_path: str,
host: str,
port: int,
database: str,
page_size: int)\
-> None:
"""
In case of error, dump information to file to allow recovery
:param host: address of the host to which the script connected
:param port: port of the Mongo database
:param database: name of the database being queried
:param page_size: size of the page that was being used
"""
recovery_file_contents = {}
recovery_file_contents["host"] = host
recovery_file_contents["port"] = port
recovery_file_contents["database"] = database
recovery_file_contents["pagesize"] = page_size
recovery_file_contents["dumped_pages"] = []
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
logger.error("Generated recovery file at {}".format(file_path))
def update_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add a new page to the list of already dumped pages in the recovery
file
:param file_path: path to the recovery file
:param page_number: number of the page that was safely written
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["dumped_pages"].append(page_number)
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
def dump_error_recovery_file(
file_path: str,
page_number: int)\
-> None:
"""
Add information pointing to the page where error was detected
:param file_path: path to the recovery file
:param page_number: number of the page that crashed
"""
with open(file_path, "r") as f:
recovery_file_contents = json.load(f)
recovery_file_contents["error_page"] = page_number
with open(file_path, "w") as f:
json.dump(recovery_file_contents, f)
#########################
# TWEET DB PAGINATION #
#########################
def get_page_index(
collection: pymongo.collection.Collection,
page_size: int)\
-> List[int]:
"""
Get a list of the pages in the collection
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:returns: list of pages in the collection, starting from 0
"""
return list(range(0, ceil(collection.count() / page_size)))
def get_tweets_page(
collection: pymongo.collection.Collection,
page_size: int,
num_page: int)\
-> pymongo.cursor.Cursor:
"""
Get a cursor pointing to the Mongo entries for that page
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:param num_page: relative index of the page within the collection
:returns: a Pymongo cursor pointing to the tweets
"""
tweets =\
collection.find().skip(num_page * page_size).limit(page_size)
return tweets
def get_page_index_fast(
collection: pymongo.collection.Collection,
page_size: int)\
-> Dict[int, int]:
"""
Get a list of the pages indexed by their tweet ID.
Skip is very slow for large collections where we need to skip
millions of records. Thus, it is much better for performance
to paginate with references to some identifier field, in this
case, "id" of tweets. This function finds the first and last ID
for a page of "page_size" tweets, and then asks in a loop for the
next page of tweets after the last found ID. This way, it builds
a list with the first ID for each page. The user will be able to
get that page i by asking for the "page_size" tweets with ID lesser
than page[i] (IDs are sorted in descending order). The loop stops
adding pages when it finds one that is not complete.
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:returns: list of indexes, using ID
"""
pages = []
first_page = collection.find()\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
pages.append(first_page[0]["id"])
last_id = first_page[page_size - 1]["id"]
while True:
page = collection.find({"id": {"$lt": last_id}})\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
pages.append(page[0]["id"])
try:
last_id = page[page_size - 1]["id"]
except IndexError:
break
pages_index = {}
for i in range(len(pages)):
pages_index[i] = pages[i]
return pages_index
def get_tweets_page_fast(
collection: pymongo.collection.Collection,
page_size: int,
page_index: int)\
-> pymongo.cursor.Cursor:
"""
Get a cursor pointing to the Mongo entries for that page
:param collection: pymongo collection of tweets
:param page_size: number of tweets in each page
:param num_page: relative index of the page within the collection
:returns: a Pymongo cursor pointing to the tweets
"""
tweets = collection\
.find({"id": {"$lte": page_index}})\
.sort("id", pymongo.DESCENDING)\
.limit(page_size)
return tweets
#########################
# METADATA GENERATION #
#########################
def generate_metadata_file(collection_path: str) -> None:
"""
Create a metadata file for the current collection
Once all the CSV files have been created, generate a metadata file
with information about the number of tweets in each of the CSVs by
making a simple count of lines. Making a call to the system to
perform "wc -l" would surely be faster, but more system dependant
and a bit "hacky"
:param collection_path: path to the root dir of a database
"""
metadata = {}
metadata["files"] = {}
for root, dirs, files in os.walk(collection_path):
for f in files:
file_path = os.path.join(root, f)
relative_path = os.path.relpath(file_path, collection_path)
metadata["files"][relative_path] = {}
metadata["files"][relative_path]["count"] =\
file_length(file_path)
output_path = os.path.join(collection_path, ".metadata.json")
with open(output_path, 'w') as f:
json.dump(metadata, f)
def file_length(file_path: str) -> int:
"""
Calculate number of lines of a file
:param file_path: path of the file
:returns: number of lines of the file
"""
with open(file_path) as f:
for i, l in enumerate(f):
pass
return i
#######################
# CUSTOM EXCEPTIONS #
#######################
class ExceptionAtPage(Exception):
"""
Exception designed to be raised when the conversion of a page of
tweets taken from Mongo fails
"""
def __init__(self, message: str, error_page: int):
"""
:param message: str descriptive of the error
:param error_page: int indicating the number of page that failed
"""
self.message = message
self.error_page = error_page
class TweetConversionException(Exception):
"""
Should be raised when a tweet raises an exception in the process of
being converted
"""
def __init__(self, message: str, tweet: str):
"""
:param message: str descriptive of the error
:param tweet: str with the contents of the tweet that caused the
failure
"""
self.message = message
self.tweet = tweet
#!/usr/bin/env python
import pymongo
import os
import sys
import argparse
import logging
import time
import json
import multiprocessing as mp
from config import globals
from lib import utils
# Command line parsing
parser = argparse.ArgumentParser(
description="Dump the tweets of a database to a JSON file")
parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-t", "--timing", action="store_true")
parser.add_argument("-r", "--recovery", type=str)
parser.add_argument("database", type=str)
args = parser.parse_args()
# Logging config
logformat = "[%(asctime)s] %(message)s"
dateformat = "%H:%M:%S"
if args.verbose:
logging.basicConfig(
level=logging.DEBUG, format=logformat, datefmt=dateformat)
else:
logging.basicConfig(
level=logging.ERROR, format=logformat, datefmt=dateformat)
logger = logging.getLogger(__name__)
# Initialize some variables
script_dir = os.path.dirname(__file__)
output_dir = os.path.join(script_dir, "pymongodump", args.database)
header_file = os.path.join(script_dir, "config", "header.txt")
with open(header_file) as f:
header = f.readline()
buffer_tweets = {}
task_queue = mp.Queue()
if args.timing:
globals.timing = True
time0 = time.time()
# MongoDB connection to get page index
logger.debug("The indexing of the collection may take a while if "
+ "the collection is too big. Please, be patient...")
if args.recovery:
with open(args.recovery) as f:
recovery_data = json.load(f)
client = pymongo.MongoClient(
recovery_data["host"], recovery_data["port"])
database_tweets = client[recovery_data["database"]]["tweets"]
page_index = utils.get_page_index_fast(
database_tweets, recovery_data["pagesize"])
client.close()
full_page_index_len = len(page_index)
for page in recovery_data["dumped_pages"]:
page_index.pop(page, None)
if "error_page" in recovery_data:
logger.debug("Discarding corrupted page")
page_index.pop(recovery_data.pop("error_page"))
logger.debug(
"Resuming collection conversion. {} of {} pages left."
.format(len(page_index), full_page_index_len))
else:
client = pymongo.MongoClient(args.host, args.port)
database_tweets = client[args.database]["tweets"]
page_index = utils.get_page_index_fast(database_tweets, args.pagesize)
client.close()
logger.debug(
"Database {} partitioned in {} pages of {} tweets (maximum)"
.format(args.database, len(page_index), args.pagesize))
# Build a picklable function that we can pass to map
def process_data_page(
page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir,
queue=task_queue, page_index=page_index):
utils.process_page(
host, port, database, header, output_dir, pagesize, page,
page_index, queue)
# Launch single process to write to the filesystem
try:
writer_worker = mp.Process(
target=utils.filesystem_writer,
args=(task_queue, header, args.host, args.port,
args.database, args.pagesize, output_dir, args.recovery))
writer_worker.start()
except Exception:
logger.error("There was a failure in the filesystem writer", exc_info=True)
os.system("spd-say 'Something really bad happened!'")
sys.exit(1)
# Launch pool of workers to perform the format conversion
try:
with mp.Pool() as pool:
pool.map(process_data_page, page_index.keys())
except utils.ExceptionAtPage as exc:
logger.error("Error detected at page {}".format(exc.error_page))
task_queue.put((exc.error_page, "ERROR"))
os.system("spd-say 'Something really bad happened!'")
sys.exit(1)
except (Exception, KeyboardInterrupt):
logger.error("Error detected", exc_info=True)
task_queue.put((-2, "ERROR"))
os.system("spd-say 'Something really bad happened!'")
sys.exit(1)
task_queue.put((-1, "END"))
if globals.timing:
time1 = time.time()
try:
utils.generate_metadata_file(output_dir)
logger.info("Metadata file created")
except (Exception, KeyboardInterrupt):
logger.error("The collection was converted correctly to CSV, but something"
+ " failed when generating the metadata file", exc_info=True)
os.system("spd-say 'Something really bad happened!'")
sys.exit(1)
if globals.timing:
logger.critical(
"Time spent generating metadata file: {}s"
.format(time.time() - time1))
logger.critical(
"Total execution time: {}s"
.format(time.time() - time0))
os.system('spd-say "Conversion completed successfully!"')
logger.info("Conversion completed successfully!!")
......@@ -3,6 +3,8 @@
import pymongo
import os
import argparse
import pprint
from tweet_manager.lib.json2csv import flatten
parser = argparse.ArgumentParser(
description="Dump the tweets of a database to a JSON file")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment