Source code for EduNLP.Pretrain.gensim_vec

# coding: utf-8
# 2021/5/29 @ tongshiwei
from EduNLP import logger
import multiprocessing
import gensim
from gensim.models import word2vec
from gensim.models.doc2vec import TaggedDocument
from gensim.models.callbacks import CallbackAny2Vec
from EduNLP.SIF.sif import sif4sci
from EduNLP.Vector import D2V, BowLoader
from copy import deepcopy
import itertools as it

__all__ = ["GensimWordTokenizer", "train_vector", "GensimSegTokenizer"]


[docs]class GensimWordTokenizer(object): """ Parameters ---------- symbol: gm fgm gmas fgmas general: True when item isn't in standard format, and want to tokenize formulas(except formulas in figure) linearly. False when use 'ast' mothed to tokenize formulas instead of 'linear'. Returns ---------- Examples ---------- >>> tokenizer = GensimWordTokenizer(symbol="gmas", general=True) >>> token_item = tokenizer("有公式$\\FormFigureID{wrong1?}$,如图$\\FigureID{088f15ea-xxx}$,\ ... 若$x,y$满足约束条件公式$\\FormFigureBase64{wrong2?}$,$\\SIFSep$,则$z=x+7 y$的最大值为$\\SIFBlank$") >>> print(token_item.tokens[:10]) ['公式', '[FORMULA]', '如图', '[FIGURE]', 'x', ',', 'y', '约束条件', '公式', '[FORMULA]'] >>> tokenizer = GensimWordTokenizer(symbol="fgmas", general=False) >>> token_item = tokenizer("有公式$\\FormFigureID{wrong1?}$,如图$\\FigureID{088f15ea-xxx}$,\ ... 若$x,y$满足约束条件公式$\\FormFigureBase64{wrong2?}$,$\\SIFSep$,则$z=x+7 y$的最大值为$\\SIFBlank$") >>> print(token_item.tokens[:10]) ['公式', '[FORMULA]', '如图', '[FIGURE]', '[FORMULA]', '约束条件', '公式', '[FORMULA]', '[SEP]', '[FORMULA]'] """ def __init__(self, symbol="gm", general=False): self.symbol = symbol if general is True: self.tokenization_params = { "formula_params": { "method": "linear", "symbolize_figure_formula": True } } else: self.tokenization_params = { "formula_params": { "method": "ast", "return_type": "list", "ord2token": True } } def batch_process(self, *items): pass def __call__(self, item): return sif4sci( item, symbol=self.symbol, tokenization_params=self.tokenization_params, errors="ignore" )
[docs]class GensimSegTokenizer(object): # pragma: no cover """ Parameters ---------- symbol: gms fgm """ def __init__(self, symbol="gms", depth=None, flatten=False, **kwargs): self.symbol = symbol self.tokenization_params = { "formula_params": { "method": "ast", "return_type": "list", "ord2token": True } } self.kwargs = dict( add_seg_type=True if depth in {0, 1, 2} else False, add_seg_mode="head", depth=depth, drop="s" if depth not in {0, 1, 2} else "" ) self.kwargs.update(kwargs) self.flatten = flatten def __call__(self, item, flatten=None, **kwargs): flatten = self.flatten if flatten is None else flatten tl = sif4sci( item, symbol=self.symbol, tokenization_params=self.tokenization_params, errors="ignore" ) if kwargs: _kwargs = deepcopy(self.kwargs) _kwargs.update(kwargs) else: _kwargs = self.kwargs if tl: ret = tl.get_segments(**_kwargs) if flatten is True: return it.chain(*ret) return ret return tl
class MonitorCallback(CallbackAny2Vec): def __init__(self, test_words): self.epoch = 0 self._test_words = test_words def on_epoch_end(self, model): logger.info("Epoch #{}: loss-{:.4f} ".format(self.epoch, model.get_latest_training_loss())) self.epoch += 1 def train_vector(items, w2v_prefix, embedding_dim=None, method="sg", binary=None, train_params=None): monitor = MonitorCallback(["word", "I", "less"]) _train_params = dict( min_count=0, vector_size=embedding_dim, workers=multiprocessing.cpu_count(), callbacks=[monitor] ) if method in {"sg", "cbow"}: sg = 1 if method == "sg" else 0 _train_params["sg"] = sg if train_params is not None: _train_params.update(train_params) model = gensim.models.Word2Vec( items, **_train_params ) binary = binary if binary is not None else False elif method == "fasttext": if train_params is not None: _train_params.update(train_params) model = gensim.models.FastText( sentences=items, **_train_params ) binary = binary if binary is not None else True elif method == "d2v": if train_params is not None: _train_params.update(train_params) docs = [TaggedDocument(doc, [i]) for i, doc in enumerate(items)] model = gensim.models.Doc2Vec( docs, **_train_params ) binary = binary if binary is not None else True elif method == "bow": model = gensim.corpora.Dictionary(items) binary = binary if binary is not None else True elif method == "tfidf": dictionary_path = train_vector(items, w2v_prefix, method="bow") dictionary = BowLoader(dictionary_path) corpus = [dictionary.infer_vector(item) for item in items] model = gensim.models.TfidfModel(corpus) binary = binary if binary is not None else True else: raise ValueError("Unknown method: %s" % method) filepath = w2v_prefix + method if embedding_dim is not None: filepath = filepath + "_" + str(embedding_dim) if binary is True: filepath += ".bin" logger.info("model is saved to %s" % filepath) model.save(filepath) else: if method in {"fasttext", "d2v"}: # pragma: no cover logger.warning("binary should be True for %s, otherwise all vectors for ngrams will be lost." % method) filepath += ".kv" logger.info("model is saved to %s" % filepath) model.wv.save(filepath) return filepath