import ssl
import time
import smtplib
import psycopg2
from os import environ, listdir
from bcrypt import gensalt, hashpw
from email.message import EmailMessage
def fetch_articles(email):
counts_table_name = "article_counts"
user_articles_table_name = "users_articles"
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
try:
cur.execute(
f"SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='{user_articles_table_name}')")
if not cur.fetchone()[0]:
cur.execute(f"CREATE TABLE {user_articles_table_name} (email varchar, article varchar);")
cur.execute(f"SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='{counts_table_name}')")
if not cur.fetchone()[0]:
cur.execute(f"CREATE TABLE {counts_table_name} (article varchar, visits integer);")
files = listdir("survey/articles")
for file in files:
cur.execute(f"INSERT INTO {counts_table_name} VALUES ('{file}', 0);")
chosen_articles = []
cur.execute(f"\
SELECT {counts_table_name}.article, visits\
FROM {counts_table_name}\
LEFT JOIN {user_articles_table_name}\
ON {user_articles_table_name}.email = '{email}' AND\
{counts_table_name}.article = {user_articles_table_name}.article\
WHERE {user_articles_table_name}.article IS NULL\
ORDER BY visits LIMIT 1;")
for row in cur.fetchall():
with open(f"survey/articles/{row[0]}", "r") as article_file:
current_article = article_file.read()
chosen_articles.append((row[0], current_article))
for chosen_article in chosen_articles:
cur.execute(f"UPDATE {counts_table_name} SET visits=visits+1 WHERE article='{chosen_article[0]}'")
conn.commit()
finally:
cur.close()
conn.close()
return chosen_articles
def fetch_countries():
try:
with open("survey/countries.txt") as f:
countries = [c.strip() for c in f.readlines()]
except Exception:
countries = ["Default"]
return countries
def send_confirmation_email(receiver_email, code):
sender_email = "stance.annotator@gmail.com"
message = EmailMessage()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "Stance Annotator Code"
html_message = f"""\
To whom it may concern,
We would like to thank-you for your participation in the Stance Annotator task.
Please use the following code to login:
{code}
Kind regards,
Stance Annotator Team
"""
message.set_content(html_message, subtype="html")
try:
with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=ssl.create_default_context()) as server:
server.login(sender_email, "idpscqicyblagqtu")
server.send_message(message)
#server.sendmail(sender_email, receiver_email, message)
except Exception as e:
print(e)
return False
return True
def encrypt_code(code):
encoding = "utf-8"
encoded_code = str(code).encode(encoding)
salt = gensalt()
hashed_code = hashpw(encoded_code, salt)
decoded_hashed_code = hashed_code.decode(encoding)
return decoded_hashed_code
def insert_code(new_email, new_code):
table_name = "users"
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
success = True
try:
cur.execute(f"SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='{table_name}')")
if not cur.fetchone()[0]:
cur.execute(f"CREATE TABLE {table_name} (email varchar, code varchar, info boolean);")
cur.execute(f"INSERT INTO {table_name} (email, code, info) VALUES ('{new_email}', '{new_code}', false);")
conn.commit()
except Exception as e:
print(e)
success = False
finally:
cur.close()
conn.close()
return success
def generate_code(base):
code_size = 13
code = ""
for i, letter in enumerate(base):
code += str(ord(letter))
reversed_code = code[::-1]
multiplied_code = ""
for i in range(0, len(code)):
multiplied_code += str(int(code[i]) * int(reversed_code[i]))
trimmed_code = multiplied_code[:code_size]
final_code = int(trimmed_code) + round(time.time() * 1000)
return final_code
def fetch_code(user_email):
table_name = "users"
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
try:
cur.execute(f"SELECT EXISTS(SELECT * FROM information_schema.tables WHERE table_name='{table_name}')")
if not cur.fetchone()[0]:
result = None
else:
cur.execute(f"SELECT code FROM {table_name} WHERE email='{user_email}';")
result = cur.fetchone()
result = result if not result else result[0]
finally:
cur.close()
conn.close()
return result
def mark_info(user_email):
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
try:
cur.execute(f"UPDATE users SET info=true WHERE email='{user_email}'")
conn.commit()
finally:
cur.close()
conn.close()
def info_provided(user_email):
table_name = "users"
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
try:
cur.execute(f"SELECT info FROM {table_name} WHERE email='{user_email}';")
result = cur.fetchone()[0]
finally:
cur.close()
conn.close()
return result
def insert_email_article_pair(user_email, article_id):
table_name = "users_articles"
db_url = environ.get('DATABASE_URL') or "postgres://postgres:postgres@localhost:5432/postgres"
conn = psycopg2.connect(db_url, sslmode='require')
cur = conn.cursor()
success = True
try:
cur.execute(f"INSERT INTO {table_name} (email, article) VALUES ('{user_email}', '{article_id}');")
conn.commit()
except Exception as e:
print(e)
success = False
finally:
cur.close()
conn.close()
return success