random-sets/random_sets/datasources.py
2024-02-17 16:58:33 -08:00

171 lines
5.7 KiB
Python

import random
import yaml
from typing import IO
class UnknownFrequencyError(Exception):
"""
Thrown when attempting to set a datasource's fequency without a frequency table in the metadat.
"""
class DataSource:
"""
Represents a yaml data source used to generate roll tables.
Attributes:
source - the IO source to parse
frequency - the frequency distribution to apply
headers - an array of header strings
data - The parsed YAML data
Methods:
load_source - Read and parse the source, populating the attributes
"""
def __init__(self, source: IO, frequency: str = 'default') -> None:
"""
Initialize a DataSource instance.
Args:
source - an IO object to read source from
frequency - the name of the frequency distribution to use; must
be defined in the source file's metadata.
"""
self.source = source
self.frequency = frequency
self.headers = []
self.frequencies = None
self.data = None
self.metadata = None
self.load_source()
def load_source(self) -> None:
"""
Cache the yaml source and the parsed or generated metadata.
"""
if self.data:
return
self.read_source()
self.init_headers()
self.init_frequencies()
def read_source(self) -> None:
self.data = yaml.safe_load(self.source)
self.metadata = self.data.pop('metadata', {})
def init_headers(self) -> None:
if 'headers' in self.metadata:
self.headers = self.metadata['headers']
def init_frequencies(self) -> None:
num_keys = len(self.data.keys())
default_freq = num_keys / 100
frequencies = {
'default': dict([(k, default_freq) for k in self.data.keys()])
}
if 'frequencies' in self.metadata:
frequencies.update(**self.metadata['frequencies'])
self.frequencies = frequencies[self.frequency]
def set_frequency(self, frequency: str) -> None:
"""
Select a new frequency distribution from the data source metadata.
"""
if 'frequencies' not in self.metadata:
raise UnknownFrequencyError(
"Cannnot set a new frequency because there is no frequency table in the metadata."
)
if frequency not in self.metadata['frequencies']:
raise UnknownFrequencyError(f"{frequency} is not present in the frequency table.")
self.frequency = frequency
self.init_frequencies()
def random_frequencies(self, count: int = 1) -> list:
"""
Choose random option names from the frequency table.
"""
weights = []
options = []
for (option, weight) in self.frequencies.items():
weights.append(weight)
options.append(option)
return random.choices(options, weights=weights, k=count)
def random_values(self, count: int = 1) -> list:
"""
Return a list of random values from the data set, as a list of lists.
"""
return [
self.get_entries(option, rand=True) for option in self.random_frequencies(count)
]
def as_dict(self) -> dict:
"""
Return the contents of the data source as a dict.
"""
data = dict()
for name in self.data.keys():
entries = self.get_entries(name, rand=False)
items = {(k, v) for k, v in zip(self.headers, entries)}
data[name] = dict(items)
return data
def get_entries(self, option, rand: bool = False) -> list:
"""
For a random item or each item in the specified option in the data source,
return a flattened list of the option, the select item, and the item's value (if any).
"""
# If there is no data for the specified option, stop now.
flattened = []
if not self.data[option]:
return [option]
if hasattr(self.data[option], 'keys'):
# if the option is a dict, we assume the values are lists; we select a random item
# and prepend the key to the value list as our random selection. For example, given:
#
# >>> self.data[option] == {'One': ['bar', 'baz'], 'Two': ['qaz', 'qux']}
#
# choices might then be: ['One', 'bar', 'baz']
#
if rand:
k, v = random.choice(list(self.data[option].items()))
choices = [[k] + v]
else:
choices = [
[k] + v for k, v in list(self.data[option].items())
]
else:
# If the option is either a list or a string, just select it.
choices = self.data[option]
for choice in choices:
# If the randomly-selected choice is a dict, choose a random item and return a list consisting
# of the option name, the key, and the value, flattening the # value if it is also a list.
if hasattr(choice, 'keys'):
for (k, v) in choice.items():
if type(v) is list:
flattened.append([option, k, *v])
else:
flattened.append([option, k, v])
continue
# if the member is a list, return the flattened list
elif type(choice) is list:
flattened.extend(choice)
continue
# otherwise, return a list consisting of option and choice
flattened.append(choice)
# Return all randomized values or just 1.
if rand:
return random.choice(flattened)
return flattened