⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.83
Server IP:
13.127.59.50
Server:
Linux ip-172-31-46-210 5.15.0-1033-aws #37~20.04.1-Ubuntu SMP Fri Mar 17 11:39:30 UTC 2023 x86_64
Server Software:
Apache/2.4.41 (Ubuntu)
PHP Version:
7.4.3-4ubuntu2.29
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
lib
/
python3
/
dist-packages
/
uaclient
/
View File Name :
data_types.py
import json from typing import Any, List, Optional, Type, TypeVar, Union from uaclient import exceptions INCORRECT_TYPE_ERROR_MESSAGE = ( "Expected value with type {expected_type} but got type: {got_type}" ) INCORRECT_LIST_ELEMENT_TYPE_ERROR_MESSAGE = ( "Got value with incorrect type at index {index}: {nested_msg}" ) INCORRECT_FIELD_TYPE_ERROR_MESSAGE = ( 'Got value with incorrect type for field "{key}": {nested_msg}' ) class IncorrectTypeError(exceptions.UserFacingError): def __init__(self, expected_type: str, got_type: str): super().__init__( INCORRECT_TYPE_ERROR_MESSAGE.format( expected_type=expected_type, got_type=got_type ) ) class IncorrectListElementTypeError(IncorrectTypeError): def __init__(self, err: IncorrectTypeError, at_index: int): self.msg = INCORRECT_LIST_ELEMENT_TYPE_ERROR_MESSAGE.format( index=at_index, nested_msg=err.msg ) class IncorrectFieldTypeError(IncorrectTypeError): def __init__(self, err: IncorrectTypeError, key: str): self.msg = INCORRECT_FIELD_TYPE_ERROR_MESSAGE.format( key=key, nested_msg=err.msg ) class DataValue: """ Generic data value to be extended by more specific typed data values. This establishes the interface of a static/class method called `from_value` that returns the parsed value if appropriate. """ @staticmethod def from_value(val: Any) -> Any: return val class StringDataValue(DataValue): """ To be used for parsing string values from_value raises an error if the value is not a string and returns the string itself if it is a string. """ @staticmethod def from_value(val: Any) -> str: if not isinstance(val, str): raise IncorrectTypeError("str", type(val).__name__) return val class IntDataValue(DataValue): """ To be used for parsing int values from_value raises an error if the value is not a int and returns the int itself if it is a int. """ @staticmethod def from_value(val: Any) -> int: if not isinstance(val, int) or isinstance(val, bool): raise IncorrectTypeError("int", type(val).__name__) return val class BoolDataValue(DataValue): """ To be used for parsing bool values from_value raises an error if the value is not a bool and returns the bool itself if it is a bool. """ @staticmethod def from_value(val: Any) -> bool: if not isinstance(val, bool): raise IncorrectTypeError("bool", type(val).__name__) return val def data_list(data_cls: Type[DataValue]) -> Type[DataValue]: """ To be used for parsing lists of a certain DataValue type. Returns a class that extends DataValue and validates that each item in a list is the correct type in its from_value. """ class _DataList(DataValue): @staticmethod def from_value(val: Any) -> List: if not isinstance(val, list): raise IncorrectTypeError("list", type(val).__name__) new_val = [] for i, item in enumerate(val): try: new_val.append(data_cls.from_value(item)) except IncorrectTypeError as e: raise IncorrectListElementTypeError(e, i) return new_val return _DataList def data_list_to_list( val: List[Union["DataObject", list, str, int, bool]] ) -> list: new_val = [] # type: list for item in val: if isinstance(item, DataObject): new_val.append(item.to_dict()) elif isinstance(item, list): new_val.append(data_list_to_list(item)) else: new_val.append(item) return new_val class Field: """ For defining the fields static property of a DataObject. """ def __init__( self, key: str, data_cls: Type[DataValue], required: bool = True ): self.key = key self.data_cls = data_cls self.required = required T = TypeVar("T", bound="DataObject") class DataObject(DataValue): """ For defining a python object that can be parsed from a dict. Validates that a set of expected fields are present in the dict that is parsed and that the values of those fields are the correct DataValue by calling from_value on each. The fields are defined using the `fields` static property. DataObjects can be used in Fields of other DataObjects. To define a new DataObject: 1. Create a new class that extends DataObject. 2. Define the `fields` static property to be a list of Field objects 3. Define the constructor to take kwargs that match the list of Field objects. a. Example 1: Field("keyname", StringDataValue) -> keyname: str b. Example 2: Field("keyname", data_list(IntDataValue), required=False) -> keyname: Optional[List[int]] # noqa: E501 4. Use from_value or from_dict to parse a dict into the python object. """ fields = [] # type: List[Field] def __init__(self, **_kwargs): pass def to_dict(self, keep_none: bool = True) -> dict: d = {} for field in self.fields: val = getattr(self, field.key, None) new_val = None # type: Any if isinstance(val, DataObject): new_val = val.to_dict() elif isinstance(val, list): new_val = data_list_to_list(val) else: # simple type, just copy new_val = val if new_val is not None or keep_none: d[field.key] = new_val return d def to_json(self, keep_null: bool = True) -> str: return json.dumps(self.to_dict(keep_none=keep_null), sort_keys=True) @classmethod def from_dict(cls: Type[T], d: dict) -> T: kwargs = {} for field in cls.fields: try: val = d[field.key] except KeyError: if field.required: raise IncorrectFieldTypeError( IncorrectTypeError(field.data_cls.__name__, "null"), field.key, ) else: val = None if val is not None: try: val = field.data_cls.from_value(val) except IncorrectTypeError as e: raise IncorrectFieldTypeError(e, field.key) kwargs[field.key] = val return cls(**kwargs) @classmethod def from_value(cls, val: Any): if not isinstance(val, dict): raise IncorrectTypeError("dict", type(val).__name__) return cls.from_dict(val) class AttachActionsConfigFile(DataObject): """ The format of the yaml file that can be passed with ua attach --attach-config /path/to/file """ fields = [ Field("token", StringDataValue), Field("enable_services", data_list(StringDataValue), required=False), ] def __init__(self, *, token: str, enable_services: Optional[List[str]]): self.token = token self.enable_services = enable_services