-
Notifications
You must be signed in to change notification settings - Fork 3
/
app2.py
139 lines (112 loc) · 4.34 KB
/
app2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from profiledb.comments import Comment
from profiledb.follower import Follower
from profiledb.engagement import Engagement
from profiledb.post import Post
from profiledb.user import User
from profiledb.profiledb import ProfileDatabase
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text
import pandas as pd
# Initialize database tables
user_db = User()
post_db = Post()
comment_db = Comment()
engagement_db = Engagement()
follower_db = Follower()
def drop_tables_in_order():
try:
# Get a connection from the engine
with ProfileDatabase.get_engine().connect() as connection:
# Disable foreign key checks to allow dropping tables with dependencies
connection.execute(text("SET FOREIGN_KEY_CHECKS = 0;"))
# Drop tables in reverse order of foreign key dependencies
Engagement(drop=True) # No foreign key dependencies
Comment(drop=True) # Depends on Post and User
Follower(drop=True) # Depends on User
Post(drop=True) # Depends on User
User(drop=True) # Independent
print("All tables dropped successfully.")
# Re-enable foreign key checks after dropping tables
connection.execute(text("SET FOREIGN_KEY_CHECKS = 1;"))
except SQLAlchemyError as e:
print(f"Error dropping tables: {e}")
raise
def load_data_from_csv(file_path, table_class):
"""
General function to load data from CSV into the corresponding table.
Args:
file_path (str): Path to the CSV file.
table_class (BaseProfileTable): The class corresponding to the table (User, Post, Comment, etc.)
"""
# Read the CSV data using pandas
data_df = pd.read_csv(file_path)
# Initialize the table object
table_obj = table_class()
def load_users_from_csv(file_path):
data_df = pd.read_csv(file_path)
user_db = User()
for _, row in data_df.iterrows():
user_db.write(
user_id=row['user_id'],
username=row['username'],
bio=row['bio'],
followers_count=row['followers_count'],
following_count=row['following_count'],
location=row['location'],
is_influential=row['is_influential']
)
print(f"Loaded {len(data_df)} records into the User table.")
def load_posts_from_csv(file_path):
data_df = pd.read_csv(file_path)
post_db = Post()
for _, row in data_df.iterrows():
post_db.write(
user_id=row['user_id'],
post_id=row['image_id']
media_type= "img",
media_url=row['media_url'],
caption=row['caption']
)
print(f"Loaded {len(data_df)} records into the Post table.")
def load_comments_from_csv(file_path):
data_df = pd.read_csv(file_path)
comment_db = Comment()
for _, row in data_df.iterrows():
comment_db.write(
post_id=row['post_id'],
user_id=row['user_id'],
message=row['message'],
like_count=row['like_count']
)
print(f"Loaded {len(data_df)} records into the Comment table.")
def load_engagements_from_csv(file_path):
data_df = pd.read_csv(file_path)
engagement_db = Engagement()
for _, row in data_df.iterrows():
engagement_db.write(
post_id=row['post_id'],
likes_count=row['likes_count'],
comments_count=row['comments_count'],
shares_count=row['shares_count'],
video_completion_rate=row['video_completion_rate']
)
print(f"Loaded {len(data_df)} records into the Engagement table.")
def main():
drop_tables_in_order()
# Load data into each table from the corresponding CSV file
load_users_from_csv('real_data/users.csv')
load_posts_from_csv('real_data/images.csv')
# load_comments_from_csv('data/Dummy_Comments_Data.csv')
# load_engagements_from_csv('data/Dummy_Engagement_Data.csv')
# Optionally, read and print data for validation
user_db = User()
print("Users:", user_db.read())
post_db = Post()
print("Posts:", post_db.read())
comment_db = Comment()
print("Comments:", comment_db.read())
engagement_db = Engagement()
print("Engagements:", engagement_db.read())
ProfileDatabase.close_connection()
if __name__ == "__main__":
main()