Source code for penman.model

# -*- coding: utf-8 -*-

"""
Semantic models for interpreting graphs.
"""

import random
import re
from collections import defaultdict
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    Mapping,
    Optional,
    Set,
    Tuple,
    cast,
)

from penman.exceptions import ModelError
from penman.graph import CONCEPT_ROLE, Graph
from penman.types import BasicTriple, Constant, Role, Target, Variable

_ReificationSpec = Tuple[Role, Constant, Role, Role]
_Reified = Tuple[Constant, Role, Role]
_Dereified = Tuple[Role, Role, Role]
_Reification = Tuple[BasicTriple, BasicTriple, BasicTriple]


[docs] class Model(object): """ A semantic model for Penman graphs. The model defines things like valid roles and transformations. Args: top_variable: the variable of the graph's top top_role: the role linking the graph's top to the top node concept_role: the role associated with node concepts roles: a mapping of roles to associated data normalizations: a mapping of roles to normalized roles reifications: a list of 4-tuples used to define reifications """ def __init__( self, top_variable: Variable = 'top', top_role: Role = ':TOP', concept_role: Role = CONCEPT_ROLE, roles: Optional[Mapping[Role, Any]] = None, normalizations: Optional[Mapping[Role, Role]] = None, reifications: Optional[Iterable[_ReificationSpec]] = None, ): self.top_variable = top_variable self.top_role = top_role self.concept_role = concept_role if roles: roles = dict(roles) self.roles = roles or {} self._role_re = re.compile( '^({})$'.format( '|'.join(list(self.roles) + [top_role, concept_role]) ) ) if normalizations: normalizations = dict(normalizations) self.normalizations = normalizations or {} reifs: Dict[Role, List[_Reified]] = defaultdict(list) deifs: Dict[Constant, List[_Dereified]] = defaultdict(list) if reifications: for role, concept, source, target in reifications: reifs[role].append((concept, source, target)) deifs[concept].append((role, source, target)) self.reifications = dict(reifs) self.dereifications = dict(deifs) def __eq__(self, other): if not isinstance(other, Model): return NotImplemented return ( self.top_variable == other.top_variable and self.top_role == other.top_role and self.concept_role == other.concept_role and self.roles == other.roles and self.normalizations == other.normalizations and self.reifications == other.reifications )
[docs] @classmethod def from_dict(cls, d): """Instantiate a model from a dictionary.""" return cls(**d)
[docs] def has_role(self, role: Role) -> bool: """ Return ``True`` if *role* is defined by the model. If *role* is not in the model but a single deinversion of *role* is in the model, then ``True`` is returned. Otherwise ``False`` is returned, even if something like :meth:`canonicalize_role` could return a valid role. """ return self._has_role(role) or ( role.endswith('-of') and self._has_role(role[:-3]) )
def _has_role(self, role: Role) -> bool: return self._role_re.match(role) is not None
[docs] def is_role_inverted(self, role: Role) -> bool: """Return ``True`` if *role* is inverted.""" return not self._has_role(role) and role.endswith('-of')
[docs] def invert_role(self, role: Role) -> Role: """Invert *role*.""" if not self._has_role(role) and role.endswith('-of'): inverse = role[:-3] else: inverse = role + '-of' return inverse
[docs] def invert(self, triple: BasicTriple) -> BasicTriple: """ Invert *triple*. This will invert or deinvert a triple regardless of its current state. :meth:`deinvert` will deinvert a triple only if it is already inverted. Unlike :meth:`canonicalize`, this will not perform multiple inversions or replace the role with a normalized form. """ source, role, target = triple inverse = self.invert_role(role) # casting is just for the benefit of the type checker; it does # not actually check that target is a valid variable type target = cast(Variable, target) return (target, inverse, source)
[docs] def deinvert(self, triple: BasicTriple) -> BasicTriple: """ De-invert *triple* if it is inverted. Unlike :meth:`invert`, this only inverts a triple if the model considers it to be already inverted, otherwise it is left alone. Unlike :meth:`canonicalize`, this will not normalize multiple inversions or replace the role with a normalized form. """ if self.is_role_inverted(triple[1]): triple = self.invert(triple) return triple
[docs] def canonicalize_role(self, role: Role) -> Role: """ Canonicalize *role*. Role canonicalization will do the following: * Ensure the role starts with `':'` * Normalize multiple inversions (e.g., ``ARG0-of-of`` becomes ``ARG0``), but it does *not* change the direction of the role * Replace the resulting role with a normalized form if one is defined in the model """ if role != '/' and not role.startswith(':'): role = ':' + role role = self._canonicalize_inversion(role) role = self.normalizations.get(role, role) return role
def _canonicalize_inversion(self, role: Role) -> Role: invert = self.invert_role if not self._has_role(role): while True: prev = role inverse = invert(role) role = invert(inverse) if prev == role: break return role
[docs] def canonicalize(self, triple: BasicTriple) -> BasicTriple: """ Canonicalize *triple*. See :meth:`canonicalize_role` for a description of how the role is canonicalized. Unlike :meth:`invert`, this does not swap the source and target of *triple*. """ source, role, target = triple canonical = self.canonicalize_role(role) return (source, canonical, target)
[docs] def is_role_reifiable(self, role: Role) -> bool: """Return ``True`` if *role* can be reified.""" return role in self.reifications
[docs] def reify( self, triple: BasicTriple, variables: Optional[Set[Variable]] = None, ) -> _Reification: """ Return the three triples that reify *triple*. Note that, unless *variables* is given, the node variable for the reified node is not necessarily valid for the target graph. When incorporating the reified triples, this variable should then be replaced. If the role of *triple* does not have a defined reification, a :exc:`~penman.exceptions.ModelError` is raised. Args: triple: the triple to reify variables: a set of variables that should not be used for the reified node's variable Returns: The 3-tuple of triples that reify *triple*. """ source, role, target = triple if role not in self.reifications: raise ModelError(f"'{role}' cannot be reified") concept, source_role, target_role = next(iter(self.reifications[role])) var = '_' if variables: i = 2 while var in variables: var = f'_{i}' i += 1 return ( (var, source_role, source), (var, CONCEPT_ROLE, concept), (var, target_role, target), )
[docs] def is_concept_dereifiable(self, concept: Target) -> bool: """Return ``True`` if *concept* can be dereified.""" return concept in self.dereifications
[docs] def dereify( self, instance_triple: BasicTriple, source_triple: BasicTriple, target_triple: BasicTriple, ) -> BasicTriple: """ Return the triple that dereifies the three argument triples. If the target of *instance_triple* does not have a defined dereification, or if the roles of *source_triple* and *target_triple* do not match those for the dereification of the concept, a :exc:`~penman.exceptions.ModelError` is raised. A :exc:`ValueError` is raised if *instance_triple* is not an instance triple or any triple does not have the same source variable as the others. Args: instance_triple: the triple containing the node's concept source_triple: the source triple from the node target_triple: the target triple from the node Returns: The triple that dereifies the three argument triples. """ if instance_triple[1] != CONCEPT_ROLE: raise ValueError('second argument is not an instance triple') if not (instance_triple[0] == source_triple[0] == target_triple[0]): raise ValueError('triples do not share the same source') concept = instance_triple[2] source_role = source_triple[1] target_role = target_triple[1] if concept not in self.dereifications: raise ModelError(f'{concept!r} cannot be dereified') for role, source, target in self.dereifications[concept]: if source == source_role and target == target_role: return ( cast(Variable, source_triple[2]), role, target_triple[2], ) elif target == source_role and source == target_role: return ( cast(Variable, target_triple[2]), role, source_triple[2], ) raise ModelError( f'{source_role!r} and {target_role!r} ' f'are not valid roles to dereify {concept!r}' )
[docs] def original_order(self, role: Role): """Role sorting key that does not change the order.""" return True
[docs] def alphanumeric_order(self, role: Role): """Role sorting key for alphanumeric order.""" m = re.match(r'(.*\D)(\d+)$', role) if m: rolename = m.group(1) roleno = int(m.group(2)) else: rolename, roleno = role, 0 return rolename, roleno
[docs] def canonical_order(self, role: Role): """Role sorting key that finds a canonical order.""" return (self.is_role_inverted(role), self.alphanumeric_order(role))
[docs] def random_order(self, role: Role): """Role sorting key that randomizes the order.""" return random.random()
[docs] def errors(self, graph: Graph) -> Dict[Optional[BasicTriple], List[str]]: """ Return a description of model errors detected in *graph*. The description is a dictionary mapping a context to a list of errors. A context is a triple if the error is relevant for the triple, or ``None`` for general graph errors. Example: >>> from penman.models.amr import model >>> from penman.graph import Graph >>> g = Graph([('a', ':instance', 'alpha'), ... ('a', ':foo', 'bar'), ... ('b', ':instance', 'beta')]) >>> for context, errors in model.errors(g).items(): ... print(context, errors) ... ('a', ':foo', 'bar') ['invalid role'] ('b', ':instance', 'beta') ['unreachable'] """ err: Dict[Optional[BasicTriple], List[str]] = defaultdict(list) if len(graph.triples) == 0: err[None].append('graph is empty') else: g: Dict[Variable, List[BasicTriple]] = {} for triple in graph.triples: var, role, tgt = triple if not self.has_role(role): err[triple].append('invalid role') if var not in g: g[var] = [] g[var].append(triple) if not graph.top: err[None].append('top is not set') elif graph.top not in g: err[None].append('top is not a variable in the graph') else: reachable = _dfs(g, graph.top) unreachable = set(g).difference(reachable) for uvar in sorted(unreachable): for triple in g[uvar]: err[triple].append('unreachable') return dict(err)
def _dfs(g, top): # just keep source and target of edge relations q = { var: {target for _, _, target in triples if target in g} for var, triples in g.items() } # make edges bidirectional for var, targets in q.items(): for target in targets: if target not in q: q[target] = set() q[target].add(var) visited = set() agenda = [top] while agenda: cur = agenda.pop() if cur not in visited: visited.add(cur) agenda.extend(t for t in q.get(cur, []) if t not in visited) return visited