python3-10.pyscalpel
This is a module providing tools to handle Burp HTTP traffic through the use of the Scalpel extension.
It provides many utilities to manipulate HTTP requests, responses and converting data.
1""" 2This is a module providing tools to handle Burp HTTP traffic through the use of the Scalpel extension. 3 4It provides many utilities to manipulate HTTP requests, responses and converting data. 5""" 6 7from pyscalpel.http import Request, Response, Flow 8from pyscalpel.edit import editor 9from pyscalpel.burp_utils import ctx as _context 10from pyscalpel.java.scalpel_types import Context 11from pyscalpel.logger import Logger, logger 12from pyscalpel.events import MatchEvent 13from . import http 14from . import java 15from . import encoding 16from . import utils 17from . import burp_utils 18from . import venv 19from . import edit 20 21ctx: Context = _context 22"""The Scalpel Python execution context 23 24Contains the Burp Java API object, the venv directory, the user script path, 25the path to the file loading the user script and a logging object 26""" 27 28 29__all__ = [ 30 "http", 31 "java", 32 "encoding", 33 "utils", 34 "burp_utils", 35 "venv", 36 "edit", 37 "Request", 38 "Response", 39 "Flow", 40 "ctx", 41 "Context", 42 "MatchEvent", 43 "editor", 44 "logger", 45 "Logger", 46]
70class Request: 71 """A "Burp oriented" HTTP request class 72 73 74 This class allows to manipulate Burp requests in a Pythonic way. 75 """ 76 77 _Port = int 78 _QueryParam = tuple[str, str] 79 _ParsedQuery = tuple[_QueryParam, ...] 80 _HttpVersion = str 81 _HeaderKey = str 82 _HeaderValue = str 83 _Header = tuple[_HeaderKey, _HeaderValue] 84 _Host = str 85 _Method = str 86 _Scheme = Literal["http", "https"] 87 _Authority = str 88 _Content = bytes 89 _Path = str 90 91 host: _Host 92 port: _Port 93 method: _Method 94 scheme: _Scheme 95 authority: _Authority 96 97 # Path also includes URI parameters (;), query (?) and fragment (#) 98 # Simply because it is more conveninent to manipulate that way in a pentensting context 99 # It also mimics the way mitmproxy works. 100 path: _Path 101 102 http_version: _HttpVersion 103 _headers: Headers 104 _serializer: FormSerializer | None = None 105 _deserialized_content: Any = None 106 _content: _Content | None = None 107 _old_deserialized_content: Any = None 108 _is_form_initialized: bool = False 109 update_content_length: bool = True 110 111 def __init__( 112 self, 113 method: str, 114 scheme: Literal["http", "https"], 115 host: str, 116 port: int, 117 path: str, 118 http_version: str, 119 headers: ( 120 Headers | tuple[tuple[bytes, bytes], ...] | Iterable[tuple[bytes, bytes]] 121 ), 122 authority: str, 123 content: bytes | None, 124 ): 125 self.scheme = scheme 126 self.host = host 127 self.port = port 128 self.path = path 129 self.method = method 130 self.authority = authority 131 self.http_version = http_version 132 self.headers = headers if isinstance(headers, Headers) else Headers(headers) 133 self._content = content 134 135 # Initialize the serializer (json,urlencoded,multipart) 136 self.update_serializer_from_content_type( 137 self.headers.get("Content-Type"), fail_silently=True 138 ) 139 140 # Initialize old deserialized content to avoid modifying content if it has not been modified 141 # (see test_content_do_not_modify_json() in scalpel/src/main/resources/python/pyscalpel/tests/test_request.py) 142 self._old_deserialized_content = deepcopy(self._deserialized_content) 143 144 def _del_header(self, header: str) -> bool: 145 if header in self._headers.keys(): 146 del self._headers[header] 147 return True 148 149 return False 150 151 def _update_content_length(self) -> None: 152 if self.update_content_length: 153 if self._content is None: 154 self._del_header("Content-Length") 155 else: 156 length = len(cast(bytes, self._content)) 157 self._headers["Content-Length"] = str(length) 158 159 @staticmethod 160 def _parse_qs(query_string: str) -> _ParsedQuery: 161 return tuple(urllib.parse.parse_qsl(query_string)) 162 163 @staticmethod 164 def _parse_url( 165 url: str, 166 ) -> tuple[_Scheme, _Host, _Port, _Path]: 167 scheme, host, port, path = url_parse(url) 168 169 # This method is only used to create HTTP requests from URLs 170 # so we can ensure the scheme is valid for this usage 171 if scheme not in (b"http", b"https"): 172 scheme = b"http" 173 174 return cast( 175 tuple[Literal["http", "https"], str, int, str], 176 (scheme.decode("ascii"), host.decode("idna"), port, path.decode("ascii")), 177 ) 178 179 @staticmethod 180 def _unparse_url(scheme: _Scheme, host: _Host, port: _Port, path: _Path) -> str: 181 return url_unparse(scheme, host, port, path) 182 183 @classmethod 184 def make( 185 cls, 186 method: str, 187 url: str, 188 content: bytes | str = "", 189 headers: ( 190 Headers 191 | dict[str | bytes, str | bytes] 192 | dict[str, str] 193 | dict[bytes, bytes] 194 | Iterable[tuple[bytes, bytes]] 195 ) = (), 196 ) -> Request: 197 """Create a request from an URL 198 199 Args: 200 method (str): The request method (GET,POST,PUT,PATCH, DELETE,TRACE,...) 201 url (str): The request URL 202 content (bytes | str, optional): The request content. Defaults to "". 203 headers (Headers, optional): The request headers. Defaults to (). 204 205 Returns: 206 Request: The HTTP request 207 """ 208 scalpel_headers: Headers 209 match headers: 210 case Headers(): 211 scalpel_headers = headers 212 case dict(): 213 casted_headers = cast(dict[str | bytes, str | bytes], headers) 214 scalpel_headers = Headers( 215 ( 216 (always_bytes(key), always_bytes(val)) 217 for key, val in casted_headers.items() 218 ) 219 ) 220 case _: 221 scalpel_headers = Headers(headers) 222 223 scheme, host, port, path = Request._parse_url(url) 224 http_version = "HTTP/1.1" 225 226 # Inferr missing Host header from URL 227 host_header = scalpel_headers.get("Host") 228 if host_header is None: 229 match (scheme, port): 230 case ("http", 80) | ("https", 443): 231 host_header = host 232 case _: 233 host_header = f"{host}:{port}" 234 235 scalpel_headers["Host"] = host_header 236 237 authority: str = host_header 238 encoded_content = always_bytes(content) 239 240 assert isinstance(host, str) 241 242 return cls( 243 method=method, 244 scheme=scheme, 245 host=host, 246 port=port, 247 path=path, 248 http_version=http_version, 249 headers=scalpel_headers, 250 authority=authority, 251 content=encoded_content, 252 ) 253 254 @classmethod 255 def from_burp( 256 cls, request: IHttpRequest, service: IHttpService | None = None 257 ) -> Request: # pragma: no cover (uses Java API) 258 """Construct an instance of the Request class from a Burp suite HttpRequest. 259 :param request: The Burp suite HttpRequest to convert. 260 :return: A Request with the same data as the Burp suite HttpRequest. 261 """ 262 service = service or request.httpService() 263 body = get_bytes(request.body()) 264 265 # Burp will give you lowercased and pseudo headers when using HTTP/2. 266 # https://portswigger.net/burp/documentation/desktop/http2/http2-normalization-in-the-message-editor#sending-requests-without-any-normalization:~:text=are%20converted%20to-,lowercase,-. 267 # https://blog.yaakov.online/http-2-header-casing/ 268 headers: Headers = Headers.from_burp(request.headers()) 269 270 # Burp gives a 0 length byte array body even when it doesn't exist, instead of null. 271 # Empty but existing bodies without a Content-Length header are lost in the process. 272 if not body and not headers.get("Content-Length"): 273 body = None 274 275 # request.url() gives a relative url for some reason 276 # So we have to parse and unparse to get the full path 277 # (path + parameters + query + fragment) 278 _, _, path, parameters, query, fragment = urllib.parse.urlparse(request.url()) 279 280 # Concatenate the path components 281 # Empty parameters,query and fragment are lost in the process 282 # e.g.: http://example.com;?# becomes http://example.com 283 # To use such an URL, the user must set the path directly 284 # To fix this we would need to write our own URL parser, which is a bit overkill for now. 285 path = urllib.parse.urlunparse(("", "", path, parameters, query, fragment)) 286 287 host = "" 288 port = 0 289 scheme = "http" 290 if service: 291 host = service.host() 292 port = service.port() 293 scheme = "https" if service.secure() else "http" 294 295 return cls( 296 method=request.method(), 297 scheme=scheme, 298 host=host, 299 port=port, 300 path=path, 301 http_version=request.httpVersion() or "HTTP/1.1", 302 headers=headers, 303 authority=headers.get(":authority") or headers.get("Host") or "", 304 content=body, 305 ) 306 307 def __bytes__(self) -> bytes: 308 """Convert the request to bytes 309 :return: The request as bytes. 310 """ 311 # Reserialize the request to bytes. 312 first_line = ( 313 b" ".join( 314 always_bytes(s) for s in (self.method, self.path, self.http_version) 315 ) 316 + b"\r\n" 317 ) 318 319 # Strip HTTP/2 pseudo headers. 320 # https://portswigger.net/burp/documentation/desktop/http2/http2-basics-for-burp-users#:~:text=HTTP/2%20specification.-,Pseudo%2Dheaders,-In%20HTTP/2 321 mapped_headers = tuple( 322 field for field in self.headers.fields if not field[0].startswith(b":") 323 ) 324 325 if self.headers.get(b"Host") is None and self.http_version == "HTTP/2": 326 # Host header is not present in HTTP/2, but is required by Burp message editor. 327 # So we have to add it back from the :authority pseudo-header. 328 # https://portswigger.net/burp/documentation/desktop/http2/http2-normalization-in-the-message-editor#sending-requests-without-any-normalization:~:text=pseudo%2Dheaders%20and-,derives,-the%20%3Aauthority%20from 329 mapped_headers = ( 330 (b"Host", always_bytes(self.headers[":authority"])), 331 ) + tuple(mapped_headers) 332 333 # Construct the request's headers part. 334 headers_lines = b"".join( 335 b"%s: %s\r\n" % (key, val) for key, val in mapped_headers 336 ) 337 338 # Set a default value for the request's body. (None -> b"") 339 body = self.content or b"" 340 341 # Construct the whole request and return it. 342 return first_line + headers_lines + b"\r\n" + body 343 344 def to_burp(self) -> IHttpRequest: # pragma: no cover 345 """Convert the request to a Burp suite :class:`IHttpRequest`. 346 :return: The request as a Burp suite :class:`IHttpRequest`. 347 """ 348 # Convert the request to a Burp ByteArray. 349 request_byte_array: IByteArray = PythonUtils.toByteArray(bytes(self)) 350 351 if self.port == 0: 352 # No networking information is available, so we build a plain network-less request. 353 return HttpRequest.httpRequest(request_byte_array) 354 355 # Build the Burp HTTP networking service. 356 service: IHttpService = HttpService.httpService( 357 self.host, self.port, self.scheme == "https" 358 ) 359 360 # Instantiate and return a new Burp HTTP request. 361 return HttpRequest.httpRequest(service, request_byte_array) 362 363 @classmethod 364 def from_raw( 365 cls, 366 data: bytes | str, 367 real_host: str = "", 368 port: int = 0, 369 scheme: Literal["http"] | Literal["https"] | str = "http", 370 ) -> Request: # pragma: no cover 371 """Construct an instance of the Request class from raw bytes. 372 :param data: The raw bytes to convert. 373 :param real_host: The real host to connect to. 374 :param port: The port of the request. 375 :param scheme: The scheme of the request. 376 :return: A :class:`Request` with the same data as the raw bytes. 377 """ 378 # Convert the raw bytes to a Burp ByteArray. 379 # We use the Burp API to trivialize the parsing of the request from raw bytes. 380 str_or_byte_array: IByteArray | str = ( 381 data if isinstance(data, str) else PythonUtils.toByteArray(data) 382 ) 383 384 # Handle the case where the networking informations are not provided. 385 if port == 0: 386 # Instantiate and return a new Burp HTTP request without networking informations. 387 burp_request: IHttpRequest = HttpRequest.httpRequest(str_or_byte_array) 388 else: 389 # Build the Burp HTTP networking service. 390 service: IHttpService = HttpService.httpService( 391 real_host, port, scheme == "https" 392 ) 393 394 # Instantiate a new Burp HTTP request with networking informations. 395 burp_request: IHttpRequest = HttpRequest.httpRequest( 396 service, str_or_byte_array 397 ) 398 399 # Construct the request from the Burp. 400 return cls.from_burp(burp_request) 401 402 @property 403 def url(self) -> str: 404 """ 405 The full URL string, constructed from `Request.scheme`, 406 `Request.host`, `Request.port` and `Request.path`. 407 408 Setting this property updates these attributes as well. 409 """ 410 return Request._unparse_url(self.scheme, self.host, self.port, self.path) 411 412 @url.setter 413 def url(self, val: str | bytes) -> None: 414 (self.scheme, self.host, self.port, self.path) = Request._parse_url( 415 always_str(val) 416 ) 417 418 def _get_query(self) -> _ParsedQuery: 419 query = urllib.parse.urlparse(self.url).query 420 return tuple(url_decode(query)) 421 422 def _set_query(self, query_data: Sequence[_QueryParam]): 423 query = url_encode(query_data) 424 _, _, path, params, _, fragment = urllib.parse.urlparse(self.url) 425 self.path = urllib.parse.urlunparse(["", "", path, params, query, fragment]) 426 427 @property 428 def query(self) -> URLEncodedFormView: 429 """The query string parameters as a dict-like object 430 431 Returns: 432 QueryParamsView: The query string parameters 433 """ 434 return URLEncodedFormView( 435 multidict.MultiDictView(self._get_query, self._set_query) 436 ) 437 438 @query.setter 439 def query(self, value: Sequence[tuple[str, str]]): 440 self._set_query(value) 441 442 def _has_deserialized_content_changed(self) -> bool: 443 return self._deserialized_content != self._old_deserialized_content 444 445 def _serialize_content(self): 446 if self._serializer is None: 447 return 448 449 if self._deserialized_content is None: 450 self._content = None 451 return 452 453 self._update_serialized_content( 454 self._serializer.serialize(self._deserialized_content, req=self) 455 ) 456 457 def _update_serialized_content(self, serialized: bytes): 458 if self._serializer is None: 459 self._content = serialized 460 return 461 462 # Update the parsed form 463 self._deserialized_content = self._serializer.deserialize(serialized, self) 464 self._old_deserialized_content = deepcopy(self._deserialized_content) 465 466 # Set the raw content directly 467 self._content = serialized 468 469 def _deserialize_content(self): 470 if self._serializer is None: 471 return 472 473 if self._content: 474 self._deserialized_content = self._serializer.deserialize( 475 self._content, req=self 476 ) 477 478 def _update_deserialized_content(self, deserialized: Any): 479 if self._serializer is None: 480 return 481 482 if deserialized is None: 483 self._deserialized_content = None 484 self._old_deserialized_content = None 485 return 486 487 self._deserialized_content = deserialized 488 self._content = self._serializer.serialize(deserialized, self) 489 self._update_content_length() 490 491 @property 492 def content(self) -> bytes | None: 493 """The request content / body as raw bytes 494 495 Returns: 496 bytes | None: The content if it exists 497 """ 498 if self._serializer and self._has_deserialized_content_changed(): 499 self._update_deserialized_content(self._deserialized_content) 500 self._old_deserialized_content = deepcopy(self._deserialized_content) 501 502 self._update_content_length() 503 504 return self._content 505 506 @content.setter 507 def content(self, value: bytes | str | None): 508 match value: 509 case None: 510 self._content = None 511 self._deserialized_content = None 512 return 513 case str(): 514 value = value.encode("latin-1") 515 516 self._update_content_length() 517 518 self._update_serialized_content(value) 519 520 @property 521 def body(self) -> bytes | None: 522 """Alias for content() 523 524 Returns: 525 bytes | None: The request body / content 526 """ 527 return self.content 528 529 @body.setter 530 def body(self, value: bytes | str | None): 531 self.content = value 532 533 def update_serializer_from_content_type( 534 self, 535 content_type: ImplementedContentType | str | None = None, 536 fail_silently: bool = False, 537 ): 538 """Update the form parsing based on the given Content-Type 539 540 Args: 541 content_type (ImplementedContentTypesTp | str | None, optional): The form content-type. Defaults to None. 542 fail_silently (bool, optional): Determine if an excpetion is raised when the content-type is unknown. Defaults to False. 543 544 Raises: 545 FormNotParsedException: Raised when the content-type is unknown. 546 """ 547 # Strip the boundary param so we can use our content-type to serializer map 548 _content_type: str = get_header_value_without_params( 549 content_type or self.headers.get("Content-Type") or "" 550 ) 551 552 serializer = None 553 if _content_type in IMPLEMENTED_CONTENT_TYPES: 554 serializer = CONTENT_TYPE_TO_SERIALIZER.get(_content_type) 555 556 if serializer is None: 557 if fail_silently: 558 serializer = self._serializer 559 else: 560 raise FormNotParsedException( 561 f"Unimplemented form content-type: {_content_type}" 562 ) 563 self._set_serializer(serializer) 564 565 @property 566 def content_type(self) -> str | None: 567 """The Content-Type header value. 568 569 Returns: 570 str | None: <=> self.headers.get("Content-Type") 571 """ 572 return self.headers.get("Content-Type") 573 574 @content_type.setter 575 def content_type(self, value: str) -> str | None: 576 self.headers["Content-Type"] = value 577 578 def create_defaultform( 579 self, 580 content_type: ImplementedContentType | str | None = None, 581 update_header: bool = True, 582 ) -> MutableMapping[Any, Any]: 583 """Creates the form if it doesn't exist, else returns the existing one 584 585 Args: 586 content_type (IMPLEMENTED_CONTENT_TYPES_TP | None, optional): The form content-type. Defaults to None. 587 update_header (bool, optional): Whether to update the header. Defaults to True. 588 589 Raises: 590 FormNotParsedException: Thrown when provided content-type has no implemented form-serializer 591 FormNotParsedException: Thrown when the raw content could not be parsed. 592 593 Returns: 594 MutableMapping[Any, Any]: The mapped form. 595 """ 596 if not self._is_form_initialized or content_type: 597 self.update_serializer_from_content_type(content_type) 598 599 # Set content-type if it does not exist 600 if (content_type and update_header) or not self.headers.get_all( 601 "Content-Type" 602 ): 603 self.headers["Content-Type"] = content_type 604 605 serializer = self._serializer 606 if serializer is None: 607 # This should probably never trigger here as it should already be raised by update_serializer_from_content_type 608 raise FormNotParsedException( 609 f"Form of content-type {self.content_type} not implemented." 610 ) 611 612 # Create default form. 613 if not self.content: 614 self._deserialized_content = serializer.get_empty_form(self) 615 elif self._deserialized_content is None: 616 self._deserialize_content() 617 618 if self._deserialized_content is None: 619 raise FormNotParsedException( 620 f"Could not parse content to {serializer.deserialized_type()}\nContent:{self._content}" 621 ) 622 623 if not isinstance(self._deserialized_content, serializer.deserialized_type()): 624 self._deserialized_content = serializer.get_empty_form(self) 625 626 self._is_form_initialized = True 627 return self._deserialized_content 628 629 @property 630 def form(self) -> MutableMapping[Any, Any]: 631 """Mapping from content parsed accordingly to Content-Type 632 633 Raises: 634 FormNotParsedException: The content could not be parsed accordingly to Content-Type 635 636 Returns: 637 MutableMapping[Any, Any]: The mapped request form 638 """ 639 if not self._is_form_initialized: 640 self.update_serializer_from_content_type() 641 642 self.create_defaultform() 643 if self._deserialized_content is None: 644 raise FormNotParsedException() 645 646 self._is_form_initialized = True 647 return self._deserialized_content 648 649 @form.setter 650 def form(self, form: MutableMapping[Any, Any]): 651 if not self._is_form_initialized: 652 self.update_serializer_from_content_type() 653 self._is_form_initialized = True 654 655 self._deserialized_content = form 656 657 # Update raw _content 658 self._serialize_content() 659 660 def _set_serializer(self, serializer: FormSerializer | None): 661 # Update the serializer 662 old_serializer = self._serializer 663 self._serializer = serializer 664 665 if serializer is None: 666 self._deserialized_content = None 667 return 668 669 if type(serializer) == type(old_serializer): 670 return 671 672 if old_serializer is None: 673 self._deserialize_content() 674 return 675 676 old_form = self._deserialized_content 677 678 if old_form is None: 679 self._deserialize_content() 680 return 681 682 # Convert the form to an intermediate format for easier conversion 683 exported_form = old_serializer.export_form(old_form) 684 685 # Parse the intermediate data to the new serializer format 686 imported_form = serializer.import_form(exported_form, self) 687 self._deserialized_content = imported_form 688 689 def _update_serializer_and_get_form( 690 self, serializer: FormSerializer 691 ) -> MutableMapping[Any, Any] | None: 692 # Set the serializer and update the content 693 self._set_serializer(serializer) 694 695 # Return the new form 696 return self._deserialized_content 697 698 def _update_serializer_and_set_form( 699 self, serializer: FormSerializer, form: MutableMapping[Any, Any] 700 ) -> None: 701 # NOOP when the serializer is the same 702 self._set_serializer(serializer) 703 704 self._update_deserialized_content(form) 705 706 @property 707 def urlencoded_form(self) -> URLEncodedForm: 708 """The urlencoded form data 709 710 Converts the content to the urlencoded form format if needed. 711 Modification to this object will update Request.content and vice versa 712 713 Returns: 714 QueryParams: The urlencoded form data 715 """ 716 self._is_form_initialized = True 717 return cast( 718 URLEncodedForm, 719 self._update_serializer_and_get_form(URLEncodedFormSerializer()), 720 ) 721 722 @urlencoded_form.setter 723 def urlencoded_form(self, form: URLEncodedForm): 724 self._is_form_initialized = True 725 self._update_serializer_and_set_form(URLEncodedFormSerializer(), form) 726 727 @property 728 def json_form(self) -> dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]: 729 """The JSON form data 730 731 Converts the content to the JSON form format if needed. 732 Modification to this object will update Request.content and vice versa 733 734 Returns: 735 dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]: The JSON form data 736 """ 737 self._is_form_initialized = True 738 if self._update_serializer_and_get_form(JSONFormSerializer()) is None: 739 serializer = cast(JSONFormSerializer, self._serializer) 740 self._deserialized_content = serializer.get_empty_form(self) 741 742 return self._deserialized_content 743 744 @json_form.setter 745 def json_form(self, form: dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]): 746 self._is_form_initialized = True 747 self._update_serializer_and_set_form(JSONFormSerializer(), JSONForm(form)) 748 749 def _ensure_multipart_content_type(self) -> str: 750 content_types_headers = self.headers.get_all("Content-Type") 751 pattern = re.compile( 752 r"^multipart/form-data;\s*boundary=([^;\s]+)", re.IGNORECASE 753 ) 754 755 # Find a valid multipart content-type header with a valid boundary 756 matched_content_type: str | None = None 757 for content_type in content_types_headers: 758 if pattern.match(content_type): 759 matched_content_type = content_type 760 break 761 762 # If no boundary was found, overwrite the Content-Type header 763 # If an user wants to avoid this behaviour,they should manually create a MultiPartForm(), convert it to bytes 764 # and pass it as raw_form() 765 if matched_content_type is None: 766 # TODO: Randomly generate this? The boundary could be used to fingerprint Scalpel 767 new_content_type = ( 768 "multipart/form-data; boundary=----WebKitFormBoundaryy6klzjxzTk68s1dI" 769 ) 770 self.headers["Content-Type"] = new_content_type 771 return new_content_type 772 773 return matched_content_type 774 775 @property 776 def multipart_form(self) -> MultiPartForm: 777 """The multipart form data 778 779 Converts the content to the multipart form format if needed. 780 Modification to this object will update Request.content and vice versa 781 782 Returns: 783 MultiPartForm 784 """ 785 self._is_form_initialized = True 786 787 # Keep boundary even if content-type has changed 788 if isinstance(self._deserialized_content, MultiPartForm): 789 return self._deserialized_content 790 791 # We do not have an existing form, so we have to ensure we have a content-type header with a boundary 792 self._ensure_multipart_content_type() 793 794 # Serialize the current form and try to parse it with the new serializer 795 form = self._update_serializer_and_get_form(MultiPartFormSerializer()) 796 serializer = cast(MultiPartFormSerializer, self._serializer) 797 798 # Set a default value 799 if not form: 800 self._deserialized_content = serializer.get_empty_form(self) 801 802 # get_empty_form() fails when the request doesn't have a valid Content-Type multipart/form-data with a boundary 803 if self._deserialized_content is None: 804 raise FormNotParsedException( 805 f"Could not parse content to {serializer.deserialized_type()}" 806 ) 807 808 return self._deserialized_content 809 810 @multipart_form.setter 811 def multipart_form(self, form: MultiPartForm): 812 self._is_form_initialized = True 813 if not isinstance(self._deserialized_content, MultiPartForm): 814 # Generate a multipart header because we don't have any boundary to format the multipart. 815 self._ensure_multipart_content_type() 816 817 return self._update_serializer_and_set_form( 818 MultiPartFormSerializer(), cast(MutableMapping, form) 819 ) 820 821 @property 822 def cookies(self) -> multidict.MultiDictView[str, str]: 823 """ 824 The request cookies. 825 For the most part, this behaves like a dictionary. 826 Modifications to the MultiDictView update `Request.headers`, and vice versa. 827 """ 828 return multidict.MultiDictView(self._get_cookies, self._set_cookies) 829 830 def _get_cookies(self) -> tuple[tuple[str, str], ...]: 831 header = self.headers.get_all("Cookie") 832 return tuple(cookies.parse_cookie_headers(header)) 833 834 def _set_cookies(self, value: tuple[tuple[str, str], ...]): 835 self.headers["cookie"] = cookies.format_cookie_header(value) 836 837 @cookies.setter 838 def cookies(self, value: tuple[tuple[str, str], ...] | Mapping[str, str]): 839 if hasattr(value, "items") and callable(getattr(value, "items")): 840 value = tuple(cast(Mapping[str, str], value).items()) 841 self._set_cookies(cast(tuple[tuple[str, str], ...], value)) 842 843 @property 844 def host_header(self) -> str | None: 845 """Host header value 846 847 Returns: 848 str | None: The host header value 849 """ 850 return self.headers.get("Host") 851 852 @host_header.setter 853 def host_header(self, value: str | None): 854 self.headers["Host"] = value 855 856 def text(self, encoding="utf-8") -> str: 857 """The decoded content 858 859 Args: 860 encoding (str, optional): encoding to use. Defaults to "utf-8". 861 862 Returns: 863 str: The decoded content 864 """ 865 if self.content is None: 866 return "" 867 868 return self.content.decode(encoding) 869 870 @property 871 def headers(self) -> Headers: 872 """The request HTTP headers 873 874 Returns: 875 Headers: a case insensitive dict containing the HTTP headers 876 """ 877 self._update_content_length() 878 return self._headers 879 880 @headers.setter 881 def headers(self, value: Headers): 882 self._headers = value 883 self._update_content_length() 884 885 @property 886 def content_length(self) -> int: 887 """Returns the Content-Length header value 888 Returns 0 if the header is absent 889 890 Args: 891 value (int | str): The Content-Length value 892 893 Raises: 894 RuntimeError: Throws RuntimeError when the value is invalid 895 """ 896 content_length: str | None = self.headers.get("Content-Length") 897 if content_length is None: 898 return 0 899 900 trimmed = content_length.strip() 901 if not trimmed.isdigit(): 902 raise ValueError("Content-Length does not contain only digits") 903 904 return int(trimmed) 905 906 @content_length.setter 907 def content_length(self, value: int | str): 908 if self.update_content_length: 909 # It is useless to manually set content-length because the value will be erased. 910 raise RuntimeError( 911 "Cannot set content_length when self.update_content_length is True" 912 ) 913 914 if isinstance(value, int): 915 value = str(value) 916 917 self._headers["Content-Length"] = value 918 919 @property 920 def pretty_host(self) -> str: 921 """Returns the most approriate host 922 Returns self.host when it exists, else it returns self.host_header 923 924 Returns: 925 str: The request target host 926 """ 927 return self.host or self.headers.get("Host") or "" 928 929 def host_is(self, *patterns: str) -> bool: 930 """Perform wildcard matching (fnmatch) on the target host. 931 932 Args: 933 pattern (str): The pattern to use 934 935 Returns: 936 bool: Whether the pattern matches 937 """ 938 return host_is(self.pretty_host, *patterns) 939 940 def path_is(self, *patterns: str) -> bool: 941 return match_patterns(self.path, *patterns)
A "Burp oriented" HTTP request class
This class allows to manipulate Burp requests in a Pythonic way.
111 def __init__( 112 self, 113 method: str, 114 scheme: Literal["http", "https"], 115 host: str, 116 port: int, 117 path: str, 118 http_version: str, 119 headers: ( 120 Headers | tuple[tuple[bytes, bytes], ...] | Iterable[tuple[bytes, bytes]] 121 ), 122 authority: str, 123 content: bytes | None, 124 ): 125 self.scheme = scheme 126 self.host = host 127 self.port = port 128 self.path = path 129 self.method = method 130 self.authority = authority 131 self.http_version = http_version 132 self.headers = headers if isinstance(headers, Headers) else Headers(headers) 133 self._content = content 134 135 # Initialize the serializer (json,urlencoded,multipart) 136 self.update_serializer_from_content_type( 137 self.headers.get("Content-Type"), fail_silently=True 138 ) 139 140 # Initialize old deserialized content to avoid modifying content if it has not been modified 141 # (see test_content_do_not_modify_json() in scalpel/src/main/resources/python/pyscalpel/tests/test_request.py) 142 self._old_deserialized_content = deepcopy(self._deserialized_content)
870 @property 871 def headers(self) -> Headers: 872 """The request HTTP headers 873 874 Returns: 875 Headers: a case insensitive dict containing the HTTP headers 876 """ 877 self._update_content_length() 878 return self._headers
The request HTTP headers
Returns: Headers: a case insensitive dict containing the HTTP headers
183 @classmethod 184 def make( 185 cls, 186 method: str, 187 url: str, 188 content: bytes | str = "", 189 headers: ( 190 Headers 191 | dict[str | bytes, str | bytes] 192 | dict[str, str] 193 | dict[bytes, bytes] 194 | Iterable[tuple[bytes, bytes]] 195 ) = (), 196 ) -> Request: 197 """Create a request from an URL 198 199 Args: 200 method (str): The request method (GET,POST,PUT,PATCH, DELETE,TRACE,...) 201 url (str): The request URL 202 content (bytes | str, optional): The request content. Defaults to "". 203 headers (Headers, optional): The request headers. Defaults to (). 204 205 Returns: 206 Request: The HTTP request 207 """ 208 scalpel_headers: Headers 209 match headers: 210 case Headers(): 211 scalpel_headers = headers 212 case dict(): 213 casted_headers = cast(dict[str | bytes, str | bytes], headers) 214 scalpel_headers = Headers( 215 ( 216 (always_bytes(key), always_bytes(val)) 217 for key, val in casted_headers.items() 218 ) 219 ) 220 case _: 221 scalpel_headers = Headers(headers) 222 223 scheme, host, port, path = Request._parse_url(url) 224 http_version = "HTTP/1.1" 225 226 # Inferr missing Host header from URL 227 host_header = scalpel_headers.get("Host") 228 if host_header is None: 229 match (scheme, port): 230 case ("http", 80) | ("https", 443): 231 host_header = host 232 case _: 233 host_header = f"{host}:{port}" 234 235 scalpel_headers["Host"] = host_header 236 237 authority: str = host_header 238 encoded_content = always_bytes(content) 239 240 assert isinstance(host, str) 241 242 return cls( 243 method=method, 244 scheme=scheme, 245 host=host, 246 port=port, 247 path=path, 248 http_version=http_version, 249 headers=scalpel_headers, 250 authority=authority, 251 content=encoded_content, 252 )
Create a request from an URL
Args: method (str): The request method (GET,POST,PUT,PATCH, DELETE,TRACE,...) url (str): The request URL content (bytes | str, optional): The request content. Defaults to "". headers (Headers, optional): The request headers. Defaults to ().
Returns: Request: The HTTP request
254 @classmethod 255 def from_burp( 256 cls, request: IHttpRequest, service: IHttpService | None = None 257 ) -> Request: # pragma: no cover (uses Java API) 258 """Construct an instance of the Request class from a Burp suite HttpRequest. 259 :param request: The Burp suite HttpRequest to convert. 260 :return: A Request with the same data as the Burp suite HttpRequest. 261 """ 262 service = service or request.httpService() 263 body = get_bytes(request.body()) 264 265 # Burp will give you lowercased and pseudo headers when using HTTP/2. 266 # https://portswigger.net/burp/documentation/desktop/http2/http2-normalization-in-the-message-editor#sending-requests-without-any-normalization:~:text=are%20converted%20to-,lowercase,-. 267 # https://blog.yaakov.online/http-2-header-casing/ 268 headers: Headers = Headers.from_burp(request.headers()) 269 270 # Burp gives a 0 length byte array body even when it doesn't exist, instead of null. 271 # Empty but existing bodies without a Content-Length header are lost in the process. 272 if not body and not headers.get("Content-Length"): 273 body = None 274 275 # request.url() gives a relative url for some reason 276 # So we have to parse and unparse to get the full path 277 # (path + parameters + query + fragment) 278 _, _, path, parameters, query, fragment = urllib.parse.urlparse(request.url()) 279 280 # Concatenate the path components 281 # Empty parameters,query and fragment are lost in the process 282 # e.g.: http://example.com;?# becomes http://example.com 283 # To use such an URL, the user must set the path directly 284 # To fix this we would need to write our own URL parser, which is a bit overkill for now. 285 path = urllib.parse.urlunparse(("", "", path, parameters, query, fragment)) 286 287 host = "" 288 port = 0 289 scheme = "http" 290 if service: 291 host = service.host() 292 port = service.port() 293 scheme = "https" if service.secure() else "http" 294 295 return cls( 296 method=request.method(), 297 scheme=scheme, 298 host=host, 299 port=port, 300 path=path, 301 http_version=request.httpVersion() or "HTTP/1.1", 302 headers=headers, 303 authority=headers.get(":authority") or headers.get("Host") or "", 304 content=body, 305 )
Construct an instance of the Request class from a Burp suite HttpRequest.
Parameters
- request: The Burp suite HttpRequest to convert.
Returns
A Request with the same data as the Burp suite HttpRequest.
344 def to_burp(self) -> IHttpRequest: # pragma: no cover 345 """Convert the request to a Burp suite :class:`IHttpRequest`. 346 :return: The request as a Burp suite :class:`IHttpRequest`. 347 """ 348 # Convert the request to a Burp ByteArray. 349 request_byte_array: IByteArray = PythonUtils.toByteArray(bytes(self)) 350 351 if self.port == 0: 352 # No networking information is available, so we build a plain network-less request. 353 return HttpRequest.httpRequest(request_byte_array) 354 355 # Build the Burp HTTP networking service. 356 service: IHttpService = HttpService.httpService( 357 self.host, self.port, self.scheme == "https" 358 ) 359 360 # Instantiate and return a new Burp HTTP request. 361 return HttpRequest.httpRequest(service, request_byte_array)
Convert the request to a Burp suite IHttpRequest
.
Returns
The request as a Burp suite
IHttpRequest
.
363 @classmethod 364 def from_raw( 365 cls, 366 data: bytes | str, 367 real_host: str = "", 368 port: int = 0, 369 scheme: Literal["http"] | Literal["https"] | str = "http", 370 ) -> Request: # pragma: no cover 371 """Construct an instance of the Request class from raw bytes. 372 :param data: The raw bytes to convert. 373 :param real_host: The real host to connect to. 374 :param port: The port of the request. 375 :param scheme: The scheme of the request. 376 :return: A :class:`Request` with the same data as the raw bytes. 377 """ 378 # Convert the raw bytes to a Burp ByteArray. 379 # We use the Burp API to trivialize the parsing of the request from raw bytes. 380 str_or_byte_array: IByteArray | str = ( 381 data if isinstance(data, str) else PythonUtils.toByteArray(data) 382 ) 383 384 # Handle the case where the networking informations are not provided. 385 if port == 0: 386 # Instantiate and return a new Burp HTTP request without networking informations. 387 burp_request: IHttpRequest = HttpRequest.httpRequest(str_or_byte_array) 388 else: 389 # Build the Burp HTTP networking service. 390 service: IHttpService = HttpService.httpService( 391 real_host, port, scheme == "https" 392 ) 393 394 # Instantiate a new Burp HTTP request with networking informations. 395 burp_request: IHttpRequest = HttpRequest.httpRequest( 396 service, str_or_byte_array 397 ) 398 399 # Construct the request from the Burp. 400 return cls.from_burp(burp_request)
Construct an instance of the Request class from raw bytes.
Parameters
- data: The raw bytes to convert.
- real_host: The real host to connect to.
- port: The port of the request.
- scheme: The scheme of the request.
Returns
A
Request
with the same data as the raw bytes.
402 @property 403 def url(self) -> str: 404 """ 405 The full URL string, constructed from `Request.scheme`, 406 `Request.host`, `Request.port` and `Request.path`. 407 408 Setting this property updates these attributes as well. 409 """ 410 return Request._unparse_url(self.scheme, self.host, self.port, self.path)
The full URL string, constructed from Request.scheme
,
Request.host
, Request.port
and Request.path
.
Setting this property updates these attributes as well.
427 @property 428 def query(self) -> URLEncodedFormView: 429 """The query string parameters as a dict-like object 430 431 Returns: 432 QueryParamsView: The query string parameters 433 """ 434 return URLEncodedFormView( 435 multidict.MultiDictView(self._get_query, self._set_query) 436 )
The query string parameters as a dict-like object
Returns: QueryParamsView: The query string parameters
491 @property 492 def content(self) -> bytes | None: 493 """The request content / body as raw bytes 494 495 Returns: 496 bytes | None: The content if it exists 497 """ 498 if self._serializer and self._has_deserialized_content_changed(): 499 self._update_deserialized_content(self._deserialized_content) 500 self._old_deserialized_content = deepcopy(self._deserialized_content) 501 502 self._update_content_length() 503 504 return self._content
The request content / body as raw bytes
Returns: bytes | None: The content if it exists
520 @property 521 def body(self) -> bytes | None: 522 """Alias for content() 523 524 Returns: 525 bytes | None: The request body / content 526 """ 527 return self.content
Alias for content()
Returns: bytes | None: The request body / content
533 def update_serializer_from_content_type( 534 self, 535 content_type: ImplementedContentType | str | None = None, 536 fail_silently: bool = False, 537 ): 538 """Update the form parsing based on the given Content-Type 539 540 Args: 541 content_type (ImplementedContentTypesTp | str | None, optional): The form content-type. Defaults to None. 542 fail_silently (bool, optional): Determine if an excpetion is raised when the content-type is unknown. Defaults to False. 543 544 Raises: 545 FormNotParsedException: Raised when the content-type is unknown. 546 """ 547 # Strip the boundary param so we can use our content-type to serializer map 548 _content_type: str = get_header_value_without_params( 549 content_type or self.headers.get("Content-Type") or "" 550 ) 551 552 serializer = None 553 if _content_type in IMPLEMENTED_CONTENT_TYPES: 554 serializer = CONTENT_TYPE_TO_SERIALIZER.get(_content_type) 555 556 if serializer is None: 557 if fail_silently: 558 serializer = self._serializer 559 else: 560 raise FormNotParsedException( 561 f"Unimplemented form content-type: {_content_type}" 562 ) 563 self._set_serializer(serializer)
Update the form parsing based on the given Content-Type
Args: content_type (ImplementedContentTypesTp | str | None, optional): The form content-type. Defaults to None. fail_silently (bool, optional): Determine if an excpetion is raised when the content-type is unknown. Defaults to False.
Raises: FormNotParsedException: Raised when the content-type is unknown.
565 @property 566 def content_type(self) -> str | None: 567 """The Content-Type header value. 568 569 Returns: 570 str | None: <=> self.headers.get("Content-Type") 571 """ 572 return self.headers.get("Content-Type")
The Content-Type header value.
Returns: str | None: <=> self.headers.get("Content-Type")
578 def create_defaultform( 579 self, 580 content_type: ImplementedContentType | str | None = None, 581 update_header: bool = True, 582 ) -> MutableMapping[Any, Any]: 583 """Creates the form if it doesn't exist, else returns the existing one 584 585 Args: 586 content_type (IMPLEMENTED_CONTENT_TYPES_TP | None, optional): The form content-type. Defaults to None. 587 update_header (bool, optional): Whether to update the header. Defaults to True. 588 589 Raises: 590 FormNotParsedException: Thrown when provided content-type has no implemented form-serializer 591 FormNotParsedException: Thrown when the raw content could not be parsed. 592 593 Returns: 594 MutableMapping[Any, Any]: The mapped form. 595 """ 596 if not self._is_form_initialized or content_type: 597 self.update_serializer_from_content_type(content_type) 598 599 # Set content-type if it does not exist 600 if (content_type and update_header) or not self.headers.get_all( 601 "Content-Type" 602 ): 603 self.headers["Content-Type"] = content_type 604 605 serializer = self._serializer 606 if serializer is None: 607 # This should probably never trigger here as it should already be raised by update_serializer_from_content_type 608 raise FormNotParsedException( 609 f"Form of content-type {self.content_type} not implemented." 610 ) 611 612 # Create default form. 613 if not self.content: 614 self._deserialized_content = serializer.get_empty_form(self) 615 elif self._deserialized_content is None: 616 self._deserialize_content() 617 618 if self._deserialized_content is None: 619 raise FormNotParsedException( 620 f"Could not parse content to {serializer.deserialized_type()}\nContent:{self._content}" 621 ) 622 623 if not isinstance(self._deserialized_content, serializer.deserialized_type()): 624 self._deserialized_content = serializer.get_empty_form(self) 625 626 self._is_form_initialized = True 627 return self._deserialized_content
Creates the form if it doesn't exist, else returns the existing one
Args: content_type (IMPLEMENTED_CONTENT_TYPES_TP | None, optional): The form content-type. Defaults to None. update_header (bool, optional): Whether to update the header. Defaults to True.
Raises: FormNotParsedException: Thrown when provided content-type has no implemented form-serializer FormNotParsedException: Thrown when the raw content could not be parsed.
Returns: MutableMapping[Any, Any]: The mapped form.
629 @property 630 def form(self) -> MutableMapping[Any, Any]: 631 """Mapping from content parsed accordingly to Content-Type 632 633 Raises: 634 FormNotParsedException: The content could not be parsed accordingly to Content-Type 635 636 Returns: 637 MutableMapping[Any, Any]: The mapped request form 638 """ 639 if not self._is_form_initialized: 640 self.update_serializer_from_content_type() 641 642 self.create_defaultform() 643 if self._deserialized_content is None: 644 raise FormNotParsedException() 645 646 self._is_form_initialized = True 647 return self._deserialized_content
Mapping from content parsed accordingly to Content-Type
Raises: FormNotParsedException: The content could not be parsed accordingly to Content-Type
Returns: MutableMapping[Any, Any]: The mapped request form
706 @property 707 def urlencoded_form(self) -> URLEncodedForm: 708 """The urlencoded form data 709 710 Converts the content to the urlencoded form format if needed. 711 Modification to this object will update Request.content and vice versa 712 713 Returns: 714 QueryParams: The urlencoded form data 715 """ 716 self._is_form_initialized = True 717 return cast( 718 URLEncodedForm, 719 self._update_serializer_and_get_form(URLEncodedFormSerializer()), 720 )
The urlencoded form data
Converts the content to the urlencoded form format if needed. Modification to this object will update Request.content and vice versa
Returns: QueryParams: The urlencoded form data
727 @property 728 def json_form(self) -> dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]: 729 """The JSON form data 730 731 Converts the content to the JSON form format if needed. 732 Modification to this object will update Request.content and vice versa 733 734 Returns: 735 dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]: The JSON form data 736 """ 737 self._is_form_initialized = True 738 if self._update_serializer_and_get_form(JSONFormSerializer()) is None: 739 serializer = cast(JSONFormSerializer, self._serializer) 740 self._deserialized_content = serializer.get_empty_form(self) 741 742 return self._deserialized_content
The JSON form data
Converts the content to the JSON form format if needed. Modification to this object will update Request.content and vice versa
Returns: dict[JSON_KEY_TYPES, JSON_VALUE_TYPES]: The JSON form data
775 @property 776 def multipart_form(self) -> MultiPartForm: 777 """The multipart form data 778 779 Converts the content to the multipart form format if needed. 780 Modification to this object will update Request.content and vice versa 781 782 Returns: 783 MultiPartForm 784 """ 785 self._is_form_initialized = True 786 787 # Keep boundary even if content-type has changed 788 if isinstance(self._deserialized_content, MultiPartForm): 789 return self._deserialized_content 790 791 # We do not have an existing form, so we have to ensure we have a content-type header with a boundary 792 self._ensure_multipart_content_type() 793 794 # Serialize the current form and try to parse it with the new serializer 795 form = self._update_serializer_and_get_form(MultiPartFormSerializer()) 796 serializer = cast(MultiPartFormSerializer, self._serializer) 797 798 # Set a default value 799 if not form: 800 self._deserialized_content = serializer.get_empty_form(self) 801 802 # get_empty_form() fails when the request doesn't have a valid Content-Type multipart/form-data with a boundary 803 if self._deserialized_content is None: 804 raise FormNotParsedException( 805 f"Could not parse content to {serializer.deserialized_type()}" 806 ) 807 808 return self._deserialized_content
The multipart form data
Converts the content to the multipart form format if needed. Modification to this object will update Request.content and vice versa
Returns: MultiPartForm
843 @property 844 def host_header(self) -> str | None: 845 """Host header value 846 847 Returns: 848 str | None: The host header value 849 """ 850 return self.headers.get("Host")
Host header value
Returns: str | None: The host header value
856 def text(self, encoding="utf-8") -> str: 857 """The decoded content 858 859 Args: 860 encoding (str, optional): encoding to use. Defaults to "utf-8". 861 862 Returns: 863 str: The decoded content 864 """ 865 if self.content is None: 866 return "" 867 868 return self.content.decode(encoding)
The decoded content
Args: encoding (str, optional): encoding to use. Defaults to "utf-8".
Returns: str: The decoded content
885 @property 886 def content_length(self) -> int: 887 """Returns the Content-Length header value 888 Returns 0 if the header is absent 889 890 Args: 891 value (int | str): The Content-Length value 892 893 Raises: 894 RuntimeError: Throws RuntimeError when the value is invalid 895 """ 896 content_length: str | None = self.headers.get("Content-Length") 897 if content_length is None: 898 return 0 899 900 trimmed = content_length.strip() 901 if not trimmed.isdigit(): 902 raise ValueError("Content-Length does not contain only digits") 903 904 return int(trimmed)
Returns the Content-Length header value Returns 0 if the header is absent
Args: value (int | str): The Content-Length value
Raises: RuntimeError: Throws RuntimeError when the value is invalid
919 @property 920 def pretty_host(self) -> str: 921 """Returns the most approriate host 922 Returns self.host when it exists, else it returns self.host_header 923 924 Returns: 925 str: The request target host 926 """ 927 return self.host or self.headers.get("Host") or ""
Returns the most approriate host Returns self.host when it exists, else it returns self.host_header
Returns: str: The request target host
929 def host_is(self, *patterns: str) -> bool: 930 """Perform wildcard matching (fnmatch) on the target host. 931 932 Args: 933 pattern (str): The pattern to use 934 935 Returns: 936 bool: Whether the pattern matches 937 """ 938 return host_is(self.pretty_host, *patterns)
Perform wildcard matching (fnmatch) on the target host.
Args: pattern (str): The pattern to use
Returns: bool: Whether the pattern matches
22class Response(MITMProxyResponse): 23 """A "Burp oriented" HTTP response class 24 25 26 This class allows to manipulate Burp responses in a Pythonic way. 27 28 Fields: 29 scheme: http or https 30 host: The initiating request target host 31 port: The initiating request target port 32 request: The initiating request. 33 """ 34 35 scheme: Literal["http", "https"] = "http" 36 host: str = "" 37 port: int = 0 38 request: Request | None = None 39 40 def __init__( 41 self, 42 http_version: bytes, 43 status_code: int, 44 reason: bytes, 45 headers: Headers | tuple[tuple[bytes, bytes], ...], 46 content: bytes | None, 47 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 48 scheme: Literal["http", "https"] = "http", 49 host: str = "", 50 port: int = 0, 51 ): 52 # Construct the base/inherited MITMProxy response. 53 super().__init__( 54 http_version, 55 status_code, 56 reason, 57 headers, 58 content, 59 trailers, 60 timestamp_start=time.time(), 61 timestamp_end=time.time(), 62 ) 63 self.scheme = scheme 64 self.host = host 65 self.port = port 66 67 @classmethod 68 # https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response 69 # link to mitmproxy documentation 70 def from_mitmproxy(cls, response: MITMProxyResponse) -> Response: 71 """Construct an instance of the Response class from a [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response). 72 :param response: The [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response) to convert. 73 :return: A :class:`Response` with the same data as the [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response). 74 """ 75 return cls( 76 always_bytes(response.http_version), 77 response.status_code, 78 always_bytes(response.reason), 79 Headers.from_mitmproxy(response.headers), 80 response.content, 81 Headers.from_mitmproxy(response.trailers) if response.trailers else None, 82 ) 83 84 @classmethod 85 def from_burp( 86 cls, 87 response: IHttpResponse, 88 service: IHttpService | None = None, 89 request: IHttpRequest | None = None, 90 ) -> Response: 91 """Construct an instance of the Response class from a Burp suite :class:`IHttpResponse`.""" 92 body = get_bytes(cast(IByteArray, response.body())) if response.body() else b"" 93 scalpel_response = cls( 94 always_bytes(response.httpVersion() or "HTTP/1.1"), 95 response.statusCode(), 96 always_bytes(response.reasonPhrase() or b""), 97 Headers.from_burp(response.headers()), 98 body, 99 None, 100 ) 101 102 burp_request: IHttpRequest | None = request 103 if burp_request is None: 104 try: 105 # Some responses can have a "initiatingRequest" field. 106 # https://portswigger.github.io/burp-extensions-montoya-api/javadoc/burp/api/montoya/http/handler/HttpResponseReceived.html#initiatingRequest():~:text=HttpRequest-,initiatingRequest(),-Returns%3A 107 burp_request = response.initiatingRequest() # type: ignore 108 except AttributeError: 109 pass 110 111 if burp_request: 112 scalpel_response.request = Request.from_burp(burp_request, service) 113 114 if not service and burp_request: 115 # The only way to check if the Java method exist without writing Java is catching the error. 116 service = burp_request.httpService() 117 118 if service: 119 scalpel_response.scheme = "https" if service.secure() else "http" 120 scalpel_response.host = service.host() 121 scalpel_response.port = service.port() 122 123 return scalpel_response 124 125 def __bytes__(self) -> bytes: 126 """Convert the response to raw bytes.""" 127 # Reserialize the response to bytes. 128 129 # Format the first line of the response. (e.g. "HTTP/1.1 200 OK\r\n") 130 first_line = ( 131 b" ".join( 132 always_bytes(s) 133 for s in (self.http_version, str(self.status_code), self.reason) 134 ) 135 + b"\r\n" 136 ) 137 138 # Format the response's headers part. 139 headers_lines = b"".join( 140 b"%s: %s\r\n" % (key, val) for key, val in self.headers.fields 141 ) 142 143 # Set a default value for the response's body. (None -> b"") 144 body = self.content or b"" 145 146 # Build the whole response and return it. 147 return first_line + headers_lines + b"\r\n" + body 148 149 def to_burp(self) -> IHttpResponse: # pragma: no cover (uses Java API) 150 """Convert the response to a Burp suite :class:`IHttpResponse`.""" 151 response_byte_array: IByteArray = PythonUtils.toByteArray(bytes(self)) 152 153 return HttpResponse.httpResponse(response_byte_array) 154 155 @classmethod 156 def from_raw( 157 cls, data: bytes | str 158 ) -> Response: # pragma: no cover (uses Java API) 159 """Construct an instance of the Response class from raw bytes. 160 :param data: The raw bytes to convert. 161 :return: A :class:`Response` parsed from the raw bytes. 162 """ 163 # Use the Burp API to trivialize the parsing of the response from raw bytes. 164 # Convert the raw bytes to a Burp ByteArray. 165 # Plain strings are OK too. 166 str_or_byte_array: IByteArray | str = ( 167 data if isinstance(data, str) else PythonUtils.toByteArray(data) 168 ) 169 170 # Instantiate a new Burp HTTP response. 171 burp_response: IHttpResponse = HttpResponse.httpResponse(str_or_byte_array) 172 173 return cls.from_burp(burp_response) 174 175 @classmethod 176 def make( 177 cls, 178 status_code: int = 200, 179 content: bytes | str = b"", 180 headers: Headers | tuple[tuple[bytes, bytes], ...] = (), 181 host: str = "", 182 port: int = 0, 183 scheme: Literal["http", "https"] = "http", 184 ) -> "Response": 185 # Use the base/inherited make method to construct a MITMProxy response. 186 mitmproxy_res = MITMProxyResponse.make(status_code, content, headers) 187 188 res = cls.from_mitmproxy(mitmproxy_res) 189 res.host = host 190 res.scheme = scheme 191 res.port = port 192 193 return res 194 195 def host_is(self, *patterns: str) -> bool: 196 """Matches the host against the provided patterns 197 198 Returns: 199 bool: Whether at least one pattern matched 200 """ 201 return host_is(self.host, *patterns) 202 203 @property 204 def body(self) -> bytes | None: 205 """Alias for content() 206 207 Returns: 208 bytes | None: The request body / content 209 """ 210 return self.content 211 212 @body.setter 213 def body(self, val: bytes | None): 214 self.content = val
A "Burp oriented" HTTP response class
This class allows to manipulate Burp responses in a Pythonic way.
Fields: scheme: http or https host: The initiating request target host port: The initiating request target port request: The initiating request.
40 def __init__( 41 self, 42 http_version: bytes, 43 status_code: int, 44 reason: bytes, 45 headers: Headers | tuple[tuple[bytes, bytes], ...], 46 content: bytes | None, 47 trailers: Headers | tuple[tuple[bytes, bytes], ...] | None, 48 scheme: Literal["http", "https"] = "http", 49 host: str = "", 50 port: int = 0, 51 ): 52 # Construct the base/inherited MITMProxy response. 53 super().__init__( 54 http_version, 55 status_code, 56 reason, 57 headers, 58 content, 59 trailers, 60 timestamp_start=time.time(), 61 timestamp_end=time.time(), 62 ) 63 self.scheme = scheme 64 self.host = host 65 self.port = port
67 @classmethod 68 # https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response 69 # link to mitmproxy documentation 70 def from_mitmproxy(cls, response: MITMProxyResponse) -> Response: 71 """Construct an instance of the Response class from a [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response). 72 :param response: The [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response) to convert. 73 :return: A :class:`Response` with the same data as the [mitmproxy.http.HTTPResponse](https://docs.mitmproxy.org/stable/api/mitmproxy/http.html#Response). 74 """ 75 return cls( 76 always_bytes(response.http_version), 77 response.status_code, 78 always_bytes(response.reason), 79 Headers.from_mitmproxy(response.headers), 80 response.content, 81 Headers.from_mitmproxy(response.trailers) if response.trailers else None, 82 )
Construct an instance of the Response class from a mitmproxy.http.HTTPResponse.
Parameters
- **response: The mitmproxy.http.HTTPResponse to convert.
Returns
A
Response
with the same data as the mitmproxy.http.HTTPResponse.
84 @classmethod 85 def from_burp( 86 cls, 87 response: IHttpResponse, 88 service: IHttpService | None = None, 89 request: IHttpRequest | None = None, 90 ) -> Response: 91 """Construct an instance of the Response class from a Burp suite :class:`IHttpResponse`.""" 92 body = get_bytes(cast(IByteArray, response.body())) if response.body() else b"" 93 scalpel_response = cls( 94 always_bytes(response.httpVersion() or "HTTP/1.1"), 95 response.statusCode(), 96 always_bytes(response.reasonPhrase() or b""), 97 Headers.from_burp(response.headers()), 98 body, 99 None, 100 ) 101 102 burp_request: IHttpRequest | None = request 103 if burp_request is None: 104 try: 105 # Some responses can have a "initiatingRequest" field. 106 # https://portswigger.github.io/burp-extensions-montoya-api/javadoc/burp/api/montoya/http/handler/HttpResponseReceived.html#initiatingRequest():~:text=HttpRequest-,initiatingRequest(),-Returns%3A 107 burp_request = response.initiatingRequest() # type: ignore 108 except AttributeError: 109 pass 110 111 if burp_request: 112 scalpel_response.request = Request.from_burp(burp_request, service) 113 114 if not service and burp_request: 115 # The only way to check if the Java method exist without writing Java is catching the error. 116 service = burp_request.httpService() 117 118 if service: 119 scalpel_response.scheme = "https" if service.secure() else "http" 120 scalpel_response.host = service.host() 121 scalpel_response.port = service.port() 122 123 return scalpel_response
Construct an instance of the Response class from a Burp suite IHttpResponse
.
149 def to_burp(self) -> IHttpResponse: # pragma: no cover (uses Java API) 150 """Convert the response to a Burp suite :class:`IHttpResponse`.""" 151 response_byte_array: IByteArray = PythonUtils.toByteArray(bytes(self)) 152 153 return HttpResponse.httpResponse(response_byte_array)
Convert the response to a Burp suite IHttpResponse
.
155 @classmethod 156 def from_raw( 157 cls, data: bytes | str 158 ) -> Response: # pragma: no cover (uses Java API) 159 """Construct an instance of the Response class from raw bytes. 160 :param data: The raw bytes to convert. 161 :return: A :class:`Response` parsed from the raw bytes. 162 """ 163 # Use the Burp API to trivialize the parsing of the response from raw bytes. 164 # Convert the raw bytes to a Burp ByteArray. 165 # Plain strings are OK too. 166 str_or_byte_array: IByteArray | str = ( 167 data if isinstance(data, str) else PythonUtils.toByteArray(data) 168 ) 169 170 # Instantiate a new Burp HTTP response. 171 burp_response: IHttpResponse = HttpResponse.httpResponse(str_or_byte_array) 172 173 return cls.from_burp(burp_response)
Construct an instance of the Response class from raw bytes.
Parameters
- data: The raw bytes to convert.
Returns
A
Response
parsed from the raw bytes.
175 @classmethod 176 def make( 177 cls, 178 status_code: int = 200, 179 content: bytes | str = b"", 180 headers: Headers | tuple[tuple[bytes, bytes], ...] = (), 181 host: str = "", 182 port: int = 0, 183 scheme: Literal["http", "https"] = "http", 184 ) -> "Response": 185 # Use the base/inherited make method to construct a MITMProxy response. 186 mitmproxy_res = MITMProxyResponse.make(status_code, content, headers) 187 188 res = cls.from_mitmproxy(mitmproxy_res) 189 res.host = host 190 res.scheme = scheme 191 res.port = port 192 193 return res
Simplified API for creating response objects.
195 def host_is(self, *patterns: str) -> bool: 196 """Matches the host against the provided patterns 197 198 Returns: 199 bool: Whether at least one pattern matched 200 """ 201 return host_is(self.host, *patterns)
Matches the host against the provided patterns
Returns: bool: Whether at least one pattern matched
203 @property 204 def body(self) -> bytes | None: 205 """Alias for content() 206 207 Returns: 208 bytes | None: The request body / content 209 """ 210 return self.content
Alias for content()
Returns: bytes | None: The request body / content
Inherited Members
- _internal_mitmproxy.http.Response
- data
- status_code
- reason
- refresh
- _internal_mitmproxy.http.Message
- from_state
- get_state
- set_state
- stream
- http_version
- is_http10
- is_http11
- is_http2
- headers
- trailers
- raw_content
- content
- text
- set_content
- get_content
- set_text
- get_text
- timestamp_start
- timestamp_end
- decode
- encode
- json
- _internal_mitmproxy.coretypes.serializable.Serializable
- copy
10class Flow: 11 """Contains request and response and some utilities for match()""" 12 13 def __init__( 14 self, 15 scheme: Literal["http", "https"] = "http", 16 host: str = "", 17 port: int = 0, 18 request: Request | None = None, 19 response: Response | None = None, 20 text: bytes | None = None, 21 ): 22 self.scheme = scheme 23 self.host = host 24 self.port = port 25 self.request = request 26 self.response = response 27 self.text = text 28 29 def host_is(self, *patterns: str) -> bool: 30 """Matches a wildcard pattern against the target host 31 32 Returns: 33 bool: True if at least one pattern matched 34 """ 35 return host_is(self.host, *patterns) 36 37 def path_is(self, *patterns: str) -> bool: 38 """Matches a wildcard pattern against the request path 39 40 Includes query string `?` and fragment `#` 41 42 Returns: 43 bool: True if at least one pattern matched 44 """ 45 req = self.request 46 if req is None: 47 return False 48 49 return req.path_is(*patterns)
Contains request and response and some utilities for match()
13 def __init__( 14 self, 15 scheme: Literal["http", "https"] = "http", 16 host: str = "", 17 port: int = 0, 18 request: Request | None = None, 19 response: Response | None = None, 20 text: bytes | None = None, 21 ): 22 self.scheme = scheme 23 self.host = host 24 self.port = port 25 self.request = request 26 self.response = response 27 self.text = text
29 def host_is(self, *patterns: str) -> bool: 30 """Matches a wildcard pattern against the target host 31 32 Returns: 33 bool: True if at least one pattern matched 34 """ 35 return host_is(self.host, *patterns)
Matches a wildcard pattern against the target host
Returns: bool: True if at least one pattern matched
37 def path_is(self, *patterns: str) -> bool: 38 """Matches a wildcard pattern against the request path 39 40 Includes query string `?` and fragment `#` 41 42 Returns: 43 bool: True if at least one pattern matched 44 """ 45 req = self.request 46 if req is None: 47 return False 48 49 return req.path_is(*patterns)
Matches a wildcard pattern against the request path
Includes query string ?
and fragment #
Returns: bool: True if at least one pattern matched
The Scalpel Python execution context
Contains the Burp Java API object, the venv directory, the user script path, the path to the file loading the user script and a logging object
6class Context(TypedDict): 7 """Scalpel Python execution context""" 8 9 API: Any 10 """ 11 The Burp [Montoya API] 12 (https://portswigger.github.io/burp-extensions-montoya-api/javadoc/burp/api/montoya/MontoyaApi.html) 13 root object. 14 15 Allows you to interact with Burp by directly manipulating the Java object. 16 17 """ 18 19 directory: str 20 """The framework directory""" 21 22 user_script: str 23 """The loaded script path""" 24 25 framework: str 26 """The framework (loader script) path""" 27 28 venv: str 29 """The venv the script was loaded in"""
Scalpel Python execution context
The Burp [Montoya API] (https://portswigger.github.io/burp-extensions-montoya-api/javadoc/burp/api/montoya/MontoyaApi.html) root object.
Allows you to interact with Burp by directly manipulating the Java object.
12def editor(mode: EditorMode): 13 """Decorator to specify the editor type for a given hook 14 15 This can be applied to a req_edit_in / res_edit_in hook declaration to specify the editor that should be displayed in Burp 16 17 Example: 18 ```py 19 @editor("hex") 20 def req_edit_in(req: Request) -> bytes | None: 21 return bytes(req) 22 ``` 23 This displays the request in an hex editor. 24 25 Currently, the only modes supported are `"raw"`, `"hex"`, `"octal"`, `"binary"` and `"decimal"`. 26 27 28 Args: 29 mode (EDITOR_MODE): The editor mode (raw, hex,...) 30 """ 31 32 if mode not in EDITOR_MODES: 33 raise ValueError(f"Argument must be one of {EDITOR_MODES}") 34 35 def decorator(hook: Callable): 36 hook.__annotations__["scalpel_editor_mode"] = mode 37 return hook 38 39 return decorator
Decorator to specify the editor type for a given hook
This can be applied to a req_edit_in / res_edit_in hook declaration to specify the editor that should be displayed in Burp
Example:
@editor("hex")
def req_edit_in(req: Request) -> bytes | None:
return bytes(req)
This displays the request in an hex editor.
Currently, the only modes supported are "raw"
, "hex"
, "octal"
, "binary"
and "decimal"
.
Args: mode (EDITOR_MODE): The editor mode (raw, hex,...)
8class Logger: # pragma: no cover 9 """Provides methods for logging messages to the Burp Suite output and standard streams.""" 10 11 def all(self, msg: str): 12 """Prints the message to the standard output 13 14 Args: 15 msg (str): The message to print 16 """ 17 print(f"(default): {msg}") 18 19 def trace(self, msg: str): 20 """Prints the message to the standard output 21 22 Args: 23 msg (str): The message to print 24 """ 25 print(f"(default): {msg}") 26 27 def debug(self, msg: str): 28 """Prints the message to the standard output 29 30 Args: 31 msg (str): The message to print 32 """ 33 print(f"(default): {msg}") 34 35 def info(self, msg: str): 36 """Prints the message to the standard output 37 38 Args: 39 msg (str): The message to print 40 """ 41 print(f"(default): {msg}") 42 43 def warn(self, msg: str): 44 """Prints the message to the standard output 45 46 Args: 47 msg (str): The message to print 48 """ 49 print(f"(default): {msg}") 50 51 def fatal(self, msg: str): 52 """Prints the message to the standard output 53 54 Args: 55 msg (str): The message to print 56 """ 57 print(f"(default): {msg}") 58 59 def error(self, msg: str): 60 """Prints the message to the standard error 61 62 Args: 63 msg (str): The message to print 64 """ 65 print(f"(default): {msg}", file=sys.stderr)
Provides methods for logging messages to the Burp Suite output and standard streams.
11 def all(self, msg: str): 12 """Prints the message to the standard output 13 14 Args: 15 msg (str): The message to print 16 """ 17 print(f"(default): {msg}")
Prints the message to the standard output
Args: msg (str): The message to print
19 def trace(self, msg: str): 20 """Prints the message to the standard output 21 22 Args: 23 msg (str): The message to print 24 """ 25 print(f"(default): {msg}")
Prints the message to the standard output
Args: msg (str): The message to print
27 def debug(self, msg: str): 28 """Prints the message to the standard output 29 30 Args: 31 msg (str): The message to print 32 """ 33 print(f"(default): {msg}")
Prints the message to the standard output
Args: msg (str): The message to print
35 def info(self, msg: str): 36 """Prints the message to the standard output 37 38 Args: 39 msg (str): The message to print 40 """ 41 print(f"(default): {msg}")
Prints the message to the standard output
Args: msg (str): The message to print
43 def warn(self, msg: str): 44 """Prints the message to the standard output 45 46 Args: 47 msg (str): The message to print 48 """ 49 print(f"(default): {msg}")
Prints the message to the standard output
Args: msg (str): The message to print