"""
PokeGambler - A Pokemon themed gambling bot for Discord.
Copyright (C) 2021 Harshith Thota
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
----------------------------------------------------------------------------
This module is a compilation of all in-game Shop related classes.
"""
# pylint: disable=too-few-public-methods, unused-argument
# pylint: disable=invalid-overridden-method, arguments-differ
# pylint: disable=too-many-instance-attributes
from __future__ import annotations
from abc import abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from queue import Queue
from typing import TYPE_CHECKING, Dict, List, Optional, Type, Union
from discord.errors import Forbidden, HTTPException
from ..base.items import DB_CLIENT, Item
from ..base.models import Boosts, Inventory, Loots, Profiles
from ..helpers.utils import get_embed
if TYPE_CHECKING:
from discord import Member, Message
[docs]class Listing(Queue):
"""A dynamically flowing Queue with the provision
for pinning items from removal.
:param items: An optional list of items to be added to the queue.
:type items: List[ShopItem]
:param maxsize: The maximum size of the queue., default is 5.
:type maxsize: Optional[int]
"""
def __init__(
self, items: Optional[List[ShopItem]] = None,
maxsize: Optional[int] = 5
):
if not items:
items = []
super().__init__(
maxsize=max(maxsize, len(items))
)
self.register(items)
def __iter__(self) -> ShopItem:
"""
Iterator with a LIFO order and lower precedence
for pinned items.
"""
def get_rank(item):
price = item.price
return (
getattr(
item, "created_on",
self.queue.index(item)
),
price
)
yield from sorted(
self.queue,
key=lambda item: (
[True, False].index(
getattr(item, "pinned", False)
),
get_rank(item)
),
reverse=True
)
def __len__(self) -> int:
return len(self.queue)
def __repr__(self) -> str:
return f"Listing({list(self)})"
@property
def name_id_map(self) -> Dict[str, str]:
"""Returns a mapping between item names and their ids.
:return: A dictionary of item names and their ids.
:rtype: Dict[str, str]
"""
return {
item.name: item.itemid
for item in self
}
[docs] def fetch(self) -> ShopItem:
"""Pops out an item in FIFO order.
:return: The popped item.
:rtype: :class:`ShopItem`
"""
return super().get_nowait()
[docs] def register(
self, items: Union[ShopItem, List[ShopItem]]
):
"""Adds an item/list of items to the queue.
:param items: An item or list of items to be added to the queue.
:type items: Union[:class:`ShopItem`, List[:class:`ShopItem`]]
"""
if not items:
return
if not isinstance(items, list):
items = [items]
for item in items:
if not self.full():
super().put_nowait(item)
continue
removable = next(
(
existing_item
for existing_item in self.queue
if not getattr(existing_item, "pinned", False)
),
None,
)
if not removable:
return
self.queue.remove(removable)
super().put_nowait(item)
[docs]@dataclass
class ShopCategory:
"""The different categories of PokeGambler Shop.
:param name: The name of the category.
:type name: str
:param description: The description for the category.
:type description: str
:param emoji: The emoji for the category.
:type emoji: str
:param items: A list of items in the category.
:type items: :class:`Listing`
"""
name: str
description: str
emoji: str
items: Listing
def __str__(self) -> str:
return f"『{self.emoji}』 {self.name}"
[docs] def copy(self) -> ShopCategory:
"""Returns a copy of itself (to prevent mutation).
:return: A copy of itself.
:rtype: :class:`ShopCategory`
"""
return self.__class__(
name=self.name,
description=self.description,
emoji=self.emoji,
items=Listing(list(self.items))
)
[docs]@dataclass
class ShopItem:
"""Base class for any item visible in the PokeGambler Shop.
:param itemid: The id of the item.
:type itemid: str
:param name: The name of the item.
:type name: str
:param description: The description for the item.
:type description: str
:param price: The price of the item.
:type price: int
:param emoji: The emoji for the item.
:type emoji: str
:param pinned: Whether the item is pinned in the Shop.
:type pinned: bool
"""
itemid: str
name: str
description: str
price: int
emoji: str = ""
pinned: bool = False
# Safely ignore these kwargs while casting from Item
category: field(default_factory=str) = "Tradable"
asset_url: field(default_factory=str) = ""
buyable: field(default_factory=bool) = True
sellable: field(default_factory=bool) = True
premium: field(default_factory=bool) = False
max_stack: field(default_factory=int) = 0
color: field(default_factory=int) = 0
def __str__(self) -> str:
return (
f"{self.name} " if not self.emoji
else f"{self.name} 『{self.emoji}』"
)
[docs] @abstractmethod
def buy(self, **kwargs) -> str:
"""
Every ShopItem should have a buy action.
"""
[docs] def debit_player(
self, user: Member,
quantity: Optional[int] = 1,
premium: bool = False
):
"""Debits from the player, the price of the item.
:param user: The user to debit from.
:type user: :class:`discord.Member`
:param quantity: The quantity of items to debit., default is 1.
:type quantity: Optional[int]
:param premium: Whether the item is premium.
:type premium: bool
"""
amount = self.price * quantity
bonds = False
if premium:
amount //= 10
bonds = True
Profiles(user).debit(
amount=amount, bonds=bonds
)
[docs]class BoostItem(ShopItem):
"""
This class represents a purchasable temporary boost.
"""
[docs] async def buy(
self, message: Message,
quantity: int = 1, **kwargs
) -> str:
"""Applies the relevant temporary boost to the user.
:param message: The message which triggered the boost purchase.
:type message: :class:`discord.Message`
:param quantity: The number of stacks of boost to buy.
:type quantity: int
:return: A success/error message.
:rtype: str
"""
user = message.author
tier = Loots(user).tier
boost_dict = self._get_tempboosts(user)
if any([
boost_dict["stack"] == 5,
self._check_lootlust(boost_dict, user, quantity)
]):
return (
"You've maxed out to 5 stacks for this boost."
if quantity == 1
else "You can't puchase that many "
"as it exceeds max stack of 5."
)
self.__boost_handler(boost_dict, user, quantity)
Profiles(user).debit(
amount=((self.price * (10 ** (tier - 1))) * quantity)
)
return "success"
[docs] @classmethod
def get_boosts(
cls: Type[BoostItem], user_id: str
) -> Dict:
"""Returns a list of all the temporary boosts for the user.
:param user_id: The id of the user.
:type user_id: str
:return: A list of all the temporary boosts for the user.
:rtype: Dict
"""
return {
boost.itemid: DB_CLIENT["tempboosts"].find_one({
"user_id": user_id,
"boost_id": boost.itemid
}) or {
"stack": 0,
"name": boost.name,
"description": boost.description,
"added_on": datetime.now()
}
for boost in Shop.categories["Boosts"].items
}
[docs] @classmethod
def default_boosts(cls: Type[BoostItem]) -> Dict:
"""Returns the default temporary boosts dictionary.
:return: A dictionary of default temporary boosts.
:rtype: Dict
"""
return {
item.itemid: {
"stack": 0,
"name": item.name,
"description": item.description,
"added_on": datetime.now()
}
for item in Shop.categories["Boosts"].items
}
def __boost_handler(
self, boost_dict: Dict,
user: Member, quantity: int
):
boost_dict["stack"] += quantity
DB_CLIENT["tempboosts"].create_index(
"added_on",
expireAfterSeconds=30 * 60
)
DB_CLIENT["tempboosts"].update_one(
{"user_id": str(user.id), "boost_id": self.itemid},
{"$set": boost_dict},
upsert=True
)
@staticmethod
def _check_lootlust(
boost_dict: Dict,
user: Member, quantity: int
) -> bool:
if boost_dict["boost_id"] not in (
"boost_lt_cd", "boost_pr_lt_cd"
):
return False
return sum([
Boosts(user).get("loot_lust"),
quantity,
boost_dict["stack"]
]) > 5
def _get_tempboosts(self, user: Member) -> Dict:
return DB_CLIENT["tempboosts"].find_one({
"user_id": str(user.id),
"boost_id": self.itemid
}) or {
"user_id": str(user.id),
"boost_id": self.itemid,
"added_on": datetime.utcnow(),
"name": self.name,
"description": self.description,
"stack": 0,
}
[docs]class PremiumBoostItem(BoostItem):
"""
Permanent Boosts purchasable from Premium Shop
"""
[docs] def buy(
self, message: Message,
quantity: Optional[int] = 1,
**kwargs
) -> str:
"""Applies the relevant permanent boost to the user.
:param message: The message which triggered the boost purchase.
:type message: :class:`discord.Message`
:param quantity: The number of stacks of boost to buy.
:type quantity: int
:return: A success/error message.
:rtype: str
"""
user = message.author
if self._check_lootlust(
self._get_tempboosts(user),
user, quantity
):
return (
"You've maxed out to 5 stacks for this boost."
if quantity == 1
else "You can't puchase that many "
"as it exceeds max stack of 5."
)
tier = Loots(user).tier
boost = Boosts(user)
boost_name = self.name.lower().replace(' ', '_')
curr = boost.get(boost_name)
boost.update(**{boost_name: curr + quantity})
Profiles(user).debit(
amount=((self.price * (10 ** (tier - 2))) * quantity),
bonds=True
)
return "success"
[docs]class TradebleItem(ShopItem):
"""
This class represents a shop version of \
:class:`~.items.Tradable`.
"""
[docs] def buy(
self, message: Message,
quantity: int, **kwargs
) -> str:
"""Buys the Item and places in user's inventory.
:param message: The message that triggered this action.
:type message: :class:`discord.Message`
:param quantity: The number of items to buy.
:type quantity: int
:return: A success/error message.
:rtype: str
"""
inventory = Inventory(message.author)
for _ in range(quantity):
inventory.save(self.itemid)
self.debit_player(
user=message.author,
quantity=quantity,
premium=self.premium
)
return "success"
[docs]class Title(ShopItem):
"""
This class represents a purchasable Title.
"""
[docs] async def buy(
self, message: Message,
**kwargs
) -> str:
"""Automatically adds the titled role to the user.
Also edits their nickname if possible.
:param message: The message that triggered this action.
:type message: :class:`discord.Message`
:return: A success/error message.
:rtype: str
"""
if self.name in (
role.name.title()
for role in message.author.roles
):
return "**You already have this title " + \
"bestowed upon you.**"
roles = [
role
for role in message.guild.roles
if role.name.lower() == self.name.lower()
]
if not roles:
try:
roles = [
await message.guild.create_role(
name=self.name.title(),
color=self.color,
hoist=True
)
]
except Forbidden:
return "**Need [Manage Server] permission to create title roles.**"
role = roles[0]
change_nick = True
try:
await message.author.add_roles(role)
new_nick = message.author.nick or message.author.name
for title in Shop.categories["Titles"].items:
if title.name in new_nick:
if title.price < self.price:
new_nick = new_nick.replace(f"『{title.name}』", '')
else:
change_nick = False
try:
if change_nick:
await message.author.edit(
nick=f"『{role.name}』{new_nick}"
)
except HTTPException:
await message.channel.send(
embed=get_embed(
f"Unable to edit {message.author.name}'s nickname.",
embed_type="warning",
title="Unexpected Error"
)
)
self.debit_player(message.author)
return "success"
except Forbidden:
return "**You're too OP for me to give you a role.**\n" + \
"**Please ask an admin to give you the role.**\n"
[docs]class Shop:
"""
The main class containing all Shop related data and functionality.
"""
categories: Dict[str, ShopCategory] = {
"Titles": ShopCategory(
"Titles",
"""
Flex your financial status using a special Title.
Titles will automatically give you a role named as the title.
""",
"📜",
Listing([
Title(
"title_dlr",
"Dealers",
"Get access to the gamble command and other perks.",
20_000,
color=16765440
),
Title(
"title_cnm",
"Commoner No More",
"You're wealthier than the casuals.",
20_000,
color=16757504
),
Title(
"title_wealthy",
"The Wealthy",
"You're climbing the ladder towards richness.",
50_000,
color=13729044
),
Title(
"title_duke",
"Duke",
"You literally own a kingdom at this point.",
150_000,
color=16722176
),
Title(
"title_insane",
"Insane",
"Dude what the hell?! That's way more than enough chips"
" for a lifetime.",
1_000_000,
color=13504512
)
])
),
"Boosts": ShopCategory(
"Boosts",
"""
Give yourself a competitive edge by improving certain stats.
Boosts purchased through ingame shop are temporary.
Buying new ones increases the effect. They can stack up to 5 times.
Entire stack expires after the time period (30 minutes) ends.
For permenant boosts, contact an admin to purchase PokeBonds.
""",
"🧬",
Listing([
BoostItem(
"boost_lt",
"Lucky Looter",
"Increases your loot by 5%.",
100, "💰"
),
BoostItem(
"boost_lt_cd",
"Loot Lust",
"Decreases Loot Cooldown by 1 minute.",
100, "⌛"
),
BoostItem(
"boost_tr",
"Fortune Burst",
"Increase Treasure Chance while looting by 10%.",
500, "💎"
),
BoostItem(
"boost_flip",
"Flipster",
"Increase reward for QuickFlip minigame by 10%.",
200, "🎲"
)
])
),
"Tradables": ShopCategory(
"Tradables",
"""
These are the items in the PokeGambler world which can be
bought, sold and traded with players.
Might even contain player created Items.
""",
"📦",
Listing()
),
"Consumables": ShopCategory(
"Consumables",
"""
These items exists solely for your consumption.
They cannot be sold back to the shop.
""",
"🛒",
Listing()
),
"Gladiators": ShopCategory(
"Gladiators",
"""
These champions of the old have been cloned for you.
You can buy them to make them fight in brutal gladiator fights.
""",
"💀",
Listing()
)
}
alias_map: Dict[str, str] = {
"Title": "Titles",
"Titles": "Titles",
"Boost": "Boosts",
"Boosts": "Boosts",
"Trade": "Tradables",
"Trades": "Tradables",
"Tradable": "Tradables",
"Tradables": "Tradables",
"Consumable": "Consumables",
"Consumables": "Consumables",
"Consume": "Consumables",
"Consumes": "Consumables",
"Gladiator": "Gladiators",
"Gladiators": "Gladiators",
"Glad": "Gladiators",
"Glads": "Gladiators",
"Minion": "Gladiators",
"Minions": "Gladiators",
"Pet": "Gladiators",
"Pets": "Gladiators",
}
premium = False
ids_dict: Dict[str, ShopItem] = {}
for catog in categories.values():
for item in catog.items:
ids_dict[item.itemid] = item
[docs] @classmethod
def add_category(cls: Type[Shop], category: ShopCategory):
"""Adds a new category to the Shop.
:param category: The new ShopCategory to add.
:type category: :class:`ShopCategory`
"""
cls.categories[category.name] = category
[docs] @classmethod
def from_name(cls: Type[Shop], name: str) -> str:
"""Returns the itemid of the item with given name.
:param name: The name of the item.
:type name: str
:return: The itemid of the item.
:rtype: str
"""
return next(
(
catog.items.name_id_map[name]
for catog in cls.categories.values()
if catog.items.name_id_map.get(name) is not None
),
None,
)
[docs] @classmethod
def get_item(
cls: Type[Shop], itemid: str,
force_new: bool = False
) -> ShopItem:
"""Returns the item registered in Shop based on itemID.
:param itemid: The itemid of the item.
:type itemid: str
:param force_new: If True, a new Item is created.
:type force_new: bool
:return: The item registered in Shop.
:rtype: :class:`ShopItem`
"""
if itemid in cls.ids_dict:
return cls.ids_dict[itemid]
if any(
ch.lower() not in 'abcdef1234567890'
for ch in itemid
):
return None
item = Item.from_id(itemid, force_new=force_new)
if not item:
return None
# pylint: disable=no-member
if cls._premium_cond(item.premium):
return None
return TradebleItem(**dict(item))
[docs] @classmethod
def refresh_tradables(cls: Type[Shop]):
"""
Similar to :func:`update_category`, \
but exclusive for :class:`~.items.Tradable`.
"""
item_types = ["Tradables", "Consumables", "Gladiators"]
for item_type in item_types:
# Check availability of existing items
for item in cls.categories[item_type].items:
db_item = Item.from_id(item.itemid)
# pylint: disable=no-member
if not db_item or db_item.premium is not cls.premium:
cls.categories[item_type].items.queue.remove(
item
)
cls.ids_dict.pop(item.itemid, None)
items = [
TradebleItem(
item["itemid"], item["name"],
item["description"], item["price"],
item["emoji"],
pinned="permanent" in item["description"].lower(),
premium=item["premium"]
)
for item in Item.list_items(
category=item_type[::-1].replace('s', '', 1)[::-1],
limit=5,
premium=cls.premium
)
]
cls.update_category(item_type, items)
[docs] @classmethod
def update_category(
cls: Type[Shop], category: str,
items: List[ShopItem]
):
"""Updates an existing category in the Shop.
:param category: The name of the category.
:type category: str
:param items: The items to add to the category.
:type items: list[:class:`ShopItem`]
"""
new_items = [
item
for item in items
if item.name not in (
itm.name
for itm in cls.categories[category].items
)
]
cls.categories[category].items.register(new_items)
[docs] @classmethod
def validate(
cls: Type[Shop], user: Member,
item: ShopItem, quantity: int = 1
) -> str:
"""Validates if an item is purchasable and affordable by the user.
:param user: The user to check the item for.
:type user: :class:`discord.Member`
:param item: The item to check.
:type item: :class:`ShopItem`
:param quantity: The quantity of the item.
:type quantity: int
:return: The error message if the item is not purchasable.
:rtype: str
"""
if (
isinstance(item, TradebleItem)
and not Item.from_id(item.itemid)
):
return "Item does not exist anymore."
price = item.price
if item.__class__ in [BoostItem, PremiumBoostItem]:
price *= 10 ** (Loots(user).tier - 1)
curr_attr = "balance"
if item.premium:
curr_attr = "pokebonds"
price //= 10
if getattr(
Profiles(user),
curr_attr
) < price * quantity:
return "You have Insufficient Balance."
return "proceed"
@staticmethod
def _premium_cond(premium: bool):
return premium
class PremiumShop(Shop):
"""
The subclass of Shop for premium-only items.
"""
categories: Dict[str, ShopCategory] = {
**{
key: catog.copy()
for key, catog in Shop.categories.items()
},
"Titles": ShopCategory(
"Titles",
"""
Flex your financial status using a special Title.
Titles will automatically give you a role named as the title.
""",
"📜",
Listing([
Title(
"title_pr",
"The Patron",
"Dedicated patron of PokeGambler.",
2000,
color=14103594
)
])
),
"Boosts": ShopCategory(
"Boosts",
"""
Give yourself a competitive edge by improving certain stats.
Boosts purchased through premium shop are permanent.
Buying new ones increases the effect.
""",
"🧬",
Listing([
PremiumBoostItem(
"boost_pr_lt",
"Lucky Looter",
"Permanently increases your loot by 5%.",
100, "💰", premium=True
),
PremiumBoostItem(
"boost_pr_lt_cd",
"Loot Lust",
"Permanently decreases Loot Cooldown by 1 minute."
"\n(Max Stack of 5)",
100, "⌛", premium=True
),
PremiumBoostItem(
"boost_pr_tr",
"Fortune Burst",
"Permanently increases Treasure Chance "
"while looting by 10%.",
500, "💎", premium=True
),
PremiumBoostItem(
"boost_pr_flip",
"Flipster",
"Permanently increases reward for QuickFlip "
"minigame by 10%.",
200, "🎲", premium=True
)
])
)
}
premium = True
ids_dict: Dict[str, ShopItem] = {}
for catog in categories.values():
for item in catog.items:
ids_dict[item.itemid] = item
@staticmethod
def _premium_cond(premium: bool):
return not premium