pyscalpel.http.body

Pentesters often have to manipulate form data in precise and extensive ways

This module contains implementations for the most common forms (multipart,urlencoded, JSON)

Users may be implement their own form by creating a Serializer, assigning the .serializer attribute in Request and using the "form" property

Forms are designed to be convertible from one to another.

For example, JSON forms may be converted to URL encoded forms by using the php query string syntax:

{"key1": {"key2" : {"key3" : "nested_value"}}} -> key1[key2][key3]=nested_value

And vice-versa.

 1"""
 2    Pentesters often have to manipulate form data in precise and extensive ways
 3
 4    This module contains implementations for the most common forms (multipart,urlencoded, JSON)
 5    
 6    Users may be implement their own form by creating a Serializer,
 7    assigning the .serializer attribute in `Request` and using the "form" property
 8    
 9    Forms are designed to be convertible from one to another.
10    
11    For example, JSON forms may be converted to URL encoded forms
12    by using the php query string syntax:
13    
14    ```{"key1": {"key2" : {"key3" : "nested_value"}}} -> key1[key2][key3]=nested_value```
15    
16    And vice-versa.
17"""
18
19from .form import *
20
21
22__all__ = [
23    "Form",
24    "JSON_VALUE_TYPES",
25    "JSONForm",
26    "MultiPartForm",
27    "MultiPartFormField",
28    "URLEncodedForm",
29    "FormSerializer",
30    "json_unescape",
31    "json_unescape_bytes",
32    "json_escape_bytes",
33]
class Form(collections.abc.MutableMapping[~KT, ~VT]):
33class Form(MutableMapping[KT, VT], metaclass=ABCMeta):
34    pass

A MutableMapping is a generic container for associating key/value pairs.

This class provides concrete generic implementations of all methods except for __getitem__, __setitem__, __delitem__, __iter__, and __len__.

Inherited Members
collections.abc.MutableMapping
pop
popitem
clear
update
setdefault
collections.abc.Mapping
get
keys
items
values
JSON_VALUE_TYPES = str | int | float | bool | None | list['JSON_VALUE_TYPES'] | dict[str | int | float, 'JSON_VALUE_TYPES']
class JSONForm(dict[str | int | float, str | int | float | bool | None | list['JSON_VALUE_TYPES'] | dict[str | int | float, 'JSON_VALUE_TYPES']]):
37class JSONForm(dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]):
38    """Form representing a JSON object {}
39
40    Implemented by a plain dict
41
42    Args:
43        dict (_type_): A dict containing JSON-compatible types.
44    """
45
46    pass

Form representing a JSON object {}

Implemented by a plain dict

Args: dict (_type_): A dict containing JSON-compatible types.

Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class MultiPartForm(collections.abc.Mapping[str, pyscalpel.http.body.multipart.MultiPartFormField]):
341class MultiPartForm(Mapping[str, MultiPartFormField]):
342    """
343    This class represents a multipart/form-data request.
344
345    It contains a collection of MultiPartFormField objects, providing methods
346    to add, get, and delete form fields.
347
348    The class also enables the conversion of the entire form
349    into bytes for transmission.
350
351    - Args:
352        - fields (Sequence[MultiPartFormField]): A sequence of MultiPartFormField objects that make up the form.
353        - content_type (str): The content type of the form.
354        - encoding (str): The encoding of the form.
355
356    - Raises:
357        - TypeError: Raised when an incorrect type is passed to MultiPartForm.set.
358        - KeyError: Raised when trying to access a field that does not exist in the form.
359
360    - Returns:
361        - MultiPartForm: An instance of the class representing a multipart/form-data request.
362
363    - Yields:
364        - Iterator[MultiPartFormField]: Yields each field in the form.
365    """
366
367    fields: list[MultiPartFormField]
368    content_type: str
369    encoding: str
370
371    def __init__(
372        self,
373        fields: Sequence[MultiPartFormField],
374        content_type: str,
375        encoding: str = "utf-8",
376    ):
377        self.content_type = content_type
378        self.encoding = encoding
379        super().__init__()
380        self.fields = list(fields)
381
382    @classmethod
383    def from_bytes(
384        cls, content: bytes, content_type: str, encoding: str = "utf-8"
385    ) -> MultiPartForm:
386        """Create a MultiPartForm by parsing a raw multipart form
387
388        - Args:
389            - content (bytes): The multipart form as raw bytes
390            - content_type (str): The Content-Type header with the corresponding boundary param (required).
391            - encoding (str, optional): The encoding to use (not required). Defaults to "utf-8".
392
393        - Returns:
394           - MultiPartForm: The parsed multipart form
395        """
396        decoder = MultipartDecoder(content, content_type, encoding=encoding)
397        parts: tuple[BodyPart] = decoder.parts
398        fields: tuple[MultiPartFormField, ...] = tuple(
399            MultiPartFormField.from_body_part(body_part) for body_part in parts
400        )
401        return cls(fields, content_type, encoding)
402
403    @property
404    def boundary(self) -> bytes:
405        """Get the form multipart boundary
406
407        Returns:
408            bytes: The multipart boundary
409        """
410        return extract_boundary(self.content_type, self.encoding)
411
412    def __bytes__(self) -> bytes:
413        boundary = self.boundary
414        serialized = b""
415        encoding = self.encoding
416        for field in self.fields:
417            serialized += b"--" + boundary + b"\r\n"
418
419            # Format the headers
420            for key, val in field.headers.items():
421                serialized += (
422                    key.encode(encoding) + b": " + val.encode(encoding) + b"\r\n"
423                )
424            serialized += b"\r\n" + field.content + b"\r\n"
425
426        # Format the final boundary
427        serialized += b"--" + boundary + b"--\r\n\r\n"
428        return serialized
429
430    # Override
431    def get_all(self, key: str) -> list[MultiPartFormField]:
432        """
433        Return the list of all values for a given key.
434        If that key is not in the MultiDict, the return value will be an empty list.
435        """
436        return [field for field in self.fields if key == field.name]
437
438    def get(
439        self, key: str, default: MultiPartFormField | None = None
440    ) -> MultiPartFormField | None:
441        values = self.get_all(key)
442        if not values:
443            return default
444
445        return values[0]
446
447    def del_all(self, key: str):
448        # Mutate object to avoid invalidating user references to fields
449        for field in self.fields:
450            if key == field.name:
451                self.fields.remove(field)
452
453    def __delitem__(self, key: str):
454        self.del_all(key)
455
456    def set(
457        self,
458        key: str,
459        value: (
460            TextIOWrapper
461            | BufferedReader
462            | IOBase
463            | MultiPartFormField
464            | bytes
465            | str
466            | int
467            | float
468            | None
469        ),
470    ) -> None:
471        new_field: MultiPartFormField
472        match value:
473            case MultiPartFormField():
474                new_field = value
475            case int() | float():
476                return self.set(key, str(value))
477            case bytes() | str():
478                new_field = MultiPartFormField.make(key)
479                new_field.content = always_bytes(value)
480            case IOBase():
481                new_field = MultiPartFormField.from_file(key, value)
482            case None:
483                self.del_all(key)
484                return
485            case _:
486                raise TypeError("Wrong type was passed to MultiPartForm.set")
487
488        for i, field in enumerate(self.fields):
489            if field.name == key:
490                self.fields[i] = new_field
491                return
492
493        self.append(new_field)
494
495    def setdefault(
496        self, key: str, default: MultiPartFormField | None = None
497    ) -> MultiPartFormField:
498        found = self.get(key)
499        if found is None:
500            default = default or MultiPartFormField.make(key)
501            self[key] = default
502            return default
503
504        return found
505
506    def __setitem__(
507        self,
508        key: str,
509        value: (
510            TextIOWrapper
511            | BufferedReader
512            | MultiPartFormField
513            | IOBase
514            | bytes
515            | str
516            | int
517            | float
518            | None
519        ),
520    ) -> None:
521        self.set(key, value)
522
523    def __getitem__(self, key: str) -> MultiPartFormField:
524        values = self.get_all(key)
525        if not values:
526            raise KeyError(key)
527        return values[0]
528
529    def __len__(self) -> int:
530        return len(self.fields)
531
532    def __eq__(self, other) -> bool:
533        if isinstance(other, MultiPartForm):
534            return self.fields == other.fields
535        return False
536
537    def __iter__(self) -> Iterator[MultiPartFormField]:
538        seen = set()
539        for field in self.fields:
540            if field not in seen:
541                seen.add(field)
542                yield field
543
544    def insert(self, index: int, value: MultiPartFormField) -> None:
545        """
546        Insert an additional value for the given key at the specified position.
547        """
548        self.fields.insert(index, value)
549
550    def append(self, value: MultiPartFormField) -> None:
551        self.fields.append(value)
552
553    def __repr__(self):  # pragma: no cover
554        fields = (repr(field) for field in self.fields)
555        return f"{type(self).__name__}[{', '.join(fields)}]"
556
557    def items(self) -> tuple[tuple[str, MultiPartFormField], ...]:
558        fields = self.fields
559        items = ((i.name, i) for i in fields)
560        return tuple(items)
561
562    def keys(self) -> tuple[str, ...]:
563        return tuple(field.name for field in self.fields)
564
565    def values(self) -> tuple[MultiPartFormField, ...]:
566        return tuple(self.fields)

This class represents a multipart/form-data request.

It contains a collection of MultiPartFormField objects, providing methods to add, get, and delete form fields.

The class also enables the conversion of the entire form into bytes for transmission.

  • Args:

    • fields (Sequence[MultiPartFormField]): A sequence of MultiPartFormField objects that make up the form.
    • content_type (str): The content type of the form.
    • encoding (str): The encoding of the form.
  • Raises:

    • TypeError: Raised when an incorrect type is passed to MultiPartForm.set.
    • KeyError: Raised when trying to access a field that does not exist in the form.
  • Returns:

    • MultiPartForm: An instance of the class representing a multipart/form-data request.
  • Yields:

    • Iterator[MultiPartFormField]: Yields each field in the form.
MultiPartForm( fields: Sequence[MultiPartFormField], content_type: str, encoding: str = 'utf-8')
371    def __init__(
372        self,
373        fields: Sequence[MultiPartFormField],
374        content_type: str,
375        encoding: str = "utf-8",
376    ):
377        self.content_type = content_type
378        self.encoding = encoding
379        super().__init__()
380        self.fields = list(fields)
fields: list[MultiPartFormField]
content_type: str
encoding: str
@classmethod
def from_bytes( cls, content: bytes, content_type: str, encoding: str = 'utf-8') -> MultiPartForm:
382    @classmethod
383    def from_bytes(
384        cls, content: bytes, content_type: str, encoding: str = "utf-8"
385    ) -> MultiPartForm:
386        """Create a MultiPartForm by parsing a raw multipart form
387
388        - Args:
389            - content (bytes): The multipart form as raw bytes
390            - content_type (str): The Content-Type header with the corresponding boundary param (required).
391            - encoding (str, optional): The encoding to use (not required). Defaults to "utf-8".
392
393        - Returns:
394           - MultiPartForm: The parsed multipart form
395        """
396        decoder = MultipartDecoder(content, content_type, encoding=encoding)
397        parts: tuple[BodyPart] = decoder.parts
398        fields: tuple[MultiPartFormField, ...] = tuple(
399            MultiPartFormField.from_body_part(body_part) for body_part in parts
400        )
401        return cls(fields, content_type, encoding)

Create a MultiPartForm by parsing a raw multipart form

  • Args:

    • content (bytes): The multipart form as raw bytes
    • content_type (str): The Content-Type header with the corresponding boundary param (required).
    • encoding (str, optional): The encoding to use (not required). Defaults to "utf-8".
  • Returns:

    • MultiPartForm: The parsed multipart form
boundary: bytes
403    @property
404    def boundary(self) -> bytes:
405        """Get the form multipart boundary
406
407        Returns:
408            bytes: The multipart boundary
409        """
410        return extract_boundary(self.content_type, self.encoding)

Get the form multipart boundary

Returns: bytes: The multipart boundary

def get_all(self, key: str) -> list[MultiPartFormField]:
431    def get_all(self, key: str) -> list[MultiPartFormField]:
432        """
433        Return the list of all values for a given key.
434        If that key is not in the MultiDict, the return value will be an empty list.
435        """
436        return [field for field in self.fields if key == field.name]

Return the list of all values for a given key. If that key is not in the MultiDict, the return value will be an empty list.

def del_all(self, key: str):
447    def del_all(self, key: str):
448        # Mutate object to avoid invalidating user references to fields
449        for field in self.fields:
450            if key == field.name:
451                self.fields.remove(field)
def set( self, key: str, value: _io.TextIOWrapper | _io.BufferedReader | io.IOBase | MultiPartFormField | bytes | str | int | float | None) -> None:
456    def set(
457        self,
458        key: str,
459        value: (
460            TextIOWrapper
461            | BufferedReader
462            | IOBase
463            | MultiPartFormField
464            | bytes
465            | str
466            | int
467            | float
468            | None
469        ),
470    ) -> None:
471        new_field: MultiPartFormField
472        match value:
473            case MultiPartFormField():
474                new_field = value
475            case int() | float():
476                return self.set(key, str(value))
477            case bytes() | str():
478                new_field = MultiPartFormField.make(key)
479                new_field.content = always_bytes(value)
480            case IOBase():
481                new_field = MultiPartFormField.from_file(key, value)
482            case None:
483                self.del_all(key)
484                return
485            case _:
486                raise TypeError("Wrong type was passed to MultiPartForm.set")
487
488        for i, field in enumerate(self.fields):
489            if field.name == key:
490                self.fields[i] = new_field
491                return
492
493        self.append(new_field)
def setdefault( self, key: str, default: MultiPartFormField | None = None) -> MultiPartFormField:
495    def setdefault(
496        self, key: str, default: MultiPartFormField | None = None
497    ) -> MultiPartFormField:
498        found = self.get(key)
499        if found is None:
500            default = default or MultiPartFormField.make(key)
501            self[key] = default
502            return default
503
504        return found
def insert( self, index: int, value: MultiPartFormField) -> None:
544    def insert(self, index: int, value: MultiPartFormField) -> None:
545        """
546        Insert an additional value for the given key at the specified position.
547        """
548        self.fields.insert(index, value)

Insert an additional value for the given key at the specified position.

def append(self, value: MultiPartFormField) -> None:
550    def append(self, value: MultiPartFormField) -> None:
551        self.fields.append(value)
Inherited Members
collections.abc.Mapping
get
items
keys
values
class MultiPartFormField:
 86class MultiPartFormField:
 87    """
 88    This class represents a field in a multipart/form-data request.
 89
 90    It provides functionalities to create form fields from various inputs like raw body parts,
 91    files and manual construction with name, filename, body, and content type.
 92
 93    It also offers properties and methods to interact with the form field's headers and content.
 94
 95    Raises:
 96        StopIteration: Raised when the specified Content-Disposition header is not found or could not be parsed.
 97
 98    Returns:
 99        MultiPartFormField: An instance of the class representing a form field in a multipart/form-data request.
100    """
101
102    headers: CaseInsensitiveDict[str]
103    content: bytes
104    encoding: str
105
106    def __init__(
107        self,
108        headers: CaseInsensitiveDict[str],
109        content: bytes = b"",
110        encoding: str = "utf-8",
111    ):
112        self.headers = headers
113        self.content = content
114        self.encoding = encoding
115
116    @classmethod
117    def from_body_part(cls, body_part: BodyPart):
118        headers = cls._fix_headers(cast(Mapping[bytes, bytes], body_part.headers))
119        return cls(headers, body_part.content, body_part.encoding)
120
121    @classmethod
122    def make(
123        cls,
124        name: str,
125        filename: str | None = None,
126        body: bytes = b"",
127        content_type: str | None = None,
128        encoding: str = "utf-8",
129    ) -> MultiPartFormField:
130        # Ensure the form won't break if someone includes quotes
131        escaped_name: str = escape_parameter(name)
132
133        # rfc7578  4.2. specifies that urlencoding shouldn't be applied to filename
134        #   But most browsers still replaces the " character by %22 to avoid breaking the parameters quotation.
135        escaped_filename: str | None = filename and escape_parameter(filename)
136
137        if content_type is None:
138            content_type = get_mime(filename)
139
140        urlencoded_content_type = urllibquote(content_type)
141
142        disposition = f'form-data; name="{escaped_name}"'
143        if filename is not None:
144            # When the param is a file, add a filename MIME param and a content-type header
145            disposition += f'; filename="{escaped_filename}"'
146            headers = CaseInsensitiveDict(
147                {
148                    CONTENT_DISPOSITION_KEY: disposition,
149                    CONTENT_TYPE_KEY: urlencoded_content_type,
150                }
151            )
152        else:
153            headers = CaseInsensitiveDict(
154                {
155                    CONTENT_DISPOSITION_KEY: disposition,
156                }
157            )
158
159        return cls(headers, body, encoding)
160
161    # TODO: Rewrite request_toolbelt multipart parser to get rid of encoding.
162    @staticmethod
163    def from_file(
164        name: str,
165        file: TextIOWrapper | BufferedReader | str | IOBase,
166        filename: str | None = None,
167        content_type: str | None = None,
168        encoding: str | None = None,
169    ):
170        if isinstance(file, str):
171            file = open(file, mode="rb")
172
173        if filename is None:
174            match file:
175                case TextIOWrapper() | BufferedReader():
176                    filename = os.path.basename(file.name)
177                case _:
178                    filename = name
179
180        # Guess the MIME content-type from the file extension
181        if content_type is None:
182            content_type = (
183                mimetypes.guess_type(filename)[0] or "application/octet-stream"
184            )
185
186        # Read the whole file into memory
187        content: bytes
188        match file:
189            case TextIOWrapper():
190                content = file.read().encode(file.encoding)
191                # Override file.encoding if provided.
192                encoding = encoding or file.encoding
193            case BufferedReader() | IOBase():
194                content = file.read()
195
196        instance = MultiPartFormField.make(
197            name,
198            filename=filename,
199            body=content,
200            content_type=content_type,
201            encoding=encoding or "utf-8",
202        )
203
204        file.close()
205
206        return instance
207
208    @staticmethod
209    def __serialize_content(
210        content: bytes, headers: Mapping[str | bytes, str | bytes]
211    ) -> bytes:
212        # Prepend content with headers
213        merged_content: bytes = b""
214        header_lines = (
215            always_bytes(key) + b": " + always_bytes(value)
216            for key, value in headers.items()
217        )
218        merged_content += b"\r\n".join(header_lines)
219        merged_content += b"\r\n\r\n"
220        merged_content += content
221        return merged_content
222
223    def __bytes__(self) -> bytes:
224        return self.__serialize_content(
225            self.content,
226            cast(Mapping[bytes | str, bytes | str], self.headers),
227        )
228    
229    def __eq__(self, other) -> bool:
230        match other:
231            case MultiPartFormField() | bytes():
232                return bytes(other) == bytes(self)
233            case str():
234                return other.encode("latin-1") == bytes(self)
235        return False
236
237    def __hash__(self) -> int:
238        return hash(bytes(self))
239
240    @staticmethod
241    def _fix_headers(headers: Mapping[bytes, bytes]) -> CaseInsensitiveDict[str]:
242        # Fix the headers key by converting them to strings
243        # https://github.com/requests/toolbelt/pull/353
244
245        fixed_headers: CaseInsensitiveDict[str] = CaseInsensitiveDict()
246        for key, value in headers.items():
247            fixed_headers[always_str(key)] = always_str(value.decode())
248        return fixed_headers
249
250    # Unused for now
251    # @staticmethod
252    # def _unfix_headers(headers: Mapping[str, str]) -> CaseInsensitiveDict[bytes]:
253    #     # Unfix the headers key by converting them to bytes
254
255    #     unfixed_headers: CaseInsensitiveDict[bytes] = CaseInsensitiveDict()
256    #     for key, value in headers.items():
257    #         unfixed_headers[always_bytes(key)] = always_bytes(value)  # type: ignore requests_toolbelt uses wrong types but it still works fine.
258    #     return unfixed_headers
259
260    @property
261    def text(self) -> str:
262        return self.content.decode(self.encoding)
263
264    @property
265    def content_type(self) -> str | None:
266        return self.headers.get(CONTENT_TYPE_KEY)
267
268    @content_type.setter
269    def content_type(self, content_type: str | None) -> None:
270        headers = self.headers
271        if content_type is None:
272            del headers[CONTENT_TYPE_KEY]
273        else:
274            headers[CONTENT_TYPE_KEY] = content_type
275
276    def _parse_disposition(self) -> list[tuple[str, str]]:
277        header_key = CONTENT_DISPOSITION_KEY
278        header_value = self.headers[header_key]
279        return parse_header(header_key, header_value)
280
281    def _unparse_disposition(self, parsed_header: list[tuple[str, str]]):
282        unparsed = unparse_header_value(parsed_header)
283        self.headers[CONTENT_DISPOSITION_KEY] = unparsed
284
285    def get_disposition_param(self, key: str) -> tuple[str, str | None] | None:
286        """Get a param from the Content-Disposition header
287
288        Args:
289            key (str): the param name
290
291        Raises:
292            StopIteration: Raised when the param was not found.
293
294        Returns:
295            tuple[str, str | None] | None: Returns the param as (key, value)
296        """
297        # Parse the Content-Disposition header
298        parsed_disposition = self._parse_disposition()
299        return find_header_param(parsed_disposition, key)
300
301    def set_disposition_param(self, key: str, value: str):
302        """Set a Content-Type header parameter
303
304        Args:
305            key (str): The parameter name
306            value (str): The parameter value
307        """
308        parsed = self._parse_disposition()
309        updated = update_header_param(parsed, key, value)
310        self._unparse_disposition(cast(list[tuple[str, str]], updated))
311
312    @property
313    def name(self) -> str:
314        """Get the Content-Disposition header name parameter
315
316        Returns:
317            str: The Content-Disposition header name parameter value
318        """
319        # Assume name is always present
320        return cast(tuple[str, str], self.get_disposition_param("name"))[1]
321
322    @name.setter
323    def name(self, value: str):
324        self.set_disposition_param("name", value)
325
326    @property
327    def filename(self) -> str | None:
328        """Get the Content-Disposition header filename parameter
329
330        Returns:
331            str | None: The Content-Disposition header filename parameter value
332        """
333        param = self.get_disposition_param("filename")
334        return param and param[1]
335
336    @filename.setter
337    def filename(self, value: str):
338        self.set_disposition_param("filename", value)

