82 lines
2.5 KiB
Python
82 lines
2.5 KiB
Python
# scheduler.py
|
|
|
|
import datetime
|
|
import pytz
|
|
from disnake.ext import tasks
|
|
|
|
MOSCOW_TZ = pytz.timezone("Europe/Moscow")
|
|
|
|
SCHEDULED_EVENTS = [
|
|
{
|
|
"name": "ECHO TTvT",
|
|
"day": 4,
|
|
"create_hour": 8,
|
|
"create_minute": 0,
|
|
"event_hour": 19,
|
|
"event_minute": 30,
|
|
"key": "echottvt"
|
|
},
|
|
{
|
|
"name": "AS VDV PSG",
|
|
"day": 4,
|
|
"create_hour": 22,
|
|
"create_minute": 0,
|
|
"event_hour": 16,
|
|
"event_minute": 30,
|
|
"key": "asvdvpsg"
|
|
},
|
|
{
|
|
"name": "ECHO TvT",
|
|
"day": 5,
|
|
"create_hour": 8,
|
|
"create_minute": 0,
|
|
"event_hour": 19,
|
|
"event_minute": 30,
|
|
"key": "echotvt_saturday"
|
|
}
|
|
]
|
|
|
|
def get_next_datetime(now, target_day, hour, minute):
|
|
days_ahead = (target_day - now.weekday()) % 7
|
|
next_date = now + datetime.timedelta(days=days_ahead)
|
|
return MOSCOW_TZ.localize(datetime.datetime.combine(next_date.date(), datetime.time(hour, minute)))
|
|
|
|
# Этот объект будет заполнен из main.py
|
|
event_loop_context = {}
|
|
AUTO_EVENTS_ENABLED = False
|
|
|
|
@tasks.loop(minutes=1)
|
|
async def scheduled_event_loop():
|
|
if not AUTO_EVENTS_ENABLED:
|
|
return # ⛔ автосоздание запрещено командой
|
|
|
|
now = datetime.datetime.now(MOSCOW_TZ)
|
|
|
|
create_poll = event_loop_context["create_poll"]
|
|
event_tasks = event_loop_context["event_tasks"]
|
|
generate_event_id = event_loop_context["generate_event_id"]
|
|
|
|
for event in SCHEDULED_EVENTS:
|
|
create_time = get_next_datetime(now, event["day"], event["create_hour"], event["create_minute"])
|
|
if abs((now - create_time).total_seconds()) < 60:
|
|
start_time = get_next_datetime(now, event["day"], event["event_hour"], event["event_minute"])
|
|
event_id = generate_event_id(event["name"], start_time)
|
|
if event_id in event_tasks:
|
|
print(f"⏩ Уже существует: {event_id}")
|
|
continue
|
|
|
|
info = {
|
|
"name": event["name"],
|
|
"start_time": start_time,
|
|
}
|
|
|
|
print(f"🆕 Автосоздание: {info['name']} в {start_time}")
|
|
await create_poll(info, message_id=event["key"])
|
|
|
|
|
|
|
|
def init_scheduler(create_poll_func, event_tasks_dict, generate_event_id_func):
|
|
event_loop_context["create_poll"] = create_poll_func
|
|
event_loop_context["event_tasks"] = event_tasks_dict
|
|
event_loop_context["generate_event_id"] = generate_event_id_func
|