forked from zgrep/happybot
262 lines
7.3 KiB
Python
Executable file
262 lines
7.3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
# How to read a line.
|
|
|
|
def deirc(nick, line):
|
|
action = False
|
|
if len(line) <= 3:
|
|
return action, nick, line
|
|
if line[0] == '\u200b':
|
|
if line[1] == '<':
|
|
try:
|
|
close = line.index('>')
|
|
assert(line[close+1] == ' ')
|
|
assert(not any(map(lambda x: x.isspace(), line[2:close])))
|
|
nick = line[2:close]
|
|
line = line[close+2:]
|
|
except:
|
|
pass
|
|
elif line[1:3] == '* ':
|
|
try:
|
|
close = line[3:].index(' ') + 3
|
|
assert(not any(map(lambda x: x.isspace(), line[3:close])))
|
|
nick = line[3:close]
|
|
line = line[close+1:]
|
|
action = True
|
|
except:
|
|
pass
|
|
elif line[:8] == '\x01ACTION ' and line[-1] == '\x01':
|
|
action = True
|
|
line = line[8:-1]
|
|
|
|
# Redact the nologs.
|
|
if line.startswith('[nolog]') or line.startswith('nolog:'):
|
|
line = '[REDACTED]'
|
|
|
|
return action, nick, line
|
|
|
|
# Set flags and funcs.
|
|
|
|
from sys import argv
|
|
if len(argv) != 3:
|
|
print('Usage: in out')
|
|
exit(1)
|
|
_, fin, fout = argv
|
|
|
|
# Read the outfile.
|
|
|
|
class ReadOut:
|
|
def __init__(self, fout=fout):
|
|
self.file = open(fout, 'rb')
|
|
self.first = True
|
|
def cat(self):
|
|
self.first = False
|
|
for line in self.file:
|
|
yield str(line[:-1], 'utf-8', 'ignore')
|
|
def tac(self):
|
|
if self.first:
|
|
self.first = False
|
|
self.file.seek(0, 2)
|
|
buffer = b''
|
|
if self.file.tell():
|
|
self.file.seek(self.file.tell() - 1)
|
|
if self.file.read(1) == b'\n':
|
|
self.file.seek(self.file.tell() - 1)
|
|
while self.file.tell():
|
|
self.file.seek(self.file.tell() - 1)
|
|
char = self.file.read(1)
|
|
if char == b'\n':
|
|
yield str(buffer[::-1], 'utf-8', 'ignore')
|
|
buffer = b''
|
|
else:
|
|
buffer += char
|
|
self.file.seek(self.file.tell() - 1)
|
|
if buffer:
|
|
yield str(buffer[::-1], 'utf-8', 'ignore')
|
|
def close(self):
|
|
self.file.close()
|
|
|
|
from re import compile as regex
|
|
|
|
xed_match = regex(r'x/((?:\\.|[^/])*)/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
|
|
xed_verbose_match = regex(r'xv/((?:\\.|[^/])*)/(?:((?:\\.|[^/])*)/)?')
|
|
sed_match = regex(r's/((?:\\.|[^/])*)/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
|
|
find_match = regex(r'p([\+-]\d+)?/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
|
|
tr_match = regex(r'y/((?:\\.|[^/])*)/((?:\\.|[^/])*)/([^\s~]*)(?:~(\d+))?')
|
|
|
|
matchers = [xed_match, sed_match, find_match, tr_match, xed_verbose_match]
|
|
|
|
def xed_test(nick, line):
|
|
# Is it a command?
|
|
match = xed_match.match(line)
|
|
if not match:
|
|
return None
|
|
search, replace, who, back = match.groups()
|
|
return search, replace, nick, who, back
|
|
|
|
import x as xed
|
|
|
|
def xed_method(search, replace, nick, who, back):
|
|
# Some things to fix.
|
|
if not back:
|
|
back = 0
|
|
else:
|
|
back = int(back)
|
|
|
|
fuzzy = True
|
|
if not who:
|
|
fuzzy = False
|
|
who = nick
|
|
elif who == 'g':
|
|
who = ''
|
|
who = who.lower()
|
|
|
|
# Turn it into a possibility space.
|
|
try:
|
|
search = xed.parser.parse(search)
|
|
replace = xed.parser.parse(replace)
|
|
except e:
|
|
print('| Parsing error:', e)
|
|
return None
|
|
search = xed.Expand().transform(search)
|
|
lookup = xed.lookup(search)
|
|
|
|
output = None
|
|
|
|
# Now it is time to try to xed.
|
|
log = ReadOut()
|
|
for nline, line in enumerate(log.tac()):
|
|
if nline > 500:
|
|
log.close()
|
|
break
|
|
|
|
_, _, nick, line = line.split(' ', 3)
|
|
nick = nick[1:-1]
|
|
|
|
skip = False
|
|
for test in matchers:
|
|
if test.match(line):
|
|
skip = True
|
|
break
|
|
if skip:
|
|
continue
|
|
|
|
action, nick, line = deirc(nick, line)
|
|
|
|
match_nick = nick.lower().replace('*', '')
|
|
if fuzzy and not match_nick.startswith(who):
|
|
continue
|
|
elif not fuzzy and match_nick != who:
|
|
continue
|
|
|
|
if action:
|
|
action = False
|
|
line = '\x01ACTION ' + line + '\x01'
|
|
|
|
output = xed.findall(lookup, line)
|
|
if not output:
|
|
continue
|
|
|
|
if back != 0:
|
|
back -= 1
|
|
continue
|
|
|
|
log.close()
|
|
break
|
|
else:
|
|
log.close()
|
|
return None
|
|
|
|
if not output:
|
|
return None
|
|
|
|
result = line
|
|
replace = xed.Expand(amp=search).transform(replace)
|
|
for n, i, j in reversed(output):
|
|
rep = replace[n % len(replace)]
|
|
result = result[:i] + rep + result[j:]
|
|
|
|
if result[:8] == '\x01ACTION ' and result[-1] == '\x01':
|
|
action = True
|
|
result = result[8:-1]
|
|
|
|
log.close()
|
|
return action, nick + '*', result
|
|
|
|
# Execute a command.
|
|
|
|
from subprocess import Popen, PIPE
|
|
|
|
def cmd(args):
|
|
proc = Popen(args, stdout=PIPE)
|
|
while True:
|
|
line = proc.stdout.readline()
|
|
if line:
|
|
try:
|
|
yield str(line[:-1], 'utf-8', 'ignore')
|
|
except:
|
|
pass
|
|
else:
|
|
break
|
|
|
|
# Do the thing!
|
|
from time import sleep
|
|
|
|
def begin():
|
|
for line in cmd(['tail', '-n', '0', '-f', fout]):
|
|
_, _, nick, line = line.split(' ', 3)
|
|
nick = nick[1:-1]
|
|
|
|
# Ignore actions.
|
|
if line[:8] == '\x01ACTION ' and line[-1] == '\x01':
|
|
continue
|
|
|
|
# Ignore bots and nologs.
|
|
if line[0] == '\u200b':
|
|
continue
|
|
if line.startswith('[nolog]') or line.startswith('nolog:'):
|
|
continue
|
|
|
|
try:
|
|
# Try it out.
|
|
print('Testing xed.')
|
|
result = xed_test(nick, line)
|
|
if not result:
|
|
print('| Test complete, yet invalid.')
|
|
m = xed_verbose_match.match(line)
|
|
if m:
|
|
n = m.group(2)
|
|
try:
|
|
m = xed.parser.parse(m.group(1))
|
|
if n:
|
|
n = xed.parser.parse(n)
|
|
except:
|
|
with open(fin, 'w') as fh:
|
|
fh.write('\u200b' + nick + ': Parsing error.\n')
|
|
m = xed.Expand().transform(m)
|
|
if n:
|
|
n = xed.Expand(amp=m).transform(n)
|
|
with open(fin, 'w') as fh:
|
|
if n:
|
|
fh.write('\u200b' + nick + ': ' + repr(m) + ' -> ' + repr(n) + '\n')
|
|
else:
|
|
fh.write('\u200b' + nick + ': ' + repr(m) + '\n')
|
|
continue
|
|
print('| Test complete and valid.\nProceeding with xed.')
|
|
result = xed_method(*result)
|
|
print('| Method complete.')
|
|
if not result:
|
|
continue
|
|
action, nick, line = result
|
|
if action:
|
|
reply = '* ' + nick + ' ' + line
|
|
else:
|
|
reply = '<' + nick + '> ' + line
|
|
print('| It is valid! Sending:', reply)
|
|
with open(fin, 'w') as fh:
|
|
fh.write('\u200b' + reply + '\n')
|
|
except:
|
|
with open(fin, 'w') as fh:
|
|
fh.write('\u200b' + nick + ': This would have crashed me.\n')
|
|
|
|
begin()
|