This class represents a field in a multipart/form-data request.

It provides functionalities to create form fields from various inputs like raw body parts, files and manual construction with name, filename, body, and content type.

It also offers properties and methods to interact with the form field's headers and content.

Raises: StopIteration: Raised when the specified Content-Disposition header is not found or could not be parsed.

Returns: MultiPartFormField: An instance of the class representing a form field in a multipart/form-data request.

MultiPartFormField( headers: requests.structures.CaseInsensitiveDict[str], content: bytes = b'', encoding: str = 'utf-8')
106    def __init__(
107        self,
108        headers: CaseInsensitiveDict[str],
109        content: bytes = b"",
110        encoding: str = "utf-8",
111    ):
112        self.headers = headers
113        self.content = content
114        self.encoding = encoding
headers: requests.structures.CaseInsensitiveDict[str]
content: bytes
encoding: str
@classmethod
def from_body_part(cls, body_part: requests_toolbelt.multipart.decoder.BodyPart):
116    @classmethod
117    def from_body_part(cls, body_part: BodyPart):
118        headers = cls._fix_headers(cast(Mapping[bytes, bytes], body_part.headers))
119        return cls(headers, body_part.content, body_part.encoding)
@classmethod
def make( cls, name: str, filename: str | None = None, body: bytes = b'', content_type: str | None = None, encoding: str = 'utf-8') -> MultiPartFormField:
121    @classmethod
122    def make(
123        cls,
124        name: str,
125        filename: str | None = None,
126        body: bytes = b"",
127        content_type: str | None = None,
128        encoding: str = "utf-8",
129    ) -> MultiPartFormField:
130        # Ensure the form won't break if someone includes quotes
131        escaped_name: str = escape_parameter(name)
132
133        # rfc7578  4.2. specifies that urlencoding shouldn't be applied to filename
134        #   But most browsers still replaces the " character by %22 to avoid breaking the parameters quotation.
135        escaped_filename: str | None = filename and escape_parameter(filename)
136
137        if content_type is None:
138            content_type = get_mime(filename)
139
140        urlencoded_content_type = urllibquote(content_type)
141
142        disposition = f'form-data; name="{escaped_name}"'
143        if filename is not None:
144            # When the param is a file, add a filename MIME param and a content-type header
145            disposition += f'; filename="{escaped_filename}"'
146            headers = CaseInsensitiveDict(
147                {
148                    CONTENT_DISPOSITION_KEY: disposition,
149                    CONTENT_TYPE_KEY: urlencoded_content_type,
150                }
151            )
152        else:
153            headers = CaseInsensitiveDict(
154                {
155                    CONTENT_DISPOSITION_KEY: disposition,
156                }
157            )
158
159        return cls(headers, body, encoding)
@staticmethod
def from_file( name: str, file: _io.TextIOWrapper | _io.BufferedReader | str | io.IOBase, filename: str | None = None, content_type: str | None = None, encoding: str | None = None):
162    @staticmethod
163    def from_file(
164        name: str,
165        file: TextIOWrapper | BufferedReader | str | IOBase,
166        filename: str | None = None,
167        content_type: str | None = None,
168        encoding: str | None = None,
169    ):
170        if isinstance(file, str):
171            file = open(file, mode="rb")
172
173        if filename is None:
174            match file:
175                case TextIOWrapper() | BufferedReader():
176                    filename = os.path.basename(file.name)
177                case _:
178                    filename = name
179
180        # Guess the MIME content-type from the file extension
181        if content_type is None:
182            content_type = (
183                mimetypes.guess_type(filename)[0] or "application/octet-stream"
184            )
185
186        # Read the whole file into memory
187        content: bytes
188        match file:
189            case TextIOWrapper():
190                content = file.read().encode(file.encoding)
191                # Override file.encoding if provided.
192                encoding = encoding or file.encoding
193            case BufferedReader() | IOBase():
194                content = file.read()
195
196        instance = MultiPartFormField.make(
197            name,
198            filename=filename,
199            body=content,
200            content_type=content_type,
201            encoding=encoding or "utf-8",
202        )
203
204        file.close()
205
206        return instance
text: str
260    @property
261    def text(self) -> str:
262        return self.content.decode(self.encoding)
content_type: str | None
264    @property
265    def content_type(self) -> str | None:
266        return self.headers.get(CONTENT_TYPE_KEY)
def get_disposition_param(self, key: str) -> tuple[str, str | None] | None:
285    def get_disposition_param(self, key: str) -> tuple[str, str | None] | None:
286        """Get a param from the Content-Disposition header
287
288        Args:
289            key (str): the param name
290
291        Raises:
292            StopIteration: Raised when the param was not found.
293
294        Returns:
295            tuple[str, str | None] | None: Returns the param as (key, value)
296        """
297        # Parse the Content-Disposition header
298        parsed_disposition = self._parse_disposition()
299        return find_header_param(parsed_disposition, key)

