|
|
@@ -0,0 +1,150 @@ |
|
|
|
#!/usr/bin/env python3 |
|
|
|
import enum |
|
|
|
import hashlib |
|
|
|
import sys |
|
|
|
|
|
|
|
|
|
|
|
class ParserState(enum.Enum): |
|
|
|
NONE = 0 |
|
|
|
DICTIONARY = 1 |
|
|
|
LIST = 2 |
|
|
|
|
|
|
|
|
|
|
|
class Placeholder: |
|
|
|
def __init__(self, s): |
|
|
|
self._s = s |
|
|
|
|
|
|
|
def __repr__(self): |
|
|
|
return self._s |
|
|
|
|
|
|
|
|
|
|
|
dictEntry = Placeholder('dict') |
|
|
|
listEntry = Placeholder('list') |
|
|
|
|
|
|
|
|
|
|
|
class CopyingFileReader: |
|
|
|
'''All reads to the underlying file-like object are copied to write, which must be a callable accepting the data.''' |
|
|
|
|
|
|
|
def __init__(self, fp, write): |
|
|
|
self.fp = fp |
|
|
|
self.write = write |
|
|
|
|
|
|
|
def read(self, *args, **kwargs): |
|
|
|
data = self.fp.read(*args, **kwargs) |
|
|
|
self.write(data) |
|
|
|
return data |
|
|
|
|
|
|
|
|
|
|
|
def read_str(fp, c): |
|
|
|
'''Reads a string from the current position with c being the first character of the length (having been read already)''' |
|
|
|
length = read_int(fp, c, end = b':') |
|
|
|
s = fp.read(length) |
|
|
|
if len(s) != length: |
|
|
|
raise ValueError |
|
|
|
try: |
|
|
|
s = s.decode('utf-8') |
|
|
|
except UnicodeDecodeError: |
|
|
|
pass |
|
|
|
return s |
|
|
|
|
|
|
|
|
|
|
|
def read_int(fp, c = b'', end = b'e'): |
|
|
|
'''Reads an int from the current position with c optionally being the first digit''' |
|
|
|
i = c |
|
|
|
while True: |
|
|
|
c = fp.read(1) |
|
|
|
if c == end: |
|
|
|
break |
|
|
|
elif c in b'0123456789': |
|
|
|
i += c |
|
|
|
else: |
|
|
|
raise ValueError |
|
|
|
i = int(i.decode('ascii')) |
|
|
|
return i |
|
|
|
|
|
|
|
|
|
|
|
def read_or_stack_value(fp, stateStack, c = b''): |
|
|
|
if not c: |
|
|
|
c = fp.read(1) |
|
|
|
if c == b'l': |
|
|
|
stateStack.append(ParserState.LIST) |
|
|
|
return listEntry |
|
|
|
elif c == b'd': |
|
|
|
stateStack.append(ParserState.DICTIONARY) |
|
|
|
return dictEntry |
|
|
|
elif c == b'i': |
|
|
|
return read_int(fp) |
|
|
|
elif c in b'0123456789': # String value |
|
|
|
return read_str(fp, c) |
|
|
|
else: |
|
|
|
raise ValueError |
|
|
|
|
|
|
|
|
|
|
|
def bdecode(fp, display = False, infoWrite = None): |
|
|
|
c = fp.read(1) |
|
|
|
if c != b'd': |
|
|
|
raise ValueError |
|
|
|
stateStack = [ParserState.NONE, ParserState.DICTIONARY] |
|
|
|
inInfo = False |
|
|
|
print_ = print if display else (lambda *args, **kwargs: None) |
|
|
|
print_(f'(global): {dictEntry}') |
|
|
|
while stateStack: |
|
|
|
state = stateStack[-1] |
|
|
|
indent = ' ' * (len(stateStack) - 1) |
|
|
|
if state == ParserState.DICTIONARY: |
|
|
|
c = fp.read(1) |
|
|
|
if c == b'e': # End of dict |
|
|
|
stateStack.pop(-1) |
|
|
|
if len(stateStack) == 2 and inInfo and infoWrite: |
|
|
|
inInfo = False |
|
|
|
fp = fp.fp |
|
|
|
continue |
|
|
|
elif c in b'0123456789': # Key |
|
|
|
key = read_str(fp, c) |
|
|
|
if len(stateStack) == 2 and key == b'info' and infoWrite: # If in global dict and this is the 'info' value and a copy is desired... |
|
|
|
inInfo = True |
|
|
|
fp = CopyingFileReader(fp, infoWrite) |
|
|
|
v = read_or_stack_value(fp, stateStack) |
|
|
|
print_(f'{indent}{key!r}: {v!r}') |
|
|
|
else: |
|
|
|
raise ValueError |
|
|
|
elif state == ParserState.LIST: |
|
|
|
c = fp.read(1) |
|
|
|
if c == b'e': |
|
|
|
stateStack.pop(-1) |
|
|
|
continue |
|
|
|
else: |
|
|
|
v = read_or_stack_value(fp, stateStack, c) |
|
|
|
print_(f'{indent}- {v!r}') |
|
|
|
elif state == ParserState.NONE: |
|
|
|
assert len(stateStack) == 1 |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
def print_torrent(fp): |
|
|
|
bdecode(fp, display = True) |
|
|
|
|
|
|
|
|
|
|
|
def get_info_hash(fp): |
|
|
|
hasher = hashlib.sha1() |
|
|
|
bdecode(fp, infoWrite = hasher.update) |
|
|
|
return hasher.hexdigest() |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
if len(sys.argv) < 3 or sys.argv[1] not in ('print', 'infohash'): |
|
|
|
print('Usage: torrent-tiny MODE FILE [FILE...]', file = sys.stderr) |
|
|
|
print('MODEs: print, infohash', file = sys.stderr) |
|
|
|
sys.exit(1) |
|
|
|
|
|
|
|
mode = sys.argv[1] |
|
|
|
for fn in sys.argv[2:]: |
|
|
|
with open(fn, 'rb') as fp: |
|
|
|
if mode == 'print': |
|
|
|
print_torrent(fp) |
|
|
|
elif mode == 'infohash': |
|
|
|
print(get_info_hash(fp)) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
main() |