# -*- coding: utf-8 -*-
"""Bio2BEL HGNC Manager."""
import logging
from collections import Counter
from typing import Iterable, List, Mapping, Optional, Set, Tuple, TypeVar
import click
from networkx import relabel_nodes
from tqdm import tqdm
import pybel.dsl
from bio2bel import AbstractManager
from bio2bel.manager.bel_manager import BELManagerMixin
from bio2bel.manager.flask_manager import FlaskMixin
from bio2bel.manager.namespace_manager import BELNamespaceManagerMixin
from pybel import BELGraph
from pybel.constants import FUNCTION, MIRNA, NAME, NAMESPACE, PROTEIN, RNA, VARIANTS
from pybel.dsl import BaseEntity, CentralDogma, FUNC_TO_DSL
import pybel.dsl
from pybel.manager.models import Namespace, NamespaceEntry
from .constants import ENCODINGS, ENTREZ, MODULE_NAME
from .gfam_manager import Manager as GfamManager
from .model_utils import add_central_dogma, family_to_bel, gene_to_bel, uniprot_to_bel
from .models import (
AliasName, AliasSymbol, Base, Enzyme, GeneFamily, HumanGene, MouseGene, RatGene, UniProt, gene_enzyme,
gene_mouse_gene, gene_rat_gene,
)
from .wrapper import BaseManager
__all__ = [
'Manager',
]
logger = logging.getLogger(__name__)
UNIPROT_RE = r'^([A-N,R-Z][0-9]([A-Z][A-Z, 0-9][A-Z, 0-9][0-9]){1,2})|([O,P,Q][0-9][A-Z, 0-9][A-Z, 0-9][A-Z, 0-9][0-9])(\.\d+)?$'
GENE_FAMILY_NAMESPACES = {'gfam', 'hgnc.family', 'hgnc.genefamily'}
X = TypeVar('X')
def _deal_with_nonsense(results: Optional[List[X]]) -> Optional[X]:
"""Fix lists that only have one thing inside them."""
if not results:
return
if 1 < len(results):
raise ValueError('{}'.format(results))
return results[0]
[docs]class Manager(AbstractManager, FlaskMixin, BELManagerMixin, BELNamespaceManagerMixin, BaseManager):
"""Human gene nomenclature and orthologies to mouse and rat."""
module_name = MODULE_NAME
_base = Base
namespace_model = HumanGene
edge_model = [gene_rat_gene, gene_enzyme, gene_mouse_gene]
identifiers_recommended = 'HGNC'
identifiers_pattern = r'^((HGNC|hgnc):)?\d{1,5}$'
identifiers_miriam = 'MIR:00000080'
identifiers_namespace = 'hgnc'
identifiers_url = 'http://identifiers.org/hgnc/'
flask_admin_models = [HumanGene, GeneFamily, UniProt, MouseGene, RatGene, AliasName, AliasSymbol, Enzyme]
def __init__(self, *args, **kwargs): # noqa: D107
super().__init__(*args, **kwargs)
self._hgnc_symbol_entrez_id_mapping = {}
[docs] def is_populated(self) -> bool:
"""Check if the database is already populated."""
return 0 < self.count_human_genes()
[docs] def populate(self, silent=False, hgnc_file_path=None, use_hcop=False, hcop_file_path=None, low_memory=False):
"""Populate the database."""
json_data = self.load_hgnc_json(hgnc_file_path=hgnc_file_path)
self.insert_hgnc(hgnc_dict=json_data, silent=silent, low_memory=low_memory)
if use_hcop:
self.insert_hcop(silent=silent, hcop_file_path=hcop_file_path)
#: Clobber this PyHGNC function so it doesn't accidentally get called
def db_import(self, silent=False, hgnc_file_path=None, hcop_file_path=None, low_memory=False): # noqa: D102
raise NotImplementedError('call manager.populate instead')
#: Clobber this PyHGNC function so it doesn't accidentally get called
def _drop_tables(self): # noqa: D102
raise NotImplementedError('call manager.drop_all instead')
#####################
# Summary Functions #
#####################
[docs] def count_human_genes(self) -> int:
"""Count the number of human genes in the database."""
return self._count_model(HumanGene)
[docs] def count_families(self) -> int:
"""Count the number of human gene families in the database."""
return self._count_model(GeneFamily)
[docs] def count_mouse_genes(self) -> int:
"""Count the number of mouse genes in the database."""
return self._count_model(MouseGene)
[docs] def count_rat_genes(self) -> int:
"""Count the number of rat genes in the database."""
return self._count_model(RatGene)
[docs] def count_proteins(self) -> int:
"""Count the number of UniProt proteins in the database."""
return self._count_model(UniProt)
[docs] def summarize(self) -> Mapping[str, int]:
"""Summarize the database."""
return dict(
human_genes=self.count_human_genes(),
rat_genes=self.count_rat_genes(),
mouse_genes=self.count_mouse_genes(),
families=self.count_families(),
proteins=self.count_proteins(),
)
[docs] def get_gene_by_hgnc_symbol(self, hgnc_symbol: str) -> Optional[HumanGene]:
"""Get a human gene by HGNC symbol.
:param hgnc_symbol: The HGNC gene symbol
"""
results = self.hgnc(symbol=hgnc_symbol)
return _deal_with_nonsense(results)
[docs] def get_gene_by_hgnc_id(self, hgnc_id: str) -> Optional[HumanGene]:
"""Get a human gene by HGNC identifier.
:param hgnc_id: The HGNC gene identifier
"""
results = self.hgnc(identifier=int(hgnc_id)) # it's actually an integer in the database
return _deal_with_nonsense(results)
[docs] def get_gene_by_entrez_id(self, entrez_id: str) -> Optional[HumanGene]:
"""Get a human gene by its Entrez gene identifier.
:param entrez_id: The Entrez gene identifier
"""
results = self.hgnc(entrez=entrez_id)
return _deal_with_nonsense(results)
[docs] def get_gene_by_ensembl_id(self, ensembl_id: str) -> Optional[HumanGene]:
"""Get a human gene by its ENSEMBL gene identifier.
:param ensembl_id: The ENSEMBL gene identifier
"""
results = self.hgnc(ensembl_gene=ensembl_id)
return _deal_with_nonsense(results)
[docs] def get_gene_by_uniprot_id(self, uniprot_id: str) -> Optional[HumanGene]:
"""Get a human gene by its UniProt gene identifier.
:param uniprot_id: The UniProt gene identifier
"""
return _deal_with_nonsense(self.hgnc(uniprotid=uniprot_id))
[docs] def get_gene_by_mgi_id(self, mgi_id: str) -> Optional[HumanGene]:
"""Get a human gene by an orthologous MGI identifier.
:param mgi_id: MGI identifier
"""
results = self.mgd(mgdid=mgi_id)
mouse_gene = _deal_with_nonsense(results)
if mouse_gene is None:
return
human_genes = mouse_gene.hgncs
if len(human_genes) > 1:
logger.warning('multiple human genes mapped to mgi_id:%s: %s', mgi_id, human_genes)
return
return human_genes[0]
[docs] def get_gene_by_rgd_id(self, rgd_id: str) -> Optional[HumanGene]:
"""Get a human gene by an orthologous RGD identifier.
:param rgd_id: RGD identifier
"""
results = self.rgd(rgdid=rgd_id)
rat_gene = _deal_with_nonsense(results)
if rat_gene is None:
return
human_genes = rat_gene.hgncs
if len(human_genes) > 1:
logger.warning('multiple human genes mapped to rgd_id:%s: %s', rgd_id, human_genes)
return
return human_genes[0]
[docs] def get_enzyme_by_ec_number(self, ec_number: str) -> Optional[HumanGene]:
"""Get a enzyme by its associated EC number.
:param ec_number: EC number
"""
return self.session.query(Enzyme).filter(Enzyme.ec_number == ec_number).one_or_none()
[docs] def get_hgnc_from_alias_symbol(self, alias_symbol: str) -> Optional[HumanGene]:
"""Get HGNC from alias symbol.
:param alias_symbol: alias symbol
"""
query_result = self.session.query(AliasSymbol).filter(AliasSymbol.alias_symbol == alias_symbol).all()
if not query_result:
return None
return query_result[0].hgnc
[docs] def get_node(self, node: BaseEntity) -> Optional[HumanGene]:
"""Get a node from the database, whether it has a HGNC, RGD, MGI, or EG identifier.
:param node: The node to look for
:raises: KeyError
"""
if not isinstance(node, CentralDogma):
return
namespace = node.namespace
if namespace is None:
return
identifier = node.identifier
name = node.name
if namespace.lower() in {'hgnc'}:
return self._get_node_handle_hgnc(identifier, name)
if namespace.lower() in {'entrez', 'egid', 'eg', 'ncbigene'}:
return self._get_node_handle_entrez(identifier, name)
# if namespace.lower() in {'mgi'}:
# return self._get_node_handle_mgi(identifier, name)
#
# if namespace.lower() == 'mgiid':
# return self._get_node_handle_mgiid(identifier, name)
#
# if namespace.lower() in {'rgd'}:
# return self._get_node_handle_rgd(identifier, name)
#
# if namespace.lower() in {'rgdid'}:
# return self._get_node_handle_rgdid(identifier, name)
def _get_node_handle_hgnc(self, identifier, name) -> Optional[HumanGene]:
if identifier is not None:
return self.get_gene_by_hgnc_id(identifier)
elif name is not None:
return self.get_gene_by_hgnc_symbol(name)
raise KeyError
def _get_node_handle_entrez(self, identifier, name) -> Optional[HumanGene]:
if identifier is not None:
return self.get_gene_by_entrez_id(identifier)
elif name is not None:
return self.get_gene_by_entrez_id(name)
raise KeyError
def _get_node_handle_mgi(self, identifier, name) -> Optional[HumanGene]:
if identifier is not None:
return self.get_gene_by_mgi_id(identifier)
elif name is not None:
return
raise KeyError
def _get_node_handle_rgd(self, identifier, name) -> Optional[HumanGene]:
if identifier is not None:
return self.get_gene_by_rgd_id(identifier)
elif name is not None:
return
raise KeyError
def _get_node_handle_mgiid(self, _, name):
if name is None:
raise KeyError
return self.get_gene_by_mgi_id(name)
def _get_node_handle_rgdid(self, _, name):
if name is None:
raise KeyError
return self.get_gene_by_rgd_id(name)
[docs] def add_namespace_to_graph(self, graph: BELGraph):
"""Add this manager's namespace to the graph."""
namespace = self.upload_bel_namespace()
graph.namespace_url[namespace.keyword] = namespace.url
gfam_manager = GfamManager(engine=self.engine, session=self.session)
gfam_manager.add_namespace_to_graph(graph)
[docs] def iter_genes(self, graph: BELGraph, use_tqdm: bool = False) -> Iterable[Tuple[BaseEntity, HumanGene]]:
"""Iterate over pairs of BEL nodes and HGNC genes."""
if use_tqdm:
it = tqdm(graph, desc='HGNC Genes')
else:
it = graph
for node in it:
human_gene = self.get_node(node)
if human_gene is not None:
yield node, human_gene
[docs] def normalize_genes(self, graph: BELGraph, use_tqdm: bool = False) -> None:
"""Add identifiers to all HGNC genes."""
mapping = {}
for node_data, human_gene in list(self.iter_genes(graph, use_tqdm=use_tqdm)):
mapping[node_data] = gene_to_bel(human_gene, func=node_data.function, variants=node_data.get(VARIANTS))
# FIXME what about when an HGNC node appears in a fusion, complex, or composite?
relabel_nodes(graph, mapping, copy=False)
[docs] def enrich_genes_with_equivalences(self, graph: BELGraph) -> None:
"""Enrich genes with their corresponding UniProt."""
for node, human_gene in list(self.iter_genes(graph)):
func = node.function
if human_gene.entrez:
entrez_node = FUNC_TO_DSL[func](
namespace=ENTREZ,
name=human_gene.symbol,
identifier=str(human_gene.entrez)
)
graph.add_equivalence(node, entrez_node)
if func == PROTEIN:
for uniprot in human_gene.uniprots:
graph.add_equivalence(node, uniprot_to_bel(uniprot))
if func == RNA:
if human_gene.mirbase:
mirbase_rna_node = pybel.dsl.Rna(
namespace='mirbase',
identifier=str(human_gene.mirbase),
)
graph.add_equivalence(node, mirbase_rna_node)
[docs] def enrich_genes_with_families(self, graph: BELGraph) -> None:
"""Enrich genes in the BEL graph with their families."""
self.add_namespace_to_graph(graph)
for node_data, human_gene in list(self.iter_genes(graph)):
for family in human_gene.gene_families:
graph.add_is_a(node_data, family_to_bel(family, node_data[FUNCTION]))
[docs] def get_family_by_id(self, family_identifier: str) -> Optional[GeneFamily]:
"""Get a gene family by its hgnc.genefamily identifier, if it exists."""
results = self.gene_family(family_identifier=family_identifier)
return _deal_with_nonsense(results)
[docs] def get_family_by_name(self, family_name: str) -> Optional[GeneFamily]:
"""Get a gene family by its name, if it exists."""
results = self.gene_family(family_name=family_name)
return _deal_with_nonsense(results)
def _enrich_hgnc_with_entrez_equivalences(self, graph: BELGraph, data: BaseEntity) -> Optional[str]:
namespace = data.get(NAMESPACE)
if namespace.lower() not in {'hgnc'}:
return
func = data[FUNCTION]
name = data[NAME]
entrez = self.hgnc_symbol_entrez_id_mapping[name]
return graph.add_equivalence(data, FUNC_TO_DSL[func](
namespace=ENTREZ,
name=name,
identifier=str(entrez),
))
[docs] def enrich_hgnc_with_entrez_equivalences(self, graph: BELGraph):
"""Add equivalent Entrez nodes for all HGNC genes."""
self.add_namespace_to_graph(graph)
for _, data in graph.nodes(data=True):
self._enrich_hgnc_with_entrez_equivalences(graph, data)
[docs] def enrich_families_with_genes(self, graph: BELGraph):
"""Enrich gene families in the BEL graph with their member genes."""
self.add_namespace_to_graph(graph)
for gene_family_node in list(graph):
if not isinstance(gene_family_node, pybel.dsl.Gene):
continue
namespace = gene_family_node.namespace
if namespace is None or namespace.lower() not in GENE_FAMILY_NAMESPACES:
continue
identifier = gene_family_node.identifier
name = gene_family_node.name
if identifier:
gene_family_model = self.get_family_by_id(identifier)
elif name:
gene_family_model = self.get_family_by_name(name)
else:
raise ValueError
if gene_family_model is None:
logger.info('family not found: %s', gene_family_node)
continue
for human_gene in gene_family_model.hgncs:
graph.add_is_a(gene_to_bel(human_gene), gene_family_node)
"""Mapping dictionaries"""
@staticmethod
def _get_name(human_gene: HumanGene) -> str:
return str(human_gene.symbol)
@staticmethod
def _get_identifier(human_gene: HumanGene) -> str:
"""Get the identifier from a human gene SQLAlchemy model."""
return str(human_gene.identifier)
@staticmethod
def _get_encoding(human_gene: HumanGene) -> str:
"""Get the BEL encoding for a Human gene."""
return ENCODINGS.get(human_gene.locus_type, 'GRP')
[docs] def build_entrez_id_to_hgnc_symbol_mapping(self) -> Mapping[str, str]:
"""Build a mapping from Entrez gene identifier to HGNC gene symbols."""
return {
str(entrez_id): hgnc_symbol
for entrez_id, hgnc_symbol in self.session.query(HumanGene.entrez, HumanGene.symbol).all()
}
[docs] def build_entrez_id_to_hgnc_id_mapping(self) -> Mapping[str, str]:
"""Build a mapping from Entrez gene identifier to HGNC identifier."""
return {
str(entrez_id): str(hgnc_id)
for entrez_id, hgnc_id in self.session.query(HumanGene.entrez, HumanGene.identifier).all()
}
@property
def hgnc_symbol_entrez_id_mapping(self) -> Mapping[str, str]:
"""Get a mapping from Entrez gene identifiers to HGNC gene symbols."""
if not self._hgnc_symbol_entrez_id_mapping:
self._hgnc_symbol_entrez_id_mapping = self.build_hgnc_symbol_entrez_id_mapping()
return self._hgnc_symbol_entrez_id_mapping
[docs] def build_hgnc_symbol_entrez_id_mapping(self) -> Mapping[str, str]:
"""Build a mapping from HGNC symbol to ENTREZ identifier."""
return {
hgnc_symbol: str(entrez_id)
for hgnc_symbol, entrez_id in self.session.query(HumanGene.symbol, HumanGene.entrez).all()
}
[docs] def build_hgnc_id_symbol_mapping(self) -> Mapping[str, str]:
"""Build a mapping from HGNC identifiers to HGNC gene symbols."""
return {
str(hgnc_id): hgnc_symbol
for hgnc_id, hgnc_symbol in self.session.query(HumanGene.identifier, HumanGene.symbol).all()
}
[docs] def build_hgnc_symbol_id_mapping(self) -> Mapping[str, str]:
"""Build a mapping from HGNC gene symbols to HGNC identifiers."""
return {
hgnc_symbol: str(hgnc_id)
for hgnc_symbol, hgnc_id in self.session.query(HumanGene.symbol, HumanGene.identifier).all()
}
[docs] def build_uniprot_id_hgnc_id_mapping(self) -> Mapping[str, str]:
"""Build a mapping from UniProt identifiers to HGNC identifiers."""
return {
uniprot_id: str(hgnc_id)
for hgnc_id, uniprot_id in self.session.query(HumanGene.identifier, UniProt.uniprotid).all()
}
[docs] def build_uniprot_id_hgnc_symbol_mapping(self) -> Mapping[str, str]:
"""Build a mapping from UniProt identifiers to HGNC gene symbols."""
return {
uniprot_id: symbol
for symbol, uniprot_id in self.session.query(HumanGene.symbol, UniProt.uniprotid).all()
}
[docs] def get_all_hgnc_symbols(self) -> Set[str]:
"""Return the set of HGNC gene symbols in the database."""
return {
gene.symbol
for gene in self.session.query(HumanGene.symbol).all()
}
def _get_gene_encodings(self) -> Mapping[str, str]:
"""Get the name to encoding dictionary for HGNC gene names."""
return {
symbol: ENCODINGS.get(locus_type, 'GRP')
for symbol, locus_type in self.session.query(HumanGene.symbol, HumanGene.locus_type).all()
}
[docs] def list_families(self) -> List[GeneFamily]:
"""List families in the database."""
return self._list_model(GeneFamily)
[docs] def list_human_genes(self) -> List[HumanGene]:
"""List human genes in the database."""
return self._list_model(HumanGene)
[docs] def to_bel(self) -> BELGraph:
"""Export gene family definitions as a BEL graph."""
graph = BELGraph(
name='HGNC Central Dogma and Gene Family Definitions',
version='1.0.0', # FIXME use database version
authors='HUGO Gene Nomenclature Consortium',
description='Gene transcription, translation, and memberships to Gene Families.',
contact='charles.hoyt@scai.fraunhofer.de',
)
hgnc_namespace = self.upload_bel_namespace()
logger.info('using default namespace: %s at %s', hgnc_namespace, hgnc_namespace.url)
graph.namespace_url[hgnc_namespace.keyword] = hgnc_namespace.url
gfam_manager = GfamManager(connection=self.connection)
gfam_namespace = gfam_manager.upload_bel_namespace()
logger.info('using default namespace: %s at %s', gfam_namespace, gfam_namespace.url)
graph.namespace_url[gfam_namespace.keyword] = gfam_namespace.url
for human_gene in tqdm(self.list_human_genes(), total=self.count_human_genes(),
desc='Mapping central dogma to BEL'):
gene_dsl = gene_to_bel(human_gene)
graph.add_node_from_data(gene_dsl)
add_central_dogma(graph, human_gene)
for mouse_gene in human_gene.mgds:
graph.add_orthology(gene_dsl, pybel.dsl.Gene('mgi', identifier=str(mouse_gene.mgdid)))
for rat_gene in human_gene.rgds:
graph.add_orthology(gene_dsl, pybel.dsl.Gene('rgd', identifier=str(rat_gene.rgdid)))
protein_dsl = gene_to_bel(human_gene, PROTEIN)
for enzyme in human_gene.enzymes:
graph.add_is_a(protein_dsl, pybel.dsl.Protein('ec-code', enzyme.ec_number))
for family in tqdm(self.list_families(), total=self.count_families(),
desc='Mapping gene family definitions to BEL'):
for human_gene in family.hgncs:
graph.add_is_a(gene_to_bel(human_gene), family_to_bel(family))
return graph
[docs] def get_pathway_size_distribution(self) -> Counter:
"""Get the pathway size distribution."""
return Counter({
family.family_name: len(family.hgncs)
for family in self.session.query(GeneFamily)
})
[docs] def get_all_hgnc_symbols_family(self) -> Set[str]:
"""Get all Gene symbols that appear in gene families."""
return {
human_gene.symbol
for family in self.session.query(GeneFamily)
for human_gene in family.hgncs
}
[docs] def add_central_dogma(self, graph: BELGraph, node: BaseEntity) -> Optional[CentralDogma]:
"""Add the central dogma of biology."""
if VARIANTS in node:
return
human_gene = self.get_node(node)
encoding = ENCODINGS.get(human_gene.locus_type, 'GRP')
if 'M' in encoding:
mirna = gene_to_bel(human_gene, func=MIRNA)
graph.add_transcription(gene_to_bel(human_gene), mirna)
return mirna
if 'R' not in encoding:
return
rna = gene_to_bel(human_gene, func=RNA)
graph.add_transcription(gene_to_bel(human_gene), rna)
if 'P' not in encoding:
return rna
else:
protein = gene_to_bel(human_gene, func=PROTEIN)
graph.add_translation(rna, protein)
return protein
def _create_namespace_entry_from_model(self, human_gene: HumanGene, namespace: Namespace) -> NamespaceEntry:
return NamespaceEntry(
encoding=self._get_encoding(human_gene),
identifier=human_gene.identifier,
name=human_gene.symbol,
namespace=namespace
)
@staticmethod
def _cli_add_populate(main: click.Group) -> click.Group: # noqa: D202
"""Override default method to make it possible to add more flags."""
@main.command()
@click.option('--reset', is_flag=True)
@click.option('--skip-hcop', is_flag=True)
@click.pass_obj
def populate(manager, reset, skip_hcop):
"""Populate the database."""
if reset:
logger.info('Deleting the previous instance of the database')
manager.drop_all()
logger.info('Creating new models')
manager.create_all()
manager.populate(use_hcop=(not skip_hcop))
return main