Get a param from the Content-Disposition header

Args: key (str): the param name

Raises: StopIteration: Raised when the param was not found.

Returns: tuple[str, str | None] | None: Returns the param as (key, value)

def set_disposition_param(self, key: str, value: str):
301    def set_disposition_param(self, key: str, value: str):
302        """Set a Content-Type header parameter
303
304        Args:
305            key (str): The parameter name
306            value (str): The parameter value
307        """
308        parsed = self._parse_disposition()
309        updated = update_header_param(parsed, key, value)
310        self._unparse_disposition(cast(list[tuple[str, str]], updated))

Set a Content-Type header parameter

Args: key (str): The parameter name value (str): The parameter value

name: str
312    @property
313    def name(self) -> str:
314        """Get the Content-Disposition header name parameter
315
316        Returns:
317            str: The Content-Disposition header name parameter value
318        """
319        # Assume name is always present
320        return cast(tuple[str, str], self.get_disposition_param("name"))[1]

Get the Content-Disposition header name parameter

Returns: str: The Content-Disposition header name parameter value

filename: str | None
326    @property
327    def filename(self) -> str | None:
328        """Get the Content-Disposition header filename parameter
329
330        Returns:
331            str | None: The Content-Disposition header filename parameter value
332        """
333        param = self.get_disposition_param("filename")
334        return param and param[1]

