Adding all files
This commit is contained in:
894
.local/lib/python3.14/site-packages/disnake/audit_logs.py
Normal file
894
.local/lib/python3.14/site-packages/disnake/audit_logs.py
Normal file
@@ -0,0 +1,894 @@
|
||||
# SPDX-License-Identifier: MIT
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
Any,
|
||||
Callable,
|
||||
ClassVar,
|
||||
Dict,
|
||||
Generator,
|
||||
List,
|
||||
Mapping,
|
||||
Optional,
|
||||
Tuple,
|
||||
Type,
|
||||
TypeVar,
|
||||
Union,
|
||||
cast,
|
||||
)
|
||||
|
||||
from . import abc, enums, flags, utils
|
||||
from .app_commands import ApplicationCommandPermissions
|
||||
from .asset import Asset
|
||||
from .automod import AutoModAction, AutoModTriggerMetadata, _automod_action_factory
|
||||
from .colour import Colour
|
||||
from .invite import Invite
|
||||
from .mixins import Hashable
|
||||
from .object import Object
|
||||
from .partial_emoji import PartialEmoji
|
||||
from .permissions import PermissionOverwrite, Permissions
|
||||
from .threads import ForumTag, Thread
|
||||
|
||||
__all__ = (
|
||||
"AuditLogDiff",
|
||||
"AuditLogChanges",
|
||||
"AuditLogEntry",
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import datetime
|
||||
|
||||
from .app_commands import APIApplicationCommand
|
||||
from .automod import AutoModRule
|
||||
from .emoji import Emoji
|
||||
from .guild import Guild
|
||||
from .guild_scheduled_event import GuildScheduledEvent
|
||||
from .integrations import PartialIntegration
|
||||
from .member import Member
|
||||
from .role import Role
|
||||
from .stage_instance import StageInstance
|
||||
from .sticker import GuildSticker
|
||||
from .types.audit_log import (
|
||||
AuditLogChange as AuditLogChangePayload,
|
||||
AuditLogEntry as AuditLogEntryPayload,
|
||||
_AuditLogChange_ApplicationCommandPermissions as AuditLogChangeAppCmdPermsPayload,
|
||||
)
|
||||
from .types.automod import (
|
||||
AutoModAction as AutoModActionPayload,
|
||||
AutoModTriggerMetadata as AutoModTriggerMetadataPayload,
|
||||
)
|
||||
from .types.channel import (
|
||||
DefaultReaction as DefaultReactionPayload,
|
||||
PermissionOverwrite as PermissionOverwritePayload,
|
||||
)
|
||||
from .types.invite import Invite as InvitePayload
|
||||
from .types.role import Role as RolePayload
|
||||
from .types.snowflake import Snowflake
|
||||
from .types.threads import ForumTag as ForumTagPayload
|
||||
from .user import User
|
||||
from .webhook import Webhook
|
||||
|
||||
|
||||
def _transform_permissions(entry: AuditLogEntry, data: str) -> Permissions:
|
||||
return Permissions(int(data))
|
||||
|
||||
|
||||
def _transform_color(entry: AuditLogEntry, data: int) -> Colour:
|
||||
return Colour(data)
|
||||
|
||||
|
||||
def _transform_snowflake(entry: AuditLogEntry, data: Snowflake) -> int:
|
||||
return int(data)
|
||||
|
||||
|
||||
def _transform_channel(
|
||||
entry: AuditLogEntry, data: Optional[Snowflake]
|
||||
) -> Optional[Union[abc.GuildChannel, Object]]:
|
||||
if data is None:
|
||||
return None
|
||||
channel = entry.guild.get_channel(int(data))
|
||||
return channel or Object(id=data)
|
||||
|
||||
|
||||
def _transform_role(
|
||||
entry: AuditLogEntry, data: Optional[Snowflake]
|
||||
) -> Optional[Union[Role, Object]]:
|
||||
if data is None:
|
||||
return None
|
||||
role = entry.guild.get_role(int(data))
|
||||
return role or Object(id=data)
|
||||
|
||||
|
||||
def _transform_member_id(
|
||||
entry: AuditLogEntry, data: Optional[Snowflake]
|
||||
) -> Union[Member, User, Object, None]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry._get_member(int(data))
|
||||
|
||||
|
||||
def _transform_guild_id(entry: AuditLogEntry, data: Optional[Snowflake]) -> Optional[Guild]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry._state._get_guild(int(data))
|
||||
|
||||
|
||||
def _transform_overwrites(
|
||||
entry: AuditLogEntry, data: List[PermissionOverwritePayload]
|
||||
) -> List[Tuple[Union[Object, Member, Role, User], PermissionOverwrite]]:
|
||||
overwrites: List[Tuple[Union[Object, Member, Role, User], PermissionOverwrite]] = []
|
||||
for elem in data:
|
||||
allow = Permissions(int(elem["allow"]))
|
||||
deny = Permissions(int(elem["deny"]))
|
||||
ow = PermissionOverwrite.from_pair(allow, deny)
|
||||
|
||||
ow_type = elem["type"]
|
||||
ow_id = int(elem["id"])
|
||||
target = None
|
||||
if ow_type == 0:
|
||||
target = entry.guild.get_role(ow_id)
|
||||
elif ow_type == 1:
|
||||
target = entry._get_member(ow_id)
|
||||
|
||||
if target is None:
|
||||
target = Object(id=ow_id)
|
||||
|
||||
overwrites.append((target, ow))
|
||||
|
||||
return overwrites
|
||||
|
||||
|
||||
def _transform_icon(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
if entry.action.name.startswith("role_"):
|
||||
return Asset._from_role_icon(entry._state, entry._target_id, data) # type: ignore
|
||||
return Asset._from_guild_icon(entry._state, entry.guild.id, data)
|
||||
|
||||
|
||||
def _transform_avatar(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_avatar(entry._state, entry._target_id, data) # type: ignore
|
||||
|
||||
|
||||
def _guild_hash_transformer(path: str) -> Callable[[AuditLogEntry, Optional[str]], Optional[Asset]]:
|
||||
def _transform(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_guild_image(entry._state, entry.guild.id, data, path=path)
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _transform_tag(entry: AuditLogEntry, data: Optional[ForumTagPayload]) -> Optional[ForumTag]:
|
||||
if data is None:
|
||||
return None
|
||||
return ForumTag._from_data(data=data, state=entry._state)
|
||||
|
||||
|
||||
def _transform_tag_id(
|
||||
entry: AuditLogEntry, data: Optional[str]
|
||||
) -> Optional[Union[ForumTag, Object]]:
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
# cyclic imports
|
||||
from .channel import ThreadOnlyGuildChannel
|
||||
|
||||
tag: Optional[ForumTag] = None
|
||||
tag_id = int(data)
|
||||
thread = entry.target
|
||||
# try thread parent first
|
||||
if isinstance(thread, Thread) and isinstance(thread.parent, ThreadOnlyGuildChannel):
|
||||
tag = thread.parent.get_tag(tag_id)
|
||||
else:
|
||||
# if not found (possibly deleted thread), search all forum/media channels
|
||||
for channel in entry.guild._channels.values():
|
||||
if isinstance(channel, ThreadOnlyGuildChannel) and (tag := channel.get_tag(tag_id)):
|
||||
break
|
||||
|
||||
return tag or Object(id=tag_id)
|
||||
|
||||
|
||||
T = TypeVar("T")
|
||||
EnumT = TypeVar("EnumT", bound=enums.Enum)
|
||||
FlagsT = TypeVar("FlagsT", bound=flags.BaseFlags)
|
||||
|
||||
|
||||
def _enum_transformer(enum: Type[EnumT]) -> Callable[[AuditLogEntry, int], EnumT]:
|
||||
def _transform(entry: AuditLogEntry, data: int) -> EnumT:
|
||||
return enums.try_enum(enum, data)
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _flags_transformer(
|
||||
flags_type: Type[FlagsT],
|
||||
) -> Callable[[AuditLogEntry, Optional[int]], Optional[FlagsT]]:
|
||||
def _transform(entry: AuditLogEntry, data: Optional[int]) -> Optional[FlagsT]:
|
||||
return flags_type._from_value(data) if data is not None else None
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _list_transformer(
|
||||
func: Callable[[AuditLogEntry, Any], T],
|
||||
) -> Callable[[AuditLogEntry, Any], List[T]]:
|
||||
def _transform(entry: AuditLogEntry, data: Any) -> List[T]:
|
||||
if not data:
|
||||
return []
|
||||
return [func(entry, value) for value in data if value is not None]
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _transform_type(
|
||||
entry: AuditLogEntry, data: Any
|
||||
) -> Union[enums.ChannelType, enums.StickerType, enums.WebhookType, str, int]:
|
||||
action_name = entry.action.name
|
||||
if action_name.startswith("sticker_"):
|
||||
return enums.try_enum(enums.StickerType, data)
|
||||
elif action_name.startswith("webhook_"):
|
||||
return enums.try_enum(enums.WebhookType, data)
|
||||
elif action_name.startswith("integration_") or action_name.startswith("overwrite_"):
|
||||
# integration: str, overwrite: int
|
||||
return data
|
||||
else:
|
||||
return enums.try_enum(enums.ChannelType, data)
|
||||
|
||||
|
||||
def _transform_datetime(entry: AuditLogEntry, data: Optional[str]) -> Optional[datetime.datetime]:
|
||||
return utils.parse_time(data)
|
||||
|
||||
|
||||
def _transform_privacy_level(
|
||||
entry: AuditLogEntry, data: Optional[int]
|
||||
) -> Optional[Union[enums.StagePrivacyLevel, enums.GuildScheduledEventPrivacyLevel]]:
|
||||
if data is None:
|
||||
return None
|
||||
if entry.action.target_type == "guild_scheduled_event":
|
||||
return enums.try_enum(enums.GuildScheduledEventPrivacyLevel, data)
|
||||
return enums.try_enum(enums.StagePrivacyLevel, data)
|
||||
|
||||
|
||||
def _transform_guild_scheduled_event_image(
|
||||
entry: AuditLogEntry, data: Optional[str]
|
||||
) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_guild_scheduled_event_image(entry._state, entry._target_id, data) # type: ignore
|
||||
|
||||
|
||||
def _transform_automod_action(
|
||||
entry: AuditLogEntry, data: Optional[AutoModActionPayload]
|
||||
) -> Optional[AutoModAction]:
|
||||
if data is None:
|
||||
return None
|
||||
return _automod_action_factory(data)
|
||||
|
||||
|
||||
def _transform_automod_trigger_metadata(
|
||||
entry: AuditLogEntry, data: Optional[AutoModTriggerMetadataPayload]
|
||||
) -> Optional[AutoModTriggerMetadata]:
|
||||
if data is None:
|
||||
return None
|
||||
return AutoModTriggerMetadata._from_dict(data)
|
||||
|
||||
|
||||
def _transform_default_reaction(
|
||||
entry: AuditLogEntry, data: Optional[DefaultReactionPayload]
|
||||
) -> Optional[Union[Emoji, PartialEmoji]]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry._state._get_emoji_from_fields(
|
||||
name=data.get("emoji_name"),
|
||||
id=utils._get_as_snowflake(data, "emoji_id"),
|
||||
)
|
||||
|
||||
|
||||
class AuditLogDiff:
|
||||
def __len__(self) -> int:
|
||||
return len(self.__dict__)
|
||||
|
||||
def __iter__(self) -> Generator[Tuple[str, Any], None, None]:
|
||||
yield from self.__dict__.items()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
values = " ".join(f"{k!s}={v!r}" for k, v in self.__dict__.items())
|
||||
return f"<AuditLogDiff {values}>"
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
def __getattr__(self, item: str) -> Any: ...
|
||||
|
||||
def __setattr__(self, key: str, value: Any) -> Any: ...
|
||||
|
||||
|
||||
Transformer = Callable[["AuditLogEntry", Any], Any]
|
||||
|
||||
|
||||
class AuditLogChanges:
|
||||
# fmt: off
|
||||
TRANSFORMERS: ClassVar[Dict[str, Tuple[Optional[str], Optional[Transformer]]]] = {
|
||||
"verification_level": (None, _enum_transformer(enums.VerificationLevel)),
|
||||
"explicit_content_filter": (None, _enum_transformer(enums.ContentFilter)),
|
||||
"allow": (None, _transform_permissions),
|
||||
"deny": (None, _transform_permissions),
|
||||
"permissions": (None, _transform_permissions),
|
||||
"id": (None, _transform_snowflake),
|
||||
"application_id": (None, _transform_snowflake),
|
||||
"color": ("colour", _transform_color),
|
||||
"owner_id": ("owner", _transform_member_id),
|
||||
"inviter_id": ("inviter", _transform_member_id),
|
||||
"channel_id": ("channel", _transform_channel),
|
||||
"afk_channel_id": ("afk_channel", _transform_channel),
|
||||
"system_channel_id": ("system_channel", _transform_channel),
|
||||
"widget_channel_id": ("widget_channel", _transform_channel),
|
||||
"rules_channel_id": ("rules_channel", _transform_channel),
|
||||
"public_updates_channel_id": ("public_updates_channel", _transform_channel),
|
||||
"permission_overwrites": ("overwrites", _transform_overwrites),
|
||||
"splash_hash": ("splash", _guild_hash_transformer("splashes")),
|
||||
"banner_hash": ("banner", _guild_hash_transformer("banners")),
|
||||
"discovery_splash_hash": ("discovery_splash", _guild_hash_transformer("discovery-splashes")),
|
||||
"icon_hash": ("icon", _transform_icon),
|
||||
"avatar_hash": ("avatar", _transform_avatar),
|
||||
"rate_limit_per_user": ("slowmode_delay", None),
|
||||
"default_thread_rate_limit_per_user": ("default_thread_slowmode_delay", None),
|
||||
"guild_id": ("guild", _transform_guild_id),
|
||||
"tags": ("emoji", None),
|
||||
"unicode_emoji": ("emoji", None),
|
||||
"default_message_notifications": ("default_notifications", _enum_transformer(enums.NotificationLevel)),
|
||||
"communication_disabled_until": ("timeout", _transform_datetime),
|
||||
"image_hash": ("image", _transform_guild_scheduled_event_image),
|
||||
"video_quality_mode": (None, _enum_transformer(enums.VideoQualityMode)),
|
||||
"preferred_locale": (None, _enum_transformer(enums.Locale)),
|
||||
"privacy_level": (None, _transform_privacy_level),
|
||||
"format_type": (None, _enum_transformer(enums.StickerFormatType)),
|
||||
"entity_type": (None, _enum_transformer(enums.GuildScheduledEventEntityType)),
|
||||
"status": (None, _enum_transformer(enums.GuildScheduledEventStatus)),
|
||||
"type": (None, _transform_type),
|
||||
"flags": (None, _flags_transformer(flags.ChannelFlags)),
|
||||
"system_channel_flags": (None, _flags_transformer(flags.SystemChannelFlags)),
|
||||
"trigger_type": (None, _enum_transformer(enums.AutoModTriggerType)),
|
||||
"event_type": (None, _enum_transformer(enums.AutoModEventType)),
|
||||
"actions": (None, _list_transformer(_transform_automod_action)),
|
||||
"trigger_metadata": (None, _transform_automod_trigger_metadata),
|
||||
"exempt_roles": (None, _list_transformer(_transform_role)),
|
||||
"exempt_channels": (None, _list_transformer(_transform_channel)),
|
||||
"applied_tags": (None, _list_transformer(_transform_tag_id)),
|
||||
"available_tags": (None, _list_transformer(_transform_tag)),
|
||||
"default_reaction_emoji": ("default_reaction", _transform_default_reaction),
|
||||
"default_sort_order": (None, _enum_transformer(enums.ThreadSortOrder)),
|
||||
"sound_id": ("id", _transform_snowflake),
|
||||
}
|
||||
# fmt: on
|
||||
|
||||
def __init__(self, entry: AuditLogEntry, data: List[AuditLogChangePayload]) -> None:
|
||||
self.before = AuditLogDiff()
|
||||
self.after = AuditLogDiff()
|
||||
|
||||
has_emoji_fields = False
|
||||
|
||||
for elem in data:
|
||||
attr = elem["key"]
|
||||
|
||||
# special cases for role add/remove
|
||||
if attr == "$add":
|
||||
self._handle_role(self.before, self.after, entry, elem["new_value"]) # type: ignore
|
||||
continue
|
||||
elif attr == "$remove":
|
||||
self._handle_role(self.after, self.before, entry, elem["new_value"]) # type: ignore
|
||||
continue
|
||||
|
||||
# special case for application command permissions update
|
||||
if entry.action == enums.AuditLogAction.application_command_permission_update:
|
||||
self._handle_command_permissions(
|
||||
entry, cast("AuditLogChangeAppCmdPermsPayload", elem)
|
||||
)
|
||||
continue
|
||||
|
||||
# special case for flat emoji fields (discord, why), these will be merged later
|
||||
if attr == "emoji_id" or attr == "emoji_name":
|
||||
has_emoji_fields = True
|
||||
|
||||
transformer: Optional[Transformer]
|
||||
|
||||
try:
|
||||
key, transformer = self.TRANSFORMERS[attr]
|
||||
except (ValueError, KeyError):
|
||||
transformer = None
|
||||
else:
|
||||
if key:
|
||||
attr = key
|
||||
|
||||
try:
|
||||
before = elem["old_value"]
|
||||
except KeyError:
|
||||
before = None
|
||||
else:
|
||||
if transformer:
|
||||
before = transformer(entry, before)
|
||||
|
||||
setattr(self.before, attr, before)
|
||||
|
||||
try:
|
||||
after = elem["new_value"]
|
||||
except KeyError:
|
||||
after = None
|
||||
else:
|
||||
if transformer:
|
||||
after = transformer(entry, after)
|
||||
|
||||
setattr(self.after, attr, after)
|
||||
|
||||
if has_emoji_fields:
|
||||
self._merge_emoji(entry)
|
||||
|
||||
# add an alias
|
||||
if hasattr(self.after, "colour"):
|
||||
self.after.color = self.after.colour
|
||||
self.before.color = self.before.colour
|
||||
if hasattr(self.after, "expire_behavior"):
|
||||
self.after.expire_behaviour = self.after.expire_behavior
|
||||
self.before.expire_behaviour = self.before.expire_behavior
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<AuditLogChanges before={self.before!r} after={self.after!r}>"
|
||||
|
||||
def _handle_role(
|
||||
self,
|
||||
first: AuditLogDiff,
|
||||
second: AuditLogDiff,
|
||||
entry: AuditLogEntry,
|
||||
elem: List[RolePayload],
|
||||
) -> None:
|
||||
if not hasattr(first, "roles"):
|
||||
first.roles = []
|
||||
|
||||
data = []
|
||||
g: Guild = entry.guild
|
||||
|
||||
for e in elem:
|
||||
role_id = int(e["id"])
|
||||
role = g.get_role(role_id)
|
||||
|
||||
if role is None:
|
||||
role = Object(id=role_id)
|
||||
role.name = e["name"] # type: ignore
|
||||
|
||||
data.append(role)
|
||||
|
||||
second.roles = data
|
||||
|
||||
def _handle_command_permissions(
|
||||
self,
|
||||
entry: AuditLogEntry,
|
||||
data: AuditLogChangeAppCmdPermsPayload,
|
||||
) -> None:
|
||||
guild_id = entry.guild.id
|
||||
entity_id = int(data["key"])
|
||||
|
||||
if not hasattr(self.before, "command_permissions"):
|
||||
self.before.command_permissions = {}
|
||||
if (old := data.get("old_value")) is not None:
|
||||
self.before.command_permissions[entity_id] = ApplicationCommandPermissions(
|
||||
data=old, guild_id=guild_id
|
||||
)
|
||||
|
||||
if not hasattr(self.after, "command_permissions"):
|
||||
self.after.command_permissions = {}
|
||||
if (new := data.get("new_value")) is not None:
|
||||
self.after.command_permissions[entity_id] = ApplicationCommandPermissions(
|
||||
data=new, guild_id=guild_id
|
||||
)
|
||||
|
||||
def _merge_emoji(self, entry: AuditLogEntry) -> None:
|
||||
for diff in (self.before, self.after):
|
||||
emoji_id: Optional[str] = diff.__dict__.pop("emoji_id", None)
|
||||
emoji_name: Optional[str] = diff.__dict__.pop("emoji_name", None)
|
||||
|
||||
diff.emoji = entry._state._get_emoji_from_fields(
|
||||
name=emoji_name,
|
||||
id=int(emoji_id) if emoji_id else None,
|
||||
)
|
||||
|
||||
|
||||
class _AuditLogProxyMemberPrune:
|
||||
delete_member_days: int
|
||||
members_removed: int
|
||||
|
||||
|
||||
class _AuditLogProxyMemberMoveOrMessageDelete:
|
||||
channel: Union[abc.GuildChannel, Thread]
|
||||
count: int
|
||||
|
||||
|
||||
class _AuditLogProxyMemberDisconnect:
|
||||
count: int
|
||||
|
||||
|
||||
class _AuditLogProxyPinAction:
|
||||
channel: Union[abc.GuildChannel, Thread]
|
||||
message_id: int
|
||||
|
||||
|
||||
class _AuditLogProxyStageInstanceAction:
|
||||
channel: abc.GuildChannel
|
||||
|
||||
|
||||
class _AuditLogProxyAutoModAction:
|
||||
channel: Optional[Union[abc.GuildChannel, Thread]]
|
||||
rule_name: str
|
||||
rule_trigger_type: enums.AutoModTriggerType
|
||||
|
||||
|
||||
class _AuditLogProxyKickOrMemberRoleAction:
|
||||
integration_type: Optional[str]
|
||||
|
||||
|
||||
class AuditLogEntry(Hashable):
|
||||
"""Represents an Audit Log entry.
|
||||
|
||||
You can retrieve these via :meth:`Guild.audit_logs`,
|
||||
or via the :func:`on_audit_log_entry_create` event.
|
||||
|
||||
.. collapse:: operations
|
||||
|
||||
.. describe:: x == y
|
||||
|
||||
Checks if two entries are equal.
|
||||
|
||||
.. describe:: x != y
|
||||
|
||||
Checks if two entries are not equal.
|
||||
|
||||
.. describe:: hash(x)
|
||||
|
||||
Returns the entry's hash.
|
||||
|
||||
.. versionchanged:: 1.7
|
||||
Audit log entries are now comparable and hashable.
|
||||
|
||||
Attributes
|
||||
----------
|
||||
action: :class:`AuditLogAction`
|
||||
The action that was done.
|
||||
user: Optional[Union[:class:`Member`, :class:`User`, :class:`Object`]]
|
||||
The user who initiated this action. Usually :class:`Member`\\, unless gone
|
||||
then it's a :class:`User`.
|
||||
|
||||
.. versionchanged:: 2.8
|
||||
May now be an :class:`Object` if the user could not be found.
|
||||
id: :class:`int`
|
||||
The entry ID.
|
||||
target: Any
|
||||
The target that got changed. The exact type of this depends on
|
||||
the action being done.
|
||||
extra: Any
|
||||
Extra information that this entry has that might be useful.
|
||||
For most actions, this is ``None``. However in some cases it
|
||||
contains extra information. See :class:`AuditLogAction` for
|
||||
which actions have this field filled out.
|
||||
reason: Optional[:class:`str`]
|
||||
The reason this action was done.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
data: AuditLogEntryPayload,
|
||||
guild: Guild,
|
||||
application_commands: Mapping[int, APIApplicationCommand],
|
||||
automod_rules: Mapping[int, AutoModRule],
|
||||
guild_scheduled_events: Mapping[int, GuildScheduledEvent],
|
||||
integrations: Mapping[int, PartialIntegration],
|
||||
threads: Mapping[int, Thread],
|
||||
users: Mapping[int, User],
|
||||
webhooks: Mapping[int, Webhook],
|
||||
) -> None:
|
||||
self._state = guild._state
|
||||
self.guild = guild
|
||||
|
||||
self._application_commands = application_commands
|
||||
self._automod_rules = automod_rules
|
||||
self._guild_scheduled_events = guild_scheduled_events
|
||||
self._integrations = integrations
|
||||
self._threads = threads
|
||||
self._users = users
|
||||
self._webhooks = webhooks
|
||||
|
||||
self._from_data(data)
|
||||
|
||||
def _from_data(self, data: AuditLogEntryPayload) -> None:
|
||||
self.action = enums.try_enum(enums.AuditLogAction, data["action_type"])
|
||||
self.id = int(data["id"])
|
||||
|
||||
# this key is technically not usually present
|
||||
self.reason = data.get("reason")
|
||||
self.extra = extra = data.get("options")
|
||||
|
||||
if isinstance(self.action, enums.AuditLogAction) and extra:
|
||||
if self.action is enums.AuditLogAction.member_prune:
|
||||
elems = {
|
||||
"delete_member_days": utils._get_as_snowflake(extra, "delete_member_days"),
|
||||
"members_removed": utils._get_as_snowflake(extra, "members_removed"),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif (
|
||||
self.action is enums.AuditLogAction.member_move
|
||||
or self.action is enums.AuditLogAction.message_delete
|
||||
):
|
||||
elems = {
|
||||
"count": int(extra["count"]),
|
||||
"channel": self._get_channel_or_thread(
|
||||
utils._get_as_snowflake(extra, "channel_id")
|
||||
),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action is enums.AuditLogAction.member_disconnect:
|
||||
elems = {
|
||||
"count": int(extra["count"]),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action.name.endswith("pin"):
|
||||
elems = {
|
||||
"channel": self._get_channel_or_thread(
|
||||
utils._get_as_snowflake(extra, "channel_id")
|
||||
),
|
||||
"message_id": int(extra["message_id"]),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action.name.startswith("overwrite_"):
|
||||
instance_id = int(extra["id"])
|
||||
the_type = extra.get("type")
|
||||
if the_type == "1":
|
||||
self.extra = self._get_member(instance_id)
|
||||
elif the_type == "0":
|
||||
role = self.guild.get_role(instance_id)
|
||||
if role is None:
|
||||
role = Object(id=instance_id)
|
||||
role.name = extra.get("role_name") # type: ignore
|
||||
self.extra = role
|
||||
elif self.action.name.startswith("stage_instance"):
|
||||
elems = {
|
||||
"channel": self._get_channel_or_thread(
|
||||
utils._get_as_snowflake(extra, "channel_id")
|
||||
)
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action is enums.AuditLogAction.application_command_permission_update:
|
||||
app_id = int(extra["application_id"])
|
||||
elems = {
|
||||
"integration": self._get_integration_by_application_id(app_id) or Object(app_id)
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action in (
|
||||
enums.AuditLogAction.automod_block_message,
|
||||
enums.AuditLogAction.automod_send_alert_message,
|
||||
enums.AuditLogAction.automod_timeout,
|
||||
enums.AuditLogAction.automod_quarantine_user,
|
||||
):
|
||||
elems = {
|
||||
"channel": (
|
||||
self._get_channel_or_thread(utils._get_as_snowflake(extra, "channel_id"))
|
||||
),
|
||||
"rule_name": extra["auto_moderation_rule_name"],
|
||||
"rule_trigger_type": enums.try_enum(
|
||||
enums.AutoModTriggerType,
|
||||
int(extra["auto_moderation_rule_trigger_type"]),
|
||||
),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
elif self.action in (
|
||||
enums.AuditLogAction.kick,
|
||||
enums.AuditLogAction.member_role_update,
|
||||
):
|
||||
elems = {
|
||||
# unlike other extras, this key isn't always provided
|
||||
"integration_type": extra.get("integration_type"),
|
||||
}
|
||||
self.extra = type("_AuditLogProxy", (), elems)()
|
||||
|
||||
self.extra: Any
|
||||
# actually this but there's no reason to annoy users with this:
|
||||
# Union[
|
||||
# _AuditLogProxyMemberPrune,
|
||||
# _AuditLogProxyMemberMoveOrMessageDelete,
|
||||
# _AuditLogProxyMemberDisconnect,
|
||||
# _AuditLogProxyPinAction,
|
||||
# _AuditLogProxyStageInstanceAction,
|
||||
# _AuditLogProxyAutoModAction,
|
||||
# _AuditLogProxyKickOrMemberRoleAction,
|
||||
# Member, User, None,
|
||||
# Role,
|
||||
# ]
|
||||
|
||||
# this key is not present when the above is present, typically.
|
||||
# It's a list of { new_value: a, old_value: b, key: c }
|
||||
# where new_value and old_value are not guaranteed to be there depending
|
||||
# on the action type, so let's just fetch it for now and only turn it
|
||||
# into meaningful data when requested
|
||||
self._changes = data.get("changes", [])
|
||||
|
||||
self.user = self._get_member(utils._get_as_snowflake(data, "user_id"))
|
||||
self._target_id = utils._get_as_snowflake(data, "target_id")
|
||||
|
||||
def _get_member(self, user_id: Optional[int]) -> Union[Member, User, Object, None]:
|
||||
if not user_id:
|
||||
return None
|
||||
return self.guild.get_member(user_id) or self._users.get(user_id) or Object(id=user_id)
|
||||
|
||||
def _get_channel_or_thread(
|
||||
self, channel_id: Optional[int]
|
||||
) -> Union[abc.GuildChannel, Thread, Object, None]:
|
||||
if not channel_id:
|
||||
return None
|
||||
return self.guild.get_channel_or_thread(channel_id) or Object(channel_id)
|
||||
|
||||
def _get_integration_by_application_id(
|
||||
self, application_id: int
|
||||
) -> Optional[PartialIntegration]:
|
||||
if not application_id:
|
||||
return None
|
||||
|
||||
for integration in self._integrations.values():
|
||||
if integration.application_id == application_id:
|
||||
return integration
|
||||
return None
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<AuditLogEntry id={self.id} action={self.action} user={self.user!r}>"
|
||||
|
||||
@utils.cached_property
|
||||
def created_at(self) -> datetime.datetime:
|
||||
""":class:`datetime.datetime`: Returns the entry's creation time in UTC."""
|
||||
return utils.snowflake_time(self.id)
|
||||
|
||||
@utils.cached_property
|
||||
def target(
|
||||
self,
|
||||
) -> Union[
|
||||
Guild,
|
||||
abc.GuildChannel,
|
||||
Member,
|
||||
User,
|
||||
Role,
|
||||
Invite,
|
||||
Webhook,
|
||||
Emoji,
|
||||
PartialIntegration,
|
||||
StageInstance,
|
||||
GuildSticker,
|
||||
Thread,
|
||||
GuildScheduledEvent,
|
||||
APIApplicationCommand,
|
||||
AutoModRule,
|
||||
Object,
|
||||
None,
|
||||
]:
|
||||
if self.action.target_type is None:
|
||||
return Object(id=self._target_id) if self._target_id else None
|
||||
|
||||
try:
|
||||
converter = getattr(self, f"_convert_target_{self.action.target_type}")
|
||||
except AttributeError:
|
||||
return Object(id=self._target_id) if self._target_id else None
|
||||
else:
|
||||
return converter(self._target_id)
|
||||
|
||||
@utils.cached_property
|
||||
def category(self) -> Optional[enums.AuditLogActionCategory]:
|
||||
"""Optional[:class:`AuditLogActionCategory`]: The category of the action, if applicable."""
|
||||
return self.action.category
|
||||
|
||||
@utils.cached_property
|
||||
def changes(self) -> AuditLogChanges:
|
||||
""":class:`AuditLogChanges`: The list of changes this entry has."""
|
||||
obj = AuditLogChanges(self, self._changes)
|
||||
del self._changes
|
||||
return obj
|
||||
|
||||
@utils.cached_property
|
||||
def before(self) -> AuditLogDiff:
|
||||
""":class:`AuditLogDiff`: The target's prior state."""
|
||||
return self.changes.before
|
||||
|
||||
@utils.cached_property
|
||||
def after(self) -> AuditLogDiff:
|
||||
""":class:`AuditLogDiff`: The target's subsequent state."""
|
||||
return self.changes.after
|
||||
|
||||
def _convert_target_guild(self, target_id: int) -> Guild:
|
||||
return self.guild
|
||||
|
||||
def _convert_target_channel(self, target_id: int) -> Union[abc.GuildChannel, Object]:
|
||||
return self.guild.get_channel(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_user(self, target_id: int) -> Union[Member, User, Object, None]:
|
||||
return self._get_member(target_id)
|
||||
|
||||
def _convert_target_role(self, target_id: int) -> Union[Role, Object]:
|
||||
return self.guild.get_role(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_invite(self, target_id: int) -> Invite:
|
||||
# invites have target_id set to null
|
||||
# so figure out which change has the full invite data
|
||||
changeset = self.before if self.action is enums.AuditLogAction.invite_delete else self.after
|
||||
|
||||
fake_payload: InvitePayload = {
|
||||
"max_age": changeset.max_age,
|
||||
"max_uses": changeset.max_uses,
|
||||
"code": changeset.code,
|
||||
"temporary": changeset.temporary,
|
||||
"uses": changeset.uses,
|
||||
"type": 0,
|
||||
"channel": None,
|
||||
"expires_at": None,
|
||||
}
|
||||
|
||||
obj = Invite(
|
||||
state=self._state, data=fake_payload, guild=self.guild, channel=changeset.channel
|
||||
)
|
||||
try:
|
||||
obj.inviter = changeset.inviter
|
||||
except AttributeError:
|
||||
pass
|
||||
return obj
|
||||
|
||||
def _convert_target_webhook(self, target_id: int) -> Union[Webhook, Object]:
|
||||
return self._webhooks.get(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_emoji(self, target_id: int) -> Union[Emoji, Object]:
|
||||
return self._state.get_emoji(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_message(self, target_id: int) -> Union[Member, User, Object, None]:
|
||||
return self._get_member(target_id)
|
||||
|
||||
def _convert_target_integration(self, target_id: int) -> Union[PartialIntegration, Object]:
|
||||
return self._integrations.get(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_stage_instance(self, target_id: int) -> Union[StageInstance, Object]:
|
||||
return self.guild.get_stage_instance(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_sticker(self, target_id: int) -> Union[GuildSticker, Object]:
|
||||
return self._state.get_sticker(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_thread(self, target_id: int) -> Union[Thread, Object]:
|
||||
return (
|
||||
self.guild.get_thread(target_id) or self._threads.get(target_id) or Object(id=target_id)
|
||||
)
|
||||
|
||||
def _convert_target_guild_scheduled_event(
|
||||
self, target_id: int
|
||||
) -> Union[GuildScheduledEvent, Object]:
|
||||
return (
|
||||
self.guild.get_scheduled_event(target_id)
|
||||
or self._guild_scheduled_events.get(target_id)
|
||||
or Object(id=target_id)
|
||||
)
|
||||
|
||||
def _convert_target_application_command_or_integration(
|
||||
self, target_id: int
|
||||
) -> Union[APIApplicationCommand, PartialIntegration, Object]:
|
||||
# try application command
|
||||
if target := (
|
||||
self._state._get_guild_application_command(self.guild.id, target_id)
|
||||
or self._state._get_global_application_command(target_id)
|
||||
or self._application_commands.get(target_id)
|
||||
):
|
||||
return target
|
||||
|
||||
# permissions may also be changed for the entire application,
|
||||
# however the target ID is the application ID, not the integration ID
|
||||
if target := self._get_integration_by_application_id(target_id):
|
||||
return target
|
||||
|
||||
# fall back to object
|
||||
return Object(id=target_id)
|
||||
|
||||
def _convert_target_automod_rule(self, target_id: int) -> Union[AutoModRule, Object]:
|
||||
return self._automod_rules.get(target_id) or Object(id=target_id)
|
||||
Reference in New Issue
Block a user