Skip to content

执行器

Executor dataclass

Executor(
    desc: str = "Evaluating",
    show_progress: bool = True,
    keep_progress_bar: bool = True,
    jobs: List[Any] = list(),
    raise_exceptions: bool = False,
    run_config: Optional[RunConfig] = None,
    _nest_asyncio_applied: bool = False,
)

Executor class for running asynchronous jobs with progress tracking and error handling.

Attributes:

Name Type Description
desc str

Description for the progress bar

show_progress bool

Whether to show the progress bar

keep_progress_bar bool

Whether to keep the progress bar after completion

jobs List[Any]

List of jobs to execute

raise_exceptions bool

Whether to raise exceptions or log them

run_config RunConfig

Configuration for the run

_nest_asyncio_applied bool

Whether nest_asyncio has been applied

submit

submit(
    callable: Callable,
    *args,
    name: Optional[str] = None,
    **kwargs
)

Submit a job to be executed. This will wrap the callable with error handling and indexing to keep track of the job index.

Source code in src/ragas/executor.py
def submit(
    self, callable: t.Callable, *args, name: t.Optional[str] = None, **kwargs
):
    """
    Submit a job to be executed. This will wrap the callable with error handling
    and indexing to keep track of the job index.
    """
    callable_with_index = self.wrap_callable_with_index(callable, len(self.jobs))
    self.jobs.append((callable_with_index, args, kwargs, name))

results

results() -> List[Any]

Execute all submitted jobs and return their results. The results are returned in the order of job submission.

Source code in src/ragas/executor.py
def results(self) -> t.List[t.Any]:
    """
    Execute all submitted jobs and return their results. The results are returned in
    the order of job submission.
    """
    if is_event_loop_running():
        # an event loop is running so call nested_asyncio to fix this
        try:
            import nest_asyncio
        except ImportError:
            raise ImportError(
                "It seems like your running this in a jupyter-like environment. Please install nest_asyncio with `pip install nest_asyncio` to make it work."
            )

        if not self._nest_asyncio_applied:
            nest_asyncio.apply()
            self._nest_asyncio_applied = True

    # create a generator for which returns tasks as they finish
    futures_as_they_finish = as_completed(
        coros=[afunc(*args, **kwargs) for afunc, args, kwargs, _ in self.jobs],
        max_workers=(self.run_config or RunConfig()).max_workers,
    )

    async def _aresults() -> t.List[t.Any]:
        results = []
        for future in tqdm(
            await futures_as_they_finish,
            desc=self.desc,
            total=len(self.jobs),
            # whether you want to keep the progress bar after completion
            leave=self.keep_progress_bar,
            disable=not self.show_progress,
        ):
            r = await future
            results.append(r)

        return results

    results = asyncio.run(_aresults())
    sorted_results = sorted(results, key=lambda x: x[0])
    return [r[1] for r in sorted_results]

is_event_loop_running

is_event_loop_running() -> bool

Check if an event loop is currently running.

Source code in src/ragas/executor.py
def is_event_loop_running() -> bool:
    """
    Check if an event loop is currently running.
    """
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        return False
    else:
        return loop.is_running()

run_async_batch

run_async_batch(
    desc: str, func: Callable, kwargs_list: List[Dict]
)

A utility function to run the same async function with different arguments in parallel.

Source code in src/ragas/executor.py
def run_async_batch(desc: str, func: t.Callable, kwargs_list: t.List[t.Dict]):
    """
    A utility function to run the same async function with different arguments in
    parallel.
    """
    run_config = RunConfig()
    executor = Executor(
        desc=desc,
        keep_progress_bar=False,
        raise_exceptions=True,
        run_config=run_config,
    )

    for kwargs in kwargs_list:
        executor.submit(func, **kwargs)

    return executor.results()