WebSocketService.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. <?php
  2. namespace App\Services;
  3. @error_reporting(E_ALL & ~E_NOTICE);
  4. use App\Module\Base;
  5. use App\Module\Chat;
  6. use App\Module\Project;
  7. use App\Module\Users;
  8. use App\Tasks\ChromeExtendTask;
  9. use App\Tasks\NotificationTask;
  10. use App\Tasks\PushTask;
  11. use Cache;
  12. use DB;
  13. use Hhxsv5\LaravelS\Swoole\Task\Task;
  14. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  15. use Swoole\Http\Request;
  16. use Swoole\WebSocket\Frame;
  17. use Swoole\WebSocket\Server;
  18. /**
  19. * @see https://wiki.swoole.com/#/start/start_ws_server
  20. */
  21. class WebSocketService implements WebSocketHandlerInterface
  22. {
  23. /**
  24. * 声明没有参数的构造函数
  25. * WebSocketService constructor.
  26. */
  27. public function __construct()
  28. {
  29. }
  30. /**
  31. * 连接建立时触发
  32. * @param Server $server
  33. * @param Request $request
  34. */
  35. public function onOpen(Server $server, Request $request)
  36. {
  37. global $_A;
  38. $_A = [
  39. '__static_langdata' => [],
  40. ];
  41. //判断参数
  42. $fd = $request->fd;
  43. if (!isset($request->get['token'])) {
  44. $server->push($fd, Chat::formatMsgSend([
  45. 'messageType' => 'error',
  46. 'body' => [
  47. 'error' => '参数错误'
  48. ],
  49. ]));
  50. $server->close($fd);
  51. $this->deleteUser($fd);
  52. return;
  53. }
  54. //判断token
  55. $token = $request->get['token'];
  56. $channel = $request->get['channel'] ?: '';
  57. $cacheKey = "ws::token:" . md5($token);
  58. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  59. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  60. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  61. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  62. return $username;
  63. }
  64. }
  65. return null;
  66. });
  67. if (empty($username)) {
  68. Cache::forget($cacheKey);
  69. $server->push($fd, Chat::formatMsgSend([
  70. 'messageType' => 'error',
  71. 'channel' => $channel,
  72. 'body' => [
  73. 'error' => '会员不存在',
  74. ],
  75. ]));
  76. $server->close($fd);
  77. $this->deleteUser($fd);
  78. return;
  79. }
  80. //踢下线
  81. /*if ($channel != 'chromeExtend') {
  82. $userLists = $this->getUser('', $channel, $username);
  83. foreach ($userLists AS $user) {
  84. $server->push($user['fd'], Chat::formatMsgSend([
  85. 'messageType' => 'kick',
  86. 'channel' => $channel,
  87. 'body' => [
  88. 'ip' => Base::getIp(),
  89. 'time' => time(),
  90. 'newfd' => $fd,
  91. ],
  92. ]));
  93. $this->deleteUser($user['fd']);
  94. }
  95. }*/
  96. //保存用户、发送open事件
  97. $this->saveUser($fd, $channel, $username);
  98. $server->push($fd, Chat::formatMsgSend([
  99. 'messageType' => 'open',
  100. 'channel' => $channel,
  101. 'body' => [
  102. 'fd' => $fd,
  103. ],
  104. ]));
  105. //发送最后一条未发送的信息
  106. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  107. if ($lastMsg && $lastMsg['roger'] === 0) {
  108. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  109. if (!Base::isError($dialog)) {
  110. $dialog = $dialog['data'];
  111. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  112. $body = Base::string2array($lastMsg['message']);
  113. $body['id'] = $lastMsg['id'];
  114. $body['resend'] = 1;
  115. $body['unread'] = $unread;
  116. $body['username'] = $lastMsg['username'];
  117. $body['userimg'] = Users::userimg($lastMsg['username']);
  118. $body['indate'] = $lastMsg['indate'];
  119. $server->push($fd, Chat::formatMsgSend([
  120. 'messageType' => 'user',
  121. 'contentId' => $lastMsg['id'],
  122. 'channel' => $channel,
  123. 'username' => $lastMsg['username'],
  124. 'target' => $lastMsg['receive'],
  125. 'body' => $body,
  126. 'time' => $lastMsg['indate'],
  127. ]));
  128. }
  129. }
  130. }
  131. /**
  132. * 收到消息时触发
  133. * @param Server $server
  134. * @param Frame $frame
  135. */
  136. public function onMessage(Server $server, Frame $frame)
  137. {
  138. global $_A;
  139. $_A = [
  140. '__static_langdata' => [],
  141. ];
  142. //
  143. $data = Chat::formatMsgReceive($frame->data);
  144. $back = [
  145. 'status' => 1,
  146. 'message' => '',
  147. ];
  148. //
  149. switch ($data['messageType']) {
  150. /**
  151. * 刷新
  152. */
  153. case 'refresh':
  154. DB::table('ws')->where([
  155. 'fd' => $frame->fd,
  156. 'channel' => $data['channel'],
  157. ])->update(['update' => time()]);
  158. break;
  159. /**
  160. * 总未读消息数
  161. */
  162. case 'unread':
  163. $username = DB::table('ws')->where([
  164. 'fd' => $frame->fd,
  165. 'channel' => $data['channel'],
  166. ])->value('username');
  167. if ($username) {
  168. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  169. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  170. $back['message'] = $num;
  171. } else {
  172. $back['message'] = 0;
  173. }
  174. break;
  175. /**
  176. * 已读会员消息
  177. */
  178. case 'read':
  179. $username = DB::table('ws')->where([
  180. 'fd' => $frame->fd,
  181. 'channel' => $data['channel'],
  182. ])->value('username');
  183. $dialog = Chat::openDialog($username, $data['target']);
  184. if (!Base::isError($dialog)) {
  185. $dialog = $dialog['data'];
  186. $upArray = [];
  187. if ($dialog['user1'] == $dialog['user2']) {
  188. $upArray['unread1'] = 0;
  189. $upArray['unread2'] = 0;
  190. } else {
  191. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  192. }
  193. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  194. }
  195. $chromeExtendTask = new ChromeExtendTask($username);
  196. Task::deliver($chromeExtendTask);
  197. break;
  198. /**
  199. * 收到信息回执
  200. */
  201. case 'roger':
  202. if ($data['contentId'] > 0) {
  203. $username = DB::table('ws')->where([
  204. 'fd' => $frame->fd,
  205. 'channel' => $data['channel'],
  206. ])->value('username');
  207. DB::table('chat_msg')->where([
  208. 'id' => $data['contentId'],
  209. 'receive' => $username,
  210. ])->update([
  211. 'roger' => 1,
  212. ]);
  213. }
  214. break;
  215. /**
  216. * 发给用户
  217. */
  218. case 'user':
  219. $username = DB::table('ws')->where([
  220. 'fd' => $frame->fd,
  221. 'channel' => $data['channel'],
  222. ])->value('username');
  223. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  224. if (Base::isError($res)) {
  225. $back = [
  226. 'status' => 0,
  227. 'message' => $res['msg'],
  228. ];
  229. } else {
  230. $resData = $res['data'];
  231. $back['message'] = $resData['id'];
  232. $data['contentId'] = $resData['id'];
  233. $data['body']['id'] = $resData['id'];
  234. $data['body']['unread'] = $resData['unread'];
  235. //
  236. $basic = Users::username2basic($username);
  237. $data['body']['nickname'] = $basic ? $basic['nickname'] : ($data['body']['nickname'] || '');
  238. $data['body']['userimg'] = $basic ? $basic['userimg'] : ($data['body']['userimg'] || '');
  239. //
  240. $pushLists = [];
  241. foreach ($this->getUserOfName($data['target']) AS $item) {
  242. $pushLists[] = [
  243. 'fd' => $item['fd'],
  244. 'msg' => $data
  245. ];
  246. }
  247. $pushTask = new PushTask($pushLists);
  248. Task::deliver($pushTask);
  249. //
  250. $notificationTask = new NotificationTask($resData['id']);
  251. $notificationTask->delay(8);
  252. Task::deliver($notificationTask);
  253. }
  254. break;
  255. /**
  256. * 发给用户(不保存记录)
  257. */
  258. case 'info':
  259. $pushLists = [];
  260. foreach ($this->getUserOfName($data['target']) AS $item) {
  261. $pushLists[] = [
  262. 'fd' => $item['fd'],
  263. 'msg' => $data
  264. ];
  265. }
  266. $pushTask = new PushTask($pushLists);
  267. Task::deliver($pushTask);
  268. break;
  269. /**
  270. * 发给整个团队
  271. */
  272. case 'team':
  273. if ($data['body']['type'] === 'taskA') {
  274. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  275. if ($taskId > 0) {
  276. $userLists = Chat::getTaskUsers($taskId);
  277. } else {
  278. $userLists = $this->getTeamUsers();
  279. }
  280. //
  281. $pushLists = [];
  282. foreach ($userLists as $user) {
  283. $data['messageType'] = 'user';
  284. $data['target'] = $user['username'];
  285. $pushLists[] = [
  286. 'fd' => $user['fd'],
  287. 'msg' => $data
  288. ];
  289. }
  290. $pushTask = new PushTask($pushLists);
  291. Task::deliver($pushTask);
  292. }
  293. break;
  294. /**
  295. * 知识库协作
  296. */
  297. case 'docs':
  298. $back['message'] = [];
  299. $body = $data['body'];
  300. $type = $body['type'];
  301. $sid = intval($body['sid']);
  302. if ($sid <= 0) {
  303. return;
  304. }
  305. $array = Base::json2array(Cache::get("docs::" . $sid));
  306. if ($array) {
  307. foreach ($array as $uname => $vbody) {
  308. if (intval($vbody['indate']) + 20 < time()) {
  309. unset($array[$uname]);
  310. }
  311. }
  312. }
  313. if ($type == 'enter' || $type == 'refresh') {
  314. $array[$body['username']] = $body;
  315. } elseif ($type == 'quit') {
  316. unset($array[$body['username']]);
  317. }
  318. //
  319. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  320. ksort($array);
  321. $back['message'] = array_values($array);
  322. //
  323. if ($type == 'enter' || $type == 'quit') {
  324. $pushLists = [];
  325. foreach ($back['message'] AS $tuser) {
  326. foreach ($this->getUserOfName($tuser['username']) AS $item) {
  327. $pushLists[] = [
  328. 'fd' => $item['fd'],
  329. 'msg' => [
  330. 'messageType' => 'docs',
  331. 'body' => [
  332. 'type' => 'users',
  333. 'sid' => $sid,
  334. 'lists' => $back['message']
  335. ]
  336. ]
  337. ];
  338. }
  339. }
  340. $pushTask = new PushTask($pushLists);
  341. Task::deliver($pushTask);
  342. }
  343. break;
  344. }
  345. if ($data['messageId']) {
  346. $pushLists = [];
  347. $pushLists[] = [
  348. 'fd' => $frame->fd,
  349. 'msg' => [
  350. 'messageType' => 'back',
  351. 'messageId' => $data['messageId'],
  352. 'body' => $back,
  353. ]
  354. ];
  355. $pushTask = new PushTask($pushLists);
  356. Task::deliver($pushTask);
  357. }
  358. }
  359. /**
  360. * 关闭连接时触发
  361. * @param Server $server
  362. * @param $fd
  363. * @param $reactorId
  364. */
  365. public function onClose(Server $server, $fd, $reactorId)
  366. {
  367. $this->deleteUser($fd);
  368. }
  369. /** ****************************************************************************** */
  370. /** ****************************************************************************** */
  371. /** ****************************************************************************** */
  372. /**
  373. * 保存用户
  374. * @param $fd
  375. * @param $channel
  376. * @param $username
  377. */
  378. private function saveUser($fd, $channel, $username)
  379. {
  380. try {
  381. DB::transaction(function () use ($username, $channel, $fd) {
  382. $this->deleteUser($fd);
  383. DB::table('ws')->updateOrInsert([
  384. 'key' => md5($fd . '@' . $channel . '@' . $username)
  385. ], [
  386. 'fd' => $fd,
  387. 'username' => $username,
  388. 'channel' => $channel,
  389. 'update' => time()
  390. ]);
  391. });
  392. } catch (\Throwable $e) {
  393. }
  394. }
  395. /**
  396. * 清除用户
  397. * @param $fd
  398. */
  399. private function deleteUser($fd)
  400. {
  401. DB::table('ws')->where('fd', $fd)->delete();
  402. }
  403. /**
  404. * 获取用户
  405. * @param string $fd
  406. * @param string $channel
  407. * @param string $username
  408. * @return array
  409. */
  410. private function getUser($fd = '', $channel = '', $username = '')
  411. {
  412. $array = [];
  413. if ($fd) $array['fd'] = $fd;
  414. if ($channel) $array['channel'] = $channel;
  415. if ($username) $array['username'] = $username;
  416. if (empty($array)) {
  417. return [];
  418. }
  419. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  420. }
  421. private function getUserOfFd($fd, $channel = '') {
  422. return $this->getUser($fd, $channel);
  423. }
  424. private function getUserOfName($username, $channel = '') {
  425. return $this->getUser('', $channel, $username);
  426. }
  427. /**
  428. * 获取团队所有在线用户
  429. * @return array|string
  430. */
  431. private function getTeamUsers()
  432. {
  433. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  434. ['update', '>', time() - 600],
  435. ])->get());
  436. }
  437. }