Source code for scripts.base.models

"""
PokeGambler - A Pokemon themed gambling bot for Discord.
Copyright (C) 2021 Harshith Thota

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
----------------------------------------------------------------------------

This module contains a compilation of data models.
"""

# pylint: disable=too-many-instance-attributes, too-many-arguments
# pylint: disable=unused-argument, too-many-lines, no-member

from __future__ import annotations

import os
from dataclasses import dataclass
from datetime import datetime, timedelta
from functools import wraps
from inspect import ismethod
from typing import (
    TYPE_CHECKING, Any, Callable, Dict, Iterable,
    List, Optional, Tuple, Type, Union
)

import discord
from bson import ObjectId
from discord import Guild, Member, Role, TextChannel, User
from pymongo import UpdateOne

if TYPE_CHECKING:
    from bot import PokeGambler

# pylint: disable=cyclic-import, wrong-import-position
from ..base.items import DB_CLIENT, Item


[docs]def expire_cache(func: Callable): """Decorator to reset User cache. :param func: Function which requires cache to be cleared. """ @wraps(func) def wrapper(self, *args, **kwargs): """ Wrapper function. """ # pylint: disable=import-outside-toplevel, cyclic-import from ..commands.basecommand import Commands Commands.expire_cache(self.user.id) return func(self, *args, **kwargs) return wrapper
[docs]def to_dict( dc_obj: Union[ discord.User, discord.Guild, discord.TextChannel, discord.Role ] ) -> Dict[str, Any]: """Convert a Discord object into a Dictionary. .. note:: Currently only supports User, Guild, and TextChannel. :param dc_obj: Discord object to be converted. :type dc_obj: Union[ :class:`discord.User`, :class:`discord.Guild`, :class:`discord.TextChannel`, :class:`discord.Role`] :return: Dictionary of Discord object. :rtype: Dict[str, Any] """ fields = [ 'name', 'id', 'created_at' ] if isinstance(dc_obj, discord.Guild): fields.extend(['owner', 'large']) obj_dict = { attr: getattr(dc_obj, attr, None) for attr in fields } if isinstance(dc_obj, discord.Guild): obj_dict['owner'] = to_dict(obj_dict['owner']) return obj_dict
[docs]class MethodNotAllowed(Exception): """Exception raised when a method is not allowed."""
class NameSetter(type): """ Metaclass to set the mongo collection for the model. Useful for DB actions in Classmethods. Also used for setting default class attributes. :meta private: """ def __new__(cls, name, bases, dct): new_cl = super().__new__( cls, name, bases, dct ) new_cl.model_name = new_cl.__name__.lower() new_cl.mongo = DB_CLIENT[new_cl.model_name] new_cl.no_uinfo = dct.get('no_uinfo', False) new_cl._uid_fields = dct.get('uid_fields', []) new_cl.sort_order = dct.get('sort_order', []) new_cl.read_only = dct.get('read_only', False) return new_cl
[docs]@dataclass class Model(metaclass=NameSetter): """The Base Model Class which has a corresponding Collection in the DB. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ def __init__( self, user: discord.Member, *args, **kwargs ) -> None: self.user = user def __iter__(self): def serialize(obj): if isinstance(obj, list): return [serialize(item) for item in obj] if isinstance(obj, dict): return { key: serialize(value) for key, value in obj.items() } if isinstance(obj, (Model, Item)): return dict(obj) if isinstance(obj, ObjectId): return str(obj) return obj iterable = dir(self) if self.sort_order: iterable = sorted( iterable, key=lambda x: ( self.sort_order.index(x) if x in self.sort_order else len(self.sort_order) ) ) for attr in iterable: # Patch for Full_Info being executed before Profile creation. if attr in getattr(self, "excludes", []): continue if attr == 'user' and ( self.no_uinfo or isinstance(self, TaskModel) ): continue if all([ not attr.startswith("_"), attr not in [ "model_name", "mongo", "excludes", "no_uinfo", "uid_fields", "classes", "count", "pk_field", "sort_order", "read_only" ], not ismethod(getattr(self, attr)), not isinstance( getattr(self.__class__, attr, None), (property, staticmethod, classmethod) ) ]): res = getattr(self, attr) if attr == 'user': attr = 'user_info' res = to_dict(res) res.pop('id') res = serialize(res) yield (attr, res) @classmethod @property def uid_fields(cls) -> List[str]: """ List of fields containing user IDs. :return: List of fields containing user IDs. :rtype: List[str] """ if ('user_id', str) not in cls._uid_fields: cls._uid_fields.append(('user_id', str)) return cls._uid_fields
[docs] def drop(self): """ Deletes all entries in the Collection for the user. """ if self.read_only: raise MethodNotAllowed("This model is read-only.") self.mongo.delete_many({ "$or": [ {"user_id": str(self.user.id)}, {"played_by": str(self.user.id)} ] })
[docs] def get(self, param=None) -> Any: """Returns the Model object as a dictionary. :param param: The attribute to get from the Model. :type param: str :return: The attribute value. :rtype: Any """ return ( dict(self) if not param else dict(self).get(param) )
[docs] def save(self): """ Saves the Model object to the Collection. """ if self.read_only: raise MethodNotAllowed("This model is read-only.") self.mongo.insert_one(dict(self))
[docs] @classmethod def latest( cls: Type[Model], limit: Optional[int] = 5 ) -> List[Dict]: """Returns the latest douments from the DB for a model. :param limit: The number of documents to return., default 5 :type limit: Optional[int] :return: The documents from the DB. :rtype: List[Dict] """ return list( cls.mongo.aggregate([ {"$sort": {"_id": -1}}, {"$limit": limit}, {"$unset": "_id"} ]) )
[docs] @classmethod def purge(cls): """ Deletes all entries in the Collection. """ if cls.read_only: raise MethodNotAllowed("This model is read-only.") cls.mongo.delete_many({})
@classmethod @property def classes(cls) -> List[Type[Model]]: """ List of all subclasses of Model. :return: List of all subclasses of Model. :rtype: List[Type[Model]] """ return [ cls for cls in Model.__subclasses__() if cls not in (UnlockedModel, Minigame) ] + UnlockedModel.__subclasses__() + Minigame.__subclasses__()
[docs] @classmethod def censor_uids(cls, user: discord.User) -> int: """ Censors the user IDs in all the collections. :param user: The user to censor. :type user: :class:`discord.User` :return: The number of documents censored. :rtype: int """ def get_filter(user, elem, type_): if type_ in (list, dict): return {f'{elem}.id': user.id} return {elem: str(user.id)} def replace_id(id_, record): for key, value in record.items(): if str(value) == str(id_): record[key] = 'REDACTED' elif isinstance(value, dict): replace_id(id_, value) elif isinstance(value, list): for idx, value_ in enumerate(value): value[idx] = replace_id(id_, value_) return record num_deleted = 0 # pylint: disable=not-an-iterable for model in cls.classes: filter_ = { "$or": [ get_filter(user, name, type_) for name, type_ in model.uid_fields ] } modified_records = [ UpdateOne( {'_id': record.pop('_id')}, {'$set': replace_id(user.id, record)} ) for record in model.mongo.find(filter_) ] if modified_records: res = model.mongo.bulk_write(modified_records) raw = res.bulk_api_result if raw.get('nModified', 0) > 0: num_deleted += raw['nModified'] return num_deleted
[docs] @classmethod def count(cls) -> int: """ The number of documents in the collection. :return: The number of documents in the collection. :rtype: int """ return cls.mongo.estimated_document_count()
# region Models
[docs]class Blacklist(Model): """Wrapper for blacklisted users based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param mod: The Admin who used the command. :type mod: Optional[:class:`discord.Member`] :param reason: The reason for the blacklist. :type reason: Optional[str] """ uid_fields = [('blacklisted_by', dict)] def __init__( self, user: discord.Member, mod: Optional[discord.Member] = None, reason: Optional[str] = "" ): super().__init__(user) self.user_id = str(user.id) self.blacklisted_at = datetime.now() self.blacklisted_by = to_dict(mod) if mod else None self.reason = reason
[docs] @expire_cache def pardon(self): """ Pardons a blacklisted user. """ self.mongo.delete_one({"user_id": self.user_id})
# pylint: disable=invalid-overridden-method
[docs] async def save(self): """ Saves the blacklisted user in the table. Also resets their profile. """ # pylint: disable=import-outside-toplevel, cyclic-import from .shop import PremiumShop, Shop super().save() Profiles(self.user).reset() Inventory(self.user).drop() Boosts(self.user).reset() Loots(self.user).reset() for cls in Minigame.__subclasses__(): cls(self.user).drop() titles = [ title.name for shop in [Shop, PremiumShop] for title in shop.categories["Titles"].items ] if need_to_remove := [ role for role in self.user.roles if role.name.title() in titles ]: await self.user.remove_roles( *need_to_remove, reason="Blacklisted" ) await self.user.edit( nick=None, reason="Blacklisted" )
[docs] @classmethod def is_blacklisted( cls: Type[Blacklist], user_id: str ) -> bool: """Checks if a user is blacklisted. :param user_id: The ID of the user to check. :type user_id: str :return: True if blacklisted, False otherwise. :rtype: bool """ return cls.mongo.find_one({"user_id": user_id})
[docs]class CommandData(Model): """Wrapper for command based DB actions :param user: The user to map the collection to. :type user: :class:`discord.Member` :param message: The message which triggered the command. :type message: discord.Message :param is_interaction: Whether the command is an interaction. :type is_interaction: bool :param command: The command which was triggered. :type command: str :param args: The arguments passed to the command. :type args: List[str] :param kwargs: The keyword arguments passed to the command. :type kwargs: Dict[str, Any] """ def __init__( self, user: discord.Member, message: discord.Message, is_interaction: bool, command: str, admin_cmd: bool, args: List, kwargs: Dict ): super().__init__(user) self.user_id = str(user.id) self.admin_cmd = admin_cmd self.used_at = datetime.now() self.is_interaction = is_interaction self.channel = to_dict(message.channel) self.guild = to_dict(message.guild) self.command = command self.args = args self.kwargs = kwargs
[docs] def save(self): """ Override Save to serialize Discord objects in args or kwargs. """ for idx, arg in enumerate(self.args): if isinstance(arg, (Guild, TextChannel, User, Member, Role)): self.args[idx] = to_dict(arg) for key, value in self.kwargs.items(): if isinstance(value, (Guild, TextChannel, User, Member, Role)): self.kwargs[key] = to_dict(value) super().save()
[docs] @classmethod def history(cls, limit: Optional[int] = 5, **kwargs) -> List[Dict]: """Returns the list of commands used on PG till now. :param limit: The number of documents to return., default 5 :type limit: Optional[int] :return: The recorded commands from the DB. :rtype: List[Dict] """ return list(cls.mongo.find(kwargs).sort("_id", -1).limit(limit))
[docs] @classmethod def most_active_channel(cls) -> Dict: """Returns the most active channel. :return: The most active channel. :rtype: Dict """ return next( cls.mongo.aggregate([ { '$match': { 'used_at': { '$gte': datetime.today() - timedelta( weeks=1 ) }, 'admin_cmd': False } }, { '$group': { '_id': '$channel.id', 'name': {'$last': '$channel.name'}, 'num_cmds': {'$sum': 1}, 'guild': {'$first': '$guild'} } }, { '$sort': {'num_cmds': -1} }, { '$limit': 1 }, { '$match': {'num_cmds': {'$gt': 1}} } ]), None )
[docs] @classmethod def most_used_command(cls) -> Dict: """Returns the most used command. :return: The most used command. :rtype: Dict """ return next( cls.mongo.aggregate([ { '$match': { 'used_at': { '$gte': datetime.today() - timedelta( weeks=1 ) }, 'admin_cmd': False } }, { '$group': { '_id': '$command', 'num_cmds': {'$sum': 1} } }, { '$sort': {'num_cmds': -1} }, { '$limit': 1 }, { '$match': {'num_cmds': {'$gt': 1}} } ]), None )
[docs] @classmethod def most_active_user(cls) -> Dict: """Returns the most active user. :return: The most active user. :rtype: Dict """ return next( cls.mongo.aggregate([ { '$match': { 'used_at': { '$gte': datetime.today() - timedelta( weeks=1 ) }, 'admin_cmd': False } }, { '$group': { '_id': '$user_id', 'num_cmds': {'$sum': 1} } }, { '$sort': {'num_cmds': -1} }, { '$limit': 1 }, { '$match': {'num_cmds': {'$gt': 1}} } ]), None )
[docs] @classmethod def num_user_cmds(cls, user_id: str) -> int: """Returns the number of commands used by a user. :param user_id: The ID of the user. :type user_id: str :return: The number of commands used by the user. :rtype: int """ return cls.mongo.count_documents({ "user_id": user_id })
[docs] @classmethod def clean_guild(cls, guild_id: int) -> int: """Replaces the given guild ID in all commands with "REDACTED". :param guild_id: The ID of the guild. :type guild_id: str :return: The number of removed commands. :rtype: int """ return cls.mongo.update_many( {"guild.id": guild_id}, { "$set": { "guild.id": "REDACTED" } } ).modified_count
[docs] @classmethod def trend( cls, include_os: bool = True, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None ) -> Iterable[Dict[str, Union[int, datetime]]]: """Returns the number of commands used on PG per day. :param include_os: Whether to include commands used on the Official Server. :type include_os: bool :param start_time: The start time of the period. :type start_time: Optional[datetime] :param end_time: The end time of the period. :type end_time: Optional[datetime] :return: The number of commands used on PG. :rtype: Iterable[Dict[str, Union[int, datetime]]] """ if include_os: match = {} else: match = { '$match': { 'guild.id': { '$nin': [ int(os.getenv('OFFICIAL_SERVER')), int(( os.getenv('BLACKLIST_GUILDS') if os.getenv('IS_PROD') == 'True' else os.getenv('WHITELIST_GUILDS') ).split(', ')[0]) ] } } } if start_time is None: start_time = datetime(2021, 1, 1) if end_time is None: end_time = datetime.now() match['$match']['used_at'] = { '$gte': start_time, '$lte': end_time } return cls.mongo.aggregate([ match, { '$group': { '_id': { '$dateToString': { 'format': '%Y-%m-%d', 'date': '$used_at' } }, 'count': { '$sum': 1 } } }, { '$sort': { '_id': 1 } }, { '$project': { "date": { "$dateFromString": { "format": "%Y-%m-%d", "dateString": "$_id" } }, "_id": 0, "count": 1 } } ])
[docs]class DuelActionsModel(Model): """ Wrapper for duel actions based DB actions :param user: The user to map the collection to. :type user: :class:`discord.Member` :param action: An action which can be used in a duel. :type action: Optional[str] :param level: The level of the action. :type level: Optional[str] """ uid_fields = [("created_by", str)] def __init__( self, user: discord.Member, action: Optional[str] = None, level: Optional[str] = None ): super().__init__(user) self.created_at = datetime.now() self.created_by = str(user.id) self.action = action self.level = level # pylint: disable=inconsistent-return-statements
[docs] @classmethod def get_actions( cls: Type[DuelActionsModel], user_id: Optional[str] = None ) -> List[Dict]: """Get Duel Actions from the DB. :param user_id: An optional user_id to get the actions for. :type user_id: Optional[str] :return: The list of duel actions. :rtype: List[Dict] """ filter_ = {} if user_id: filter_["user_id"] = user_id if not cls.mongo.count_documents(filter_): return None yield from cls.mongo.find(filter_)
[docs]class Exchanges(Model): """Wrapper for currency exchanges based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param admin: The Admin who performed the exchange. :type admin: Optional[:class:`discord.Member`] :param pokebot: The Pokemon themed bot. :type pokebot: Optional[:class:`discord.Member`] :param chips: The amount of chips exchanged. :type chips: Optional[int] :param mode: The mode of the exchange., default is Deposit. :type mode: Optional[str] """ uid_fields = [("admin", dict)] def __init__( self, user: discord.Member, admin: Optional[discord.Member] = None, pokebot: Optional[discord.Member] = None, chips: Optional[int] = None, mode: Optional[str] = None ): super().__init__(user) self.exchanged_at = datetime.now() self.user_id = str(user.id) self.admin = to_dict(admin) self.pokebot = to_dict(pokebot) self.chips = chips self.mode = mode
[docs] def get_daily_exchanges(self, mode: str) -> int: """Gets the list of exchanges made by the user today. :param mode: The mode of the exchange. :type mode: str :return: The number of chips exchanged. :rtype: int """ pipeline = [ { "$match": { "user_id": str(self.user.id), "exchanged_at": { "$gt": datetime.now().replace( hour=0, minute=0, second=0 ) }, "mode": mode } }, { "$group": { "_id": "$user_id", "total_chips": {"$sum": "$chips"} } } ] if result := next(self.mongo.aggregate(pipeline), None): return result["total_chips"] return 0
[docs] @classmethod def exchanges(cls, **kwargs) -> List[Dict]: """Find all completed exchanges based on the provided filters. :param kwargs: The filters to use in the query. :type kwargs: Dict :return: The list of completed exchanges. :rtype: List[Dict] """ yield from cls.mongo.find(kwargs)
[docs]class Inventory(Model): """Wrapper for Inventory based DB operations. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ # pylint: disable=arguments-differ def __init__( self, user: discord.Member ) -> None: super().__init__(user) self.user_id = str(self.user.id)
[docs] def delete( self, item_inp: Union[str, List[str]], quantity: int = -1, is_name: bool = False ) -> int: """Deletes an Item from user's Inventory. Input can either be a name or List of itemids. If item name is given, a quantity can be provided. If quantity is -1, all items of the name will be removed. :param item_inp: The name or list of item ids to delete. :type item_inp: Union[str, List[str]] :param quantity: The quantity of items to delete., default is -1. :type quantity: int :param is_name: Whether the input is a name or list of item ids. :type is_name: bool :return: The number of items deleted. :rtype: int """ ids = self.from_name(item_inp) if is_name else [item_inp] if is_name: ids = self.from_name(item_inp) elif isinstance(item_inp, list): ids = item_inp else: ids = [item_inp] if quantity > 0: ids = ids[:quantity] res = self.mongo.delete_many({ "user_id": self.user_id, "itemid": { "$in": ids } }) return res.deleted_count
[docs] def from_id(self, itemid: str) -> Item: """Gets an item using ItemID if it exists in user's inventory. :param itemid: The ItemID of the item. :type itemid: str :return: The Item object. :rtype: :class:`~.items.Item` """ if self.mongo.find_one({ "user_id": self.user_id, "itemid": itemid }): return Item.from_id(itemid) return None
[docs] def from_name(self, name: str) -> List[str]: """Returns a list of ItemIDs if they exist in user's Inventory. :param name: The name of the item. :type name: str :return: The list of ItemIDs. :rtype: List[str] """ items = self.mongo.aggregate([ { "$match": {"user_id": self.user_id} }, { "$lookup": { "from": "items", "localField": "itemid", "foreignField": "_id", "as": "items" } }, { "$match": { "items.name": { "$regex": f"^{name}", "$options": "i" } } }, { "$unwind": "$items" }, { "$group": { "_id": "$items.name", "items": { "$push": "$items" } } }, { "$unwind": "$items" }, { "$replaceRoot": {"newRoot": "$items"} }, { "$set": { "itemid": "$_id" } }, {"$unset": "_id"} ]) items = list(items) return [ item["itemid"] for item in items ]
# pylint: disable=arguments-renamed
[docs] def get( self, category: Optional[str] = None ) -> Tuple[Dict[str, List], int]: """Returns a list of items in user's Inventory and the net worth. .. note:: These items are not included for calculating the net worth: * :class:`~.items.Chest` * :class:`~.items.Lootbag` * :class:`~.items.Rewardbox` :param category: The category to filter by. :type category: Optional[str] :return: The list of items and the net worth of the inventory. :rtype: Tuple[Dict[str, List], int] """ pipeline = [ { "$match": {"user_id": self.user_id} }, { "$lookup": { "from": "items", "localField": "itemid", "foreignField": "_id", "as": "items" } }, { "$unwind": "$items" }, { "$group": { "_id": "$items.category", "items": { "$push": "$items" }, "net_worth": {"$sum": "$items.price"} } }, { "$set": { "category": "$_id" } }, {"$unset": "_id"} ] if category: pipeline.insert(2, { "$match": { "items.category": category } }) categories = self.mongo.aggregate(pipeline) net_worth = 0 item_dict = {} for catog in categories: category = catog.pop("category") if category not in item_dict: item_dict[category] = [] item_dict[category].extend(catog["items"]) net_worth += catog.pop('net_worth') return item_dict, net_worth
[docs] def save(self, itemid: str): """Saves an item to a player's inventory. :param itemid: The ItemID of the item. :type itemid: str """ if self.from_id(itemid): new_item = Item.from_id(itemid, force_new=True) new_item.save() itemid = new_item.itemid self.mongo.insert_one({ "user_id": self.user_id, "user": to_dict(self.user), "itemid": itemid, "obtained_on": datetime.now() })
[docs] def bulk_insert(self, items: List[str]): """Inserts a list of items to a player's inventory. :param items: The list of item ids to insert. :type items: List[str] """ new_items = Item.bulk_from_id(items, force_new=True) self.mongo.insert_many([ { "user_id": self.user_id, "user": to_dict(self.user), "itemid": item.itemid, "obtained_on": datetime.now() } for item in new_items ])
[docs]class Matches(Model): """Wrapper for Gamble matches based DB actions :param user: The user to map the collection to. :type user: :class:`discord.Member` :param started_by: The user who started the match. :type started_by: Optional[:class:`discord.Member`] :param participants: The list of participants. :type participants: Optional[List[:class:`discord.Member`]] :param winner: The user who won. :type winner: :class:`discord.Member` :param deal_cost: The fee of the gamble match., deafult is 50. :type deal_cost: int :param lower_wins: Was the lower_wins rule in place? :type lower_wins: bool :param by_joker: Did the match end due to a joker? :type by_joker: bool """ no_uinfo = True uid_fields = [ ('started_by', dict), ('participants', list), ('winner', dict) ] def __init__( self, user: discord.Member, started_by: Optional[discord.Member] = None, participants: Optional[List[discord.Member]] = None, winner: Optional[discord.Member] = None, deal_cost: int = 50, lower_wins: bool = False, by_joker: bool = False ): super().__init__(user) self.played_at = datetime.now() self.started_by = to_dict(started_by) self.participants = [ to_dict(user) for user in participants ] if participants else None self.winner = to_dict(winner) self.deal_cost = deal_cost self.lower_wins = lower_wins self.by_joker = by_joker @property def num_matches(self) -> int: """Returns number of gamble matches played. :return: Number of matches played. :rtype: int """ return self.mongo.count_documents({ "$or": [ { "played_by": str(self.user.id) }, { "participants.id": self.user.id } ] }) @property def num_wins(self) -> int: """Returns number of gamble matches won. :return: Number of matches won. :rtype: int """ return self.mongo.count_documents({ "winner.id": self.user.id })
[docs] def get_stats(self) -> Tuple[int, int]: """Get Match :meth:`num_matches` and :meth:`num_wins` as a Tuple. :return: Tuple of num_matches and num_wins. :rtype: Tuple[int, int] """ return (self.num_matches, self.num_wins)
[docs] @classmethod def get_matches( cls: Type[Matches], limit: Optional[int] = 10 ) -> List[Dict]: """Get the recently played gamble matches. :param limit: The maximum number of matches to return., default 10 :type limit: Optional[int] :return: List of recent matches. :rtype: List[Dict] """ pipeline = [ {"$sort": {"played_at": -1}}, {"$limit": limit} ] return list(cls.mongo.aggregate(pipeline))
[docs]class Trades(Model): """Wrapper for trades based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param traded_to: The user with whom the trade happened. :type traded_to: :class:`discord.Member` :param given_chips: The number of pokechips given to user. :type given_chips: int :param taken_chips: The number of pokechips taken from user. :type taken_chips: int :param given_items: The list of items given to user. :type given_items: List[str] :param taken_items: The list of items taken from user. :type taken_items: List[str] """ uid_fields = [ ('traded_by', str), ('traded_to', dict) ] def __init__( self, user: discord.Member, traded_to: Optional[discord.Member] = None, given_chips: int = None, taken_chips: int = None, given_items: List[str] = None, taken_items: List[str] = None ): super().__init__(user) self.traded_at = datetime.now() self.traded_by = str(user.id) self.traded_to = to_dict(traded_to) self.given_chips = given_chips self.taken_chips = taken_chips self.given_items = given_items self.taken_items = taken_items
[docs]class Transactions(Model): """Wrapper for webshop transactions based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param created_on: The date and time of the transaction. :type created_on: datetime :param tx_id: The transaction ID. :type tx_id: str :param gateway: The gateway used for the transaction. :type gateway: str :param webitem_id: The ID of the webitem. :type webitem_id: str :param quantity: The quantity of the webitem purchased. :type quantity: int :param total_price: The total price of the transaction. :type total_price: float :param redeemed: Was the webitem redeemed? :type redeemed: bool """ sort_order: Optional[List] = [ 'created_at', "user", "tx_id", "gateway", "webitem", "quantity", "total_price", "redeemed" ] def __init__( self, user: discord.Member, created_at: datetime = None, tx_id: str = None, gateway: str = None, webitem_id: str = None, quantity: int = None, total_price: float = None, redeemed: bool = False ): super().__init__(user) self.created_at = created_at self.tx_id = tx_id self.gateway = gateway self.webitem_id = webitem_id self.quantity = quantity self.total_price = total_price self.redeemed = redeemed # pylint: disable=arguments-differ
[docs] def get(self, **kwargs) -> List[Transactions]: """Get all transactions for the user. :return: List of transactions. :rtype: List[Transactions] """ pipeline = [ { '$match': { 'user_id': str(self.user.id), **kwargs } }, { '$addFields': { 'webitem_obj_id': { '$toObjectId': '$webitem_id' } } }, { '$lookup': { 'from': 'webshop', 'localField': 'webitem_obj_id', 'foreignField': '_id', 'as': 'webitem' } }, { '$unwind': { 'path': '$webitem' } } ] txns = [] for raw_tx in self.mongo.aggregate(pipeline): txn = Transactions( user=self.user, created_at=raw_tx['created_at'], tx_id=raw_tx['tx_id'], gateway=raw_tx['gateway'], webitem_id=raw_tx['webitem_id'], quantity=raw_tx['quantity'], total_price=raw_tx['total_price'], redeemed=raw_tx['redeemed'] ) # pylint: disable=attribute-defined-outside-init txn.webitem = Webshop(name=raw_tx['webitem']['name']) del txn.webitem_id txns.append(txn) return txns
[docs] def from_tx_id(self, tx_id: str) -> Transactions: """Get the transaction with the given ID. :param tx_id: The transaction ID. :type tx_id: str :return: The transaction. :rtype: Transactions """ txs = self.get(tx_id=tx_id) return txs[0] if txs else None
[docs] def redeem(self): """Redeem the transaction. :return: The transaction. :rtype: Transactions """ self.mongo.update_one( {'tx_id': self.tx_id}, {'$set': {'redeemed': True}} )
# region Submodels
[docs]class Minigame(Model): """ Base class for Minigames. """ # pylint: disable=no-self-use @property def num_plays(self) -> int: """Returns number of minigames (of specified type) played. :return: Number of minigames played. :rtype: int """ return len(self.get_plays()) @property def num_wins(self): """Returns number of minigames (of specified type) won. :return: Number of minigames won. :rtype: int """ return len(self.get_plays(wins=True))
[docs] def get_lb(self) -> List[Dict]: """Returns leaderboard for the specified minigame. :return: The leaderboard for the minigame. :rtype: List[Dict] """ return list(self.mongo.aggregate([ { "$group": self._get_lb_group() }, { "$match": { "num_wins": {"$gte": 1} } }, { "$addFields": { "earned": { "$toInt": {"$multiply": [ "$num_wins", { "$divide": [ "$cumm_cost", "$num_matches" ] } ]} } } }, {"$sort": self._get_lb_sort()}, {"$limit": 20} ]))
[docs] def get_plays(self, wins: bool = False) -> List[Dict]: """Returns list of minigames (of specified type) played. :param wins: Whether to include only wins or all plays. :type wins: bool :return: List of minigames played. :rtype: List[Dict] """ filter_ = { "played_by": str(self.user.id) } if wins: filter_["won"] = True return list(self.mongo.aggregate([ { "$match": filter_ } ]))
[docs] @expire_cache def save(self): super().save()
def _get_lb_group(self): return { "_id": "$played_by", "num_wins": { "$sum": { "$toInt": "$won" } }, "num_matches": {"$sum": 1}, "cumm_cost": {"$sum": "$cost"} } def _get_lb_sort(self) -> Dict[str, Any]: """ Override it for each Minigame. """ return { "num_wins": -1, "earned": -1 }
[docs]class UnlockedModel(Model): """The Base Unlocked Model class which can be modified after creation. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ #: The name of the primary key. pk_field: str = "user_id" def __init__(self, user: discord.Member, *args, **kwargs): super().__init__(user, *args, **kwargs) if getattr(self, "_prefetched", False): return if existing := self._query_existing(): for key, val in existing.items(): setattr(self, key, val) self._prefetched = True else: self._default() self.save() def _query_existing(self): """ Send a MongoDB query to prepopulate the Model if a record exists. Can be overridden to implement custom logic. (eg. Lookups) """ return self.mongo.find_one({ self.pk_field: ( str(self.user.id) if self.pk_field == 'user_id' else getattr(self, self.pk_field) ) })
[docs] @expire_cache def reset(self): """ Resets a model for a particular user. """ self._default() kwargs = dict(self) self.update(**kwargs)
[docs] @expire_cache def save(self): super().save()
[docs] @expire_cache def update(self, **kwargs): """ Updates an existing unfrozen model. """ if not kwargs: return self.mongo.update_one( { self.pk_field: ( str(self.user.id) if self.pk_field == 'user_id' else getattr(self, self.pk_field) ) }, { "$set": kwargs } ) for key, val in kwargs.items(): setattr(self, key, val)
def _default(self): """ The default values to be used for init. """ raise NotImplementedError
[docs]class UnboundModel(Model): """ A special subset of Models which don't have a user associated with them. :param pk_value: The value of the primary key. :type pk_value: Optional[Any] """ #: The name of the primary key. pk_field: str = "_id" no_uinfo = True uid_fields = [] def __init__(self, *args, **kwargs): pk_value = kwargs.get(self.pk_field) setattr(self, self.pk_field, pk_value) super().__init__(None, *args, **kwargs) default_flag = False if pk_value is not None and not getattr(self, "_prefetched", False): if existing := self._query_existing(): for key, val in existing.items(): setattr(self, key, val) self._prefetched = True else: default_flag = True elif getattr(self, "_prefetched", False): default_flag = False elif hasattr(self, "_default"): default_flag = True if default_flag: self._default() def _query_existing(self): return self.mongo.find_one({ self.pk_field: getattr(self, self.pk_field) })
[docs] def drop(self): """ Overriden Drop method to disable it. """ raise MethodNotAllowed( "Unbound Models are not bound to a user." )
[docs] def save(self): """ Overriden Save method to pop the user field. """ save_data = dict(self) save_data.pop('user', None) self.mongo.insert_one(save_data)
[docs]class TaskModel(UnboundModel): """ A special subset of UnboundModels representing automated tasks. """ def __init__(self, *args, **kwargs): super().__init__(None, *args, **kwargs)
# endregion # region Unlocked Models
[docs]class Boosts(UnlockedModel): """Wrapper for Permanent Boosts based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ def _default(self): self.user_id: str = str(self.user.id) self.lucky_looter: int = 0 self.loot_lust: int = 0 self.fortune_burst: int = 0 self.flipster: int = 0
[docs] @classmethod def reset_all(cls: Type[Boosts]): """ Resets the Boosts collection. """ cls.mongo.update_many( {"user_id": {"$exists": True}}, {"$set": { "lucky_looter": 0, "loot_lust": 0, "fortune_burst": 0, "flipster": 0 }} )
[docs]class Loots(UnlockedModel): """Wrapper for Loots based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ def _default(self): self.user_id: str = str(self.user.id) self.tier: int = 1 self.earned: int = 0 self.daily_claimed_on: datetime = ( datetime.now() - timedelta(days=1) ) self.daily_streak: int = 0
[docs] @classmethod def reset_all(cls: Type[Loots]): """ Resets the Loots collection. """ cls.mongo.update_many( {"user_id": {"$exists": True}}, {"$set": { "tier": 1, "earned": 0, "daily_claimed_on": datetime.now() - timedelta( days=1 ), "daily_streak": 0 }} )
[docs]class Profiles(UnlockedModel): """Wrapper for Profiles based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ # pylint: disable=access-member-before-definition def __init__(self, user: discord.Member): self.excludes = ['full_info'] super().__init__(user) names = [self.user.name, self.name] if 'nick' in dir(self.user) and self.user.nick: names.append(self.user.nick) self.name = min( names, key=lambda x: ( sum(ord(ch) for ch in x), len(x) ) ) if 'guild' in dir(self.user) and self.user.guild.id == int( os.getenv('OFFICIAL_SERVER') ): if all([ "dealers" in [ role.name.lower() for role in user.roles ], not self.is_dealer ]): self.update(is_dealer=True) elif all([ "dealers" not in [ role.name.lower() for role in user.roles ], self.is_dealer ]): self.update(is_dealer=False) def __eq__(self, other: Profiles) -> bool: return self.user.id == other.user.id @property def full_info(self) -> Dict: """Get the full/consolidated info for the user. :return: The full info for the user. :rtype: Dict """ for collection in [Loots, Boosts]: if not collection(self.user).get(): collection(self.user).save() return next(self.mongo.aggregate([ { "$match": { "user_id": str(self.user.id) } }, { "$lookup": { "from": "loots", "localField": "user_id", "foreignField": "user_id", "as": "loots" } }, { "$lookup": { "from": "boosts", "localField": "user_id", "foreignField": "user_id", "as": "boosts" }, }, { "$unset": [ "_id", "loots._id", "boosts._id", "loots.user_id", "boosts.user_id", "user_info", "loots.user_info", "boosts.user_info" ] } ]))
[docs] @expire_cache def credit(self, amount: int, bonds: bool = False): """Shorthand method to credit user\'s balance and won_chips. :param amount: The amount to credit to the balance. :type amount: int :param bonds: Currency type is Pokebonds? :type bonds: bool """ if bonds: self.update( balance=self.balance + (amount * 10), pokebonds=self.pokebonds + amount ) else: self.update( balance=self.balance + amount, won_chips=self.won_chips + amount )
[docs] @expire_cache def debit(self, amount: int, bonds: bool = False): """Shorthand method to debit user\'s balance and won_chips. :param amount: The amount to debit from the balance. :type amount: int :param bonds: Currency type is Pokebonds? :type bonds: bool """ if bonds: self.update( balance=self.balance - (amount * 10), pokebonds=self.pokebonds - amount ) else: self.update( balance=self.balance - amount, won_chips=self.won_chips - amount )
[docs] def get_badges(self) -> List[str]: """Computes the Badges unlocked by the user. :return: The list of badges unlocked by the user. :rtype: List[str] """ definitions = { "champion": ("num_wins", 1), "emperor": ("balance", 101), "funder": ("pokebonds", 1) } badges = [ key for key, val in definitions.items() if next( self.mongo.aggregate([ {"$sort": {val[0]: -1}}, {"$limit": 1}, {"$match": { "user_id": str(self.user.id), val[0]: { "$gte": val[1] } }} ]), False ) ] if self.is_dealer: badges.append("dealer") return badges
[docs] def get_rank(self) -> int: """Get the user\'s rank in the leaderboard. :return: The user\'s rank in the leaderboard. :rtype: int """ res = self.mongo.aggregate([ { "$match": {"num_wins": {"$gte": 1}} }, { "$sort": { "num_wins": -1, "num_matches": -1, "balance": -1 } }, {"$limit": 20}, { "$group": { "_id": 0, "users": { "$push": { "_id": "$user_id", "num_wins": "$num_wins", "num_matches": "$num_matches", "balance": "$balance" } } } }, { "$unwind": { "path": "$users", "includeArrayIndex": "rank" } }, { "$match": { "users._id": str(self.user.id) } }, {"$project": { "rank": {"$add": ["$rank", 1]} }} ]) return next(res, {"rank": 0})["rank"]
[docs] @classmethod def get_all( cls: Type[Profiles], ids_only: bool = False ) -> List[Dict]: """DB query to get all whitelisted profiles. :param ids_only: Return only the user IDs? :type ids_only: bool :return: The list of whitelisted profiles. :rtype: List[Dict] """ pipeline = [ { "$lookup": { "from": "blacklist", "localField": "user_id", "foreignField": "user_id", "as": "blacklist" } }, { "$match": { "blacklist": {"$eq": []} } } ] if ids_only: pipeline.append({ "$project": { "user_id": "$user_id" } }) for result in cls.mongo.aggregate(pipeline): if ids_only: yield int(result.get("user_id", 0)) else: yield result
[docs] @classmethod def get_leaderboard( cls: Type[Profiles], sort_by: List[str] ) -> List[Dict]: """Get the global leaderboard of PokeGambler. :param sort_by: The fields to sort the leaderboard by. :type sort_by: List[str] :return: The leaderboard. :rtype: List[Dict] """ yield from cls.mongo.aggregate([ { "$match": {"num_wins": {"$gte": 1}} }, { "$sort": { field: -1 for field in sort_by } } ])
[docs] @classmethod def reset_all(cls: Type[Profiles]): """ Resets all the Profiles. """ cls.mongo.update_many( {"user_id": {"$exists": True}}, {"$set": { "balance": 100, "num_matches": 0, "num_wins": 0, "pokebonds": 0, "won_chips": 100, "background": None, "embed_color": None }} )
def _default(self): init_dict = { "user_id": str(self.user.id), "name": self.user.name, "balance": 100, "num_matches": 0, "num_wins": 0, "pokebonds": 0, "won_chips": 100, "is_dealer": "dealers" in [ role.name.lower() for role in self.user.roles ], "background": None, "embed_color": None } for key, val in init_dict.items(): setattr(self, key, val)
[docs]class Votes(UnlockedModel): """ .. _Votes: https://top.gg/bot/873569713005953064/vote Wrapper for `Votes`_ based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` """ def _default(self): self.user_id: str = str(self.user.id) self.last_voted: datetime = ( datetime.now() - timedelta(days=1) ) self.total_votes: int = 0 self.vote_streak: int = 0 self.reward_claimed: bool = False
[docs] @classmethod def most_active_voter(cls: Type[Votes]) -> Dict: """Get the most active voter. :return: The most active voter. :rtype: Dict """ res = cls.mongo.aggregate([ { "$sort": {"total_votes": -1} }, {"$limit": 1}, { "$project": { "_id": "$user_id", "total_votes": "$total_votes" } } ]) return next(res, None)
[docs] @classmethod def reset_all(cls: Type[Votes]): """ Resets the Votes collection. """ cls.mongo.update_many( {"user_id": {"$exists": True}}, {"$set": { "last_voted": datetime.now() - timedelta( days=1 ), "total_votes": 0, "vote_streak": 0, "reward_claimed": False }} )
@classmethod @property def votes_count(cls: Type[Votes]) -> int: """Get the total number of votes. :return: The total number of votes. :rtype: int """ return next( cls.mongo.aggregate([ {"$group": {"_id": None, "count": {"$sum": "$total_votes"}}} ]), {"count": 0} )["count"]
# endregion # region Unbound Models
[docs]class Webshop(UnboundModel, UnlockedModel): """Wrapper for Webshop Model. :param name: The name of the item. :type name: str :param description: The description of the item. :type description: str :param image: The image of the item. :type image: str :param price: The price of the item. :type price: float :param offer_price: Any special offer price for the item. :type offer_price: float :param reward_pokechips: The amount of Pokechips held by the item. :type reward_pokechips: int :param reward_pokebonds: The amount of Pokebonds held by the item. :type reward_pokebonds: int :param reward_items: The ingame Items held by the item. :type reward_items: List[:class:`~.items.Item`] :param meta: Metadata Dictionary :type meta: Dict[str, bool] """ pk_field: str = "name" no_uinfo: bool = True read_only: bool = True uid_fields: List[str] = [] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # pylint: disable=import-outside-toplevel import urllib.parse self.image = urllib.parse.urljoin( 'https://pokegambler.vercel.app/', self.image ) def _query_existing(self): rwd_itm_query = { '$map': { 'input': '$reward_items', 'as': 'first', 'in': { '$mergeObjects': [ '$$first', { '$arrayElemAt': [ { '$filter': { 'input': '$items', 'as': 'second', 'cond': { '$eq': [ '$$second._id', '$$first.itemid' ] } } }, 0 ] } ] } } } def to_item(item): quantity = item.pop('quantity', None) item_item = Item.from_dict(item) return { "item": item_item, "quantity": quantity } existing = next(self.mongo.aggregate([ { "$match": { "name": self.name } }, { '$addFields': { 'discount': { '$round': [ { '$subtract': [ '$price', '$offer_price' ] }, 2 ] }, 'id': '$_id' } }, { '$unset': '_id' }, { '$lookup': { 'from': 'items', 'localField': 'reward_items.itemid', 'foreignField': '_id', 'as': 'items' } }, { '$set': { 'reward_items': rwd_itm_query } }, { '$unset': 'items' } ]), None) if existing: existing["reward_items"] = [ to_item(item) for item in existing["reward_items"] ] return existing def _default(self): self.name: str = "" self.description: str = "" self.image: str = "" self.price: float = 0.0 self.offer_price: float = 0.0 self.reward_pokechips: int = 0 self.reward_pokebonds: int = 0 self.reward_items: List[Item] = [] self.meta: Dict[str, bool] = { "has_currency": False, "is_bundle": False, "ready_for_sale": False }
# endregion # region Minigames
[docs]class Duels(Minigame): """Wrapper for duels based DB actions :param user: The user to map the collection to. :type user: :class:`discord.Member` :param gladiator: The gladiator used by the user. :type gladiator: Optional[str] :param opponent: The opponent for the Duel. :type opponent: Optional[:class:`discord.Member`] :param opponent_gladiator: The gladiator of the opponent. :type opponent_gladiator: Optional[str] :param won: The ID of the winner of the Duel. :type won: Optional[str] :param cost: The cost of the Duel., default is 50. :type cost: Optional[int] """ uid_fields = [ ("played_by", str), ("opponent", dict) ] def __init__( self, user: discord.Member, gladiator: Optional[str] = None, opponent: Optional[discord.Member] = None, opponent_gladiator: Optional[str] = None, won: Optional[str] = None, cost: int = 50 ): super().__init__(user) self.played_at = datetime.now() self.played_by = str(user.id) self.gladiator = gladiator self.opponent = to_dict(opponent) self.opponent_gladiator = opponent_gladiator self.cost = cost self.won = won
[docs] def get_plays(self, wins: bool = False) -> List[Dict]: """Returns list of duels played/won by the user. :param wins: Whether to get the list of wins or plays. :type wins: bool :return: The list of plays. :rtype: List[Dict] """ filter_ = { "played_by": str(self.user.id) } if wins: filter_["won"] = str(self.user.id) return list(self.mongo.aggregate([ { "$match": filter_ } ]))
def _get_lb_group(self): return { "_id": "$played_by", "num_wins": { "$sum": { "$toInt": {"$eq": ["$won", "$played_by"]} } }, "num_matches": {"$sum": 1}, "cumm_cost": {"$sum": "$cost"} }
[docs]class Flips(Minigame): """Wrapper for Quickflips based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param cost: The cost of the flip. :type cost: int :param won: Did the user win the flip? :type won: bool """ uid_fields = [("played_by", str)] def __init__( self, user: discord.Member, cost: int = 50, won: bool = False ): super().__init__(user) self.played_at = datetime.now() self.played_by = str(user.id) self.cost = cost self.won = won self.uid_fields = ["played_by"]
[docs]class Moles(Minigame): """Wrapper for Whackamole based DB actions. :param user: The user to map the collection to. :type user: :class:`discord.Member` :param cost: The cost of the mole. :type cost: int :param level: The level of the mole. :type level: int :param won: Did the user win the mole? :type won: bool """ uid_fields = [("played_by", str)] def __init__( self, user: discord.Member, cost: int = 50, level: int = 1, won: bool = False ): super().__init__(user) self.played_at = datetime.now() self.played_by = str(user.id) self.cost = cost self.level = level self.won = won self.uid_fields = ["played_by"] def _get_lb_group(self) -> Dict[str, Any]: return { **super()._get_lb_group(), "avg_lvl": {"$avg": "level"} } def _get_lb_sort(self) -> Dict[str, Any]: return { **super()._get_lb_sort(), "avg_lvl": -1 }
# endregion # region Tasks
[docs]class Checkpoints(TaskModel): """Wrapper for Daily Checkpoints Model. :param ctx: The PokeGambler client. :type ctx: :class:`bot.PokeGambler` """ def __init__(self, ctx: PokeGambler, *args, **kwargs): super().__init__(*args, **kwargs) self.created_on = datetime.now() #: The number of profiles created till this checkpoint. self.num_profiles = Profiles.count() #: The number of guilds the bot is in. self.num_guilds = len(ctx.guilds) #: The number of commands used till this checkpoint. self.num_commands = CommandData.count() #: The number of votes received till this checkpoint. self.num_votes = Votes.votes_count
[docs] @classmethod def get_checkpoints( cls, start_time: datetime = None, end_time: datetime = None ) -> List[Dict]: """Get the checkpoints between the given times. :param start_time: The start time of the checkpoints. :type start_time: datetime :param end_time: The end time of the checkpoints. :type end_time: datetime :return: The checkpoints. :rtype: List[Dict] """ if start_time is None: start_time = datetime(2021, 1, 1) if end_time is None: end_time = datetime.now() return list(cls.mongo.aggregate([ { "$match": { "created_on": { "$gte": start_time, "$lte": end_time } } }, { "$sort": { "created_on": 1 } }, { "$project": { "_id": 0, "created_on": 1, "num_profiles": 1, "num_guilds": 1, "num_commands": 1, "num_votes": 1 } } ]))
[docs]class Nitro(TaskModel): """Wrapper for Nitro Reward records. :param boosters: The list of the nitro boosters. :type boosters: List[:class:`discord.Member`] :param rewardboxes: The list of IDs of nitro reward boxes. :type rewardboxes: List[str] """ uid_fields = [('boosters', list)] def __init__( self, *args, boosters: List[discord.Member] = None, rewardboxes: List[str] = None, **kwargs ): super().__init__(*args, **kwargs) self.last_rewarded = datetime.now() self.boosters = [ to_dict(user) for user in boosters ] if boosters else None self.rewardboxes = rewardboxes
[docs] @classmethod def get_last_rewarded(cls) -> datetime: """Returns the last time the users were rewarded. :return: Last time the users were rewarded. """ pipeline = [ {"$sort": {"last_rewarded": -1}}, {"$limit": 1} ] return next( cls.mongo.aggregate(pipeline), {'last_rewarded': datetime.utcnow() - timedelta(days=31)} )['last_rewarded']
# endregion # endregion