133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
import os
|
|
from dotenv import load_dotenv
|
|
import mysql.connector
|
|
import json
|
|
from settings.logger import log_function_call, logger
|
|
|
|
logger.info('START | 0/4 | Load dotenv')
|
|
|
|
load_dotenv()
|
|
|
|
try:
|
|
logger.info('START | 1/4 | Load DB')
|
|
mydb = mysql.connector.connect(
|
|
host=os.getenv('DB_HOST'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PW'),
|
|
database='vok1no'
|
|
)
|
|
except mysql.connector.Error as e:
|
|
print(f"Ошибка при подключении к БД: {e}")
|
|
mydb = None
|
|
|
|
logger.info('START | 2/4 | Load sql functions')
|
|
@log_function_call
|
|
def update_sql_connect():
|
|
global mydb # Указываем, что используем глобальную переменную
|
|
|
|
if mydb is not None:
|
|
try:
|
|
mydb.close()
|
|
except Exception as e:
|
|
print(f"Ошибка при закрытии соединения: {e}")
|
|
|
|
try:
|
|
mydb = mysql.connector.connect(
|
|
host=os.getenv('DB_HOST'),
|
|
user=os.getenv('DB_USER'),
|
|
password=os.getenv('DB_PW'),
|
|
database='vok1no'
|
|
)
|
|
except mysql.connector.Error as e:
|
|
print(f"Ошибка при переподключении к БД: {e}")
|
|
mydb = None
|
|
|
|
def sql_select(sql, val=None):
|
|
try:
|
|
with mydb.cursor() as new_cursor:
|
|
if val is None:
|
|
new_cursor.execute(sql)
|
|
else:
|
|
new_cursor.execute(sql, val)
|
|
data = new_cursor.fetchall()
|
|
new_cursor.close()
|
|
return data
|
|
except mysql.connector.Error as e:
|
|
raise SystemError(f'Ошибка sql_select: {str(e)}')
|
|
|
|
def sql_insert(sql, val=None):
|
|
try:
|
|
with mydb.cursor() as new_cursor:
|
|
if val is None:
|
|
new_cursor.execute(sql)
|
|
else:
|
|
# Преобразуем все вложенные структуры в JSON
|
|
processed_val = []
|
|
for item in val:
|
|
if isinstance(item, (list, tuple, dict)):
|
|
processed_val.append(json.dumps(item))
|
|
else:
|
|
processed_val.append(item)
|
|
|
|
new_cursor.execute(sql, processed_val)
|
|
|
|
mydb.commit()
|
|
data = new_cursor.lastrowid
|
|
new_cursor.close()
|
|
return data
|
|
except mysql.connector.Error as e:
|
|
raise SystemError(f'Ошибка sql_insert: {str(e)}')
|
|
|
|
def sql_delete(sql, val=None):
|
|
try:
|
|
with mydb.cursor() as new_cursor:
|
|
if val is None:
|
|
new_cursor.execute(sql)
|
|
else:
|
|
new_cursor.execute(sql, val)
|
|
mydb.commit()
|
|
new_cursor.close()
|
|
return True
|
|
except mysql.connector.Error as e:
|
|
raise SystemError(f'Ошибка sql_delete: {str(e)}')
|
|
|
|
def sql_update(sql, val=None):
|
|
try:
|
|
with mydb.cursor() as new_cursor:
|
|
if val is None:
|
|
new_cursor.execute(sql)
|
|
else:
|
|
new_cursor.execute(sql, val)
|
|
mydb.commit()
|
|
|
|
return True
|
|
except mysql.connector.Error as e:
|
|
raise SystemError(f'Ошибка sql_update: {str(e)}')
|
|
|
|
|
|
# Пример использования
|
|
# t = 'INSERT INTO sites_logs (site, page, data) VALUES (%s, %s, %s)'
|
|
# v = ('test', 'vok1no', [1, 'Vok1no', 89913, 0, 1, 'vok'])
|
|
|
|
# try:
|
|
# res = sql_insert(t, v)
|
|
# print(f"ID новой записи: {res}")
|
|
# except SystemError as e:
|
|
# print(e)
|
|
|
|
|
|
# try:
|
|
# res = sql_select('SELECT * FROM sites_logs')
|
|
# print(len(res))
|
|
# except SystemError as e:
|
|
# print(e)
|
|
|
|
|
|
# @log_function_call
|
|
# def test_error():
|
|
# result = 10 / 0
|
|
# return result
|
|
|
|
|
|
# update_sql_connect()
|
|
# test_error() |