webhook-matrix-notifier/wmn/notify.py
Ben 5364860191
Add pyproject.toml, refactor notify.py
Add instructions how to run the matrix-notify command-line program.
2023-10-27 14:09:13 +02:00

126 lines
3.8 KiB
Python
Executable file

#!/usr/bin/env python3
# Copyright 2019-2021 Benedikt Ziemons
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of
# this software and associated documentation files (the "Software"), to deal in
# the Software without restriction, including without limitation the rights to
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
# the Software, and to permit persons to whom the Software is furnished to do so,
# subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import argparse
import asyncio
import logging
import re
import sys
import nio
from .common import (
client_login,
send_message,
resolve_room,
MatrixException,
load_configuration,
)
# Not going to care for specifics like the underscore.
# Generally match !anything:example.com with unicode support.
room_pattern = re.compile(r"^!\w+:[\w\-.]+$")
def get_a_room(args):
if not args.channel and not args.room:
logging.error("Specify the Matrix room to send the message to")
sys.exit(1)
if args.room:
room = args.room
else:
room = args.channel
if room_pattern.fullmatch(room) is None:
logging.error("Could not parse Matrix room '%s'", room)
sys.exit(1)
return room
async def login_and_send(args):
cfg = load_configuration()
client = await client_login(cfg)
room = get_a_room(args)
try:
room_id = await resolve_room(client=client, room=room)
response = await client.join(room_id=room_id)
if isinstance(response, nio.ErrorResponse):
raise MatrixException(response)
if "html" in args:
response = await send_message(
client=client,
room_id=room_id,
text=(args.text or ""),
msgtype=args.type,
html=args.html,
)
else:
response = await send_message(
client=client, room_id=room_id, text=args.text, msgtype=args.type
)
finally:
await client.close()
logging.info("Message sent. %s", response.event_id)
def parse_arguments():
parser = argparse.ArgumentParser(description="Notify a Matrix room.")
parser.add_argument(
"-c", "--channel", required=False, help="the channel to send the message to (deprecated, use --room)"
)
parser.add_argument(
"-r", "--room", required=False, help="the Matrix room to send the message to"
)
parser.add_argument(
"-t",
"--type",
required=False,
help="the msgtype",
choices=("m.text", "m.notice"),
default="m.text",
)
parser.add_argument("text", help="the text message to send to the room")
parser.add_argument(
"html", nargs="?", help="the html message to send to the room"
)
return parser.parse_args()
def main():
"""
Config path is local config.yml by default. Specify with environment variable WMN_CONFIG_PATH.
config.yml Example:
matrix:
server: https://matrix.org
username: ...
password: "..."
"""
logging.basicConfig()
args = parse_arguments()
loop = asyncio.get_event_loop()
loop.run_until_complete(login_and_send(args))
if __name__ == "__main__":
main()