# -*- coding: utf-8 -*-
"""
Semantic models for interpreting graphs.
"""
import random
import re
from collections import defaultdict
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
Optional,
Set,
Tuple,
cast,
)
from penman.exceptions import ModelError
from penman.graph import CONCEPT_ROLE, Graph
from penman.types import BasicTriple, Constant, Role, Target, Variable
_ReificationSpec = Tuple[Role, Constant, Role, Role]
_Reified = Tuple[Constant, Role, Role]
_Dereified = Tuple[Role, Role, Role]
_Reification = Tuple[BasicTriple, BasicTriple, BasicTriple]
[docs]
class Model(object):
"""
A semantic model for Penman graphs.
The model defines things like valid roles and transformations.
Args:
top_variable: the variable of the graph's top
top_role: the role linking the graph's top to the top node
concept_role: the role associated with node concepts
roles: a mapping of roles to associated data
normalizations: a mapping of roles to normalized roles
reifications: a list of 4-tuples used to define reifications
"""
def __init__(
self,
top_variable: Variable = 'top',
top_role: Role = ':TOP',
concept_role: Role = CONCEPT_ROLE,
roles: Optional[Mapping[Role, Any]] = None,
normalizations: Optional[Mapping[Role, Role]] = None,
reifications: Optional[Iterable[_ReificationSpec]] = None,
):
self.top_variable = top_variable
self.top_role = top_role
self.concept_role = concept_role
if roles:
roles = dict(roles)
self.roles = roles or {}
self._role_re = re.compile(
'^({})$'.format(
'|'.join(list(self.roles) + [top_role, concept_role])
)
)
if normalizations:
normalizations = dict(normalizations)
self.normalizations = normalizations or {}
reifs: Dict[Role, List[_Reified]] = defaultdict(list)
deifs: Dict[Constant, List[_Dereified]] = defaultdict(list)
if reifications:
for role, concept, source, target in reifications:
reifs[role].append((concept, source, target))
deifs[concept].append((role, source, target))
self.reifications = dict(reifs)
self.dereifications = dict(deifs)
def __eq__(self, other):
if not isinstance(other, Model):
return NotImplemented
return (
self.top_variable == other.top_variable
and self.top_role == other.top_role
and self.concept_role == other.concept_role
and self.roles == other.roles
and self.normalizations == other.normalizations
and self.reifications == other.reifications
)
[docs]
@classmethod
def from_dict(cls, d):
"""Instantiate a model from a dictionary."""
return cls(**d)
[docs]
def has_role(self, role: Role) -> bool:
"""
Return ``True`` if *role* is defined by the model.
If *role* is not in the model but a single deinversion of
*role* is in the model, then ``True`` is returned. Otherwise
``False`` is returned, even if something like
:meth:`canonicalize_role` could return a valid role.
"""
return self._has_role(role) or (
role.endswith('-of') and self._has_role(role[:-3])
)
def _has_role(self, role: Role) -> bool:
return self._role_re.match(role) is not None
[docs]
def is_role_inverted(self, role: Role) -> bool:
"""Return ``True`` if *role* is inverted."""
return not self._has_role(role) and role.endswith('-of')
[docs]
def invert_role(self, role: Role) -> Role:
"""Invert *role*."""
if not self._has_role(role) and role.endswith('-of'):
inverse = role[:-3]
else:
inverse = role + '-of'
return inverse
[docs]
def invert(self, triple: BasicTriple) -> BasicTriple:
"""
Invert *triple*.
This will invert or deinvert a triple regardless of its
current state. :meth:`deinvert` will deinvert a triple only if
it is already inverted. Unlike :meth:`canonicalize`, this will
not perform multiple inversions or replace the role with a
normalized form.
"""
source, role, target = triple
inverse = self.invert_role(role)
# casting is just for the benefit of the type checker; it does
# not actually check that target is a valid variable type
target = cast(Variable, target)
return (target, inverse, source)
[docs]
def deinvert(self, triple: BasicTriple) -> BasicTriple:
"""
De-invert *triple* if it is inverted.
Unlike :meth:`invert`, this only inverts a triple if the model
considers it to be already inverted, otherwise it is left
alone. Unlike :meth:`canonicalize`, this will not normalize
multiple inversions or replace the role with a normalized
form.
"""
if self.is_role_inverted(triple[1]):
triple = self.invert(triple)
return triple
[docs]
def canonicalize_role(self, role: Role) -> Role:
"""
Canonicalize *role*.
Role canonicalization will do the following:
* Ensure the role starts with `':'`
* Normalize multiple inversions (e.g., ``ARG0-of-of`` becomes
``ARG0``), but it does *not* change the direction of the role
* Replace the resulting role with a normalized form if one is
defined in the model
"""
if role != '/' and not role.startswith(':'):
role = ':' + role
role = self._canonicalize_inversion(role)
role = self.normalizations.get(role, role)
return role
def _canonicalize_inversion(self, role: Role) -> Role:
invert = self.invert_role
if not self._has_role(role):
while True:
prev = role
inverse = invert(role)
role = invert(inverse)
if prev == role:
break
return role
[docs]
def canonicalize(self, triple: BasicTriple) -> BasicTriple:
"""
Canonicalize *triple*.
See :meth:`canonicalize_role` for a description of how the
role is canonicalized. Unlike :meth:`invert`, this does not
swap the source and target of *triple*.
"""
source, role, target = triple
canonical = self.canonicalize_role(role)
return (source, canonical, target)
[docs]
def is_role_reifiable(self, role: Role) -> bool:
"""Return ``True`` if *role* can be reified."""
return role in self.reifications
[docs]
def reify(
self,
triple: BasicTriple,
variables: Optional[Set[Variable]] = None,
) -> _Reification:
"""
Return the three triples that reify *triple*.
Note that, unless *variables* is given, the node variable
for the reified node is not necessarily valid for the target
graph. When incorporating the reified triples, this variable
should then be replaced.
If the role of *triple* does not have a defined reification, a
:exc:`~penman.exceptions.ModelError` is raised.
Args:
triple: the triple to reify
variables: a set of variables that should not be used for
the reified node's variable
Returns:
The 3-tuple of triples that reify *triple*.
"""
source, role, target = triple
if role not in self.reifications:
raise ModelError(f"'{role}' cannot be reified")
concept, source_role, target_role = next(iter(self.reifications[role]))
var = '_'
if variables:
i = 2
while var in variables:
var = f'_{i}'
i += 1
return (
(var, source_role, source),
(var, CONCEPT_ROLE, concept),
(var, target_role, target),
)
[docs]
def is_concept_dereifiable(self, concept: Target) -> bool:
"""Return ``True`` if *concept* can be dereified."""
return concept in self.dereifications
[docs]
def dereify(
self,
instance_triple: BasicTriple,
source_triple: BasicTriple,
target_triple: BasicTriple,
) -> BasicTriple:
"""
Return the triple that dereifies the three argument triples.
If the target of *instance_triple* does not have a defined
dereification, or if the roles of *source_triple* and
*target_triple* do not match those for the dereification of
the concept, a :exc:`~penman.exceptions.ModelError` is
raised. A :exc:`ValueError` is raised if *instance_triple* is
not an instance triple or any triple does not have the same
source variable as the others.
Args:
instance_triple: the triple containing the node's concept
source_triple: the source triple from the node
target_triple: the target triple from the node
Returns:
The triple that dereifies the three argument triples.
"""
if instance_triple[1] != CONCEPT_ROLE:
raise ValueError('second argument is not an instance triple')
if not (instance_triple[0] == source_triple[0] == target_triple[0]):
raise ValueError('triples do not share the same source')
concept = instance_triple[2]
source_role = source_triple[1]
target_role = target_triple[1]
if concept not in self.dereifications:
raise ModelError(f'{concept!r} cannot be dereified')
for role, source, target in self.dereifications[concept]:
if source == source_role and target == target_role:
return (
cast(Variable, source_triple[2]),
role,
target_triple[2],
)
elif target == source_role and source == target_role:
return (
cast(Variable, target_triple[2]),
role,
source_triple[2],
)
raise ModelError(
f'{source_role!r} and {target_role!r} '
f'are not valid roles to dereify {concept!r}'
)
[docs]
def original_order(self, role: Role):
"""Role sorting key that does not change the order."""
return True
[docs]
def alphanumeric_order(self, role: Role):
"""Role sorting key for alphanumeric order."""
m = re.match(r'(.*\D)(\d+)$', role)
if m:
rolename = m.group(1)
roleno = int(m.group(2))
else:
rolename, roleno = role, 0
return rolename, roleno
[docs]
def canonical_order(self, role: Role):
"""Role sorting key that finds a canonical order."""
return (self.is_role_inverted(role), self.alphanumeric_order(role))
[docs]
def random_order(self, role: Role):
"""Role sorting key that randomizes the order."""
return random.random()
[docs]
def errors(self, graph: Graph) -> Dict[Optional[BasicTriple], List[str]]:
"""
Return a description of model errors detected in *graph*.
The description is a dictionary mapping a context to a list of
errors. A context is a triple if the error is relevant for the
triple, or ``None`` for general graph errors.
Example:
>>> from penman.models.amr import model
>>> from penman.graph import Graph
>>> g = Graph([('a', ':instance', 'alpha'),
... ('a', ':foo', 'bar'),
... ('b', ':instance', 'beta')])
>>> for context, errors in model.errors(g).items():
... print(context, errors)
...
('a', ':foo', 'bar') ['invalid role']
('b', ':instance', 'beta') ['unreachable']
"""
err: Dict[Optional[BasicTriple], List[str]] = defaultdict(list)
if len(graph.triples) == 0:
err[None].append('graph is empty')
else:
g: Dict[Variable, List[BasicTriple]] = {}
for triple in graph.triples:
var, role, tgt = triple
if not self.has_role(role):
err[triple].append('invalid role')
if var not in g:
g[var] = []
g[var].append(triple)
if not graph.top:
err[None].append('top is not set')
elif graph.top not in g:
err[None].append('top is not a variable in the graph')
else:
reachable = _dfs(g, graph.top)
unreachable = set(g).difference(reachable)
for uvar in sorted(unreachable):
for triple in g[uvar]:
err[triple].append('unreachable')
return dict(err)
def _dfs(g, top):
# just keep source and target of edge relations
q = {
var: {target for _, _, target in triples if target in g}
for var, triples in g.items()
}
# make edges bidirectional
for var, targets in q.items():
for target in targets:
if target not in q:
q[target] = set()
q[target].add(var)
visited = set()
agenda = [top]
while agenda:
cur = agenda.pop()
if cur not in visited:
visited.add(cur)
agenda.extend(t for t in q.get(cur, []) if t not in visited)
return visited