importfunctoolsimportoperatorfromdask_expr._exprimportProjectionfromdask_expr._utilimport_convert_to_listfromdask_expr.io.ioimportBlockwiseIO,PartitionsFilteredclassReadCSV(PartitionsFiltered,BlockwiseIO):_parameters=["filename","columns","header","dtype_backend","_partitions","storage_options","kwargs","_series","dataframe_backend",]_defaults={"columns":None,"header":"infer","dtype_backend":None,"kwargs":None,"_partitions":None,"storage_options":None,"_series":False,"dataframe_backend":"pandas",}_absorb_projections=True@functools.cached_propertydefoperation(self):fromdask.dataframe.ioimportread_csvreturnread_csv@functools.cached_propertydef_ddf(self):fromdaskimportconfig# Temporary hack to simplify logicwithconfig.set({"dataframe.backend":self.dataframe_backend}):kwargs=({"dtype_backend":self.dtype_backend}ifself.dtype_backendisnotNoneelse{})ifself.kwargsisnotNone:kwargs.update(self.kwargs)columns=_convert_to_list(self.operand("columns"))ifcolumnsisNone:passelif"include_path_column"inself.kwargs:flag=self.kwargs["include_path_column"]ifflagisTrue:column_to_remove="path"elifisinstance(flag,str):column_to_remove=flagelse:column_to_remove=Nonecolumns=[cforcincolumnsifc!=column_to_remove]ifnotcolumns:meta=self.operation(self.filename,header=self.header,storage_options=self.storage_options,**kwargs,)._metacolumns=[list(meta.columns)[0]]usecols=kwargs.pop("usecols",None)ifusecolsisnotNoneandcolumnsisnotNone:columns=[colforcolincolumnsifcolinusecols]elifusecols:columns=usecolsreturnself.operation(self.filename,usecols=columns,header=self.header,storage_options=self.storage_options,**kwargs,)@functools.cached_propertydef_meta(self):returnself._ddf._metadef_simplify_up(self,parent,dependents):ifisinstance(parent,Projection):kwargs=self.kwargs# int usecols are positional, so block projectionsifkwargs.get("usecols",None)isnotNoneandisinstance(kwargs.get("usecols")[0],int):returnreturnsuper()._simplify_up(parent,dependents)@functools.cached_propertydefcolumns(self):columns_operand=self.operand("columns")ifcolumns_operandisNone:try:returnlist(self._ddf._meta.columns)exceptAttributeError:return[]else:return_convert_to_list(columns_operand)def_divisions(self):returnself._ddf.divisions@functools.cached_propertydef_tasks(self):returnlist(self._ddf.dask.to_dict().values())def_filtered_task(self,index:int):ifself._series:return(operator.getitem,self._tasks[index],self.columns[0])returnself._tasks[index]classReadTable(ReadCSV):@functools.cached_propertydefoperation(self):fromdask.dataframe.ioimportread_tablereturnread_tableclassReadFwf(ReadCSV):@functools.cached_propertydefoperation(self):fromdask.dataframe.ioimportread_fwfreturnread_fwf
[docs]defto_csv(df,filename,single_file=False,encoding="utf-8",mode="wt",name_function=None,compression=None,compute=True,scheduler=None,storage_options=None,header_first_partition_only=None,compute_kwargs=None,**kwargs,):""" Store Dask DataFrame to CSV files One filename per partition will be created. You can specify the filenames in a variety of ways. Use a globstring:: >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP The * will be replaced by the increasing sequence 0, 1, 2, ... :: /path/to/data/export-0.csv /path/to/data/export-1.csv Use a globstring and a ``name_function=`` keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices. >>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1)) >>> name(0) '2015-01-01' >>> name(15) '2015-01-16' >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP :: /path/to/data/export-2015-01-01.csv /path/to/data/export-2015-01-02.csv ... You can also provide an explicit list of paths:: >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP >>> df.to_csv(paths) # doctest: +SKIP You can also provide a directory name: >>> df.to_csv('/path/to/data') # doctest: +SKIP The files will be numbered 0, 1, 2, (and so on) suffixed with '.part': :: /path/to/data/0.part /path/to/data/1.part Parameters ---------- df : dask.DataFrame Data to save filename : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to save to remote filesystems. single_file : bool, default False Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. encoding : string, default 'utf-8' A string representing the encoding to use in the output file. mode : str, default 'w' Python file mode. The default is 'w' (or 'wt'), for writing a new file or overwriting an existing file in text mode. 'a' (or 'at') will append to an existing file in text mode or create a new file if it does not already exist. See :py:func:`open`. name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. Not supported when ``single_file`` is True. compression : string, optional A string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz', only used when the first argument is a filename. compute : bool, default True If True, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. storage_options : dict Parameters passed on to the backend filesystem class. header_first_partition_only : bool, default None If set to True, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (``single_file`` is False) and written only once under the single file mode (``single_file`` is True). It must be True under the single file mode. compute_kwargs : dict, optional Options to be passed in to the compute method kwargs : dict, optional Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`. Returns ------- The names of the file written if they were computed right away. If not, the delayed tasks associated with writing the files. Raises ------ ValueError If ``header_first_partition_only`` is set to False or ``name_function`` is specified when ``single_file`` is True. See Also -------- fsspec.open_files """fromdask.dataframe.io.csvimportto_csvas_to_csvreturn_to_csv(df.to_legacy_dataframe(),filename,single_file=single_file,encoding=encoding,mode=mode,name_function=name_function,compression=compression,compute=compute,scheduler=scheduler,storage_options=storage_options,header_first_partition_only=header_first_partition_only,compute_kwargs=compute_kwargs,**kwargs,)