happybot/sed.py

580 lines
15 KiB
Python

#!/usr/bin/env python3
# How to read a line.
def deirc(nick, line):
action = False
if len(line) <= 3:
return action, nick, line
if line[0] == '\u200b':
if line[1] == '<':
try:
close = line.index('>')
assert(line[close+1] == ' ')
assert(not any(map(lambda x: x.isspace(), line[2:close])))
nick = line[2:close]
line = line[close+2:]
except:
pass
elif line[1:3] == '* ':
try:
close = line[3:].index(' ') + 3
assert(not any(map(lambda x: x.isspace(), line[3:close])))
nick = line[3:close]
line = line[close+1:]
action = True
except:
pass
elif line[:8] == '\x01ACTION ' and line[-1] == '\x01':
action = True
line = line[8:-1]
# Redact the nologs.
if line.startswith('[nolog]') or line.startswith('nolog:'):
line = '[REDACTED]'
return action, nick, line
# Set flags and funcs.
from sys import argv
if len(argv) != 4:
print('Usage: [Sspy] in out')
exit(1)
_, flags, fin, fout = argv
methods = []
# Read the outfile.
class ReadOut:
def __init__(self, fout=fout):
self.file = open(fout, 'rb')
self.first = True
def cat(self):
self.first = False
for line in self.file:
yield str(line[:-1], 'utf-8', 'ignore')
def tac(self):
if self.first:
self.first = False
self.file.seek(0, 2)
buffer = b''
if self.file.tell():
self.file.seek(self.file.tell() - 1)
if self.file.read(1) == b'\n':
self.file.seek(self.file.tell() - 1)
while self.file.tell():
self.file.seek(self.file.tell() - 1)
char = self.file.read(1)
if char == b'\n':
yield str(buffer[::-1], 'utf-8', 'ignore')
buffer = b''
else:
buffer += char
self.file.seek(self.file.tell() - 1)
if buffer:
yield str(buffer[::-1], 'utf-8', 'ignore')
def close(self):
self.file.close()
# Define the functions!
def build_replacer(replace):
print('| Building replacement for:', replace)
rep = [] # strings
ref = [] # references
maximum = 0
i = iter(replace)
buf = ''
for c in i:
if c == '\\':
try:
c = next(i)
except:
return None
if c == 'x':
try:
c = next(i) + next(i)
except:
print('| | Illegal "\\xNN", not enough characters?')
return None
try:
buf += chr(int(c, 16))
except:
buf += '\\x' + c
elif c == 'u':
try:
c = next(i) + next(i) + next(i) + next(i)
except:
print('| | Illegal "\\uNNNN", not enough characters?')
return None
try:
buf += chr(int(c, 16))
except:
buf += '\\u' + c
elif c.isdigit():
rep.append(buf)
buf = ''
c = int(c)
ref.append((c,False))
if c > maximum:
maximum = c
elif c == '(':
rep.append(buf)
buf = ''
for c in i:
if c == ')':
try:
buf = int(buf)
if buf < 0:
print('| | Illegal group, less than zero:', buf)
return None
ref.append((buf, False))
if buf > maximum:
maximum = buf
except:
ref.append((buf, False))
buf = ''
else:
buf += c
if len(rep) != len(ref):
print('| | Error, unbalanced parentheses to backreference.')
# Unbalanced parens.
return None
elif c == ')':
rep.append(buf)
buf = ''
for c in i:
if c == '(':
try:
buf = int(buf)
if buf < 0:
print('| | Illegal group, less than zero:', buf)
return None
ref.append((buf, True))
if buf > maximum:
maximum = buf
except:
ref.append((buf, True))
buf = ''
else:
buf += c
if len(rep) != len(ref):
print('| | Error, unbalanced parentheses to reverse of backreference.')
# Unbalanced parens.
return None
elif c in '&\\/':
buf += c
else:
buf += '\\' + c
elif c == '&':
rep.append(buf)
buf = ''
ref.append((0, False))
else:
buf += c
rep.append(buf)
def replacer(zero, numbered, named):
print('| Replacing...')
if maximum > len(numbered):
return None
numbered = [zero] + list(numbered)
result = ''
for i, (code, rev) in enumerate(ref):
result += rep[i] # add non-reference
if isinstance(code, int):
try:
addition = numbered[code]
except:
print('| | Numbered group failed:', code)
return None
else:
try:
addition = named[bytes(code, 'utf-8')]
except:
print('| | Named group failed:', code)
return None
if addition is None:
addition = ''
elif rev:
addition = addition[::-1]
result += addition
result += rep[-1] # add final non-reference
print('| | ...success!')
return result
return replacer
try:
from re2 import compile as regex
except ImportError:
print('Warning: Using re and not re2.')
from re import compile as regex
sed_match = regex(r's/((?:\\.|[^/])*)/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
def sed_test(nick, line):
# Is it a command?
match = sed_match.match(line)
if not match:
return None
search, replace, who, back = match.groups()
replace = build_replacer(replace)
if not replace:
return None
return search, replace, nick, who, back
def sed_method(search, replace, nick, who, back):
# Some things to fix.
if not back:
back = 0
else:
back = int(back)
fuzzy = True
if not who:
fuzzy = False
who = nick
elif who == 'g':
who = ''
who = who.lower()
# Turn it into a regex.
try:
search = regex(search)
except:
return None
# Now it is time to try to sed.
log = ReadOut()
for line in log.tac():
_, _, nick, line = line.split(' ', 3)
nick = nick[1:-1]
skip = False
for test, _ in methods:
if test(nick, line):
skip = True
break
if skip:
continue
action, nick, line = deirc(nick, line)
match_nick = nick.lower().replace('*', '')
if fuzzy and not match_nick.startswith(who):
continue
elif not fuzzy and match_nick != who:
continue
if action:
action = False
line = '\x01ACTION ' + line + '\x01'
if not search.search(line):
continue
if back != 0:
back -= 1
continue
log.close()
break
else:
log.close()
return None
result = ''
prev = 0
for match in search.finditer(line):
start, end = match.span()
result += line[prev:start]
prev = end
replacement = replace(line[start:end], match.groups(), match.groupdict())
if replacement is None:
return None
result += replacement
result += line[prev:]
if result[:8] == '\x01ACTION ' and result[-1] == '\x01':
action = True
result = result[8:-1]
log.close()
return action, nick + '*', result
find_match = regex(r'p([\+-]\d+)?/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
def find_test(nick, line):
# Is it a command?
match = find_match.match(line)
if not match:
return None
local, search, who, back = match.groups()
return local, search, nick, who, back
def find_method(local, search, nick, who, back):
# Some things to fix.
if not back:
back = 0
else:
back = int(back)
fuzzy = True
if not who:
fuzzy = False
who = nick
elif who == 'g':
who = ''
who = who.lower()
if not local:
local = 0
else:
local = int(local)
# Turn it into a regex.
search = regex(search)
# Now it is time to try to sed.
log = ReadOut()
for line in log.tac():
_, _, nick, line = line.split(' ', 3)
nick = nick[1:-1]
skip = False
for test, _ in methods:
if test(nick, line):
skip = True
break
if skip:
continue
action, nick, line = deirc(nick, line)
match_nick = nick.lower().replace('*', '')
if fuzzy and not match_nick.startswith(who):
continue
elif not fuzzy and match_nick != who:
continue
if action:
match_line = '\x01ACTION ' + line + '\x01'
else:
match_line = line
if not search.search(match_line):
continue
if back != 0:
back -= 1
continue
break
else:
log.close()
return None
if local == 0:
log.close()
return action, nick, line
elif local > 0:
for line in log.cat():
if local == 0:
_, _, nick, line = line.split(' ', 3)
return deirc(nick[1:-1], line)
local -= 1
elif local < 0:
for line in log.tac():
local += 1
if local == 0:
_, _, nick, line = line.split(' ', 3)
return deirc(nick[1:-1], line)
# Should never happen.
return None
def tr_expand(string):
i = iter(string)
result = ''
prev = ''
for char in i:
if prev == '\\' and char in '-\\/':
prev = ''
elif char == '-' and prev:
a = ord(prev)
b = None
try:
b = ord(next(i))
if a < b:
prev = ''.join(map(chr, range(a, b + 1)))
elif a > b:
prev = ''.join(map(chr, reversed(range(b, a + 1))))
char = ''
except:
result += prev
prev = char
if b:
char = chr(b)
else:
char = ''
continue
result += prev
prev = char
result += char
return result
tr_match = regex(r'y/((?:\\.|[^/])*)/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
def tr_test(nick, line):
# Is it a command?
match = tr_match.match(line)
if not match:
return None
start, finish, who, back = match.groups()
start = tr_expand(start)
if not start:
return None
finish = tr_expand(finish)
if not finish:
return None
if len(start) != len(finish):
return None
return start, finish, nick, who, back
def tr_method(start, finish, nick, who, back):
# Some things to fix.
translate = {a: b for (a, b) in zip(start, finish)}
if not back:
back = 0
else:
back = int(back)
fuzzy = True
if not who:
fuzzy = False
who = nick
elif who == 'g':
who = ''
who = who.lower()
# Now it is time to try to tr.
log = ReadOut()
for line in log.tac():
_, _, nick, line = line.split(' ', 3)
nick = nick[1:-1]
skip = False
for test, _ in methods:
if test(nick, line):
skip = True
break
if skip:
continue
action, nick, line = deirc(nick, line)
match_nick = nick.lower().replace('*', '')
if fuzzy and not match_nick.startswith(who):
continue
elif not fuzzy and match_nick != who:
continue
result = ''
for c in line:
result += translate.get(c, c)
if result == line:
continue
if back != 0:
back -= 1
continue
log.close()
break
else:
log.close()
return None
return action, nick + '*', result
# Flags! Which do we enable? What do we append?
for flag, test, method in (
('s', sed_test, sed_method),
('p', find_test, find_method),
('y', tr_test, tr_method)
):
if flag.lower() in flags:
methods.append((test, method))
elif flag.upper() in flags:
methods.append((test, lambda *a: None))
# Execute a command.
from subprocess import Popen, PIPE
def cmd(args):
proc = Popen(args, stdout=PIPE)
while True:
line = proc.stdout.readline()
if line:
try:
yield str(line[:-1], 'utf-8', 'ignore')
except:
pass
else:
break
# Do the thing!
from time import sleep
def begin():
for line in cmd(['tail', '-n', '0', '-f', fout]):
_, _, nick, line = line.split(' ', 3)
nick = nick[1:-1]
# Ignore actions.
if line[:8] == '\x01ACTION ' and line[-1] == '\x01':
continue
# Ignore bots and nologs.
if line[0] == '\u200b':
continue
if line.startswith('[nolog]') or line.startswith('nolog:'):
continue
# Try it out.
for test, method in methods:
print('Testing', method.__name__, 'with:', line)
result = test(nick, line)
if not result:
print('| Test complete, yet invalid.')
continue
print('| Test complete and valid.\nProceeding with method', method.__name__ + '.')
result = method(*result)
print('| Method complete.')
if not result:
continue
action, nick, line = result
if action:
reply = '* ' + nick + ' ' + line
else:
reply = '<' + nick + '> ' + line
print('| It is valid! Sending:', reply)
with open(fin, 'w') as fh:
fh.write('\u200b' + reply + '\n')
sleep(0.5)
begin()