Get the Content-Disposition header filename parameter

Returns: str | None: The Content-Disposition header filename parameter value

class URLEncodedForm(_internal_mitmproxy.coretypes.multidict.MultiDict[bytes, bytes]):
27class URLEncodedForm(multidict.MultiDict[bytes, bytes]):
28    def __init__(self, fields: Iterable[tuple[str | bytes, str | bytes]]) -> None:
29        fields_converted_to_bytes: Iterable[tuple[bytes, bytes]] = (
30            (
31                always_bytes(key),
32                always_bytes(val),
33            )
34            for (key, val) in fields
35        )
36        super().__init__(fields_converted_to_bytes)
37
38    def __setitem__(self, key: int | str | bytes, value: int | str | bytes) -> None:
39        super().__setitem__(always_bytes(key), always_bytes(value))
40
41    def __getitem__(self, key: int | bytes | str) -> bytes:
42        return super().__getitem__(always_bytes(key))

A concrete MultiDict, storing its own data.

Inherited Members
_internal_mitmproxy.coretypes.multidict.MultiDict
MultiDict
fields
get_state
set_state
from_state
_internal_mitmproxy.coretypes.multidict._MultiDict
get_all
set_all
add
insert
keys
values
items
collections.abc.MutableMapping
pop
popitem
clear
update
setdefault
_internal_mitmproxy.coretypes.serializable.Serializable
copy
collections.abc.Mapping
get
class FormSerializer(abc.ABC):
 49class FormSerializer(ABC):
 50    @abstractmethod
 51    def serialize(self, deserialized_body: Form, req: ObjectWithHeaders) -> bytes:
 52        """Serialize a parsed form to raw bytes
 53
 54        Args:
 55            deserialized_body (Form): The parsed form
 56            req (ObjectWithHeaders): The originating request (used for multipart to get an up to date boundary from content-type)
 57
 58        Returns:
 59            bytes: Form's raw bytes representation
 60        """
 61
 62    @abstractmethod
 63    def deserialize(self, body: bytes, req: ObjectWithHeaders) -> Form | None:
 64        """Parses the form from its raw bytes representation
 65
 66        Args:
 67            body (bytes): The form as bytes
 68            req (ObjectWithHeaders): The originating request  (used for multipart to get an up to date boundary from content-type)
 69
 70        Returns:
 71            Form | None: The parsed form
 72        """
 73
 74    @abstractmethod
 75    def get_empty_form(self, req: ObjectWithHeaders) -> Form:
 76        """Get an empty parsed form object
 77
 78        Args:
 79            req (ObjectWithHeaders): The originating request (used to get a boundary for multipart forms)
 80
 81        Returns:
 82            Form: The empty form
 83        """
 84
 85    @abstractmethod
 86    def deserialized_type(self) -> type[Form]:
 87        """Gets the form concrete type
 88
 89        Returns:
 90            type[Form]: The form concrete type
 91        """
 92
 93    @abstractmethod
 94    def import_form(self, exported: ExportedForm, req: ObjectWithHeaders) -> Form:
 95        """Imports a form exported by a serializer
 96            Used to convert a form from a Content-Type to another
 97            Information may be lost in the process
 98
 99        Args:
100            exported (ExportedForm): The exported form
101            req: (ObjectWithHeaders): Used to get multipart boundary
102
103        Returns:
104            Form: The form converted to this serializer's format
105        """
106
107    @abstractmethod
108    def export_form(self, source: Form) -> TupleExportedForm:
109        """Formats a form so it can be imported by another serializer
110            Information may be lost in the process
111
112        Args:
113            form (Form): The form to export
114
115        Returns:
116            ExportedForm: The exported form
117        """

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def serialize( self, deserialized_body: Form, req: pyscalpel.http.body.abstract.ObjectWithHeadersField | pyscalpel.http.body.abstract.ObjectWithHeadersProperty) -> bytes:
50    @abstractmethod
51    def serialize(self, deserialized_body: Form, req: ObjectWithHeaders) -> bytes:
52        """Serialize a parsed form to raw bytes
53
54        Args:
55            deserialized_body (Form): The parsed form
56            req (ObjectWithHeaders): The originating request (used for multipart to get an up to date boundary from content-type)
57
58        Returns:
59            bytes: Form's raw bytes representation
60        """

