Welcome to part 5 of the chatbot with Python and TensorFlow tutorial series. Leading up to this tutorial, we've been working with our data and preparing the logic for how we want to insert it, now we're ready to start inserting. Code up to now:
import sqlite3 import json from datetime import datetime timeframe = '2015-05' sql_transaction = [] connection = sqlite3.connect('{}.db'.format(timeframe)) c = connection.cursor() def create_table(): c.execute("CREATE TABLE IF NOT EXISTS parent_reply(parent_id TEXT PRIMARY KEY, comment_id TEXT UNIQUE, parent TEXT, comment TEXT, subreddit TEXT, unix INT, score INT)") def format_data(data): data = data.replace('\n',' newlinechar ').replace('\r',' newlinechar ').replace('"',"'") return data def acceptable(data): if len(data.split(' ')) > 50 or len(data) < 1: return False elif len(data) > 1000: return False elif data == '[deleted]': return False elif data == '[removed]': return False else: return True def find_parent(pid): try: sql = "SELECT comment FROM parent_reply WHERE comment_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False def find_existing_score(pid): try: sql = "SELECT score FROM parent_reply WHERE parent_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False if __name__ == '__main__': create_table() row_counter = 0 paired_rows = 0 with open('J:/chatdata/reddit_data/{}/RC_{}'.format(timeframe.split('-')[0],timeframe), buffering=1000) as f: for row in f: row_counter += 1 row = json.loads(row) parent_id = row['parent_id'] body = format_data(row['body']) created_utc = row['created_utc'] score = row['score'] comment_id = row['name'] subreddit = row['subreddit'] parent_data = find_parent(parent_id) if score >= 2: existing_comment_score = find_existing_score(parent_id)
Now, if there is an existing comment score, that means there already exists a comment, so this would require an update statement. If you don't already know SQL, you might want to go through the SQLite tutorial. So our logic initially:
if score >= 2: existing_comment_score = find_existing_score(parent_id) if existing_comment_score: if score > existing_comment_score: if acceptable(body): sql_insert_replace_comment(comment_id,parent_id,parent_data,body,subreddit,created_utc,score)
Now, we need to build the sql_insert_replace_comment
function:
def sql_insert_replace_comment(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """UPDATE parent_reply SET parent_id = ?, comment_id = ?, parent = ?, comment = ?, subreddit = ?, unix = ?, score = ? WHERE parent_id =?;""".format(parentid, commentid, parent, comment, subreddit, int(time), score, parentid) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e))
That covers a situation where a comment is already paired with a parent, but we also need to cover comments that don't have parents (but might be a parent to another comment!) and comments that do have parents and those parents don't already have a reply. We can further build out insertion block:
if score >= 2: existing_comment_score = find_existing_score(parent_id) if existing_comment_score: if score > existing_comment_score: if acceptable(body): sql_insert_replace_comment(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) else: if acceptable(body): if parent_data: sql_insert_has_parent(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) paired_rows += 1 else: sql_insert_no_parent(comment_id,parent_id,body,subreddit,created_utc,score)
Now we need to build the sql_insert_has_parent
and sql_insert_no_parent
functions:
def sql_insert_has_parent(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, parent, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}","{}",{},{});""".format(parentid, commentid, parent, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def sql_insert_no_parent(commentid,parentid,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}",{},{});""".format(parentid, commentid, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e))
So we can see where we are during our iteration, we'll output every 100,000 rows of data some information:
if row_counter % 100000 == 0: print('Total Rows Read: {}, Paired Rows: {}, Time: {}'.format(row_counter, paired_rows, str(datetime.now())))
Finally, the last part of our code that we need now is that we need to build the transaction_bldr
function. This function is used to build up insertion statements and commit them in groups, rather than one-by-one. Doing it this way will be much much quicker:
def transaction_bldr(sql): global sql_transaction sql_transaction.append(sql) if len(sql_transaction) > 1000: c.execute('BEGIN TRANSACTION') for s in sql_transaction: try: c.execute(s) except: pass connection.commit() sql_transaction = []
Yeah, I am using a global.
Full code up to this point:
import sqlite3 import json from datetime import datetime timeframe = '2015-05' sql_transaction = [] connection = sqlite3.connect('{}.db'.format(timeframe)) c = connection.cursor() def create_table(): c.execute("CREATE TABLE IF NOT EXISTS parent_reply(parent_id TEXT PRIMARY KEY, comment_id TEXT UNIQUE, parent TEXT, comment TEXT, subreddit TEXT, unix INT, score INT)") def format_data(data): data = data.replace('\n',' newlinechar ').replace('\r',' newlinechar ').replace('"',"'") return data def transaction_bldr(sql): global sql_transaction sql_transaction.append(sql) if len(sql_transaction) > 1000: c.execute('BEGIN TRANSACTION') for s in sql_transaction: try: c.execute(s) except: pass connection.commit() sql_transaction = [] def sql_insert_replace_comment(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """UPDATE parent_reply SET parent_id = ?, comment_id = ?, parent = ?, comment = ?, subreddit = ?, unix = ?, score = ? WHERE parent_id =?;""".format(parentid, commentid, parent, comment, subreddit, int(time), score, parentid) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def sql_insert_has_parent(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, parent, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}","{}",{},{});""".format(parentid, commentid, parent, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def sql_insert_no_parent(commentid,parentid,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}",{},{});""".format(parentid, commentid, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def acceptable(data): if len(data.split(' ')) > 50 or len(data) < 1: return False elif len(data) > 1000: return False elif data == '[deleted]': return False elif data == '[removed]': return False else: return True def find_parent(pid): try: sql = "SELECT comment FROM parent_reply WHERE comment_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False def find_existing_score(pid): try: sql = "SELECT score FROM parent_reply WHERE parent_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False if __name__ == '__main__': create_table() row_counter = 0 paired_rows = 0 with open('J:/chatdata/reddit_data/{}/RC_{}'.format(timeframe.split('-')[0],timeframe), buffering=1000) as f: for row in f: row_counter += 1 row = json.loads(row) parent_id = row['parent_id'] body = format_data(row['body']) created_utc = row['created_utc'] score = row['score'] comment_id = row['name'] subreddit = row['subreddit'] parent_data = find_parent(parent_id) if score >= 2: existing_comment_score = find_existing_score(parent_id) if existing_comment_score: if score > existing_comment_score: if acceptable(body): sql_insert_replace_comment(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) else: if acceptable(body): if parent_data: sql_insert_has_parent(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) paired_rows += 1 else: sql_insert_no_parent(comment_id,parent_id,body,subreddit,created_utc,score) if row_counter % 100000 == 0: print('Total Rows Read: {}, Paired Rows: {}, Time: {}'.format(row_counter, paired_rows, str(datetime.now())))
Now you can begin running this. Output over time should look like:
Total Rows Read: 100000, Paired Rows: 3221, Time: 2017-11-14 15:14:33.748595 Total Rows Read: 200000, Paired Rows: 8071, Time: 2017-11-14 15:14:55.342929 Total Rows Read: 300000, Paired Rows: 13697, Time: 2017-11-14 15:15:18.035447 Total Rows Read: 400000, Paired Rows: 19723, Time: 2017-11-14 15:15:40.311376 Total Rows Read: 500000, Paired Rows: 25643, Time: 2017-11-14 15:16:02.045075
Getting through all of the data will depend on the size of the starting file. Inserting will slow down the larger the database gets. To do the entire May 2015 file, it will probably take 5-10 hrs.
Once you've gone through the file(s) that you want, we're ready to convert to training data for our model, which is what we'll be doing in the next tutorial.
If you're training much larger datasets, you may find there is significant bloat that we need to handle for. This is because only about 10% of the comments are getting paired, so a large % of our database is not actually going to be used. I use the following additional code:
if row_counter % cleanup == 0: print("Cleanin up!") sql = "DELETE FROM parent_reply WHERE parent IS NULL" c.execute(sql) connection.commit() c.execute("VACUUM") connection.commit()
Directly below the other counter. This requires a new cleanup
variable, which specifies how many rows before you "cleanup." This will remove bloat to our database and keep insertion speeds fairly high. Each "cleanup" seems to cost about 2K pairs, pretty much wherever you put it. If it's every 100K rows, that'll cost you 2K pairs per 100K rows. I went with 1 million. Another option you have is to clean every 1 million rows, but clean not the last 1 million, but instead the last -1,100,000 to the -100,000th row, since it seems those 2K pairs are happening in the last 100K. Even with this though, you will still lose some pairs. I felt like 2K pairs, out of 100K pairs per 1 million rows was negligible and not important. I also added a start_row variable, so I could start and stop database inserting while trying to improve the speeds a bit. The c.execute("VACUUM")
is an SQL command to shrink the size of the database down to what it ought to me. This actually probably isn't required, and you might want to only do this at the very end. I didn't test how long this operation takes. I mostly just did it so I could see immediately after a delete what the size of the database was.
So full code:
import sqlite3 import json from datetime import datetime import time timeframe = '2017-03' sql_transaction = [] start_row = 0 cleanup = 1000000 connection = sqlite3.connect('{}.db'.format(timeframe)) c = connection.cursor() def create_table(): c.execute("CREATE TABLE IF NOT EXISTS parent_reply(parent_id TEXT PRIMARY KEY, comment_id TEXT UNIQUE, parent TEXT, comment TEXT, subreddit TEXT, unix INT, score INT)") def format_data(data): data = data.replace('\n',' newlinechar ').replace('\r',' newlinechar ').replace('"',"'") return data def transaction_bldr(sql): global sql_transaction sql_transaction.append(sql) if len(sql_transaction) > 1000: c.execute('BEGIN TRANSACTION') for s in sql_transaction: try: c.execute(s) except: pass connection.commit() sql_transaction = [] def sql_insert_replace_comment(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """UPDATE parent_reply SET parent_id = ?, comment_id = ?, parent = ?, comment = ?, subreddit = ?, unix = ?, score = ? WHERE parent_id =?;""".format(parentid, commentid, parent, comment, subreddit, int(time), score, parentid) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def sql_insert_has_parent(commentid,parentid,parent,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, parent, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}","{}",{},{});""".format(parentid, commentid, parent, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def sql_insert_no_parent(commentid,parentid,comment,subreddit,time,score): try: sql = """INSERT INTO parent_reply (parent_id, comment_id, comment, subreddit, unix, score) VALUES ("{}","{}","{}","{}",{},{});""".format(parentid, commentid, comment, subreddit, int(time), score) transaction_bldr(sql) except Exception as e: print('s0 insertion',str(e)) def acceptable(data): if len(data.split(' ')) > 1000 or len(data) < 1: return False elif len(data) > 32000: return False elif data == '[deleted]': return False elif data == '[removed]': return False else: return True def find_parent(pid): try: sql = "SELECT comment FROM parent_reply WHERE comment_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False def find_existing_score(pid): try: sql = "SELECT score FROM parent_reply WHERE parent_id = '{}' LIMIT 1".format(pid) c.execute(sql) result = c.fetchone() if result != None: return result[0] else: return False except Exception as e: #print(str(e)) return False if __name__ == '__main__': create_table() row_counter = 0 paired_rows = 0 #with open('J:/chatdata/reddit_data/{}/RC_{}'.format(timeframe.split('-')[0],timeframe), buffering=1000) as f: with open('/home/paperspace/reddit_comment_dumps/RC_{}'.format(timeframe), buffering=1000) as f: for row in f: #print(row) #time.sleep(555) row_counter += 1 if row_counter > start_row: try: row = json.loads(row) parent_id = row['parent_id'].split('_')[1] body = format_data(row['body']) created_utc = row['created_utc'] score = row['score'] comment_id = row['id'] subreddit = row['subreddit'] parent_data = find_parent(parent_id) existing_comment_score = find_existing_score(parent_id) if existing_comment_score: if score > existing_comment_score: if acceptable(body): sql_insert_replace_comment(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) else: if acceptable(body): if parent_data: if score >= 2: sql_insert_has_parent(comment_id,parent_id,parent_data,body,subreddit,created_utc,score) paired_rows += 1 else: sql_insert_no_parent(comment_id,parent_id,body,subreddit,created_utc,score) except Exception as e: print(str(e)) if row_counter % 100000 == 0: print('Total Rows Read: {}, Paired Rows: {}, Time: {}'.format(row_counter, paired_rows, str(datetime.now()))) if row_counter > start_row: if row_counter % cleanup == 0: print("Cleanin up!") sql = "DELETE FROM parent_reply WHERE parent IS NULL" c.execute(sql) connection.commit() c.execute("VACUUM") connection.commit()