Source code for scripts.base.items

"""
PokeGambler - A Pokemon themed gambling bot for Discord.
Copyright (C) 2021 Harshith Thota

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.
----------------------------------------------------------------------------

This module contains schematics for all the items in the Pokegambler world.
"""

# pylint: disable=no-member, unused-argument
# pylint: disable=too-many-instance-attributes

from __future__ import annotations

import os
import random
import re
from abc import ABC
from dataclasses import dataclass, field
from datetime import datetime
from functools import total_ordering
from hashlib import md5
from io import BytesIO
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type

import certifi
from dotenv import load_dotenv
from PIL import Image
from pymongo import MongoClient

# pylint: disable=cyclic-import
from ..helpers.utils import dedent, get_embed

if TYPE_CHECKING:
    from aiohttp import ClientSession
    from discord import Embed

load_dotenv()

DB_CLIENT = MongoClient(
    os.getenv("MONGO_CLUSTER_STRING"),
    tlsCAfile=certifi.where()
).pokegambler


# region Base Classes
[docs]@dataclass class Item(ABC): """Any object which exists in the world of PokeGambler. :param description: A description of the item. :type description: str :param category: The category of the item. :type category: str :param asset_url: The URL of the item's asset. :type asset_url: str :param emoji: The emoji of the item. :type emoji: str :param buyable: Whether the item can be bought. :type buyable: bool :param sellable: Whether the item can be sold. :type sellable: bool :param price: The price of the item. :type price: Optional[int] :param premium: Whether the item is premium. :type premium: bool """ description: str category: str asset_url: str emoji: str buyable: bool = True sellable: bool = True price: Optional[int] = None premium: bool = False created_on: field( default_factory=datetime ) = datetime.now() # MongoDB Client mongo = DB_CLIENT.items def __eq__(self, other: Item) -> bool: return self.name == other.name def __hash__(self) -> int: return hash(self.name) def __iter__(self) -> Tuple: for attr in self.attrs: yield (attr, getattr(self, attr)) def __post_init__(self): self.itemid = md5( str(datetime.utcnow()).encode() ).hexdigest()[:8] self.attrs = ( "itemid", "name", "description", "category", "asset_url", "emoji", "buyable", "sellable", "price", "premium" ) def __repr__(self) -> str: attr_str = ',\n '.join( f"{attr}={getattr(self, attr)}" for attr in self.attrs ) return f"{self.__class__.__name__}(\n {attr_str}\n)" def __str__(self) -> str: return ' '.join( re.findall( r'[A-Z](?:[a-z]+|[A-Z]*(?=[A-Z]|$))', self.__class__.__name__ ) )
[docs] async def get_image(self, sess: ClientSession) -> Image.Image: """Downloads and returns the image of the item. :param sess: An aiohttp ClientSession object. :type sess: :class:`aiohttp.ClientSession` :return: The image of the item. :rtype: :class:`PIL.Image.Image` """ byio = BytesIO() async with sess.get(self.asset_url) as resp: data = await resp.read() byio.write(data) byio.seek(0) return Image.open(byio)
[docs] def delete(self): """ Deletes the Item from the Collection. """ self.mongo.delete_one({"_id": self.itemid})
[docs] def save(self): """ Saves the Item to the Collection. Sets the itemid of the Item after saving. """ attrs = dict(self) attrs["_id"] = attrs.pop("itemid") attrs["created_on"] = datetime.now() self.mongo.insert_one(attrs)
[docs] def update( self, modify_all: Optional[bool] = False, **kwargs ): """Updates an existing item. :param modify_all: Modify all copies of the item, defaults to False :type modify_all: Optional[bool] """ if not kwargs: return for key, val in kwargs.items(): setattr(self, key, val) filter_ = {'_id': self.itemid} if modify_all: filter_ = {'name': self.name} self.mongo.update_many( filter_, {'$set': kwargs} )
@property def name(self) -> str: """Returns the name of the Item. :return: The name of the Item. :rtype: str """ return getattr(self, "_name", str(self)) @name.setter def name(self, value: str): """Sets the name of the Item. :param value: The name of the Item. :type value: str """ self._name = value @property def category_class(self) -> Type[Item]: """Returns the class of the Item's category. :return: The class of the Item's category. :rtype: :class:`~scripts.base.items.Item` """ return globals()[self.category.title()] @property def details(self) -> Embed: """Returns a rich embed containing full details of an item. :return: Full details of an item. :rtype: :class:`discord.Embed` """ emb = get_embed( content=f"『{self.emoji}』 **{self.description}**", title=f"Information for 『{self.name}』", image=self.asset_url, footer=f"Item Id: {self.itemid}" ) fields = [ "category", "buyable", "sellable", "premium" ] if self.category == "Tradable": fields.append("price") for field_ in fields: emb.add_field( name=field_.title(), value=f"**`{getattr(self, field_)}`**", inline=True ) return emb
[docs] @classmethod def from_id( cls: Type[Item], itemid: int, force_new: bool = False ) -> Item: """Returns an Item from the Collection base on its itemid. :param itemid: The id of the Item. :type itemid: int :param force_new: Force a new Item to be created, defaults to False. :type force_new: bool :return: The existing/newly created Item. :rtype: :class:`Item` """ item = cls.get(itemid) if not item: return None return cls._new_item(item, force_new=force_new)
[docs] @classmethod def bulk_from_id( cls: Type[Item], itemids: List[str], force_new: bool = False ) -> List[Item]: """Returns a list of Items from the Collection base on their itemids. :param itemids: The ids of the Items. :type itemids: List[str] :param force_new: Force a new Item to be created, defaults to False. :type force_new: bool :return: The existing/newly created Items. :rtype: List[:class:`Item`] """ new_items = [] item_dict_list = [] for item in cls.bulk_get(itemids): # Bulk Get returns a distinct list of items. for _ in range(itemids.count(item['_id'])): new_item = cls._new_item(item, force_new=False) attrs = dict(new_item) attrs.pop("itemid") attrs['_id'] = md5( str(datetime.utcnow()).encode() + random.randbytes(16) ).hexdigest()[:8] attrs["created_on"] = datetime.now() new_item.itemid = attrs['_id'] new_items.append(new_item) item_dict_list.append(attrs) if force_new: cls.insert_many(item_dict_list) return new_items
[docs] @classmethod def from_name( cls: Type[Item], name: str, force_new: bool = False ) -> Item: """Returns an Item from the Collection base on its name. :param name: The name of the Item. :type name: str :param force_new: Force a new Item to be created, defaults to False. :type force_new: bool :return: The existing/newly created Item. :rtype: :class:`Item` """ item = cls.mongo.find_one({ "name": { "$regex": f"^{name}", "$options": "i" } }) if not item: return None return cls._new_item(item, force_new=force_new)
[docs] @classmethod def from_dict( cls: Type[Item], data: Dict ) -> Item: """Returns an Item based on available data. :param data: The data of the Item. :type data: Dict :return: The prepopulated Item. :rtype: :class:`Item` """ return cls._new_item(data, force_new=False)
[docs] @classmethod def get(cls: Type[Item], itemid: str) -> Dict: """Returns an Item from the Collection base on its itemid. :param itemid: The id of the Item. :type itemid: str :return: The dictionary of the Item. :rtype: Dict """ return cls.mongo.find_one({"_id": itemid})
[docs] @classmethod def bulk_get(cls: Type[Item], itemids: List[str]) -> List[Dict]: """Returns a list of Items from the Collection base on their itemids. :param itemids: The ids of the Items. :type itemids: List[str] :return: The list of dictionaries of the Items. :rtype: List[Dict] """ return cls.mongo.find({"_id": {"$in": itemids}})
[docs] @classmethod def get_category(cls: Type[Item], item: Dict) -> Type[Item]: """Resolves category to handle chests differently. Returns the base Category of the Item. :param item: The Item to get the category from. :type item: Dict :return: The base Category of the Item. :rtype: Type[:class:`Item`] """ def catog_crawl(cls, catog_name): result = set() path = [cls] while path: parent = path.pop() for child in parent.__subclasses__(): if '.' not in str(child): # In a multi inheritance scenario, __subclasses__() # also returns interim-classes that don't have all the # methods. With this hack, we skip them. continue if child not in result: if child.__name__ == catog_name: result.add(child) path.append(child) return result category = [ catog for catog in cls.__subclasses__() if catog.__name__ == item["category"] ] if not category: category = catog_crawl(cls, item["category"].title()) category = next(iter(category)) return category
[docs] @classmethod def get_unique_items(cls) -> List[Dict]: """Gets all items with a unique name. :return: A list of all items with a unique name. :rtype: List[Dict] """ return list(cls.mongo.aggregate([ { "$match": { "category": {"$ne": "Chest"} } }, { "$group": { "_id": "$name", "items": {"$first": "$$ROOT"} } }, {"$replaceRoot": {"newRoot": "$items"}} ]))
[docs] @classmethod def latest( cls: Type[Item], limit: Optional[int] = 5 ) -> List[Dict]: """Returns the latest items added to the Collection. :param limit: The maximum number of items to return, defaults to 5. :type limit: Optional[int] :return: A list of the latest items added to the Collection. :rtype: List[Dict] """ return list( cls.mongo.aggregate([ { "$match": { "category": {"$ne": "Chest"} } }, { "$group": { "_id": "$name", "items": {"$first": "$$ROOT"} } }, {"$replaceRoot": {"newRoot": "$items"}}, {"$set": {"itemid": "$_id"}}, {"$unset": "_id"}, {"$sort": {"created_on": -1}}, {"$limit": limit} ]) )
[docs] @classmethod def insert_many(cls, items: List[Dict]): """Inserts many items into the Collection. :param items: A list of items to insert. :type items: List[Dict] """ for item in items: if "created_on" not in item: item["created_on"] = datetime.now() cls.mongo.insert_many(items)
[docs] @classmethod def list_items( cls: Type[Item], category: str = "Tradable", limit: Optional[int] = 5, premium: Optional[bool] = None ) -> List: """List items of a certain category. :param category: The category of the items to list, defaults to "Tradable". :type category: str :param limit: The maximum number of items to return, defaults to 5. :type limit: Optional[int] :param premium: If True, only return premium items, defaults to None. :type premium: Optional[bool] :return: A list of the items of the given category. :rtype: List """ filter_ = { 'category': category.title(), 'inventory': {'$eq': []} } if premium is not None: filter_['premium'] = premium pipeline = [ { "$lookup": { "from": "inventory", "localField": "_id", "foreignField": "itemid", "as": "inventory" } }, {"$match": filter_}, { "$group": { "_id": "$name", "items": {"$first": "$$ROOT"} } }, {"$replaceRoot": {"newRoot": "$items"}}, {"$limit": limit} ] items = list( cls.mongo.aggregate(pipeline) ) modded_items = [] for item in items: item["itemid"] = item.pop("_id") modded_items.append(item) return modded_items
[docs] @classmethod def purge(cls): """ Purges the Items collection. """ cls.mongo.delete_many({})
@classmethod def _new_item( cls: Type[Item], existing_item: Dict, force_new: bool = False ) -> Item: old_item = {**existing_item} category = cls.get_category(old_item) old_item.pop('category', None) item_id = old_item.pop('_id', None) itemid = old_item.pop('itemid', None) itemid = itemid or item_id new_item = type( "".join( word.title() for word in old_item.pop("name").split(" ") ), (category, ), old_item )(**old_item) if force_new: new_item.save() elif itemid: new_item.itemid = itemid else: new_item.__post_init__() return new_item
[docs]@dataclass(eq=False) class Treasure(Item): """ Any non-buyable :class:`Item` is considered a Treasure. It is unique to the user and cannot be sold either. """ def __init__(self, **kwargs): super().__init__( category=kwargs.pop( "category", "Treasure" ), **kwargs ) self.buyable: bool = False self.sellable: bool = False
[docs]@dataclass(eq=False) class Tradable(Item): """ Any buyable and sellable :class:`Item` is a Tradeable. It should have a fixed base price. """ def __init__(self, **kwargs): super().__init__( category=kwargs.pop( "category", "Tradable" ), **kwargs ) self.price: int = kwargs['price']
[docs]@dataclass(eq=False) class Collectible(Item): """ Collectibles are sellable variants of :class:`Treasure`. They cannot be bought off the market but can be traded among users. """ def __init__(self, **kwargs): super().__init__( category=kwargs.pop( "category", "Collectible" ), **kwargs ) self.buyable: bool = False
[docs]@dataclass(eq=False) class Consumable(Tradable): """ :class:`Item` buyable from Shop but can't be sold back. """ def __init__(self, **kwargs): super().__init__( category=kwargs.pop( "category", "Consumable" ), **kwargs ) self.sellable: bool = False
# endregion # region Chests
[docs]@total_ordering class Chest(Treasure): """ Chests are spawnable :class:`Treasure` which contain \ Pokechips based on tiers. :param description: A description of the chest. :type description: str :param asset_url: The URL of the chest's asset. :type asset_url: str :param emoji: The emoji of the item. :type emoji: str :param tier: The tier of the chest. :type tier: int """ def __init__( self, description: str, asset_url: str, emoji: str, tier: int = 1, **kwargs ): super().__init__( description=description, asset_url=asset_url, emoji=emoji ) self.category = "Chest" self.tier: int = tier self.attrs += ('tier', ) def __eq__(self, other: Chest): return self.chips == other.chips def __ge__(self, other: Chest): return self.chips >= other.chips @property def chips(self) -> int: """Get a random amount of tier-scaled pokechips. :return: Number of Pokechips. :rtype: int """ scale = int(5.7735 ** (self.tier + 1)) rand_val = random.randint(scale, scale * 9) # To make sure lower tier chests are always less worthy rand_val = max(rand_val, int(5.7735 ** self.tier) * 9) return min(rand_val, int(5.7735 ** (self.tier + 2)))
[docs] @classmethod def get_chest(cls: Type[Chest], tier: int) -> Chest: """Get a specified tier Chest. :param tier: The tier of the Chest. :type tier: int :return: A Chest of the specified tier. :rtype: :class:`Chest` """ chests = [CommonChest, GoldChest, LegendaryChest] return chests[tier - 1]()
[docs] @classmethod def get_random_chest(cls: Type[Chest]) -> Chest: """Get a random tier Chest with weight of (90, 35, 12) for common, gold and legendary resp. :return: A Chest of a random tier. :rtype: :class:`Chest` """ chest_class = random.choices( cls.__subclasses__(), k=1, weights=(90, 35, 12) )[0] return chest_class()
[docs] def get_random_collectible(self) -> Collectible: """Get a random :class:`Collectible` with chance based on chest tier. .. note:: * Common Chest - 0% * Gold Chest - 25% * Legendary Chest - 50% :return: A random collectible. :rtype: :class:`Collectible` """ chance = (self.tier - 1) * 0.25 proc = random.uniform(0.1, 0.99) if proc >= chance: return None collectibles = Item.list_items("Collectible", limit=20) if not collectibles: return None col_dict = random.choice(collectibles) return Item.from_id(col_dict["itemid"])
[docs] @classmethod def get_items(cls: Type[Chest], chest_id: str) -> List[Item]: """Get a list of all :class:`Chest` items. :return: A list of all :class:`Chest` items. :rtype: List[:class:`Item`] """ chest: Chest = Item.from_id(chest_id) return [chest.get_random_collectible()]
[docs]class CommonChest(Chest): """ Lowest Tier :class:`Chest`. Chips scale in 100s. """ def __init__(self, **kwargs): description: str = dedent( """A Tier 1 chest which hs chips in the range of a few hundreds. Does not contain any other items.""" ) asset_url: str = "https://cdn.discordapp.com/attachments/" + \ "874623706339618827/874628500437467196/common.png" emoji: str = "<:common:874626457438158868>" tier: int = 1 super().__init__( description=description, asset_url=asset_url, emoji=emoji, tier=tier )
[docs]class GoldChest(Chest): """ Mid tier :class:`Chest`. Chips scale in high-hundreds to low-thousands. """ def __init__(self, **kwargs): description: str = dedent( """A Tier 2 chest which has chips in the range of high-hundreds to low-thousands. Does not contain any other items.""" ) asset_url: str = "https://cdn.discordapp.com/attachments/" + \ "874623706339618827/874628501876137984/gold.png" emoji: str = "<:gold:874626456993534042>" tier: int = 2 super().__init__( description=description, asset_url=asset_url, emoji=emoji, tier=tier )
[docs]class LegendaryChest(Chest): """ Highest Tier :class:`Chest`. Chips scale in the thousands. Legendary Chests have a small chance of containing :class:`Collectible`. """ def __init__(self, **kwargs): description: str = dedent( """A Tier 3 chest which has chips in the range of thousands. Has a small chance of containing [Collectible]s.""" ) asset_url: str = "https://cdn.discordapp.com/attachments/" + \ "874623706339618827/874628502924693584/legendary.png" emoji: str = "<:legendary:874626456918061096>" tier: int = 3 super().__init__( description=description, asset_url=asset_url, emoji=emoji, tier=tier )
# endregion # region Inherited Classes
[docs]@dataclass(eq=False) class Gladiator(Consumable): """ Minions that can fight in Gladiator matches. """ def __init__(self, **kwargs): kwargs.pop('category', None) super().__init__(category="Gladiator", **kwargs)
[docs] def rename(self, name: str): """Rename a :class:`Gladiator`. :param name: The new name of the :class:`Gladiator`. :type name: str """ self.name = name self.update(name=name)
[docs]@dataclass(eq=False) class Lootbag(Treasure): """ Lootbags contain other items inside them. Premium Lootbags can also contain Premium Items. """ def __init__( self, chips: Optional[int] = None, items: Optional[List[int]] = None, **kwargs ): super().__init__( category=kwargs.pop( "category", "Lootbag" ), **kwargs ) self._chips = chips self.items = items or [] self.attrs += ("chips", "items") @property def chips(self) -> int: """Return random amount of Pokechips in following ranges: .. note:: * Normal => [100, 499] * Premium => [500, 1000] :return: An amount of Pokechips. :rtype: int """ if self._chips: return self._chips limits = [500, 1000] if self.premium else [100, 499] return random.randint(*limits) @chips.setter def chips(self, value: int): """Set the amount of Pokechips. :param value: The amount of Pokechips. :type value: int """ self._chips = value
[docs] def get_random_items( self, categories: Optional[List[str]] = None, count: Optional[int] = 3 ) -> List[Item]: """Get a random :class:`Item` from a list of categories. :param categories: A list of categories to choose from. :type categories: Optional[List[str]] :param count: The amount of items to choose., default is 3. :type count: Optional[int] :return: A list of items. :rtype: List[:class:`Item`] """ pipeline = [ { "$group": { "_id": "$name", "items": { "$last": "$$ROOT" } } }, { "$group": { "_id": "$items.category", "items": { "$push": "$items" } } } ] matches = { "category": { "$nin": ["Chest", "Lootbag", "Rewardbox"] } } if categories: matches["category"] = { "$in": categories } if not self.premium: matches["premium"] = False if matches: pipeline.insert(0, {"$match": matches}) results = list(self.mongo.aggregate(pipeline)) random.shuffle(results) rand_items = [] premium_added = False num_misses = 0 while len(rand_items) <= count: for catog in results: try: if self.premium and not premium_added: itm_dict = random.choices( catog["items"], k=1, weights=[ int(item["premium"]) for item in catog["items"] ] )[0] premium_added = True else: itm_dict = random.choices( catog["items"], k=1, weights=[ -int(item["premium"]) for item in catog["items"] ] )[0] rand_items.append(itm_dict) catog["items"].remove(itm_dict) except IndexError: if num_misses < len(results): num_misses += 1 continue break return [ Item.from_id(itm_dict["_id"]) for itm_dict in rand_items ]
[docs] @classmethod def get_items(cls, bagid: int) -> List[Item]: """Get items stored in a :class:`Lootbag`. :param boxid: The itemid of the :class:`Lootbag`. :type boxid: int :return: A list of stored items. :rtype: List[:class:`Item`] """ bag: Lootbag = Item.from_id(bagid) if not bag.items: return bag.get_random_items() return [ Item.from_id(item, force_new=True) for item in bag.items ]
[docs]@dataclass(eq=False) class Rewardbox(Treasure): """ :class:`Lootbag` with fixed items and pokechips. """ def __init__( self, chips: Optional[int] = None, items: Optional[List[int]] = None, **kwargs ): super().__init__( category=kwargs.pop( "category", "Rewardbox" ), **kwargs ) self.items = items self.chips = chips self.attrs += ("chips", "items")
[docs] @classmethod def get_items(cls, boxid: int) -> List[Item]: """Get items stored in a :class:`Rewardbox`. :param boxid: The itemid of the :class:`Rewardbox`. :type boxid: int :return: A list of stored items. :rtype: List[:class:`Item`] """ return [ Item.from_id(item, force_new=True) for item in Item.from_id(boxid).items ]
# endregion