Source code for glados.datastore

import logging
from datetime import datetime
from typing import List, NoReturn, Optional
from uuid import uuid4

from sqlalchemy import Column, DateTime, Integer, MetaData, String, Table, create_engine
from sqlalchemy.dialects.postgresql import JSONB, UUID
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Query, Session, sessionmaker
from sqlalchemy.orm.exc import MultipleResultsFound

Metadata = MetaData()
Base = declarative_base(metadata=Metadata)

TABLE_INTERACTIONS = "interactions"


[docs]class DataStoreInteraction(Base): """DataStoreInteraction represents a row in the datastore. This is used to update data in the datastore. Attributes ---------- interaction_id: :obj:`str` This is the primary key of the datastore. This is the ID of the entry in the datastore. ts: :obj:`datetime` This is the time the row was put into the database. bot: :obj:`str` This is the name of the bot it should use when completing followup actions. data: :obj:`dict` Any extra data stored with the interaction. This is a JSON blob. message_channel: :obj:`str` The channel that this interaction was sent to. message_ts: :obj:`datetime` The message timestamp when this interaction was sent. ttl: :obj:`int` How long this interaction should live for. followup_ts: :obj:`datetime` When should the follow up action happen. followup_action: :obj:`str` The action name to execute when following up. If None then no action will happen. cron_followup_action: :obj:`str` The action name to execute on a normal cron schedule like every 5 min. If None then no action will happen. followed_up: :obj:`datetime` This is the time when the action was followed up at. If it has not happened yet this value will be None. """ __tablename__ = TABLE_INTERACTIONS interaction_id = Column(UUID, primary_key=True, default=str(uuid4())) ts = Column(DateTime, default=datetime.now()) bot = Column(String, nullable=False) data = Column(JSONB, default=dict()) message_channel = Column(String, default=None) message_ts = Column(DateTime, default=None) ttl = Column(Integer, default=None) followup_ts = Column(DateTime, default=None) followup_action = Column(String, default=None) cron_followup_action = Column(String, default=None) followed_up = Column(DateTime, default=None)
[docs] def update(self, **kwargs): """Update the object dropping any arguments that are not valid""" for k, v in kwargs: if hasattr(self, k): setattr(self, k, v)
[docs]class DataStore: """DataStore is how GLaDOS stores async data. Parameters ---------- host postgres host. username postgres username. password postgres password. port postgres port. database postgres database to use. """ def __init__( self, host: str, username: str, password: str, port: int = 5432, database: str = "glados", ): self.host = host self.port = port self.username = username self.password = password self.database = database self.db = create_engine( f"postgres://{self.username}:{self.password}@{self.host}:{self.port}/" f"{self.database}" ) self.session_maker = sessionmaker(self.db)
[docs] def create_session(self) -> Session: """Generate a new session with the existing connection.""" return self.session_maker()
[docs] def table_exists(self, table: str = TABLE_INTERACTIONS) -> bool: """Check to see if the GLaDOS table is found in postgres. Parameters ---------- table table name to use. """ return table in self.db.table_names()
[docs] def drop_table( self, table: str = TABLE_INTERACTIONS, force: bool = False ) -> NoReturn: """Drop the GLaDOS table so that it can be re-created. Parameters ---------- table table name to use. force if True will fill force drop the table without checks. """ Table(table, Metadata).drop(self.db, checkfirst=not force)
[docs] def create_table( self, tables: Optional[List[str]] = None, force: bool = False ) -> NoReturn: """Create the table. If you set force to True then it will drop the existing tables and then recreate them. ALL DATA WILL BE LOST Parameters ---------- tables only take action on these tables. If None, then take action on all tables force drop existing tables and rebuild. (default: False) """ logging.info("creating datastore table") if force: logging.warning( "DROPPING CURRENT TABLES! the 'force' flag was passed when calling create table." ) if tables: for table in tables: self.drop_table(table) else: Base.metadata.drop_all(self.db, checkfirst=False) if tables: for table in tables: Table(table, Metadata).create(self.db) else: Metadata.create_all(self.db)
[docs] def find_by_id(self, interaction_id: str, session: Session) -> DataStoreInteraction: """Find an interaction by interaction_id. Parameters ---------- interaction_id interaction ID to find session session to be used """ result = session.query(DataStoreInteraction).get(interaction_id) return result
[docs] def update_interaction( self, interaction_id: str, session: Session, **kwargs ) -> DataStoreInteraction: """Find and update an interaction with the provided values. Parameters ---------- interaction_id interaction ID to update session session to be used kwargs fields and new values to update """ interaction = session.query(DataStoreInteraction).get( interaction_id ) # type: DataStoreInteraction for k, v in kwargs.items(): if hasattr(interaction, k): continue kwargs.pop(k) interaction.update(**kwargs) return interaction
[docs] def insert_interaction( self, interaction: DataStoreInteraction, session: Session ) -> NoReturn: """Insert an interaction object into the database. Parameters ---------- interaction The row to be inserted session session to be used """ session.add(interaction) session.commit() return interaction
[docs] def find_interaction_by_channel_ts( self, channel: str, ts: datetime, session: Session ) -> Optional[DataStoreInteraction]: """Find the interaction in the datastore by channel and message ts. Parameters ---------- channel channel of the interaction youre looking for ts ts of the interaction you are looking for session session to be used Raises ------ ReferenceError There were more than one interaction that matched the channel and message_ts """ query = ( session.query(DataStoreInteraction) .filter(DataStoreInteraction.message_ts == ts) .filter(DataStoreInteraction.message_channel == channel) ) # type: Query try: result = query.one_or_none() return result except MultipleResultsFound as e: raise ReferenceError(e)