Serialize a parsed form to raw bytes

Args: deserialized_body (Form): The parsed form req (ObjectWithHeaders): The originating request (used for multipart to get an up to date boundary from content-type)

Returns: bytes: Form's raw bytes representation

@abstractmethod
def deserialize( self, body: bytes, req: pyscalpel.http.body.abstract.ObjectWithHeadersField | pyscalpel.http.body.abstract.ObjectWithHeadersProperty) -> Form | None:
62    @abstractmethod
63    def deserialize(self, body: bytes, req: ObjectWithHeaders) -> Form | None:
64        """Parses the form from its raw bytes representation
65
66        Args:
67            body (bytes): The form as bytes
68            req (ObjectWithHeaders): The originating request  (used for multipart to get an up to date boundary from content-type)
69
70        Returns:
71            Form | None: The parsed form
72        """

Parses the form from its raw bytes representation

Args: body (bytes): The form as bytes req (ObjectWithHeaders): The originating request (used for multipart to get an up to date boundary from content-type)

Returns: Form | None: The parsed form

@abstractmethod
def get_empty_form( self, req: pyscalpel.http.body.abstract.ObjectWithHeadersField | pyscalpel.http.body.abstract.ObjectWithHeadersProperty) -> Form:
74    @abstractmethod
75    def get_empty_form(self, req: ObjectWithHeaders) -> Form:
76        """Get an empty parsed form object
77
78        Args:
79            req (ObjectWithHeaders): The originating request (used to get a boundary for multipart forms)
80
81        Returns:
82            Form: The empty form
83        """

Get an empty parsed form object

Args: req (ObjectWithHeaders): The originating request (used to get a boundary for multipart forms)

Returns: Form: The empty form

@abstractmethod
def deserialized_type(self) -> type[Form]:
85    @abstractmethod
86    def deserialized_type(self) -> type[Form]:
87        """Gets the form concrete type
88
89        Returns:
90            type[Form]: The form concrete type
91        """

Gets the form concrete type

Returns: type[Form]: The form concrete type

@abstractmethod
def import_form( self, exported: tuple[tuple[bytes, bytes | None], ...], req: pyscalpel.http.body.abstract.ObjectWithHeadersField | pyscalpel.http.body.abstract.ObjectWithHeadersProperty) -> Form:
 93    @abstractmethod
 94    def import_form(self, exported: ExportedForm, req: ObjectWithHeaders) -> Form:
 95        """Imports a form exported by a serializer
 96            Used to convert a form from a Content-Type to another
 97            Information may be lost in the process
 98
 99        Args:
100            exported (ExportedForm): The exported form
101            req: (ObjectWithHeaders): Used to get multipart boundary
102
103        Returns:
104            Form: The form converted to this serializer's format
105        """

Imports a form exported by a serializer Used to convert a form from a Content-Type to another Information may be lost in the process

Args: exported (ExportedForm): The exported form req: (ObjectWithHeaders): Used to get multipart boundary

Returns: Form: The form converted to this serializer's format

@abstractmethod
def export_form( self, source: Form) -> tuple[tuple[bytes, bytes | None], ...]:
107    @abstractmethod
108    def export_form(self, source: Form) -> TupleExportedForm:
109        """Formats a form so it can be imported by another serializer
110            Information may be lost in the process
111
112        Args:
113            form (Form): The form to export
114
115        Returns:
116            ExportedForm: The exported form
117        """

Formats a form so it can be imported by another serializer Information may be lost in the process

Args: form (Form): The form to export

Returns: ExportedForm: The exported form

def json_unescape(escaped: str) -> str:
55def json_unescape(escaped: str) -> str:
56    def decode_match(match):
57        return chr(int(match.group(1), 16))
58
59    return re.sub(r"\\u([0-9a-fA-F]{4})", decode_match, escaped)
def json_unescape_bytes(escaped: str) -> bytes:
62def json_unescape_bytes(escaped: str) -> bytes:
63    return json_unescape(escaped).encode("latin-1")
def json_escape_bytes(data: bytes) -> str:
49def json_escape_bytes(data: bytes) -> str:
50    printable = string.printable.encode("utf-8")
51
52    return "".join(chr(ch) if ch in printable else f"\\u{ch:04x}" for ch in data)