Commit 33782e37 by serpucga

Create one Mongo connection for each process

parent ed2c9d74
...@@ -10,10 +10,7 @@ from lib import utils ...@@ -10,10 +10,7 @@ from lib import utils
def get_page_index(collection, page_size: int): def get_page_index(collection, page_size: int):
page_index = [] return list(range(0, ceil(collection.count() / page_size)))
for i in range(0, ceil(collection.count() / page_size)):
page_index.append(get_tweets_page(collection, page_size, i))
return page_index
def get_tweets_page(collection, page_size: int, num_page: int): def get_tweets_page(collection, page_size: int, num_page: int):
...@@ -21,8 +18,12 @@ def get_tweets_page(collection, page_size: int, num_page: int): ...@@ -21,8 +18,12 @@ def get_tweets_page(collection, page_size: int, num_page: int):
return tweets return tweets
def write_tweets_to_files(header: str, output_dir: str, tweets_page): def write_tweets_to_files(host, port, database, pagesize, header: str,
output_dir: str, page_index):
print("Hi there! write_tweets_to_files executing") print("Hi there! write_tweets_to_files executing")
client = pymongo.MongoClient(host, port)
database_tweets = client[database]["tweets"]
tweets_page = get_tweets_page(database_tweets, pagesize, page_index)
buffer_tweets = {} buffer_tweets = {}
for tweet in tweets_page: for tweet in tweets_page:
# Get output path and contents for the new CSV file # Get output path and contents for the new CSV file
...@@ -39,6 +40,7 @@ def write_tweets_to_files(header: str, output_dir: str, tweets_page): ...@@ -39,6 +40,7 @@ def write_tweets_to_files(header: str, output_dir: str, tweets_page):
buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents buffer_tweets[csv_tweet_output_path][0] += csv_tweet_contents
buffer_tweets[csv_tweet_output_path][1] += 1 buffer_tweets[csv_tweet_output_path][1] += 1
client.close()
# Perform the write operations in each of the files # Perform the write operations in each of the files
for output_path in buffer_tweets.keys(): for output_path in buffer_tweets.keys():
with open(output_path, "a") as tweet_writer: with open(output_path, "a") as tweet_writer:
...@@ -74,11 +76,14 @@ if __name__ == "__main__": ...@@ -74,11 +76,14 @@ if __name__ == "__main__":
num_page = 0 num_page = 0
page_index = get_page_index(database_tweets, args.pagesize) page_index = get_page_index(database_tweets, args.pagesize)
client.close()
output = mp.Queue() output = mp.Queue()
processes = (mp.Process( processes = (mp.Process(
target=write_tweets_to_files, args=( target=write_tweets_to_files,
header, output_dir, page)) for page in page_index) args=(args.host, args.port, args.database,
args.pagesize, header, output_dir, page))
for page in page_index)
for p in processes: for p in processes:
p.start() p.start()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment