Commit 1f695bf1 by serpucga

Limited number of processes launched by using Pool

Previous version ran out of memory for big databases, because it tried to launch all processes at once. This version has no memory issues anymore. Problems with not being thread safe and process collisions prevail.
parent 84849517
import os import os
import json import json
import re import re
import pymongo
from math import ceil
from tweet_manager.lib import json2csv, format_csv from tweet_manager.lib import json2csv, format_csv
...@@ -133,3 +135,44 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str: ...@@ -133,3 +135,44 @@ def convert_tweet_to_csv(header: str, tweet: dict) -> str:
csv_appendable_line = format_csv.get_csv_line(header, status_csv) csv_appendable_line = format_csv.get_csv_line(header, status_csv)
return csv_appendable_line return csv_appendable_line
def get_page_index(collection, page_size: int):
return list(range(0, ceil(collection.count() / page_size)))
def get_tweets_page(collection, page_size: int, num_page: int):
tweets = collection.find().skip(num_page * page_size).limit(page_size)
return tweets
def write_tweets_to_files(host, port, database, pagesize, header: str,
output_dir: str, page_index):
print("Hi there! write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
client.close()
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1])
...@@ -4,88 +4,43 @@ import pymongo ...@@ -4,88 +4,43 @@ import pymongo
import os import os
import argparse import argparse
import multiprocessing as mp import multiprocessing as mp
from math import ceil
from lib import utils from lib import utils
# Command line parsing
parser = argparse.ArgumentParser(
description="Dump the tweets of a database to a JSON file")
parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("database", type=str)
args = parser.parse_args()
def get_page_index(collection, page_size: int): # Dirs and files
return list(range(0, ceil(collection.count() / page_size))) script_dir = os.path.dirname(__file__)
output_dir = os.path.join(script_dir, "pymongodump", args.database)
header_file = os.path.join(script_dir, "header.txt")
def get_tweets_page(collection, page_size: int, num_page: int):
tweets = collection.find().skip(num_page * page_size).limit(page_size)
return tweets
def write_tweets_to_files(host, port, database, pagesize, header: str,
output_dir: str, page_index):
print("Hi there! write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {}
for tweet in tweets_page:
# Get output path and contents for the new CSV file
csv_tweet_output_path =\
utils.create_tweet_output_path(header, tweet, output_dir)
csv_tweet_contents =\
"\n" + str(utils.convert_tweet_to_csv(header, tweet))
# Check if buffer exists for the file. If not, add to dictionary
if csv_tweet_output_path not in buffer_tweets.keys():
buffer_tweets[csv_tweet_output_path] = ["", 0]
# Update the buffer adding the tweet and increasing tweet count
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1
client.close()
# Perform the write operations in each of the files
for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer:
tweet_writer.write(buffer_tweets[output_path][0])
utils.increase_metadata_count(
os.path.join(output_dir, ".metadata.json"),
output_path, increase=buffer_tweets[output_path][1])
if __name__ == "__main__":
# Command line parsing # MongoDB connection
parser = argparse.ArgumentParser( client = pymongo.MongoClient(args.host, args.port)
description="Dump the tweets of a database to a JSON file") database_tweets = client[args.database]["tweets"]
parser.add_argument("-H", "--host", type=str, default="localhost")
parser.add_argument("-p", "--port", type=int, default=27017)
parser.add_argument("-s", "--pagesize", type=int, default=1000)
parser.add_argument("database", type=str)
args = parser.parse_args()
# Dirs and files with open(header_file) as f:
script_dir = os.path.dirname(__file__) header = f.readline()
output_dir = os.path.join(script_dir, "pymongodump", args.database) buffer_tweets = {}
header_file = os.path.join(script_dir, "header.txt") num_page = 0
# MongoDB connection page_index = utils.get_page_index(database_tweets, args.pagesize)
client = pymongo.MongoClient(args.host, args.port) client.close()
database_tweets = client[args.database]["tweets"]
with open(header_file) as f:
header = f.readline()
buffer_tweets = {}
num_page = 0
page_index = get_page_index(database_tweets, args.pagesize) # Build a picklable function that we can pass to map
client.close() def write_page(
page, host=args.host, port=args.port, database=args.database,
pagesize=args.pagesize, header=header, outputdir=output_dir):
utils.write_tweets_to_files(
host, port, database, pagesize, header, output_dir, page)
output = mp.Queue()
processes = (mp.Process(
target=write_tweets_to_files,
args=(args.host, args.port, args.database,
args.pagesize, header, output_dir, page))
for page in page_index)
for p in processes: # Make the computation
p.start() with mp.Pool() as pool:
for p in processes: pool.map(write_page, page_index)
p.join()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment