# TOML

tomllib — the built‑in TOML parser introduced in Python 3.11.
For Python 3.6.1, use the third‑party toml library instead.

Python 3.13

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import tomllib


class Config:
def __init__(self, path: str):
with open(path, "rb") as f:
self.data = tomllib.load(f)

@property
def db(self):
return self.data["database"]

@property
def mail(self):
return self.data["mail"]

@property
def api(self):
return self.data["api"]

@property
def app(self):
return self.data["app"]

Python 3.61

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import toml


class Config:
def __init__(self, path: str):
with open(path, "r") as f:
self.data = toml.load(f)

@property
def db(self):
return self.data["database"]

@property
def mail(self):
return self.data["mail"]

@property
def api(self):
return self.data["api"]

@property
def app(self):
return self.data["app"]

# Mailer

mimetypes.guess_type(url, strict=True) does not accept a pathlib.Path object in Python 3.6.1. Use the string version of the path instead. Deprecated since version 3.13: Passing a file path instead of URL is soft deprecated. Use guess_file_type() for this.

``list[str] | None = None is not supported in Python 3.6.1. Use Optional[List[str]] = None` instead.

Python 3.13

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from pathlib import Path
import smtplib
import mimetypes
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText


class Mailer:
def __init__(
self,
host: str,
port: int,
sender: str,
normal_recipient: str,
error_recipient: str,
exception_recipient: str,
subject_template: str,
):
self.host = host
self.port = port
self.sender = sender
self.recipients = {
"NORMAL": normal_recipient,
"ERROR": error_recipient,
"EXCEPTION": exception_recipient,
}
self.subject_template = subject_template

def send_mail(
self,
subject: str,
message: str,
attachments: list[str] | None = None,
level: str = "NORMAL",
):
if level not in self.recipients:
raise ValueError(f"Invalid mail level: {level}")

recipient = self.recipients[level]

msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = self.sender
msg["To"] = recipient

msg.attach(MIMEText(message, "plain"))

if attachments:
for file_path in attachments:
self._attach_file(msg, file_path)

with smtplib.SMTP(self.host, self.port) as server:
server.sendmail(self.sender, recipient.split(","), msg.as_string())

def _attach_file(self, msg: MIMEMultipart, file_path: str):
path = Path(file_path)

if not path.is_file():
raise FileNotFoundError(f"Attachment not found: {file_path}")

mime_type, _ = mimetypes.guess_type(path)
main_type, sub_type = (mime_type or "application/octet-stream").split("/", 1)

with open(path, "rb") as f:
part = MIMEBase(main_type, sub_type)
part.set_payload(f.read())
encoders.encode_base64(part)

part.add_header("Content-Disposition", f'attachment; filename="{path.name}"')
msg.attach(part)

Python 3.6.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import os
import smtplib
import mimetypes
from email import encoders
from email.mime.base import MIMEBase
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List, Optional


class Mailer:
def __init__(
self,
host: str,
port: int,
sender: str,
normal_recipient: str,
error_recipient: str,
exception_recipient: str,
subject_template: str,
):
self.host = host
self.port = port
self.sender = sender
self.recipients = {
"NORMAL": normal_recipient,
"ERROR": error_recipient,
"EXCEPTION": exception_recipient,
}
self.subject_template = subject_template

def send_mail(
self,
subject: str,
message: str,
attachments: Optional[List[str]] = None,
level: str = "NORMAL",
):
if level not in self.recipients:
raise ValueError(f"Invalid mail level: {level}")

recipient = self.recipients[level]

msg = MIMEMultipart()
msg["Subject"] = subject
msg["From"] = self.sender
msg["To"] = recipient

msg.attach(MIMEText(message, "plain"))

if attachments:
for file_path in attachments:
self._attach_file(msg, file_path)

with smtplib.SMTP(self.host, self.port) as server:
server.sendmail(self.sender, recipient.split(","), msg.as_string())

def _attach_file(self, msg: MIMEMultipart, file_path: str):
if not os.path.isfile(file_path):
raise FileNotFoundError(f"Attachment not found: {file_path}")

mime_type, _ = mimetypes.guess_type(file_path)
main_type, sub_type = (mime_type or "application/octet-stream").split("/", 1)

with open(file_path, "rb") as f:
part = MIMEBase(main_type, sub_type)
part.set_payload(f.read())
encoders.encode_base64(part)

part.add_header(
"Content-Disposition",
f'attachment; filename="{os.path.basename(file_path)}"',
)
msg.attach(part)

# MSSQL Database

For Python 3.6.1, running pip install mssql-Python results in the error “Could not find a version that satisfies the requirement mssql-Python”. Use pyodbc instead.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import mssql_Python
from datetime import datetime
from error import CustomError


class DatabaseClient:
def __init__(self, server: str, database: str, uid: str, pwd: str):
self.server = server
self.database = database
self.uid = uid
self.pwd = pwd

def connect(self):
try:
return mssql_Python.connect(
server=self.server,
database=self.database,
uid=self.uid,
pwd=self.pwd,
trust_server_certificate="yes",
)
except Exception as e:
raise CustomError(f"Database connection failed: {e}")

def try_connection(self):
try:
with self.connect() as conn:
conn.cursor().execute("SELECT 1")
except Exception as e:
raise CustomError(f"Database try query failed: {e}")

def get_queue_data(self, json_file_name: str) -> tuple:
query = """
SELECT id,
interface_name,
interface_version
FROM shpe_control_table
WHERE json_file_name = ?
"""

with self.connect() as conn:
cursor = conn.cursor()
cursor.execute(query, (json_file_name,))
rows = cursor.fetchall()

if len(rows) != 1:
raise CustomError(
f"Failed to retrieve queue data for '{json_file_name}'."
f"Expected 1 record but found {len(rows)}."
)

id, interface_name, interface_version = rows[0]

if any(
value in ("", None) for value in (id, interface_name, interface_version)
):
raise CustomError(
f"Queue data contains empty or null values: "
f"id={id}, interface_name={interface_name}, interface_version={interface_version}"
)
return id, interface_name, interface_version

def update_send_status(self, json_file_name: str, error_message: str):
query = """
UPDATE shpe_control_table
SET is_send = ?,
send_date = ?,
error_message = ?
WHERE json_file_name = ?
"""

with self.connect() as conn:
cursor = conn.cursor()
cursor.execute(query, (1, datetime.now(), error_message, json_file_name))

conn.commit()

Python 3.6.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import pyodbc
from datetime import datetime
from error import CustomError


class DatabaseClient:
def __init__(self, server: str, database: str, uid: str, pwd: str):
self.server = server
self.database = database
self.uid = uid
self.pwd = pwd

def connect(self):
try:
conn_str = (
f"DRIVER={{SQL Server}};"
f"SERVER={self.server};"
f"DATABASE={self.database};"
f"UID={self.uid};"
f"PWD={self.pwd};"
f"TrustServerCertificate=yes;"
)
return pyodbc.connect(conn_str)
except Exception as e:
raise CustomError(f"Database connection failed: {e}")

def try_connection(self):
try:
with self.connect() as conn:
conn.cursor().execute("SELECT 1")
except Exception as e:
raise CustomError(f"Database try query failed: {e}")

def get_queue_data(self, json_file_name: str) -> tuple:
query = """
SELECT id,
interface_name,
interface_version
FROM shpe_control_table
WHERE json_file_name = ?
"""

with self.connect() as conn:
cursor = conn.cursor()
cursor.execute(query, json_file_name)
rows = cursor.fetchall()

if len(rows) != 1:
raise CustomError(
f"Failed to retrieve queue data id, interface_name, interface_version for '{json_file_name}'."
f"Expected 1 record but found {len(rows)}."
)

id, interface_name, interface_version = rows[0]

if any(
value in ("", None) for value in (id, interface_name, interface_version)
):
raise CustomError(
f"Queue data contains empty or null values: "
f"id={id}, interface_name={interface_name}, interface_version={interface_version}"
)
return id, interface_name, interface_version

def update_send_status(self, json_file_name: str, error_message: str):
query = """
UPDATE shpe_control_table
SET is_send = ?,
send_date = ?,
error_message = ?
WHERE json_file_name = ?
"""

with self.connect() as conn:
cursor = conn.cursor()
cursor.execute(query, (1, datetime.now(), error_message, json_file_name))

conn.commit()

# Move Files

for file in json_folder.glob("*.json") json_folder is remote direcorty //172.0.0.1/FineReport/data

On Windows with Python 3.13, the file names preserve their original case.

On Windows Server with Python 3.6.1, the file names are returned in lowercase regardless of the actual casing.

Switch to for name in os.listdir(json_folder) to preserve the original filename case.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
json_folder = Path(self.config.app["json_folder"])
work_folder = Path(self.config.app["work_folder"])

backup_folder = Path(self.config.app["backup_folder"])
error_folder = Path(self.config.app["error_folder"])
pending_folder = Path(self.config.app["pending_folder"])

# Move new files into work folder
# for file in json_folder.glob("*.json"):
# # Window server + Python 3.6.1
# # Whatever the original file name is, the file name will be lowercase.
# print(file.name)
# shutil.move(str(file), work_folder / file.name)
for name in os.listdir(json_folder):
if name.endswith("*.json"):
shutil.move(
os.path.join(json_folder, name), os.path.join(work_folder, name)
)

# Sort by modification time
json_files = sorted(
(f for f in work_folder.iterdir() if f.suffix == ".json"),
key=lambda f: f.stat().st_mtime,
)
Edited on