WebSocketService.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. <?php
  2. namespace App\Services;
  3. use App\Module\Base;
  4. use App\Module\Chat;
  5. use App\Module\Users;
  6. use App\Tasks\ChromeExtendTask;
  7. use App\Tasks\PushTask;
  8. use Cache;
  9. use DB;
  10. use Hhxsv5\LaravelS\Swoole\Task\Task;
  11. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  12. use Swoole\Http\Request;
  13. use Swoole\WebSocket\Frame;
  14. use Swoole\WebSocket\Server;
  15. /**
  16. * @see https://wiki.swoole.com/#/start/start_ws_server
  17. */
  18. class WebSocketService implements WebSocketHandlerInterface
  19. {
  20. /**
  21. * 声明没有参数的构造函数
  22. * WebSocketService constructor.
  23. */
  24. public function __construct()
  25. {
  26. }
  27. /**
  28. * 连接建立时触发
  29. * @param Server $server
  30. * @param Request $request
  31. */
  32. public function onOpen(Server $server, Request $request)
  33. {
  34. global $_A;
  35. $_A = [
  36. '__static_langdata' => [],
  37. ];
  38. //判断参数
  39. $fd = $request->fd;
  40. if (!isset($request->get['token'])) {
  41. $server->push($fd, Chat::formatMsgSend([
  42. 'messageType' => 'error',
  43. 'body' => [
  44. 'error' => '参数错误'
  45. ],
  46. ]));
  47. $server->close($fd);
  48. $this->deleteUser($fd);
  49. return;
  50. }
  51. //判断token
  52. $token = $request->get['token'];
  53. $channel = $request->get['channel'] ?: '';
  54. $cacheKey = "ws::token:" . md5($token);
  55. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  56. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  57. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  58. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  59. return $username;
  60. }
  61. }
  62. return null;
  63. });
  64. if (empty($username)) {
  65. Cache::forget($cacheKey);
  66. $server->push($fd, Chat::formatMsgSend([
  67. 'messageType' => 'error',
  68. 'channel' => $channel,
  69. 'body' => [
  70. 'error' => '会员不存在',
  71. ],
  72. ]));
  73. $server->close($fd);
  74. $this->deleteUser($fd);
  75. return;
  76. }
  77. //踢下线
  78. /*if ($channel != 'chromeExtend') {
  79. $userLists = $this->getUser('', $channel, $username);
  80. foreach ($userLists AS $user) {
  81. $server->push($user['fd'], Chat::formatMsgSend([
  82. 'messageType' => 'kick',
  83. 'channel' => $channel,
  84. 'body' => [
  85. 'ip' => Base::getIp(),
  86. 'time' => time(),
  87. 'newfd' => $fd,
  88. ],
  89. ]));
  90. $this->deleteUser($user['fd']);
  91. }
  92. }*/
  93. //保存用户、发送open事件
  94. $this->saveUser($fd, $channel, $username);
  95. $server->push($fd, Chat::formatMsgSend([
  96. 'messageType' => 'open',
  97. 'channel' => $channel,
  98. 'body' => [
  99. 'fd' => $fd,
  100. ],
  101. ]));
  102. //发送最后一条未发送的信息
  103. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  104. if ($lastMsg && $lastMsg['roger'] === 0) {
  105. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  106. if (!Base::isError($dialog)) {
  107. $dialog = $dialog['data'];
  108. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  109. $body = Base::string2array($lastMsg['message']);
  110. $body['id'] = $lastMsg['id'];
  111. $body['resend'] = 1;
  112. $body['unread'] = $unread;
  113. $body['username'] = $lastMsg['username'];
  114. $body['userimg'] = Users::userimg($lastMsg['username']);
  115. $body['indate'] = $lastMsg['indate'];
  116. $server->push($fd, Chat::formatMsgSend([
  117. 'messageType' => 'user',
  118. 'contentId' => $lastMsg['id'],
  119. 'channel' => $channel,
  120. 'username' => $lastMsg['username'],
  121. 'target' => $lastMsg['receive'],
  122. 'body' => $body,
  123. 'time' => $lastMsg['indate'],
  124. ]));
  125. }
  126. }
  127. }
  128. /**
  129. * 收到消息时触发
  130. * @param Server $server
  131. * @param Frame $frame
  132. */
  133. public function onMessage(Server $server, Frame $frame)
  134. {
  135. global $_A;
  136. $_A = [
  137. '__static_langdata' => [],
  138. ];
  139. //
  140. $data = Chat::formatMsgReceive($frame->data);
  141. $back = [
  142. 'status' => 1,
  143. 'message' => '',
  144. ];
  145. //
  146. switch ($data['messageType']) {
  147. /**
  148. * 刷新
  149. */
  150. case 'refresh':
  151. DB::table('ws')->where([
  152. 'fd' => $frame->fd,
  153. 'channel' => $data['channel'],
  154. ])->update(['update' => time()]);
  155. break;
  156. /**
  157. * 总未读消息数
  158. */
  159. case 'unread':
  160. $username = DB::table('ws')->where([
  161. 'fd' => $frame->fd,
  162. 'channel' => $data['channel'],
  163. ])->value('username');
  164. if ($username) {
  165. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  166. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  167. $back['message'] = $num;
  168. } else {
  169. $back['message'] = 0;
  170. }
  171. break;
  172. /**
  173. * 已读会员消息
  174. */
  175. case 'read':
  176. $username = DB::table('ws')->where([
  177. 'fd' => $frame->fd,
  178. 'channel' => $data['channel'],
  179. ])->value('username');
  180. $dialog = Chat::openDialog($username, $data['target']);
  181. if (!Base::isError($dialog)) {
  182. $dialog = $dialog['data'];
  183. $upArray = [];
  184. if ($dialog['user1'] == $dialog['user2']) {
  185. $upArray['unread1'] = 0;
  186. $upArray['unread2'] = 0;
  187. } else {
  188. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  189. }
  190. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  191. }
  192. $chromeExtendTask = new ChromeExtendTask($username);
  193. Task::deliver($chromeExtendTask);
  194. break;
  195. /**
  196. * 收到信息回执
  197. */
  198. case 'roger':
  199. if ($data['contentId'] > 0) {
  200. $username = DB::table('ws')->where([
  201. 'fd' => $frame->fd,
  202. 'channel' => $data['channel'],
  203. ])->value('username');
  204. DB::table('chat_msg')->where([
  205. 'id' => $data['contentId'],
  206. 'receive' => $username,
  207. ])->update([
  208. 'roger' => 1,
  209. ]);
  210. }
  211. break;
  212. /**
  213. * 发给用户
  214. */
  215. case 'user':
  216. $username = DB::table('ws')->where([
  217. 'fd' => $frame->fd,
  218. 'channel' => $data['channel'],
  219. ])->value('username');
  220. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  221. if (Base::isError($res)) {
  222. $back = [
  223. 'status' => 0,
  224. 'message' => $res['msg'],
  225. ];
  226. } else {
  227. $resData = $res['data'];
  228. $back['message'] = $resData['id'];
  229. $data['contentId'] = $resData['id'];
  230. $data['body']['id'] = $resData['id'];
  231. $data['body']['unread'] = $resData['unread'];
  232. //
  233. $pushLists = [];
  234. foreach ($this->getUserOfName($data['target']) AS $item) {
  235. $pushLists[] = [
  236. 'fd' => $item['fd'],
  237. 'msg' => $data
  238. ];
  239. }
  240. $pushTask = new PushTask($pushLists);
  241. Task::deliver($pushTask);
  242. }
  243. break;
  244. /**
  245. * 发给用户(不保存记录)
  246. */
  247. case 'info':
  248. $pushLists = [];
  249. foreach ($this->getUserOfName($data['target']) AS $item) {
  250. $pushLists[] = [
  251. 'fd' => $item['fd'],
  252. 'msg' => $data
  253. ];
  254. }
  255. $pushTask = new PushTask($pushLists);
  256. Task::deliver($pushTask);
  257. break;
  258. /**
  259. * 发给整个团队
  260. */
  261. case 'team':
  262. if (Base::val($data['body'], 'type') === 'taskA') {
  263. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  264. if ($taskId > 0) {
  265. $userLists = $this->getTaskUsers($taskId);
  266. } else {
  267. $userLists = $this->getTeamUsers();
  268. }
  269. //
  270. $pushLists = [];
  271. foreach ($userLists as $user) {
  272. $data['messageType'] = 'user';
  273. $data['target'] = $user['username'];
  274. $pushLists[] = [
  275. 'fd' => $user['fd'],
  276. 'msg' => $data
  277. ];
  278. }
  279. $pushTask = new PushTask($pushLists);
  280. Task::deliver($pushTask);
  281. }
  282. break;
  283. }
  284. if ($data['messageId']) {
  285. $pushLists = [];
  286. $pushLists[] = [
  287. 'fd' => $frame->fd,
  288. 'msg' => [
  289. 'messageType' => 'back',
  290. 'messageId' => $data['messageId'],
  291. 'body' => $back,
  292. ]
  293. ];
  294. $pushTask = new PushTask($pushLists);
  295. Task::deliver($pushTask);
  296. }
  297. }
  298. /**
  299. * 关闭连接时触发
  300. * @param Server $server
  301. * @param $fd
  302. * @param $reactorId
  303. */
  304. public function onClose(Server $server, $fd, $reactorId)
  305. {
  306. $this->deleteUser($fd);
  307. }
  308. /** ****************************************************************************** */
  309. /** ****************************************************************************** */
  310. /** ****************************************************************************** */
  311. /**
  312. * 保存用户
  313. * @param $fd
  314. * @param $channel
  315. * @param $username
  316. */
  317. private function saveUser($fd, $channel, $username)
  318. {
  319. try {
  320. DB::transaction(function () use ($username, $channel, $fd) {
  321. $this->deleteUser($fd);
  322. DB::table('ws')->updateOrInsert([
  323. 'key' => md5($fd . '@' . $channel . '@' . $username)
  324. ], [
  325. 'fd' => $fd,
  326. 'username' => $username,
  327. 'channel' => $channel,
  328. 'update' => time()
  329. ]);
  330. });
  331. } catch (\Throwable $e) {
  332. }
  333. }
  334. /**
  335. * 清除用户
  336. * @param $fd
  337. */
  338. private function deleteUser($fd)
  339. {
  340. DB::table('ws')->where('fd', $fd)->delete();
  341. }
  342. /**
  343. * 获取用户
  344. * @param string $fd
  345. * @param string $channel
  346. * @param string $username
  347. * @return array
  348. */
  349. private function getUser($fd = '', $channel = '', $username = '')
  350. {
  351. $array = [];
  352. if ($fd) $array['fd'] = $fd;
  353. if ($channel) $array['channel'] = $channel;
  354. if ($username) $array['username'] = $username;
  355. if (empty($array)) {
  356. return [];
  357. }
  358. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  359. }
  360. private function getUserOfFd($fd, $channel = '') {
  361. return $this->getUser($fd, $channel);
  362. }
  363. private function getUserOfName($username, $channel = '') {
  364. return $this->getUser('', $channel, $username);
  365. }
  366. /**
  367. * 获取团队用户
  368. * @return array|string
  369. */
  370. private function getTeamUsers()
  371. {
  372. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  373. ['update', '>', time() - 600],
  374. ])->get());
  375. }
  376. /**
  377. * 获取跟任务有关系的用户(关注的、在项目里的、负责人、创建者)
  378. * @param $taskId
  379. * @return array
  380. */
  381. private function getTaskUsers($taskId)
  382. {
  383. $taskDeatil = Base::DBC2A(DB::table('project_task')->select(['follower', 'createuser', 'username', 'projectid'])->where('id', $taskId)->first());
  384. if (empty($taskDeatil)) {
  385. return [];
  386. }
  387. //关注的用户
  388. $userArray = Base::string2array($taskDeatil['follower']);
  389. //创建者
  390. $userArray[] = $taskDeatil['createuser'];
  391. //负责人
  392. $userArray[] = $taskDeatil['username'];
  393. //在项目里的用户
  394. if ($taskDeatil['projectid'] > 0) {
  395. $tempLists = Base::DBC2A(DB::table('project_users')->select(['username'])->where(['projectid' => $taskDeatil['projectid'], 'type' => '成员' ])->get());
  396. foreach ($tempLists AS $item) {
  397. $userArray[] = $item['username'];
  398. }
  399. }
  400. //
  401. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  402. ['update', '>', time() - 600],
  403. ])->whereIn('username', array_values(array_unique($userArray)))->get());
  404. }
  405. }