"""Module for de-duplicating arrays of strings."""importrefromtypingimportList,Optional,Unionimportnumpyasnpimportpandasaspdfromsklearn.baseimportBaseEstimator,TransformerMixinfromsklearn.feature_extraction.textimportTfidfVectorizerfromsklearn.metrics.pairwiseimportlinear_kernelSTOP_TOKENS=r'[\W_]+|(corporation$)|(corp.$)|(corp$)|(incorporated$)|(inc.$)|(inc$)|(company$)|(common$)|(com$)'Data=Union[List,pd.Series,np.ndarray]
[docs]classStringCluster(BaseEstimator,TransformerMixin):""" Transformer for de-duplicating an array-like container of strings. Attributes ---------- ngram_size: int Size of ngrams to use in TfidfVectorizer. threshold: float Threshold to determine similarities; only samples above this number are flagged as similar. stop_tokens: re.Pattern RegEx pattern of stop tokens for use in TfidfVectorizer. vec: TfidfVectorizer Scikit-Learn TfidfVectorizer. similarity_: np.ndarray Array of labels_: np.ndarray Methods ------- fit(X: Data, y: Optional[Data] = None) Fit the transformer to data. transform(X: Data, y: Optional[Data] = None) Transform the data. fit_transform(X: Data, y: Optional[Data] = None, **fit_params) Fit and transform the data. """
[docs]def__init__(self,ngram_size:int=2,threshold:float=0.8,stop_tokens:str=r'[\W_]+'):""" Instantiate a StringCluster object. Parameters ---------- ngram_size: int Size of ngrams to use in TfidfVectorizer; default 2. threshold: float Threshold to determine similarities; default 0.8; must be between [0, 1]. stop_tokens: re.Pattern RegEx pattern of stop tokens for use in TfidfVectorizer; default r'[\W_]+'. """self.ngram_size=ngram_sizeself.threshold=thresholdself.stop_tokens=re.compile(stop_tokens)self.vec=TfidfVectorizer(analyzer='char_wb',ngram_range=(ngram_size,ngram_size))
[docs]deffit(self,X:Data,y:Optional[Data]=None)->"StringCluster":""" Fit the transformer to data. Parameters ---------- X: Data Array like object containing duplicated strings. y: Optional[Data] Optional array like object containing 'master list' of values to map similar samples to. Returns ------- StringCluster Self. """self.similarity_=self._get_cosine_similarity(X,y)self.labels_=self._get_labels()returnself
[docs]deftransform(self,X:Data,y:Optional[Data]=None)->pd.Series:""" Transform data. Parameters ---------- X: Data Array like object containing duplicated strings. y: Optional[Data] Optional array like object containing 'master list' of values to map similar samples to. Returns ------- pd.Series Pandas Series of de-duplicated values. """ifnothasattr(self,'labels_'):raiseAttributeError(".fit() method must be called before .transform() method.")ify:returnpd.Series(y)[self.labels_].reset_index(drop=True)returnpd.Series(X)[self.labels_].reset_index(drop=True)
[docs]deffit_transform(self,X:Data,y:Optional[Data]=None,**fit_params)->pd.Series:""" Fit and transform the data. Parameters ---------- X: Data Array like object containing duplicated strings. y: Optional[Data] Optional array like object containing 'master list' of values to map similar samples to. fit_params: Optional kwargs; for compatibility, only. Returns ------- pd.Series Pandas Series of de-deduplicated values. """returnself.fit(X,y).transform(X,y)
def_get_labels(self)->np.ndarray:""" Get labels based on similarity scores and given threshold. Notes ----- Similarity scores greater than the given threshold are replaced with 1 to setup argmax method for identifying and grouping similar samples. This causes duplicates to be renamed to the first version within the series. For example, given a series of ['Intel Corp', 'Intel', 'Intel Incorporated'], all three will be renamed to first sample-- i.e. 'Intel Corp'. This also helps reduce the number of inter-group versions which should be replaced with a single version. Returns ------- np.ndarray An array of similarity scores. If `y` is given, the array will be shape n_samples by len(y); if no `y` is given, array will be shape n_samples by n_samples. """returnnp.where(self.similarity_>self.threshold,1.,self.similarity_).argmax(1)# type: ignoredef_get_cosine_similarity(self,X:Data,y:Optional[Data]=None)->np.ndarray:"""Get cosine similarity using fitted TfidfVectorizer and Linear Kernel."""ify:a,b=self._clean_series(X),self._clean_series(y)else:a,b=self._clean_series(X),self._clean_series(X)self.vec.fit(b)returnlinear_kernel(self.vec.transform(a),self.vec.transform(b))# type: ignoredef_clean_series(self,X:Data)->pd.Series:"""Clean series of string values."""returnpd.Series(X).apply(self._clean_string)# type: ignoredef_clean_string(self,string:str)->str:"""Remove stop tokens and strip whitespace."""returnself.stop_tokens.sub(' ',string.lower()).strip()
[docs]defdedupe_companies():"""Deduplicate a list of publicly traded companies."""series=pd.read_csv('../data/companies.csv')['company']c=StringCluster(ngram_size=2,stop_tokens=STOP_TOKENS)labs=c.fit_transform(series)returnpd.DataFrame({'actual':series.reset_index(drop=True),'label':labs})
if__name__=='__main__':importtimestart=time.time()res=dedupe_companies()stop=time.time()print(res)print(f'Process took {stop-start} seconds.')