Source code for dask_expr.io.csv

import functools
import operator

from dask_expr._expr import Projection
from dask_expr._util import _convert_to_list
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered


class ReadCSV(PartitionsFiltered, BlockwiseIO):
    _parameters = [
        "filename",
        "columns",
        "header",
        "dtype_backend",
        "_partitions",
        "storage_options",
        "kwargs",
        "_series",
        "dataframe_backend",
    ]
    _defaults = {
        "columns": None,
        "header": "infer",
        "dtype_backend": None,
        "kwargs": None,
        "_partitions": None,
        "storage_options": None,
        "_series": False,
        "dataframe_backend": "pandas",
    }
    _absorb_projections = True

    @functools.cached_property
    def operation(self):
        from dask.dataframe.io import read_csv

        return read_csv

    @functools.cached_property
    def _ddf(self):
        from dask import config

        # Temporary hack to simplify logic
        with config.set({"dataframe.backend": self.dataframe_backend}):
            kwargs = (
                {"dtype_backend": self.dtype_backend}
                if self.dtype_backend is not None
                else {}
            )
            if self.kwargs is not None:
                kwargs.update(self.kwargs)

            columns = _convert_to_list(self.operand("columns"))
            if columns is None:
                pass
            elif "include_path_column" in self.kwargs:
                flag = self.kwargs["include_path_column"]
                if flag is True:
                    column_to_remove = "path"
                elif isinstance(flag, str):
                    column_to_remove = flag
                else:
                    column_to_remove = None

                columns = [c for c in columns if c != column_to_remove]

                if not columns:
                    meta = self.operation(
                        self.filename,
                        header=self.header,
                        storage_options=self.storage_options,
                        **kwargs,
                    )._meta
                    columns = [list(meta.columns)[0]]

            usecols = kwargs.pop("usecols", None)
            if usecols is not None and columns is not None:
                columns = [col for col in columns if col in usecols]
            elif usecols:
                columns = usecols

            return self.operation(
                self.filename,
                usecols=columns,
                header=self.header,
                storage_options=self.storage_options,
                **kwargs,
            )

    @functools.cached_property
    def _meta(self):
        return self._ddf._meta

    def _simplify_up(self, parent, dependents):
        if isinstance(parent, Projection):
            kwargs = self.kwargs
            # int usecols are positional, so block projections
            if kwargs.get("usecols", None) is not None and isinstance(
                kwargs.get("usecols")[0], int
            ):
                return
        return super()._simplify_up(parent, dependents)

    @functools.cached_property
    def columns(self):
        columns_operand = self.operand("columns")
        if columns_operand is None:
            try:
                return list(self._ddf._meta.columns)
            except AttributeError:
                return []
        else:
            return _convert_to_list(columns_operand)

    def _divisions(self):
        return self._ddf.divisions

    @functools.cached_property
    def _tasks(self):
        return list(self._ddf.dask.to_dict().values())

    def _filtered_task(self, index: int):
        if self._series:
            return (operator.getitem, self._tasks[index], self.columns[0])
        return self._tasks[index]


class ReadTable(ReadCSV):
    @functools.cached_property
    def operation(self):
        from dask.dataframe.io import read_table

        return read_table


class ReadFwf(ReadCSV):
    @functools.cached_property
    def operation(self):
        from dask.dataframe.io import read_fwf

        return read_fwf


[docs]def to_csv( df, filename, single_file=False, encoding="utf-8", mode="wt", name_function=None, compression=None, compute=True, scheduler=None, storage_options=None, header_first_partition_only=None, compute_kwargs=None, **kwargs, ): """ Store Dask DataFrame to CSV files One filename per partition will be created. You can specify the filenames in a variety of ways. Use a globstring:: >>> df.to_csv('/path/to/data/export-*.csv') # doctest: +SKIP The * will be replaced by the increasing sequence 0, 1, 2, ... :: /path/to/data/export-0.csv /path/to/data/export-1.csv Use a globstring and a ``name_function=`` keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices. >>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1)) >>> name(0) '2015-01-01' >>> name(15) '2015-01-16' >>> df.to_csv('/path/to/data/export-*.csv', name_function=name) # doctest: +SKIP :: /path/to/data/export-2015-01-01.csv /path/to/data/export-2015-01-02.csv ... You can also provide an explicit list of paths:: >>> paths = ['/path/to/data/alice.csv', '/path/to/data/bob.csv', ...] # doctest: +SKIP >>> df.to_csv(paths) # doctest: +SKIP You can also provide a directory name: >>> df.to_csv('/path/to/data') # doctest: +SKIP The files will be numbered 0, 1, 2, (and so on) suffixed with '.part': :: /path/to/data/0.part /path/to/data/1.part Parameters ---------- df : dask.DataFrame Data to save filename : string or list Absolute or relative filepath(s). Prefix with a protocol like ``s3://`` to save to remote filesystems. single_file : bool, default False Whether to save everything into a single CSV file. Under the single file mode, each partition is appended at the end of the specified CSV file. encoding : string, default 'utf-8' A string representing the encoding to use in the output file. mode : str, default 'w' Python file mode. The default is 'w' (or 'wt'), for writing a new file or overwriting an existing file in text mode. 'a' (or 'at') will append to an existing file in text mode or create a new file if it does not already exist. See :py:func:`open`. name_function : callable, default None Function accepting an integer (partition index) and producing a string to replace the asterisk in the given filename globstring. Should preserve the lexicographic order of partitions. Not supported when ``single_file`` is True. compression : string, optional A string representing the compression to use in the output file, allowed values are 'gzip', 'bz2', 'xz', only used when the first argument is a filename. compute : bool, default True If True, immediately executes. If False, returns a set of delayed objects, which can be computed at a later time. storage_options : dict Parameters passed on to the backend filesystem class. header_first_partition_only : bool, default None If set to True, only write the header row in the first output file. By default, headers are written to all partitions under the multiple file mode (``single_file`` is False) and written only once under the single file mode (``single_file`` is True). It must be True under the single file mode. compute_kwargs : dict, optional Options to be passed in to the compute method kwargs : dict, optional Additional parameters to pass to :meth:`pandas.DataFrame.to_csv`. Returns ------- The names of the file written if they were computed right away. If not, the delayed tasks associated with writing the files. Raises ------ ValueError If ``header_first_partition_only`` is set to False or ``name_function`` is specified when ``single_file`` is True. See Also -------- fsspec.open_files """ from dask.dataframe.io.csv import to_csv as _to_csv return _to_csv( df.to_legacy_dataframe(), filename, single_file=single_file, encoding=encoding, mode=mode, name_function=name_function, compression=compression, compute=compute, scheduler=scheduler, storage_options=storage_options, header_first_partition_only=header_first_partition_only, compute_kwargs=compute_kwargs, **kwargs, )