Run this notebook

Use Livebook to open this notebook and explore new ideas.

It is easy to get started, on your machine or the cloud.

Click below to open and run it in your Livebook at .

(or change your Livebook location)

# PubSub ## Request-Response场景 <!-- Learn more at https://mermaid-js.github.io/mermaid --> 事件发起者明确需要知道请求的结果 ```mermaid sequenceDiagram Caller->>+Server: send Request Body? Server->>-Caller: reply Response ``` 采用PubSub的方式来实现这种场景 <!-- livebook:{"break_markdown":true} --> ### 方案1: 规则topic的方式 请求者发送请求到test主题,服务提供者消费到对应的请求后,处理相应的逻辑,并且将处理结果发送到test-response中 ```mermaid sequenceDiagram Caller->>+MQ: send Request Body to test topic MQ->>+Server: consume Request Body Server->>Server: process the request and generate response Server->>-MQ: send response to test-response MQ->>-Caller: consume the response ``` <!-- livebook:{"break_markdown":true} --> ### 方案2: 请求中带有response-topic的设定 请求者发送请求到test主题,并且带上response-topic=test2,服务提供者消费到对应的test的主题后,处理相应的逻辑,并且将结果写入到test2中。 ```mermaid sequenceDiagram Caller->>+MQ: send Request Body(response-topic=test2) to test topic MQ->>+Server: consume Request Body Server->>Server: process the request and generate response Server->>-MQ: send response to test2 MQ->>-Caller: consume the response ``` <!-- livebook:{"break_markdown":true} --> ### 以下的示例采用JSON RPC的格式 #### 登录Auth 1. 鉴权请求 ``` { "jsonrpc": "2.0", "method": "auth", "params": { "uid": "zhangchao", "token": "123456" }, "id": 1 } ``` 1. 鉴权返回 ``` { "jsonrpc": "2.0", "result": { "status": "true", "token": "huanxin123456" }, "id": 1 } ``` #### 验证细节 * broadway 流式处理数据 * pubsub 返回数据给发起方 #### 鉴权请求的时序图 <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid sequenceDiagram Caller->>+CallerService: request auth request CallerService->>+MQ: send Auth(uid,pwd) to exim-auth MQ->>+AuthService: consume Auth(uid,pwd) from exim-auth AuthService->>AuthService: valid the Auth(uid,pwd) AuthService->>-MQ: send AuthResult(success/fail) to exim-auth-response MQ->>-CallerService: consume AuthResult(success/fail) from exim-auth-response CallerService->>-Caller: consume the response ``` ```elixir # 测试用例 result = Exim.PubSub.Request.auth("zhangchao", "123456") ``` ```elixir # 由于演示的原因,所以没有进行分拆,CallerService和AuthService # 1. 启动请求的kafka client # 2. 启动response的消费请求的逻辑 # 3. 启动AuthService写入回应的kafka client # 4. 启动CallerService消费请求结果的逻辑 # start request kafka client def start_request() do Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic -> # start callerService kafka client Exim.PubSub.Request.start_client(topic) end) end # start kafka consume and response kafka client def start_broadway() do Enum.each(Application.get_env(:exim, :kafka_topics, []), fn topic -> # add consumer for request Exim.PubSub.PipelineManager.add_queue(topic) # start authService kafka client Exim.PubSub.Response.start_client(topic) # add consumer for response Exim.PubSub.PipelineManager.add_queue(Exim.PubSub.Response.response_topic(topic)) end) end ``` ```elixir # 请求的逻辑 # 1. 订阅请求ID # 2. 发送请求到MQ # 3. AuthService消费到请求 # 4. AuthService进行鉴权逻辑 # 5. AuthService写入鉴权结果 # 6. CallerService消费到鉴权结果 # 7. CallerService将结果通知给Caller def auth(uid, token) do auth_request = %{ method: "auth", params: %{ uid: uid, token: token }, key: uid, id: UUID.generate() } # sub the request id PubSub.subscribe(Exim.PubSub, auth_request.id) request(auth_request) # wait for response receive do response -> Logger.info("Received response: #{inspect(response)}") response end end # auth valid the auth request and give auth result to response topic # 1. valid the token # 2. send result to response topic defp handle_message_internal(%{"method" => "auth"} = message) do Logger.info("handle auth request, #{inspect(message)}") response = message |> Map.put("topic", "exim-auth") |> Map.put("method", "result") Exim.PubSub.Response.response(response) end # handle auth response # send result to request process defp handle_message_internal(%{"method" => "result", "id" => id} = message) do Logger.info("handle auth response, #{inspect(message)}") PubSub.broadcast(Exim.PubSub, id, message) end ``` #### 加入聊天室 TODO <!-- livebook:{"break_markdown":true} --> #### 发送消息(长连接) 1. 基础参数的检测 比如消息是否在同一个appkey内,消息是否有from,有to字段等 2. 业务合法性检查 * 客户业务逻辑判定 * 敏感词的检测 * 反垃圾的检测 * 群组聊天室等业务场景的检测 * 好友黑名单的检测 3. 落盘消息 4. 给消息发送者返回发送成功 5. 路由消息的接收者,投递给消息的接收者 6. 客户端确认消息接收成功 <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid flowchart LR Sender-->|基础检查|BasicCheck BasicCheck-->|持久化数据|Persistent Persistent-->|路由消息|RouterService RouterService-->|推送消息|Receiver ``` <!-- livebook:{"break_markdown":true} --> ### 异步MQ的topic的设计-服务级topic版本 该方案引入了AgentService,ValidService,ReplyService进行,服务中间采用的固定的topic队列进行消费 1. 用户sender登录到AgentService 2. 用户订阅topic用户级别,比如subscribe /sender/android 3. sender发送消息到AgentService 4. AgentService发送message到valid-msg-topic 5. ValidService消费到valid-msg-topic的message数据 6. ValidService完成消息合法性的检查 7. ValidService将合法性检查的结果写入到valided-msg-topic 8. ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android 9. AgentService收到发送消息的检查结果,并把数据发送给sender客户端 <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid flowchart LR Sender-->|1 参数合法性检测|SenderAgentService SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService ValidService-->|3-1 发送结果(服务级对立,用户级分区)|ReplyService ReplyService-->|4 返回发送者发送结果到Sender所在的节点|SenderAgentService SenderAgentService-->|5 返回发送结果给客户端|Sender ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent ValidService-->|3-3 路由消息(服务级队列,用户级分区)|RouterService RouterService-->|4 推送消息|AgentService AgentService-->|5 发送消息到接收方|Receiver ``` <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid sequenceDiagram Sender->>+SenderAgentService: receive client message SenderAgentService->>+MQ: pub message to (valid-msg-topic) MQ->>+ValidService: consume message from valid-msg-topic ValidService->>ValidService: valid message ValidService->>+MQ: send valided msg to (valided-msg-topic) MQ->>ReplyService: send reply to Sender ReplyService->>SenderAgentService: route send_ack to ReceiveService SenderAgentService->>-Sender: reply send_ack to Sender MQ->>RouterService: route message to Receiver RouterService->>AgentService: router msg to Receiver's AgentService AgentService->>Receiver: send message to receiver Receiver->>AgentService: ack message ``` <!-- livebook:{"break_markdown":true} --> ### 异步MQ的topic的设计-用户级topic版本 该方案引入了AgentService,ValidService进行,主要的区别Agent能够直接消费到对应的数据 1. 用户sender登录到AgentService 2. 用户订阅topic用户级别,比如subscribe /sender/android 3. sender发送消息到AgentService 4. AgentService发送message到valid-msg-topic 5. ValidService消费到valid-msg-topic的message数据 6. ValidService完成消息合法性的检查 7. ValidService将合法性检查的结果写入到valided-msg-topic 8. ReplyService消费到valided-msg-topic的检查结果,发送并且publish /sender/android 9. AgentService收到发送消息的检查结果,并把数据发送给sender客户端 <!-- livebook:{"break_markdown":true} --> ```mermaid flowchart LR Sender-->|0 登录订阅用户级队列 /user/mobile |SenderAgentService Sender-->|1 参数合法性检测|SenderAgentService SenderAgentService-->|2 业务合法性检测(服务级队列,用户级分区)|ValidService ValidService-->|3-1 发送结果(用户级队列 /user/mobile)|SenderAgentService SenderAgentService-->|4 返回发送结果给客户端|Sender ValidService-->|3-2 路由消息(服务级队列,用户级分区)|Persistent ValidService-->|3-3 路由消息(用户级队列)|AgentService AgentService-->|4 发送消息到接收方|Receiver ``` <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid sequenceDiagram Sender->>+SenderAgentService: login && subscribe /sender/mobile && /sender SenderAgentService->>+MQ: pub message to (valid-msg-topic) MQ->>+ValidService: consume message from valid-msg-topic ValidService->>ValidService: valid message ValidService->>+MQ: send valided msg to (/sender/mobile) ValidService->>+MQ: route message to dest (/receiver || /receiver/mobile) MQ->>SenderAgentService: send reply to Sender SenderAgentService->>-Sender: reply send_ack to Sender MQ->>AgentService: get msg to Receiver's AgentService AgentService->>Receiver: send message to receiver Receiver->>AgentService: ack message ``` ## REST发送消息 ### 背景 当前发送消息,包括聊天室消息,群组消息,单人消息。 ### 现状 <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid flowchart LR AppServer-->|0 发送系统消息 |Gateway Gateway-->|1 验证token合法性| MessageService MessageService-->|2 检查参数合法性 | MessageService MessageService-->|3-1 返回消息发送成功|Gateway Gateway-->|4-1 返回发送结果 | AppServer MessageService -->|3-2 路由消息(此处过程同上面长连接过程) | RouterService RouterService -->|4 发送消息到接收方|Receiver ``` <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid sequenceDiagram AppServer->>+Gateway: 0 发送系统消息 Gateway->>+MessageService: 1 验证token合法性 MessageService-->>MessageService: 2 检查参数合法性 MessageService-->-Gateway: 3-1 返回消息发送成功 Gateway-->-AppServer: 4-1 返回发送结果 MessageService ->>RouterService: 3-2 路由消息(此处过程同上面长连接过程) RouterService ->>Receiver: 4 发送消息到接收方 ``` <!-- livebook:{"break_markdown":true} --> IM服务接收到客户的请求后,进行接下来的操作 1. 验证token的合法性,当前主要在gateway进行 2. 参数合法性检查,确认必要的参数是否都有 3. 返回发送结果给调用者,路径中包括gateway和AppServer 4. 消息正常的后续投递过程 5. 消息发送给消息的接收者 <!-- livebook:{"break_markdown":true} --> ### 队列方案 IM服务接收到客户的请求后,进行接下来的操作 1. 验证token的合法性,当前主要在gateway进行 2. 参数合法性检查,确认必要的参数是否都有 3. 消息写入到队列MQ(消息持久化) 4. 返回发送结果给调用者,路径中包括gateway和AppServer 5. 消息服消费到MQ中消息,进行后续的投递过程 6. 消息服务发送给消息的接收者 <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid flowchart LR AppServer-->|0 发送系统消息 |Gateway Gateway-->|1 验证token合法性| MessageService MessageService-->|2 检查参数合法性 | MessageService MessageService-->|3-1 消息写入队列|MQ MessageService-->|3-2 返回消息发送成功|Gateway Gateway-->|4-1 返回发送结果 | AppServer MQ-->|4-2 消息读取队列|RouterService RouterService-->|5 路由消息(此处过程同上面长连接过程) | AgentService AgentService -->|6 发送消息到接收方|Receiver ``` <!-- livebook:{"break_markdown":true} --> <!-- Learn more at https://mermaid-js.github.io/mermaid --> ```mermaid sequenceDiagram AppServer->>+Gateway: 0 发送系统消息 Gateway->>+MessageService: 1 验证token合法性 MessageService-->>MessageService: 2 检查参数合法性 MessageService->>MQ: 3-1 消息写入队列 MessageService-->>-Gateway: 3-2 返回消息发送成功 Gateway-->>-AppServer: 4-1 返回发送结果 MQ->>RouterService: 4-2 消息读取队列 RouterService->>AgentService: 5 路由消息(此处过程同上面长连接过程) AgentService ->>Receiver: 6 发送消息到接收方 ``` <!-- livebook:{"break_markdown":true} --> 添加队列方案当前遗留问题 1. 如何进行租户级别的隔离 2. 队列的可靠性如何进行监控 3. 延时会有所增加,是否在可接受范围内
See source

Have you already installed Livebook?

If you already installed Livebook, you can configure the default Livebook location where you want to open notebooks.
Livebook up Checking status We can't reach this Livebook (but we saved your preference anyway)
Run notebook

Not yet? Install Livebook in just a minute

Livebook is open source, free, and ready to run anywhere.

Run on your machine

with Livebook Desktop

Run in the cloud

on select platforms

To run on Linux, Docker, embedded devices, or Elixir’s Mix, check our README.

PLATINUM SPONSORS
SPONSORS
Code navigation with go to definition of modules and functions Read More ×