- Latest Versions
| Name | Version | Description |
|---|---|---|
| Ming Ke Ming (名可名) | Decentralized User Identity Authentication | |
| Dao Ke Dao (道可道) | Universal Message Module | |
| DIMP (去中心化通讯协议) | Decentralized Instant Messaging Protocol |
extends CustomizedContent
class CustomizedContentProcessor(BaseContentProcessor):
"""
Customized Content Processing Unit
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Handle content for application customized
"""
# def __init__(self, facebook: Facebook, messenger: Messenger):
# super().__init__(facebook=facebook, messenger=messenger)
# Override
async def process_content(self, content: Content, r_msg: ReliableMessage) -> List[Content]:
assert isinstance(content, CustomizedContent), 'customized content error: %s' % content
customized_filter = get_app_filter()
# get handler for 'app' & 'mod'
handler = customized_filter.filter_content(content=content, msg=r_msg)
return await handler.handle_action(content=content, msg=r_msg, messenger=self.messenger)- CustomizedContentHandler
class CustomizedContentHandler(ABC):
"""
Handler for Customized Content
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
@abstractmethod
async def handle_action(self, content: CustomizedContent, msg: ReliableMessage,
messenger: Messenger) -> List[Content]:
"""
Do your job
@param content: customized content
@param msg: network message
@param messenger: message transceiver
@return contents
"""
raise NotImplementedError(
f'Not implemented: {type(self).__module__}.{type(self).__name__}.handle_action()'
)
class BaseCustomizedContentHandler(CustomizedContentHandler):
"""
Default Handler
~~~~~~~~~~~~~~~
"""
# Override
async def handle_action(self, content: CustomizedContent, msg: ReliableMessage,
messenger: Messenger) -> List[Content]:
# app = content.application
app = content.get_str(key='app')
mod = content.module
act = content.action
text = 'Content not support.'
return self._respond_receipt(text=text, content=content, envelope=msg.envelope, extra={
'template': 'Customized content (app: ${app}, mod: ${mod}, act: ${act}) not support yet!',
'replacements': {
'app': app,
'mod': mod,
'act': act,
}
})
#
# Convenient responding
#
# noinspection PyMethodMayBeStatic
def _respond_receipt(self, text: str, envelope: Envelope, content: Optional[Content],
extra: Optional[Dict] = None) -> List[ReceiptCommand]:
return [
# create base receipt command with text & original envelope
BaseContentProcessor.create_receipt(text=text, envelope=envelope, content=content, extra=extra)
]- CustomizedContentFilter
class CustomizedContentFilter(ABC):
@abstractmethod
def filter_content(self, content: CustomizedContent, msg: ReliableMessage) -> CustomizedContentHandler:
raise NotImplementedError(
f'Not implemented: {type(self).__module__}.{type(self).__name__}.filter_content()'
)
class AppCustomizedFilter(CustomizedContentFilter):
def __init__(self):
super().__init__()
self.__default_handler = BaseCustomizedContentHandler()
self.__handlers: Dict[str, CustomizedContentHandler] = {}
def set_content_handler(self, app: str, mod: str, handler: CustomizedContentHandler):
key = '%s:%s' % (app, mod)
self.__handlers[key] = handler
# protected
def get_content_handler(self, app: str, mod: str) -> Optional[CustomizedContentHandler]:
key = '%s:%s' % (app, mod)
return self.__handlers.get(key)
# Override
def filter_content(self, content: CustomizedContent, msg: ReliableMessage) -> CustomizedContentHandler:
# app = content.application
app = content.get_str(key='app', default='')
mod = content.module
handler = self.get_content_handler(app=app, mod=mod)
if handler is not None:
return handler
# if the application has too many modules, I suggest you to
# use different handler to do the jobs for each module.
return self.__default_handler
class CustomizedFilterExtension:
@property
def customized_filter(self) -> CustomizedContentFilter:
raise NotImplementedError(
f'Not implemented: {type(self).__module__}.{type(self).__name__}.customized_filter getter'
)
@customized_filter.setter
def customized_filter(self, delegate: CustomizedContentFilter):
raise NotImplementedError(
f'Not implemented: {type(self).__module__}.{type(self).__name__}.customized_filter setter'
)
shared_message_extensions.customized_filter = AppCustomizedFilter()
def customized_extensions() -> CustomizedFilterExtension:
return shared_message_extensions
def get_app_filter() -> AppCustomizedFilter:
ext = customized_extensions()
customized_filter = ext.customized_filter
if not isinstance(customized_filter, AppCustomizedFilter):
customized_filter = AppCustomizedFilter()
ext.customized_filter = customized_filter
return customized_filter- Example for group querying
class GroupHistoryHandler(BaseCustomizedContentHandler):
""" Command Transform:
+===============================+===============================+
| Customized Content | Group Query Command |
+-------------------------------+-------------------------------+
| "type" : i2s(0xCC) | "type" : i2s(0x88) |
| "sn" : 123 | "sn" : 123 |
| "time" : 123.456 | "time" : 123.456 |
| "app" : "chat.dim.group" | |
| "mod" : "history" | |
| "act" : "query" | |
| | "command" : "query" |
| "group" : "{GROUP_ID}" | "group" : "{GROUP_ID}" |
| "last_time" : 0 | "last_time" : 0 |
+===============================+===============================+
"""
# Override
async def handle_action(self, content: CustomizedContent, msg: ReliableMessage,
messenger: Messenger) -> List[Content]:
if content.group is None:
text = 'Group command error.'
return self._respond_receipt(text=text, envelope=msg.envelope, content=content)
act = content.action
if act == GroupHistory.ACT_QUERY:
# assert GroupHistory.APP == content.application
assert GroupHistory.MOD == content.module
return await self.__transform_query_command(content=content, msg=msg, messenger=messenger)
else:
# assert False, 'unknown action: %s, %s, sender: %s' % (act, content, sender)
return await super().handle_action(content=content, msg=msg, messenger=messenger)
async def __transform_query_command(self, content: CustomizedContent, msg: ReliableMessage,
messenger: Messenger) -> List[Content]:
info = content.copy_dict()
info['type'] = ContentType.COMMAND
info['command'] = QueryCommand.QUERY
query = Content.parse(content=info)
if isinstance(query, QueryCommand):
return await messenger.process_content(content=query, r_msg=msg)
# else:
# assert False, 'query command error: %s, %s, sender: %s' % (query, content, sender)
text = 'Query command error.'
return self._respond_receipt(text=text, envelope=msg.envelope, content=content)
# def register_customized_handlers():
# app_filter = get_app_filter()
# # 'chat.dim.group:history'
# app_filter.set_content_handler(app=GroupHistory.APP,
# mod=GroupHistory.MOD,
# handler=GroupHistoryHandler()
# )from typing import Optional
from dimsdk import *
from .handshake import *
from .customized import *
class ClientContentProcessorCreator(BaseContentProcessorCreator):
# Override
def create_content_processor(self, msg_type: str) -> Optional[ContentProcessor]:
# application customized
if msg_type == ContentType.APPLICATION:
return CustomizedContentProcessor(facebook=self.facebook, messenger=self.messenger)
elif msg_type == ContentType.CUSTOMIZED:
return CustomizedContentProcessor(facebook=self.facebook, messenger=self.messenger)
# ...
# others
return super().create_content_processor(msg_type=msg_type)
# Override
def create_command_processor(self, msg_type: str, cmd: str) -> Optional[ContentProcessor]:
# handshake
if cmd == HandshakeCommand.HANDSHAKE:
return HandshakeCommandProcessor(facebook=self.facebook, messenger=self.messenger)
# ...
# others
return super().create_command_processor(msg_type=msg_type, cmd=cmd)To let your CustomizedContentProcessor start to work,
you must override BaseContentProcessorCreator for message types:
- ContentType.APPLICATION
- ContentType.CUSTOMIZED
and then set your creator for GeneralContentProcessorFactory in the MessageProcessor.