Commit 34170f35 by serpucga

Thread-safe parallel version (dirty code)

Changed structure of the code to make it thread safe when dumping data to the filesystem. Previous parallelism afected all the stages, and that could lead to corrupt data when two processes tried to write at the same time to the same file. Now the code for retrieving data from Mongo and converting it to CSV, named "process_page", because each worker receives a page of X (default 1000) tweets to convert, that code is parallelized and given to a pool of workers. However, those workers only write to buffers that they pass to a multiprocessing thread-safe queue. That queue is processed by a single process, the "filesystem_writer", which is the only one that can write to the filesystem (this includes both the creation of the necessary dirs and appending tweets to the CSV files). This worker is on an eternal loop looking for new data on the queue in order to write it down. This is a pretty dirty version that includes functions and code taht is no longer used and pretty bad log messages used during development to hunt down bugs. Will refactor soon.
parent 50dda137
import os import os
import re
import pymongo import pymongo
import json import json
import re
import multiprocessing as mp
from math import ceil from math import ceil
from typing import List from typing import List, Tuple
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
# Logger # Logger
...@@ -11,6 +12,84 @@ import logging ...@@ -11,6 +12,84 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def filesystem_writer(queue: mp.Queue, header: str) -> None:
"""
Reads the CSV pages from the queue and writes them to filesystem
:param queue: queue were processed data await to be written to FS
"""
logger.info("filesystem_writer active")
while True:
csv_page = queue.get()
if csv_page == "END":
logger.info("filesystem_writer can rest at last")
break
for output_path in csv_page.keys():
logger.debug("Dumping tweets for " + output_path)
if os.path.isfile(output_path):
with open(output_path, "a") as writer:
logger.debug("File exists, dumping...")
# logger.debug(csv_page[output_path])
writer.write(csv_page[output_path])
else:
logger.debug("File not found, generating new...")
generate_path(output_path, header)
with open(output_path, "a") as writer:
# logger.debug(csv_page[output_path])
writer.write(csv_page[output_path])
def process_page(
host: str,
port: int,
database: str,
header: str,
output_dir: str,
pagesize: int,
page_index: List[int],
queue: mp.Queue)\
-> None:
"""
Retrieve a page of tweets from Mongo, convert to CSV and enqueue
:param host: IP address of the MongoDB host
:param port: Mongo port
:param database: name of the database that we want to query to
:param header: CSV header and all its fields
:param output_dir: root directory for the new CSV database
:param pagesize: size of page (number of tweets we want to get)
:param page_index: number of the tweet page to retrieve
:param queue: queue were processed data await to be written to FS
"""
logger.info("process_page executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
csv_tweet_output_path =\
get_tweet_output_path(tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ""
buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
client.close()
queue.put(buffer_tweets)
logger.debug("================")
logger.debug("PAGE {} BUFFER".format(page_index))
logger.debug("================")
# logger.debug(buffer_tweets)
def write_tweets_to_files( def write_tweets_to_files(
host: str, host: str,
port: int, port: int,
...@@ -20,11 +99,13 @@ def write_tweets_to_files( ...@@ -20,11 +99,13 @@ def write_tweets_to_files(
output_dir: str, output_dir: str,
page_index: List[int])\ page_index: List[int])\
-> None: -> None:
logger.info("Hi there! write_tweets_to_files executing")
logger.info("write_tweets_to_files executing")
client = pymongo.MongoClient(host, port) client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"] database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index) tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {} buffer_tweets = {}
for tweet in tweets_page: for tweet in tweets_page:
# Get output path and contents for the new CSV file # Get output path and contents for the new CSV file
csv_tweet_output_path =\ csv_tweet_output_path =\
...@@ -34,17 +115,16 @@ def write_tweets_to_files( ...@@ -34,17 +115,16 @@ def write_tweets_to_files(
# Check if buffer exists for the file. If not, add to dictionary # Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys(): if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0] buffer_tweets[csv_tweet_output_path] = [""]
# Update the buffer adding the tweet and increasing tweet count # Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents buffer_tweets[csv_tweet_output_path] += csv_tweet_contents
# buffer_tweets[csv_tweet_output_path][1] += 1
client.close() client.close()
# Perform the write operations in each of the files # Perform the write operations in each of the files
for output_path in buffer_tweets.keys(): for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer: with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0]) tweet_writer.write(buffer_tweets[output_path])
def get_tweet_output_path(tweet: dict, output_dir: str) -> str: def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
...@@ -70,8 +150,8 @@ def get_tweet_output_path(tweet: dict, output_dir: str) -> str: ...@@ -70,8 +150,8 @@ def get_tweet_output_path(tweet: dict, output_dir: str) -> str:
def create_tweet_output_path( def create_tweet_output_path(
date: Tuple[int, int, int],
header: str, header: str,
tweet: dict,
output_dir: str)\ output_dir: str)\
-> str: -> str:
try: try:
...@@ -79,14 +159,7 @@ def create_tweet_output_path( ...@@ -79,14 +159,7 @@ def create_tweet_output_path(
except FileExistsError as e: except FileExistsError as e:
collection_path = e.filename collection_path = e.filename
# Extract year, month and date from the tweet using a regex day = date[2]
matchObj = re.search(
r"^(\d{4})-(\d{2})-(\d{2}) \d{2}:\d{2}:\d{2}$",
str(tweet["created_at"]))
year = matchObj.group(1)
month = matchObj.group(2)
day = matchObj.group(3)
date = (year, month, "")
# Classify the tweet chronologically # Classify the tweet chronologically
tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path) tweet_output_path = json2csv.mkdir_tweet_date(date, collection_path)
...@@ -100,6 +173,52 @@ def create_tweet_output_path( ...@@ -100,6 +173,52 @@ def create_tweet_output_path(
return tweet_output_file return tweet_output_file
def generate_path(path: str, header: str) -> None:
"""
Generate dirs and output file (initialized with a header)
if they didn't exist yet
"""
path_regex = r"(\./\w+)/(\w+)/(\d{4})/(\d{2})/(\d{2})\.csv$"
match = re.search(path_regex, path)
output_dir = match.group(1)
collection = match.group(2)
year = match.group(3)
month = match.group(4)
day = match.group(5)
if not os.path.isdir(output_dir):
logger.info(
"Building directory to contain the collected tweets at: "
+ os.path.abspath(output_dir)
)
os.mkdir(output_dir)
collection_path = os.path.join(output_dir, collection)
if not os.path.isdir(collection_path):
logger.info("Initializing collection " + collection + "...")
os.mkdir(collection_path)
collection_year = os.path.join(collection_path, year)
if not os.path.isdir(collection_year):
logger.info("Adding year " + year + "...")
os.mkdir(collection_year)
collection_month = os.path.join(collection_year, month)
if not os.path.isdir(collection_month):
logger.info("Adding month " + month + "...")
os.mkdir(collection_month)
csv_output_file = os.path.join(collection_month, day + ".csv")
if os.path.isfile(csv_output_file) is False:
with open(csv_output_file, "w") as fw:
logger.info("Adding file for day " + day + "...")
logger.debug("Initializing CSV file with header")
logger.debug(header)
fw.write(header.strip())
def create_task_database_structure( def create_task_database_structure(
output_dir: str)\ output_dir: str)\
-> str: -> str:
......
...@@ -35,6 +35,7 @@ header_file = os.path.join(script_dir, "header.txt") ...@@ -35,6 +35,7 @@ header_file = os.path.join(script_dir, "header.txt")
with open(header_file) as f: with open(header_file) as f:
header = f.readline() header = f.readline()
buffer_tweets = {} buffer_tweets = {}
task_queue = mp.Queue()
# MongoDB connection to get page index # MongoDB connection to get page index
client = pymongo.MongoClient(args.host, args.port) client = pymongo.MongoClient(args.host, args.port)
...@@ -46,13 +47,21 @@ client.close() ...@@ -46,13 +47,21 @@ client.close()
# Build a picklable function that we can pass to map # Build a picklable function that we can pass to map
def write_page( def write_page(
page, host=args.host, port=args.port, database=args.database, page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir): pagesize=args.pagesize, header=header, outputdir=output_dir,
utils.write_tweets_to_files( queue=task_queue):
host, port, database, pagesize, header, output_dir, page)
utils.process_page(
host, port, database, header, output_dir, pagesize, page, queue)
# Single process to write to the filesystem
writer_worker = mp.Process(
target=utils.filesystem_writer, args=(task_queue, header, ))
writer_worker.start()
# Make the computation # Make the computation
with mp.Pool() as pool: with mp.Pool() as pool:
pool.map(write_page, page_index) pool.map(write_page, page_index)
task_queue.put("END")
utils.generate_metadata_file(output_dir) utils.generate_metadata_file(output_dir)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment