WebSocketService.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. <?php
  2. namespace App\Services;
  3. @error_reporting(E_ALL & ~E_NOTICE);
  4. use App\Module\Base;
  5. use App\Module\Chat;
  6. use App\Module\Project;
  7. use App\Module\Users;
  8. use App\Tasks\ChromeExtendTask;
  9. use App\Tasks\NotificationTask;
  10. use App\Tasks\PushTask;
  11. use Cache;
  12. use DB;
  13. use Hhxsv5\LaravelS\Swoole\Task\Task;
  14. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  15. use Swoole\Http\Request;
  16. use Swoole\WebSocket\Frame;
  17. use Swoole\WebSocket\Server;
  18. /**
  19. * @see https://wiki.swoole.com/#/start/start_ws_server
  20. */
  21. class WebSocketService implements WebSocketHandlerInterface
  22. {
  23. /**
  24. * 声明没有参数的构造函数
  25. * WebSocketService constructor.
  26. */
  27. public function __construct()
  28. {
  29. }
  30. /**
  31. * 连接建立时触发
  32. * @param Server $server
  33. * @param Request $request
  34. */
  35. public function onOpen(Server $server, Request $request)
  36. {
  37. global $_A;
  38. $_A = [
  39. '__static_langdata' => [],
  40. ];
  41. //判断参数
  42. $fd = $request->fd;
  43. if (!isset($request->get['token'])) {
  44. $server->push($fd, Chat::formatMsgSend([
  45. 'messageType' => 'error',
  46. 'body' => [
  47. 'error' => '参数错误'
  48. ],
  49. ]));
  50. $server->close($fd);
  51. $this->deleteUser($fd);
  52. return;
  53. }
  54. //判断token
  55. $token = $request->get['token'];
  56. $channel = $request->get['channel'] ?: '';
  57. $cacheKey = "ws::token:" . md5($token);
  58. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  59. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  60. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  61. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  62. return $username;
  63. }
  64. }
  65. return null;
  66. });
  67. if (empty($username)) {
  68. Cache::forget($cacheKey);
  69. $server->push($fd, Chat::formatMsgSend([
  70. 'messageType' => 'error',
  71. 'channel' => $channel,
  72. 'body' => [
  73. 'error' => '会员不存在',
  74. ],
  75. ]));
  76. $server->close($fd);
  77. $this->deleteUser($fd);
  78. return;
  79. }
  80. //踢下线
  81. /*if ($channel != 'chromeExtend') {
  82. $userLists = $this->getUser('', $channel, $username);
  83. foreach ($userLists AS $user) {
  84. $server->push($user['fd'], Chat::formatMsgSend([
  85. 'messageType' => 'kick',
  86. 'channel' => $channel,
  87. 'body' => [
  88. 'ip' => Base::getIp(),
  89. 'time' => time(),
  90. 'newfd' => $fd,
  91. ],
  92. ]));
  93. $this->deleteUser($user['fd']);
  94. }
  95. }*/
  96. //保存用户、发送open事件
  97. $this->saveUser($fd, $channel, $username);
  98. $server->push($fd, Chat::formatMsgSend([
  99. 'messageType' => 'open',
  100. 'channel' => $channel,
  101. 'body' => [
  102. 'fd' => $fd,
  103. ],
  104. ]));
  105. //发送最后一条未发送的信息
  106. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  107. if ($lastMsg && $lastMsg['roger'] === 0) {
  108. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  109. if (!Base::isError($dialog)) {
  110. $dialog = $dialog['data'];
  111. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  112. $body = Base::string2array($lastMsg['message']);
  113. $body['id'] = $lastMsg['id'];
  114. $body['resend'] = 1;
  115. $body['unread'] = $unread;
  116. $body['username'] = $lastMsg['username'];
  117. $body['userimg'] = Users::userimg($lastMsg['username']);
  118. $body['indate'] = $lastMsg['indate'];
  119. $server->push($fd, Chat::formatMsgSend([
  120. 'messageType' => 'user',
  121. 'contentId' => $lastMsg['id'],
  122. 'channel' => $channel,
  123. 'username' => $lastMsg['username'],
  124. 'target' => $lastMsg['receive'],
  125. 'body' => $body,
  126. 'time' => $lastMsg['indate'],
  127. ]));
  128. }
  129. }
  130. }
  131. /**
  132. * 收到消息时触发
  133. * @param Server $server
  134. * @param Frame $frame
  135. */
  136. public function onMessage(Server $server, Frame $frame)
  137. {
  138. global $_A;
  139. $_A = [
  140. '__static_langdata' => [],
  141. ];
  142. //
  143. $data = Chat::formatMsgReceive($frame->data);
  144. $back = [
  145. 'status' => 1,
  146. 'message' => '',
  147. ];
  148. //
  149. switch ($data['messageType']) {
  150. /**
  151. * 刷新
  152. */
  153. case 'refresh':
  154. DB::table('ws')->where([
  155. 'fd' => $frame->fd,
  156. 'channel' => $data['channel'],
  157. ])->update(['update' => time()]);
  158. break;
  159. /**
  160. * 总未读消息数
  161. */
  162. case 'unread':
  163. $username = DB::table('ws')->where([
  164. 'fd' => $frame->fd,
  165. 'channel' => $data['channel'],
  166. ])->value('username');
  167. if ($username) {
  168. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  169. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  170. $back['message'] = $num;
  171. } else {
  172. $back['message'] = 0;
  173. }
  174. break;
  175. /**
  176. * 已读会员消息
  177. */
  178. case 'read':
  179. $username = DB::table('ws')->where([
  180. 'fd' => $frame->fd,
  181. 'channel' => $data['channel'],
  182. ])->value('username');
  183. $dialog = Chat::openDialog($username, $data['target']);
  184. if (!Base::isError($dialog)) {
  185. $dialog = $dialog['data'];
  186. $upArray = [];
  187. if ($dialog['user1'] == $dialog['user2']) {
  188. $upArray['unread1'] = 0;
  189. $upArray['unread2'] = 0;
  190. } else {
  191. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  192. }
  193. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  194. }
  195. $chromeExtendTask = new ChromeExtendTask($username);
  196. Task::deliver($chromeExtendTask);
  197. break;
  198. /**
  199. * 收到信息回执
  200. */
  201. case 'roger':
  202. $contentIds = Base::explodeInt(',', $data['contentId']);
  203. if ($contentIds) {
  204. $username = DB::table('ws')->where([
  205. 'fd' => $frame->fd,
  206. 'channel' => $data['channel'],
  207. ])->value('username');
  208. if ($username) {
  209. DB::table('chat_msg')->where('receive', $username)->whereIn('id', $contentIds)->update([
  210. 'roger' => 1,
  211. ]);
  212. }
  213. }
  214. break;
  215. /**
  216. * 发给用户
  217. */
  218. case 'user':
  219. $username = DB::table('ws')->where([
  220. 'fd' => $frame->fd,
  221. 'channel' => $data['channel'],
  222. ])->value('username');
  223. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  224. if (Base::isError($res)) {
  225. $back = [
  226. 'status' => 0,
  227. 'message' => $res['msg'],
  228. ];
  229. } else {
  230. $resData = $res['data'];
  231. $back['message'] = $resData['id'];
  232. $data['contentId'] = $resData['id'];
  233. $data['body']['id'] = $resData['id'];
  234. $data['body']['unread'] = $resData['unread'];
  235. //
  236. $basic = Users::username2basic($username);
  237. $data['body']['nickname'] = $basic ? $basic['nickname'] : ($data['body']['nickname'] || '');
  238. $data['body']['userimg'] = $basic ? $basic['userimg'] : ($data['body']['userimg'] || '');
  239. //
  240. $pushLists = [];
  241. foreach ($this->getUserOfName($data['target']) AS $item) {
  242. $pushLists[] = [
  243. 'fd' => $item['fd'],
  244. 'msg' => $data
  245. ];
  246. }
  247. $pushTask = new PushTask($pushLists);
  248. Task::deliver($pushTask);
  249. //
  250. $notificationTask = new NotificationTask($resData['id']);
  251. $notificationTask->delay(8);
  252. Task::deliver($notificationTask);
  253. }
  254. break;
  255. /**
  256. * 发给用户(不保存记录)
  257. */
  258. case 'info':
  259. $pushLists = [];
  260. foreach ($this->getUserOfName($data['target']) AS $item) {
  261. $pushLists[] = [
  262. 'fd' => $item['fd'],
  263. 'msg' => $data
  264. ];
  265. }
  266. $pushTask = new PushTask($pushLists);
  267. Task::deliver($pushTask);
  268. break;
  269. /**
  270. * 发给整个团队
  271. */
  272. case 'team':
  273. if ($data['body']['type'] === 'taskA') {
  274. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  275. if ($taskId > 0) {
  276. $userLists = Chat::getTaskUsers($taskId);
  277. } else {
  278. $userLists = $this->getTeamUsers();
  279. }
  280. //
  281. $pushLists = [];
  282. foreach ($userLists as $user) {
  283. $data['messageType'] = 'user';
  284. $data['target'] = $user['username'];
  285. $pushLists[] = [
  286. 'fd' => $user['fd'],
  287. 'msg' => $data
  288. ];
  289. }
  290. $pushTask = new PushTask($pushLists);
  291. Task::deliver($pushTask);
  292. }
  293. break;
  294. /**
  295. * 知识库协作
  296. */
  297. case 'docs':
  298. $back['message'] = [];
  299. $body = $data['body'];
  300. $type = $body['type'];
  301. $sid = intval($body['sid']);
  302. if ($sid <= 0) {
  303. return;
  304. }
  305. $array = Base::json2array(Cache::get("docs::" . $sid));
  306. if ($array) {
  307. foreach ($array as $uname => $vbody) {
  308. if (intval($vbody['indate']) + 20 < time()) {
  309. unset($array[$uname]);
  310. }
  311. }
  312. }
  313. if ($type == 'enter' || $type == 'refresh') {
  314. $array[$body['username']] = $body;
  315. } elseif ($type == 'quit') {
  316. unset($array[$body['username']]);
  317. }
  318. //
  319. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  320. if ($array) {
  321. ksort($array);
  322. }
  323. $back['message'] = array_values($array);
  324. //
  325. if ($type == 'enter' || $type == 'quit') {
  326. $pushLists = [];
  327. foreach ($back['message'] AS $tuser) {
  328. foreach ($this->getUserOfName($tuser['username']) AS $item) {
  329. $pushLists[] = [
  330. 'fd' => $item['fd'],
  331. 'msg' => [
  332. 'messageType' => 'docs',
  333. 'body' => [
  334. 'type' => 'users',
  335. 'sid' => $sid,
  336. 'lists' => $back['message']
  337. ]
  338. ]
  339. ];
  340. }
  341. }
  342. $pushTask = new PushTask($pushLists);
  343. Task::deliver($pushTask);
  344. }
  345. break;
  346. }
  347. if ($data['messageId']) {
  348. $pushLists = [];
  349. $pushLists[] = [
  350. 'fd' => $frame->fd,
  351. 'msg' => [
  352. 'messageType' => 'back',
  353. 'messageId' => $data['messageId'],
  354. 'body' => $back,
  355. ]
  356. ];
  357. $pushTask = new PushTask($pushLists);
  358. Task::deliver($pushTask);
  359. }
  360. }
  361. /**
  362. * 关闭连接时触发
  363. * @param Server $server
  364. * @param $fd
  365. * @param $reactorId
  366. */
  367. public function onClose(Server $server, $fd, $reactorId)
  368. {
  369. $this->deleteUser($fd);
  370. }
  371. /** ****************************************************************************** */
  372. /** ****************************************************************************** */
  373. /** ****************************************************************************** */
  374. /**
  375. * 保存用户
  376. * @param $fd
  377. * @param $channel
  378. * @param $username
  379. */
  380. private function saveUser($fd, $channel, $username)
  381. {
  382. try {
  383. DB::transaction(function () use ($username, $channel, $fd) {
  384. $this->deleteUser($fd);
  385. DB::table('ws')->updateOrInsert([
  386. 'key' => md5($fd . '@' . $channel . '@' . $username)
  387. ], [
  388. 'fd' => $fd,
  389. 'username' => $username,
  390. 'channel' => $channel,
  391. 'update' => time()
  392. ]);
  393. });
  394. } catch (\Throwable $e) {
  395. }
  396. }
  397. /**
  398. * 清除用户
  399. * @param $fd
  400. */
  401. private function deleteUser($fd)
  402. {
  403. DB::table('ws')->where('fd', $fd)->delete();
  404. }
  405. /**
  406. * 获取用户
  407. * @param string $fd
  408. * @param string $channel
  409. * @param string $username
  410. * @return array
  411. */
  412. private function getUser($fd = '', $channel = '', $username = '')
  413. {
  414. $array = [];
  415. if ($fd) $array['fd'] = $fd;
  416. if ($channel) $array['channel'] = $channel;
  417. if ($username) $array['username'] = $username;
  418. if (empty($array)) {
  419. return [];
  420. }
  421. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  422. }
  423. private function getUserOfFd($fd, $channel = '') {
  424. return $this->getUser($fd, $channel);
  425. }
  426. private function getUserOfName($username, $channel = '') {
  427. return $this->getUser('', $channel, $username);
  428. }
  429. /**
  430. * 获取团队所有在线用户
  431. * @return array|string
  432. */
  433. private function getTeamUsers()
  434. {
  435. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  436. ['update', '>', time() - 600],
  437. ])->get());
  438. }
  439. }