Source code for featuretools.computational_backends.calculate_feature_matrix
importloggingimportmathimportosimportshutilimporttimeimportwarningsfromdatetimeimportdatetimeimportcloudpickleimportnumpyasnpimportpandasaspdfromwoodwork.logical_typesimport(Age,AgeNullable,Boolean,BooleanNullable,Integer,IntegerNullable,)fromfeaturetools.computational_backends.feature_setimportFeatureSetfromfeaturetools.computational_backends.feature_set_calculatorimport(FeatureSetCalculator,)fromfeaturetools.computational_backends.utilsimport(_check_cutoff_time_type,_validate_cutoff_time,bin_cutoff_times,create_client_and_cluster,gather_approximate_features,gen_empty_approx_features_df,get_ww_types_from_features,save_csv_decorator,)fromfeaturetools.entityset.relationshipimportRelationshipPathfromfeaturetools.feature_baseimportAggregationFeature,FeatureBasefromfeaturetools.utilsimportTriefromfeaturetools.utils.gen_utilsimport(import_or_raise,make_tqdm_iterator,)logger=logging.getLogger("featuretools.computational_backend")PBAR_FORMAT="Elapsed: {elapsed} | Progress: {l_bar}{bar}"FEATURE_CALCULATION_PERCENTAGE=(0.95# make total 5% higher to allot time for wrapping up at end)
[docs]defcalculate_feature_matrix(features,entityset=None,cutoff_time=None,instance_ids=None,dataframes=None,relationships=None,cutoff_time_in_index=False,training_window=None,approximate=None,save_progress=None,verbose=False,chunk_size=None,n_jobs=1,dask_kwargs=None,progress_callback=None,include_cutoff_time=True,):""" 计算给定实例ID和计算时间的矩阵.Args: features (list[:class:`.FeatureBase`]): 要计算的特征定义. entityset (EntitySet): 已初始化的EntitySet.如果未提供`dataframes`和`relationships`,则需要此参数. cutoff_time (pd.DataFrame 或 Datetime): 指定为每个实例计算特征的时间.生成的特征矩阵将使用截止时间之前的数据.可以是DataFrame或单个值.如果传递的是DataFrame,则实例ID必须在目标DataFrame索引同名的列中,或者在名为`instance_id`的列中.DataFrame中的截止时间值必须在目标DataFrame时间索引同名的列中,或者在名为`time`的列中.如果DataFrame包含两个以上的列,任何额外的列都将添加到生成的特征矩阵中.如果传递的是单个值,则该值将用于所有实例. instance_ids (list): 要计算特征的实例列表.仅在`cutoff_time`为单个datetime时使用. dataframes (dict[str -> tuple(DataFrame, str, str, dict[str -> str/Woodwork.LogicalType], dict[str->str/set], boolean)]): DataFrame的字典.条目格式为{DataFrame名称 -> (DataFrame, 索引列, 时间索引, 逻辑类型, 语义标签, 创建索引)}.注意,只有DataFrame是必需的.如果提供了Woodwork DataFrame,则其他参数将被忽略. relationships (list[(str, str, str, str)]): DataFrame之间关系的列表.列表项是一个元组,格式为(父DataFrame名称, 父列, 子DataFrame名称, 子列). cutoff_time_in_index (bool): 如果为True,返回一个具有MultiIndex的DataFrame,其中第二个索引是截止时间(第一个是实例ID).DataFrame将按(时间, 实例ID)排序. training_window (Timedelta 或 str, 可选): 定义在计算特征时截止时间之前可以使用多少时间数据的窗口.如果为``None``,则使用截止时间之前的所有数据.默认为``None``. approximate (Timedelta 或 str): 用于将具有相似截止时间的实例分组以计算昂贵特征的频率.例如,如果桶为24小时,则同一天内具有截止时间的所有实例将使用相同的计算方法来计算昂贵的特征. verbose (bool, 可选): 打印进度信息.时间粒度为每个块. chunk_size (int 或 float 或 None): 每次计算的输出特征矩阵的最大行数.如果传递一个大于0的整数,将尝试每块使用那么多行.如果传递一个介于0和1之间的浮点数,则将块大小设置为所有行的该百分比.如果为None,并且n_jobs > 1,则将其设置为1/n_jobs. n_jobs (int, 可选): 计算特征矩阵时要使用的并行进程数.如果不等于1,则需要Dask. dask_kwargs (dict, 可选): 创建dask客户端和调度程序时要传递的关键字参数字典.即使未设置n_jobs,使用`dask_kwargs`也会启用多进程.主要参数: cluster (str 或 dask.distributed.LocalCluster): 要发送任务的集群或集群地址.如果未指定,将创建一个集群. diagnostics port (int): 用于Web仪表板的端口号.如果未指定,则不会启用Web界面. 还将接受LocalCluster的有效关键字参数. save_progress (str, 可选): 保存中间计算结果的路径. progress_callback (callable): 随着进度更新调用的函数.具有以下参数: update: 自上次调用以来的进度变化百分比(介于0和100之间的浮点数) progress_percent: 已完成计算的百分比(介于0和100之间的浮点数) time_elapsed: 自调用开始以来经过的总时间(以秒为单位) include_cutoff_time (bool): 在特征计算中包含截止时间的数据.默认为``True``.Returns: pd.DataFrame: 特征矩阵."""assert(isinstance(features,list)andfeatures!=[]andall([isinstance(feature,FeatureBase)forfeatureinfeatures])),"features must be a non-empty list of features"# handle loading entitysetfromfeaturetools.entityset.entitysetimportEntitySetifnotisinstance(entityset,EntitySet):ifdataframesisnotNone:entityset=EntitySet("entityset",dataframes,relationships)else:raiseTypeError("No dataframes or valid EntitySet provided")target_dataframe=entityset[features[0].dataframe_name]cutoff_time=_validate_cutoff_time(cutoff_time,target_dataframe)entityset._check_time_indexes()ifisinstance(cutoff_time,pd.DataFrame):ifinstance_ids:msg="Passing 'instance_ids' is valid only if 'cutoff_time' is a single value or None - ignoring"warnings.warn(msg)pass_columns=[colforcolincutoff_time.columnsifcolnotin["instance_id","time"]]# make sure dtype of instance_id in cutoff time# is same as column it referencestarget_dataframe=features[0].dataframeltype=target_dataframe.ww.logical_types[target_dataframe.ww.index]cutoff_time.ww.init(logical_types={"instance_id":ltype})else:pass_columns=[]ifcutoff_timeisNone:ifentityset.time_type=="numeric":cutoff_time=np.infelse:cutoff_time=datetime.now()ifinstance_idsisNone:index_col=target_dataframe.ww.indexdf=entityset._handle_time(dataframe_name=target_dataframe.ww.name,df=target_dataframe,time_last=cutoff_time,training_window=training_window,include_cutoff_time=include_cutoff_time,)instance_ids=df[index_col]# convert list or range object into seriesifnotisinstance(instance_ids,pd.Series):instance_ids=pd.Series(instance_ids)cutoff_time=(cutoff_time,instance_ids)_check_cutoff_time_type(cutoff_time,entityset.time_type)# Approximate provides no benefit with a single cutoff time, so ignore itifisinstance(cutoff_time,tuple)andapproximateisnotNone:msg=("Using approximate with a single cutoff_time value or no cutoff_time ""provides no computational efficiency benefit")warnings.warn(msg)cutoff_time=pd.DataFrame({"instance_id":cutoff_time[1],"time":[cutoff_time[0]]*len(cutoff_time[1]),},)target_dataframe=features[0].dataframeltype=target_dataframe.ww.logical_types[target_dataframe.ww.index]cutoff_time.ww.init(logical_types={"instance_id":ltype})feature_set=FeatureSet(features)# Get features to approximateifapproximateisnotNone:approximate_feature_trie=gather_approximate_features(feature_set)# Make a new FeatureSet that ignores approximated featuresfeature_set=FeatureSet(features,approximate_feature_trie=approximate_feature_trie,)# Check if there are any non-approximated aggregation featuresno_unapproximated_aggs=Trueforfeatureinfeatures:ifisinstance(feature,AggregationFeature):# do not need to check if feature is in to_approximate since# only base features of direct features can be in to_approximateno_unapproximated_aggs=FalsebreakifapproximateisnotNone:all_approx_features={ffor_,featsinfeature_set.approximate_feature_trieforfinfeats}else:all_approx_features=set()deps=feature.get_dependencies(deep=True,ignored=all_approx_features)fordependencyindeps:ifisinstance(dependency,AggregationFeature):no_unapproximated_aggs=Falsebreakcutoff_df_time_col="time"target_time="_original_time"ifapproximateisnotNone:# If there are approximated aggs, bin timesbinned_cutoff_time=bin_cutoff_times(cutoff_time,approximate)# Think about collisions: what if original time is a featurebinned_cutoff_time.ww[target_time]=cutoff_time[cutoff_df_time_col]cutoff_time_to_pass=binned_cutoff_timeelse:cutoff_time_to_pass=cutoff_timeifisinstance(cutoff_time,pd.DataFrame):cutoff_time_len=cutoff_time.shape[0]else:cutoff_time_len=len(cutoff_time[1])chunk_size=_handle_chunk_size(chunk_size,cutoff_time_len)tqdm_options={"total":(cutoff_time_len/FEATURE_CALCULATION_PERCENTAGE),"bar_format":PBAR_FORMAT,"disable":True,}ifverbose:tqdm_options.update({"disable":False})elifprogress_callbackisnotNone:# allows us to utilize progress_bar updates without printing to anywheretqdm_options.update({"file":open(os.devnull,"w"),"disable":False})withmake_tqdm_iterator(**tqdm_options)asprogress_bar:ifn_jobs!=1ordask_kwargsisnotNone:feature_matrix=parallel_calculate_chunks(cutoff_time=cutoff_time_to_pass,chunk_size=chunk_size,feature_set=feature_set,approximate=approximate,training_window=training_window,save_progress=save_progress,entityset=entityset,n_jobs=n_jobs,no_unapproximated_aggs=no_unapproximated_aggs,cutoff_df_time_col=cutoff_df_time_col,target_time=target_time,pass_columns=pass_columns,progress_bar=progress_bar,dask_kwargs=dask_kwargsor{},progress_callback=progress_callback,include_cutoff_time=include_cutoff_time,)else:feature_matrix=calculate_chunk(cutoff_time=cutoff_time_to_pass,chunk_size=chunk_size,feature_set=feature_set,approximate=approximate,training_window=training_window,save_progress=save_progress,entityset=entityset,no_unapproximated_aggs=no_unapproximated_aggs,cutoff_df_time_col=cutoff_df_time_col,target_time=target_time,pass_columns=pass_columns,progress_bar=progress_bar,progress_callback=progress_callback,include_cutoff_time=include_cutoff_time,)# ensure rows are sorted by input orderifisinstance(cutoff_time,pd.DataFrame):feature_matrix=feature_matrix.ww.reindex(pd.MultiIndex.from_frame(cutoff_time[["instance_id","time"]],names=feature_matrix.index.names,),)else:# Maintain index dtypeindex_dtype=feature_matrix.index.get_level_values(0).dtypefeature_matrix=feature_matrix.ww.reindex(cutoff_time[1].astype(index_dtype),level=0,)ifnotcutoff_time_in_index:feature_matrix.ww.reset_index(level="time",drop=True,inplace=True)ifsave_progressandos.path.exists(os.path.join(save_progress,"temp")):shutil.rmtree(os.path.join(save_progress,"temp"))# force to 100% since we saved last 5 percentprevious_progress=progress_bar.nprogress_bar.update(progress_bar.total-progress_bar.n)ifprogress_callbackisnotNone:(update,progress_percent,time_elapsed,)=update_progress_callback_parameters(progress_bar,previous_progress)progress_callback(update,progress_percent,time_elapsed)progress_bar.refresh()returnfeature_matrix
defcalculate_chunk(cutoff_time,chunk_size,feature_set,entityset,approximate,training_window,save_progress,no_unapproximated_aggs,cutoff_df_time_col,target_time,pass_columns,progress_bar=None,progress_callback=None,include_cutoff_time=True,schema=None,):ifnotisinstance(feature_set,FeatureSet):feature_set=cloudpickle.loads(feature_set)# pragma: no coverfeature_matrix=[]ifno_unapproximated_aggsandapproximateisnotNone:ifentityset.time_type=="numeric":group_time=np.infelse:group_time=datetime.now()ifisinstance(cutoff_time,tuple):update_progress_callback=Noneifprogress_barisnotNone:defupdate_progress_callback(done):previous_progress=progress_bar.nprogress_bar.update(done*len(cutoff_time[1]))ifprogress_callbackisnotNone:(update,progress_percent,time_elapsed,)=update_progress_callback_parameters(progress_bar,previous_progress,)progress_callback(update,progress_percent,time_elapsed)time_last=cutoff_time[0]ids=cutoff_time[1]calculator=FeatureSetCalculator(entityset,feature_set,time_last,training_window=training_window,)_feature_matrix=calculator.run(ids,progress_callback=update_progress_callback,include_cutoff_time=include_cutoff_time,)time_index=pd.Index([time_last]*len(ids),name="time")_feature_matrix=_feature_matrix.set_index(time_index,append=True)feature_matrix.append(_feature_matrix)else:ifschema:cutoff_time.ww.init_with_full_schema(schema=schema)# pragma: no coverfor_,groupincutoff_time.groupby(cutoff_df_time_col):# if approximating, calculate the approximate featuresifapproximateisnotNone:group.ww.init(schema=cutoff_time.ww.schema,validate=False)precalculated_features_trie=approximate_features(feature_set,group,window=approximate,entityset=entityset,training_window=training_window,include_cutoff_time=include_cutoff_time,)else:precalculated_features_trie=None@save_csv_decorator(save_progress)defcalc_results(time_last,ids,precalculated_features=None,training_window=None,include_cutoff_time=True,):update_progress_callback=Noneifprogress_barisnotNone:defupdate_progress_callback(done):previous_progress=progress_bar.nprogress_bar.update(done*group.shape[0])ifprogress_callbackisnotNone:(update,progress_percent,time_elapsed,)=update_progress_callback_parameters(progress_bar,previous_progress,)progress_callback(update,progress_percent,time_elapsed)calculator=FeatureSetCalculator(entityset,feature_set,time_last,training_window=training_window,precalculated_features=precalculated_features,)matrix=calculator.run(ids,progress_callback=update_progress_callback,include_cutoff_time=include_cutoff_time,)returnmatrix# if all aggregations have been approximated, can calculate all togetherifno_unapproximated_aggsandapproximateisnotNone:inner_grouped=[[group_time,group]]else:# if approximated features, set cutoff_time to unbinned timeifprecalculated_features_trieisnotNone:group[cutoff_df_time_col]=group[target_time]inner_grouped=group.groupby(cutoff_df_time_col,sort=True)ifchunk_sizeisnotNone:inner_grouped=_chunk_dataframe_groups(inner_grouped,chunk_size)fortime_last,groupininner_grouped:# sort group by instance idids=group["instance_id"].sort_values().valuesifno_unapproximated_aggsandapproximateisnotNone:window=Noneelse:window=training_window# calculate values for those instances at time time_last_feature_matrix=calc_results(time_last,ids,precalculated_features=precalculated_features_trie,training_window=window,include_cutoff_time=include_cutoff_time,)id_name=_feature_matrix.index.name# if approximate, merge feature matrix with group frame to get original# cutoff times and passed columnsifapproximate:cols=[cforcin_feature_matrix.columnsifcnotinpass_columns]indexer=group[["instance_id",target_time]+pass_columns]_feature_matrix=_feature_matrix[cols].merge(indexer,right_on=["instance_id"],left_index=True,how="right",)_feature_matrix.set_index(["instance_id",target_time],inplace=True,)_feature_matrix.index.set_names([id_name,"time"],inplace=True)_feature_matrix.sort_index(level=1,kind="mergesort",inplace=True)else:# all rows have same cutoff time. set time and add passed columnsnum_rows=len(ids)iflen(pass_columns)>0:pass_through=group[["instance_id",cutoff_df_time_col]+pass_columns]pass_through.rename(columns={"instance_id":id_name,cutoff_df_time_col:"time",},inplace=True,)time_index=pd.Index([time_last]*num_rows,name="time")_feature_matrix=_feature_matrix.set_index(time_index,append=True,)iflen(pass_columns)>0:pass_through.set_index([id_name,"time"],inplace=True)forcolinpass_columns:_feature_matrix[col]=pass_through[col]feature_matrix.append(_feature_matrix)ww_init_kwargs=get_ww_types_from_features(feature_set.target_features,entityset,pass_columns,cutoff_time,)feature_matrix=init_ww_and_concat_fm(feature_matrix,ww_init_kwargs)returnfeature_matrixdefapproximate_features(feature_set,cutoff_time,window,entityset,training_window=None,include_cutoff_time=True,):"""给定一组特征和要传递给calculate_feature_matrix的cutoff_times,计算某些特征的近似值以加快计算速度.cutoff_times被排序到窗口大小的桶中,并且每个桶仅在一个cutoff时间点计算近似特征值...note:: 这仅近似于目标数据帧上的AggregationFeatures的DirectFeatures.在未来的版本中,可能还可以近似这些特征在其他顶级数据帧上的值.Args: cutoff_time (pd.DataFrame): 指定为每个实例计算特征的时间.生成的特征矩阵将使用截止时间之前(包括截止时间)的数据.一个包含'instance_id'和'time'列的DataFrame. window (Timedelta 或 str): 用于将具有相似截止时间的实例分组以计算成本高昂特征的频率.例如,如果桶为24小时,则同一天内具有截止时间的所有实例将使用相同的计算来计算昂贵的特征. entityset (:class:`.EntitySet`): 一个已经初始化的实体集. feature_set (:class:`.FeatureSet`): 要计算的特征. training_window (`Timedelta`, 可选): 定义在截止时间之前可以包含多少旧数据以计算特征的窗口.如果为None,则使用所有旧数据. include_cutoff_time (bool): 如果为True,则在特征计算中包含截止时间的数据."""approx_fms_trie=Trie(path_constructor=RelationshipPath)target_time_colname="target_time"cutoff_time.ww[target_time_colname]=cutoff_time["time"]approx_cutoffs=bin_cutoff_times(cutoff_time,window)cutoff_df_time_col="time"cutoff_df_instance_col="instance_id"# should this order be by dependencies so that calculate_feature_matrix# doesn't skip approximating something?forrelationship_path,approx_feature_namesinfeature_set.approximate_feature_trie:ifnotapprox_feature_names:continue(cutoffs_with_approx_e_ids,new_approx_dataframe_index_col,)=_add_approx_dataframe_index_col(entityset,feature_set.target_df_name,approx_cutoffs.copy(),relationship_path,)# Select only columns we care aboutcolumns_we_want=[new_approx_dataframe_index_col,cutoff_df_time_col,target_time_colname,]cutoffs_with_approx_e_ids=cutoffs_with_approx_e_ids[columns_we_want]cutoffs_with_approx_e_ids=cutoffs_with_approx_e_ids.drop_duplicates()cutoffs_with_approx_e_ids.dropna(subset=[new_approx_dataframe_index_col],inplace=True,)approx_features=[feature_set.features_by_name[name]fornameinapprox_feature_names]ifcutoffs_with_approx_e_ids.empty:approx_fm=gen_empty_approx_features_df(approx_features)else:cutoffs_with_approx_e_ids.sort_values([cutoff_df_time_col,new_approx_dataframe_index_col],inplace=True,)# CFM assumes specific column names for cutoff_time argumentrename={new_approx_dataframe_index_col:cutoff_df_instance_col}cutoff_time_to_pass=cutoffs_with_approx_e_ids.rename(columns=rename)cutoff_time_to_pass=cutoff_time_to_pass[[cutoff_df_instance_col,cutoff_df_time_col]]cutoff_time_to_pass.drop_duplicates(inplace=True)approx_fm=calculate_feature_matrix(approx_features,entityset,cutoff_time=cutoff_time_to_pass,training_window=training_window,approximate=None,cutoff_time_in_index=False,chunk_size=cutoff_time_to_pass.shape[0],include_cutoff_time=include_cutoff_time,)approx_fms_trie.get_node(relationship_path).value=approx_fmreturnapprox_fms_triedefscatter_warning(num_scattered_workers,num_workers):ifnum_scattered_workers!=num_workers:scatter_warning="EntitySet was only scattered to {} out of {} workers"logger.warning(scatter_warning.format(num_scattered_workers,num_workers))defparallel_calculate_chunks(cutoff_time,chunk_size,feature_set,approximate,training_window,save_progress,entityset,n_jobs,no_unapproximated_aggs,cutoff_df_time_col,target_time,pass_columns,progress_bar,dask_kwargs=None,progress_callback=None,include_cutoff_time=True,):import_or_raise("distributed","Dask must be installed to calculate feature matrix with n_jobs set to anything but 1",)fromdask.baseimporttokenizefromdistributedimportFuture,as_completedclient=Nonecluster=Nonetry:client,cluster=create_client_and_cluster(n_jobs=n_jobs,dask_kwargs=dask_kwargs,entityset_size=entityset.__sizeof__(),)# scatter the entityset# denote future with leading underscorestart=time.time()es_token="EntitySet-{}".format(tokenize(entityset))ifes_tokeninclient.list_datasets():msg="Using EntitySet persisted on the cluster as dataset {}"progress_bar.write(msg.format(es_token))_es=client.get_dataset(es_token)else:_es=client.scatter([entityset])[0]client.publish_dataset(**{_es.key:_es})# save features to a tempfile and scatter itpickled_feats=cloudpickle.dumps(feature_set)_saved_features=client.scatter(pickled_feats)client.replicate([_es,_saved_features])num_scattered_workers=len(client.who_has([Future(es_token)]).get(es_token,[]),)num_workers=len(client.scheduler_info()["workers"].values())schema=Noneifisinstance(cutoff_time,pd.DataFrame):schema=cutoff_time.ww.schemachunks=cutoff_time.groupby(cutoff_df_time_col)cutoff_time_len=cutoff_time.shape[0]else:chunks=cutoff_timecutoff_time_len=len(cutoff_time[1])ifnotchunk_size:chunk_size=_handle_chunk_size(1.0/num_workers,cutoff_time_len)chunks=_chunk_dataframe_groups(chunks,chunk_size)chunks=[dffor_,dfinchunks]iflen(chunks)<num_workers:# pragma: no coverchunk_warning=("Fewer chunks ({}), than workers ({}) consider reducing the chunk size")warning_string=chunk_warning.format(len(chunks),num_workers)progress_bar.write(warning_string)scatter_warning(num_scattered_workers,num_workers)end=time.time()scatter_time=round(end-start)# if enabled, reset timer after scatter for better time remaining estimatesifnotprogress_bar.disable:progress_bar.reset()scatter_string="EntitySet scattered to {} workers in {} seconds"progress_bar.write(scatter_string.format(num_scattered_workers,scatter_time))# map chunks# TODO: consider handling task submission dask kwargs_chunks=client.map(calculate_chunk,chunks,feature_set=_saved_features,chunk_size=None,entityset=_es,approximate=approximate,training_window=training_window,save_progress=save_progress,no_unapproximated_aggs=no_unapproximated_aggs,cutoff_df_time_col=cutoff_df_time_col,target_time=target_time,pass_columns=pass_columns,progress_bar=None,progress_callback=progress_callback,include_cutoff_time=include_cutoff_time,schema=schema,)feature_matrix=[]iterator=as_completed(_chunks).batches()forbatchiniterator:results=client.gather(batch)forresultinresults:feature_matrix.append(result)previous_progress=progress_bar.nprogress_bar.update(result.shape[0])ifprogress_callbackisnotNone:(update,progress_percent,time_elapsed,)=update_progress_callback_parameters(progress_bar,previous_progress,)progress_callback(update,progress_percent,time_elapsed)exceptException:raisefinally:ifclientisnotNone:client.close()if"cluster"notindask_kwargsandclusterisnotNone:cluster.close()# pragma: no coverww_init_kwargs=get_ww_types_from_features(feature_set.target_features,entityset,pass_columns,cutoff_time,)feature_matrix=init_ww_and_concat_fm(feature_matrix,ww_init_kwargs)returnfeature_matrixdef_add_approx_dataframe_index_col(es,target_dataframe_name,cutoffs,path):""" 向截止日期数据框添加一列,将其链接到路径末端的数据框.返回更新后的截止日期数据框及该列的名称.名称将由连接的列组成."""last_child_col="instance_id"last_parent_col=es[target_dataframe_name].ww.indexfor_,relationshipinpath:child_cols=[last_parent_col,relationship._child_column_name]child_df=es[relationship.child_name][child_cols]# Rename relationship.child_column to include the columns we have# joined through.new_col_name="%s.%s"%(last_child_col,relationship._child_column_name)to_rename={relationship._child_column_name:new_col_name}child_df=child_df.rename(columns=to_rename)cutoffs=cutoffs.merge(child_df,left_on=last_child_col,right_on=last_parent_col,)# These will be used in the next iteration.last_child_col=new_col_namelast_parent_col=relationship._parent_column_namereturncutoffs,new_col_namedef_chunk_dataframe_groups(grouped,chunk_size):"""将一个分组的数据框分成不大于 chunk_size 的组."""ifisinstance(grouped,tuple):foriinrange(0,len(grouped[1]),chunk_size):yieldNone,(grouped[0],grouped[1].iloc[i:i+chunk_size])else:forgroup_key,group_dfingrouped:foriinrange(0,len(group_df),chunk_size):yieldgroup_key,group_df.iloc[i:i+chunk_size]def_handle_chunk_size(chunk_size,total_size):ifchunk_sizeisnotNone:assertchunk_size>0,"Chunk size must be greater than 0"ifchunk_size<1:chunk_size=math.ceil(chunk_size*total_size)chunk_size=int(chunk_size)returnchunk_sizedefupdate_progress_callback_parameters(progress_bar,previous_progress):update=(progress_bar.n-previous_progress)/progress_bar.total*100progress_percent=(progress_bar.n/progress_bar.total)*100time_elapsed=progress_bar.format_dict["elapsed"]return(update,progress_percent,time_elapsed)definit_ww_and_concat_fm(feature_matrix,ww_init_kwargs):cols_to_check={colforcol,ltypeinww_init_kwargs["logical_types"].items()ifisinstance(ltype,(Age,Boolean,Integer))}replacement_type={"age":AgeNullable(),"boolean":BooleanNullable(),"integer":IntegerNullable(),}forfminfeature_matrix:updated_cols=set()forcolincols_to_check:# Only convert types if null values are presentiffm[col].isnull().any():current_type=ww_init_kwargs["logical_types"][col].type_stringww_init_kwargs["logical_types"][col]=replacement_type[current_type]updated_cols.add(col)cols_to_check=cols_to_check-updated_colsfm.ww.init(**ww_init_kwargs)feature_matrix=pd.concat(feature_matrix)feature_matrix.ww.init(**ww_init_kwargs)returnfeature_matrix