classDiffPrivateSimpleDatasetPack(BaseLlamaPack):"""A pack for creating differentially private simple llama-dataset."""def__init__(self,llm:LLM,# currently only supports OpenAI completion LLMstokenizer:Any,prompt_bundle:PromptBundle,simple_dataset:LabelledSimpleDataset,batch_size:int=5,sleep_time_in_seconds:float=0,sephamore_counter_size:int=1,cache_checkpoints:bool=True,show_progress:bool=True,):self.llm=llmself.tokenizer=tokenizerself.prompt_bundle=prompt_bundleself.simple_dataset=simple_datasetself._num_examples=len(self.simple_dataset.examples)self.labels=list({el.reference_labelforelinself.simple_dataset[:]})self.sleep_time_in_seconds=sleep_time_in_secondsself._semaphore=asyncio.Semaphore(sephamore_counter_size)self.show_progress=show_progressself.batch_size=batch_sizeself.cache_checkpoints=cache_checkpointsdefsigma_to_eps(self,sigma:float,mechanism:PrivacyMechanism,size:int,max_token_cnt:int,max_self_compositions:int=1000,eps_error:float=0.01,delta_error:float=1e-10,)->float:""" Return the epsilon value given a sigma. Args: sigma (float): The parameter associated with noise mechanism. mechanism (PrivacyMechanism): Noise mechanism. size (int): Number of samples to be generated. max_token_cnt (int): Number of tokens generated per sample. max_self_compositions (int, optional): PRV algorithm parameter. Defaults to 1000. eps_error (float, optional): PRV algorithm parameter. Defaults to 0.01. delta_error (float, optional): PRV algorithm parameter. Defaults to 1e-10. Returns: float: The epsilon value. """ifmax_token_cnt>max_self_compositions:raiseValueError("`max_token_cnt` cannot be greater than `max_self_composition`.")sample_rate=size/self._num_examplesifmechanism==PrivacyMechanism.GAUSSIAN:prv_0=PoissonSubsampledGaussianMechanism(noise_multiplier=sigma,sampling_probability=sample_rate)elifmechanism==PrivacyMechanism.EXPONENTIAL:sigma_bar=math.log(1+sample_rate*(math.exp(sigma)-1))prv_0=PureDPMechanism(eps=sigma_bar)else:raiseValueError("Invalid value for mechanism entered."" Please use either 'gaussian' or 'exponential'.")accountant=PRVAccountant(prvs=[prv_0,],max_self_compositions=[max_self_compositions],eps_error=eps_error,delta_error=delta_error,)_eps_low,eps_est,_eps_up=accountant.compute_epsilon(delta=1/self._num_examples,num_self_compositions=[max_token_cnt])returneps_estasyncdef_async_worker(self,job:Coroutine)->Any:asyncwithself._semaphore:awaitasyncio.sleep(self.sleep_time_in_seconds)returnawaitjob@dispatcher.spandef_filter_dataset_by_label(self,label:str)->LabelledSimpleDataset:"""Filter simple_dataset by label."""iflabelnotinself.labels:raiseValueError("There are no examples with `label` in the associated `simple_dataset`.")examples=[elforelinself.simple_dataset[:]ifel.reference_label==label]returnLabelledSimpleDataset(examples=examples)@dispatcher.spandef_split_dataset(self,dataset:LabelledSimpleDataset,num_splits:int,num_samples_per_split:int,)->List[LabelledSimpleDataset]:"""Splits a dataset into a set of disjoint datasets with equal number of examples."""indexes=list(range(len(dataset.examples)))random.shuffle(indexes)partitions=[indexes[i::num_splits]foriinrange(num_splits)]splits=[]forpinpartitions:sample=random.sample(p,num_samples_per_split)ifnotlen(sample)==num_samples_per_split:raiseValueError("Not able to create disjoint sets with current values of `num_splits` and `num_samples_per_split`.")examples=[dataset.examples[ix]forixinsample]splits.append(LabelledSimpleDataset(examples=examples))returnsplitsdef_get_public_prompt(self,synthetic_example:str,label:str,)->str:"""Get completion prompt for token universe."""returnzero_shot_completion_template.format(synthetic_text=synthetic_example,label=label,instruction=self.prompt_bundle.instruction,label_heading=self.prompt_bundle.label_heading,text_heading=self.prompt_bundle.text_heading,)def_get_private_prompt(self,split:LabelledSimpleDataset,synthetic_example:str,label:str,)->str:"""Get prompt for completion endpoint."""single_templates=[single_example_template.format(label_heading=self.prompt_bundle.label_heading,text_heading=self.prompt_bundle.text_heading,example_label=x.reference_label,example_text=x.text,)forxinsplit.examples]few_shot_examples=reduce(lambdax,y:x+y,single_templates)returnfew_shot_completion_template.format(instruction=self.prompt_bundle.instruction,label_heading=self.prompt_bundle.label_heading,text_heading=self.prompt_bundle.text_heading,few_shot_examples=few_shot_examples,label=label,synthetic_text=synthetic_example,)def_normalize(self,split_probs:Dict[str,float],token_universe_proba:Dict[str,float])->Dict[str,float]:"""Normalize a probability distribution over tokens to become a valid probability distribution."""scale=sum(probaforprobainsplit_probs.values())ifscale==0:# universedispatcher.event(EmptyIntersectionEvent(public_tokens=list(token_universe_proba),private_tokens=list(split_probs),))split_probs=token_universe_proba# use public probas insteadscale=sum(probaforprobainsplit_probs.values())return{token:proba/scalefortoken,probainsplit_probs.items()}def_extract_and_normalize_next_token_probas(self,response:CompletionResponse,token_universe_probas:Dict[str,float])->Dict[str,float]:"""Extract and normalize LogProba from a CompletionResponse."""try:next_token_proba_distn=response.logprobs[0]exceptIndexError:dispatcher.event(LLMEmptyResponseEvent())returntoken_universe_probasexceptExceptionase:raiseValueError("Something went wrong when trying to get LogProb from CompletionResponse.")split_probs=dict.fromkeys(token_universe_probas,0)forelinnext_token_proba_distn:# for immediate next token onlyifel.tokeninsplit_probs:split_probs[el.token]=np.exp(el.logprob)returnself._normalize(split_probs,token_universe_probas)# to make into a valid prob distributiondef_generate_noise(self,sigma:float,size:int,mechanism:PrivacyMechanism)->float:"""Generates noise that satisfies eps-delta differential privacy."""noise_rng=np.random.RandomState()ifmechanism==PrivacyMechanism.GAUSSIAN:returnnoise_rng.normal(0,sigma,size=size)elifmechanism==PrivacyMechanism.LAPLACE:returnnoise_rng.exponential(scale=sigma,size=size)else:raiseValueError("Value entered for `mechanism` is not supported.")def_merge_probas(self,list_of_probas:List[Dict[str,float]])->Dict[str,float]:"""Merges a set of probabillity distributions over a common token universe."""scale=len(list_of_probas)tokens=list_of_probas[0].keys()merged_distribution={}fortokenintokens:merged_distribution[token]=sum(pr[token]/scaleforprinlist_of_probas)returnmerged_distributiondef_add_noise(self,proba:Dict[str,float],noise_array=Sequence[float])->Dict[str,float]:"""Add noise to proba distribution."""return{token:proba+noisefor(token,proba),noiseinzip(proba.items(),noise_array)}def_mode_of_distribution(self,proba:Dict[str,float])->str:"""Returns the mode of a given probability distribution."""returnmax(proba,key=proba.get)@dispatcher.spandefgenerate_dp_synthetic_example(self,label:str,t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataExample:"""Generates a differentially private synthetic example."""returnasyncio.run(self.agenerate_dp_synthetic_example(label=label,t_max=t_max,sigma=sigma,num_splits=num_splits,num_samples_per_split=num_samples_per_split,))@dispatcher.spanasyncdefagenerate_dp_synthetic_example(self,label:str,t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataExample:"""Generates a differentially private synthetic example."""dispatcher.event(SyntheticExampleStartEvent())synthetic_example=""iterator=range(1,t_max+1)ifself.show_progress:iterator=tqdm.tqdm(iterator,position=0,leave=True)for_initerator:token_universe_prompt=self._get_public_prompt(synthetic_example=synthetic_example,label=label)try:response=awaitself._async_worker(self.llm.acomplete(token_universe_prompt))token_universe_probas={el.token:np.exp(el.logprob)forelinresponse.logprobs[0]# only for next immediate token}exceptIndexErrorase:continue# try again in next iteration# filter dataset by labelfiltered_simple_dataset=self._filter_dataset_by_label(label=label)# split the private datasetdisjoint_splits=self._split_dataset(dataset=filtered_simple_dataset,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)# generate next token probability distributions per splitsplit_tasks=[]forsplitindisjoint_splits:prompt=self._get_private_prompt(split,synthetic_example,label)split_tasks.append(self._async_worker(self.llm.acomplete(prompt)))split_responses:List[CompletionResponse]=awaitasyncio.gather(*split_tasks)# get and normalized next-token probas per splitsplits=[self._extract_and_normalize_next_token_probas(response,token_universe_probas)forresponseinsplit_responses]# noisy aggrergationsigma_calib=np.sqrt(2)/num_splits*sigmanoise_array=self._generate_noise(sigma=sigma_calib,size=len(token_universe_probas),mechanism="gaussian")merged_probas=self._merge_probas(splits)noisy_probs=self._add_noise(merged_probas,noise_array)# next tokennext_token=self._mode_of_distribution(noisy_probs)ifnext_tokeninSTOP_TOKENS:breakelse:synthetic_example+=next_token# synthetic example remove [RESULT]try:synthetic_example=synthetic_example.split("[RESULT]")[-1].strip()exceptExceptionase:synthetic_example=synthetic_examplesimple_example=LabelledSimpleDataExample(reference_label=label,text=synthetic_example,text_by=CreatedBy(type=CreatedByType.AI,model_name=self.llm.model),)dispatcher.event(SyntheticExampleEndEvent())returnsimple_example@dispatcher.spandefrun(self,sizes:Union[int,Dict[str,int]],t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataset:"""Main run method."""ifnum_samples_per_split<1:raiseValueError("`num_samples_per_split` must be an integer greater than 1.")ifisinstance(sizes,int):sizes_dict=dict.fromkeys(self.labels,sizes)elifisinstance(sizes,dict):sizes_dict=sizeselse:raiseTypeError("Invalid type of `sizes`. Must be either an `int` or `dict`.")ifnotall(cinsizes_dictforcinself.labels):raiseValueError("Not all labels have sizes.")examples=[]forlabelinself.labels:size=sizes_dict[label]for_inrange(size):example=self.generate_dp_synthetic_example(label=label,t_max=t_max,sigma=sigma,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)examples.append(example)returnLabelledSimpleDataset(examples=examples)@dispatcher.spanasyncdefarun(self,sizes:Dict[str,int],t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataset:"""Main async run method."""ifnum_samples_per_split<1:raiseValueError("`num_samples_per_split` must be an integer greater than 1.")ifisinstance(sizes,int):sizes_dict=dict.fromkeys(self.labels,sizes)elifisinstance(sizes,dict):sizes_dict=sizeselse:raiseTypeError("Invalid type of `sizes`. Must be either an `int` or `dict`.")ifnotall(cinsizes_dictforcinself.labels):raiseValueError("Not all labels have sizes.")tasks=[]forlabelinself.labels:size=sizes_dict[label]for_inrange(size):example_task=self.agenerate_dp_synthetic_example(label=label,t_max=t_max,sigma=sigma,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)tasks.append(example_task)asyncio_runner=asyncio_module(self.show_progress)# run in batchexamples=[]forbatchin_batch(tasks,self.batch_size):batch_examples=awaitasyncio_runner.gather(*batch)examples+=batch_examplesifself.cache_checkpoints:checkpoint=LabelledSimpleDataset(examples=examples)checkpoint.save_json("checkpoint.json")returnLabelledSimpleDataset(examples=examples)
defsigma_to_eps(self,sigma:float,mechanism:PrivacyMechanism,size:int,max_token_cnt:int,max_self_compositions:int=1000,eps_error:float=0.01,delta_error:float=1e-10,)->float:""" Return the epsilon value given a sigma. Args: sigma (float): The parameter associated with noise mechanism. mechanism (PrivacyMechanism): Noise mechanism. size (int): Number of samples to be generated. max_token_cnt (int): Number of tokens generated per sample. max_self_compositions (int, optional): PRV algorithm parameter. Defaults to 1000. eps_error (float, optional): PRV algorithm parameter. Defaults to 0.01. delta_error (float, optional): PRV algorithm parameter. Defaults to 1e-10. Returns: float: The epsilon value. """ifmax_token_cnt>max_self_compositions:raiseValueError("`max_token_cnt` cannot be greater than `max_self_composition`.")sample_rate=size/self._num_examplesifmechanism==PrivacyMechanism.GAUSSIAN:prv_0=PoissonSubsampledGaussianMechanism(noise_multiplier=sigma,sampling_probability=sample_rate)elifmechanism==PrivacyMechanism.EXPONENTIAL:sigma_bar=math.log(1+sample_rate*(math.exp(sigma)-1))prv_0=PureDPMechanism(eps=sigma_bar)else:raiseValueError("Invalid value for mechanism entered."" Please use either 'gaussian' or 'exponential'.")accountant=PRVAccountant(prvs=[prv_0,],max_self_compositions=[max_self_compositions],eps_error=eps_error,delta_error=delta_error,)_eps_low,eps_est,_eps_up=accountant.compute_epsilon(delta=1/self._num_examples,num_self_compositions=[max_token_cnt])returneps_est
@dispatcher.spanasyncdefagenerate_dp_synthetic_example(self,label:str,t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataExample:"""Generates a differentially private synthetic example."""dispatcher.event(SyntheticExampleStartEvent())synthetic_example=""iterator=range(1,t_max+1)ifself.show_progress:iterator=tqdm.tqdm(iterator,position=0,leave=True)for_initerator:token_universe_prompt=self._get_public_prompt(synthetic_example=synthetic_example,label=label)try:response=awaitself._async_worker(self.llm.acomplete(token_universe_prompt))token_universe_probas={el.token:np.exp(el.logprob)forelinresponse.logprobs[0]# only for next immediate token}exceptIndexErrorase:continue# try again in next iteration# filter dataset by labelfiltered_simple_dataset=self._filter_dataset_by_label(label=label)# split the private datasetdisjoint_splits=self._split_dataset(dataset=filtered_simple_dataset,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)# generate next token probability distributions per splitsplit_tasks=[]forsplitindisjoint_splits:prompt=self._get_private_prompt(split,synthetic_example,label)split_tasks.append(self._async_worker(self.llm.acomplete(prompt)))split_responses:List[CompletionResponse]=awaitasyncio.gather(*split_tasks)# get and normalized next-token probas per splitsplits=[self._extract_and_normalize_next_token_probas(response,token_universe_probas)forresponseinsplit_responses]# noisy aggrergationsigma_calib=np.sqrt(2)/num_splits*sigmanoise_array=self._generate_noise(sigma=sigma_calib,size=len(token_universe_probas),mechanism="gaussian")merged_probas=self._merge_probas(splits)noisy_probs=self._add_noise(merged_probas,noise_array)# next tokennext_token=self._mode_of_distribution(noisy_probs)ifnext_tokeninSTOP_TOKENS:breakelse:synthetic_example+=next_token# synthetic example remove [RESULT]try:synthetic_example=synthetic_example.split("[RESULT]")[-1].strip()exceptExceptionase:synthetic_example=synthetic_examplesimple_example=LabelledSimpleDataExample(reference_label=label,text=synthetic_example,text_by=CreatedBy(type=CreatedByType.AI,model_name=self.llm.model),)dispatcher.event(SyntheticExampleEndEvent())returnsimple_example
@dispatcher.spandefrun(self,sizes:Union[int,Dict[str,int]],t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataset:"""Main run method."""ifnum_samples_per_split<1:raiseValueError("`num_samples_per_split` must be an integer greater than 1.")ifisinstance(sizes,int):sizes_dict=dict.fromkeys(self.labels,sizes)elifisinstance(sizes,dict):sizes_dict=sizeselse:raiseTypeError("Invalid type of `sizes`. Must be either an `int` or `dict`.")ifnotall(cinsizes_dictforcinself.labels):raiseValueError("Not all labels have sizes.")examples=[]forlabelinself.labels:size=sizes_dict[label]for_inrange(size):example=self.generate_dp_synthetic_example(label=label,t_max=t_max,sigma=sigma,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)examples.append(example)returnLabelledSimpleDataset(examples=examples)
@dispatcher.spanasyncdefarun(self,sizes:Dict[str,int],t_max:int=1,sigma:float=0.5,num_splits:int=5,num_samples_per_split:int=1,)->LabelledSimpleDataset:"""Main async run method."""ifnum_samples_per_split<1:raiseValueError("`num_samples_per_split` must be an integer greater than 1.")ifisinstance(sizes,int):sizes_dict=dict.fromkeys(self.labels,sizes)elifisinstance(sizes,dict):sizes_dict=sizeselse:raiseTypeError("Invalid type of `sizes`. Must be either an `int` or `dict`.")ifnotall(cinsizes_dictforcinself.labels):raiseValueError("Not all labels have sizes.")tasks=[]forlabelinself.labels:size=sizes_dict[label]for_inrange(size):example_task=self.agenerate_dp_synthetic_example(label=label,t_max=t_max,sigma=sigma,num_splits=num_splits,num_samples_per_split=num_samples_per_split,)tasks.append(example_task)asyncio_runner=asyncio_module(self.show_progress)# run in batchexamples=[]forbatchin_batch(tasks,self.batch_size):batch_examples=awaitasyncio_runner.gather(*batch)examples+=batch_examplesifself.cache_checkpoints:checkpoint=LabelledSimpleDataset(examples=examples)checkpoint.save_json("checkpoint.json")returnLabelledSimpleDataset(examples=examples)