WebSocketService.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. <?php
  2. namespace App\Services;
  3. use App\Module\Base;
  4. use App\Module\Chat;
  5. use App\Module\Users;
  6. use App\Tasks\ChromeExtendTask;
  7. use App\Tasks\PushTask;
  8. use Cache;
  9. use DB;
  10. use Hhxsv5\LaravelS\Swoole\Task\Task;
  11. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  12. use Swoole\Http\Request;
  13. use Swoole\WebSocket\Frame;
  14. use Swoole\WebSocket\Server;
  15. /**
  16. * @see https://wiki.swoole.com/#/start/start_ws_server
  17. */
  18. class WebSocketService implements WebSocketHandlerInterface
  19. {
  20. /**
  21. * 声明没有参数的构造函数
  22. * WebSocketService constructor.
  23. */
  24. public function __construct()
  25. {
  26. }
  27. /**
  28. * 连接建立时触发
  29. * @param Server $server
  30. * @param Request $request
  31. */
  32. public function onOpen(Server $server, Request $request)
  33. {
  34. global $_A;
  35. $_A = [
  36. '__static_langdata' => [],
  37. ];
  38. //判断参数
  39. $fd = $request->fd;
  40. if (!isset($request->get['token'])) {
  41. $server->push($fd, Chat::formatMsgSend([
  42. 'messageType' => 'error',
  43. 'body' => [
  44. 'error' => '参数错误'
  45. ],
  46. ]));
  47. $server->close($fd);
  48. $this->deleteUser($fd);
  49. return;
  50. }
  51. //判断token
  52. $token = $request->get['token'];
  53. $channel = $request->get['channel'] ?: '';
  54. $cacheKey = "ws::token:" . md5($token);
  55. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  56. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  57. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  58. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  59. return $username;
  60. }
  61. }
  62. return null;
  63. });
  64. if (empty($username)) {
  65. Cache::forget($cacheKey);
  66. $server->push($fd, Chat::formatMsgSend([
  67. 'messageType' => 'error',
  68. 'channel' => $channel,
  69. 'body' => [
  70. 'error' => '会员不存在',
  71. ],
  72. ]));
  73. $server->close($fd);
  74. $this->deleteUser($fd);
  75. return;
  76. }
  77. //踢下线
  78. /*if ($channel != 'chromeExtend') {
  79. $userLists = $this->getUser('', $channel, $username);
  80. foreach ($userLists AS $user) {
  81. $server->push($user['fd'], Chat::formatMsgSend([
  82. 'messageType' => 'kick',
  83. 'channel' => $channel,
  84. 'body' => [
  85. 'ip' => Base::getIp(),
  86. 'time' => time(),
  87. 'newfd' => $fd,
  88. ],
  89. ]));
  90. $this->deleteUser($user['fd']);
  91. }
  92. }*/
  93. //保存用户、发送open事件
  94. $this->saveUser($fd, $channel, $username);
  95. $server->push($fd, Chat::formatMsgSend([
  96. 'messageType' => 'open',
  97. 'channel' => $channel,
  98. 'body' => [
  99. 'fd' => $fd,
  100. ],
  101. ]));
  102. //发送最后一条未发送的信息
  103. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  104. if ($lastMsg && $lastMsg['roger'] === 0) {
  105. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  106. if (!Base::isError($dialog)) {
  107. $dialog = $dialog['data'];
  108. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  109. $body = Base::string2array($lastMsg['message']);
  110. $body['id'] = $lastMsg['id'];
  111. $body['resend'] = 1;
  112. $body['unread'] = $unread;
  113. $body['username'] = $lastMsg['username'];
  114. $body['userimg'] = Users::userimg($lastMsg['username']);
  115. $body['indate'] = $lastMsg['indate'];
  116. $server->push($fd, Chat::formatMsgSend([
  117. 'messageType' => 'user',
  118. 'contentId' => $lastMsg['id'],
  119. 'channel' => $channel,
  120. 'username' => $lastMsg['username'],
  121. 'target' => $lastMsg['receive'],
  122. 'body' => $body,
  123. 'time' => $lastMsg['indate'],
  124. ]));
  125. }
  126. }
  127. }
  128. /**
  129. * 收到消息时触发
  130. * @param Server $server
  131. * @param Frame $frame
  132. */
  133. public function onMessage(Server $server, Frame $frame)
  134. {
  135. global $_A;
  136. $_A = [
  137. '__static_langdata' => [],
  138. ];
  139. //
  140. $data = Chat::formatMsgReceive($frame->data);
  141. $back = [
  142. 'status' => 1,
  143. 'message' => '',
  144. ];
  145. //
  146. switch ($data['messageType']) {
  147. /**
  148. * 刷新
  149. */
  150. case 'refresh':
  151. DB::table('ws')->where([
  152. 'fd' => $frame->fd,
  153. 'channel' => $data['channel'],
  154. ])->update(['update' => time()]);
  155. break;
  156. /**
  157. * 总未读消息数
  158. */
  159. case 'unread':
  160. $username = DB::table('ws')->where([
  161. 'fd' => $frame->fd,
  162. 'channel' => $data['channel'],
  163. ])->value('username');
  164. if ($username) {
  165. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  166. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  167. $back['message'] = $num;
  168. } else {
  169. $back['message'] = 0;
  170. }
  171. break;
  172. /**
  173. * 已读会员消息
  174. */
  175. case 'read':
  176. $username = DB::table('ws')->where([
  177. 'fd' => $frame->fd,
  178. 'channel' => $data['channel'],
  179. ])->value('username');
  180. $dialog = Chat::openDialog($username, $data['target']);
  181. if (!Base::isError($dialog)) {
  182. $dialog = $dialog['data'];
  183. $upArray = [];
  184. if ($dialog['user1'] == $dialog['user2']) {
  185. $upArray['unread1'] = 0;
  186. $upArray['unread2'] = 0;
  187. } else {
  188. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  189. }
  190. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  191. }
  192. $chromeExtendTask = new ChromeExtendTask($username);
  193. Task::deliver($chromeExtendTask);
  194. break;
  195. /**
  196. * 收到信息回执
  197. */
  198. case 'roger':
  199. if ($data['contentId'] > 0) {
  200. $username = DB::table('ws')->where([
  201. 'fd' => $frame->fd,
  202. 'channel' => $data['channel'],
  203. ])->value('username');
  204. DB::table('chat_msg')->where([
  205. 'id' => $data['contentId'],
  206. 'receive' => $username,
  207. ])->update([
  208. 'roger' => 1,
  209. ]);
  210. }
  211. break;
  212. /**
  213. * 发给用户
  214. */
  215. case 'user':
  216. $username = DB::table('ws')->where([
  217. 'fd' => $frame->fd,
  218. 'channel' => $data['channel'],
  219. ])->value('username');
  220. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  221. if (Base::isError($res)) {
  222. $back = [
  223. 'status' => 0,
  224. 'message' => $res['msg'],
  225. ];
  226. } else {
  227. $resData = $res['data'];
  228. $back['message'] = $resData['id'];
  229. $data['contentId'] = $resData['id'];
  230. $data['body']['id'] = $resData['id'];
  231. $data['body']['unread'] = $resData['unread'];
  232. //
  233. $pushLists = [];
  234. foreach ($this->getUserOfName($data['target']) AS $item) {
  235. $pushLists[] = [
  236. 'fd' => $item['fd'],
  237. 'msg' => $data
  238. ];
  239. }
  240. $pushTask = new PushTask($pushLists);
  241. Task::deliver($pushTask);
  242. }
  243. break;
  244. /**
  245. * 发给用户(不保存记录)
  246. */
  247. case 'info':
  248. $pushLists = [];
  249. foreach ($this->getUserOfName($data['target']) AS $item) {
  250. $pushLists[] = [
  251. 'fd' => $item['fd'],
  252. 'msg' => $data
  253. ];
  254. }
  255. $pushTask = new PushTask($pushLists);
  256. Task::deliver($pushTask);
  257. break;
  258. /**
  259. * 发给整个团队
  260. */
  261. case 'team':
  262. if ($data['body']['type'] === 'taskA') {
  263. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  264. if ($taskId > 0) {
  265. $userLists = $this->getTaskUsers($taskId);
  266. } else {
  267. $userLists = $this->getTeamUsers();
  268. }
  269. //
  270. $pushLists = [];
  271. foreach ($userLists as $user) {
  272. $data['messageType'] = 'user';
  273. $data['target'] = $user['username'];
  274. $pushLists[] = [
  275. 'fd' => $user['fd'],
  276. 'msg' => $data
  277. ];
  278. }
  279. $pushTask = new PushTask($pushLists);
  280. Task::deliver($pushTask);
  281. }
  282. break;
  283. /**
  284. * 知识库协作
  285. */
  286. case 'docs':
  287. $back['message'] = [];
  288. if ($data['body']['type'] === 'enter') {
  289. $sid = intval($data['body']['sid']);
  290. if ($sid > 0) {
  291. $array = Base::json2array(Cache::get("docs::" . $sid));
  292. if ($array) {
  293. foreach ($array AS $uname => $vbody) {
  294. if (intval($vbody['indate']) + 10 < time()) {
  295. unset($array[$uname]);
  296. }
  297. }
  298. }
  299. $array[$data['body']['username']] = $data['body'];
  300. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  301. //
  302. ksort($array);
  303. $back['message'] = array_values($array);
  304. }
  305. }
  306. break;
  307. }
  308. if ($data['messageId']) {
  309. $pushLists = [];
  310. $pushLists[] = [
  311. 'fd' => $frame->fd,
  312. 'msg' => [
  313. 'messageType' => 'back',
  314. 'messageId' => $data['messageId'],
  315. 'body' => $back,
  316. ]
  317. ];
  318. $pushTask = new PushTask($pushLists);
  319. Task::deliver($pushTask);
  320. }
  321. }
  322. /**
  323. * 关闭连接时触发
  324. * @param Server $server
  325. * @param $fd
  326. * @param $reactorId
  327. */
  328. public function onClose(Server $server, $fd, $reactorId)
  329. {
  330. $this->deleteUser($fd);
  331. }
  332. /** ****************************************************************************** */
  333. /** ****************************************************************************** */
  334. /** ****************************************************************************** */
  335. /**
  336. * 保存用户
  337. * @param $fd
  338. * @param $channel
  339. * @param $username
  340. */
  341. private function saveUser($fd, $channel, $username)
  342. {
  343. try {
  344. DB::transaction(function () use ($username, $channel, $fd) {
  345. $this->deleteUser($fd);
  346. DB::table('ws')->updateOrInsert([
  347. 'key' => md5($fd . '@' . $channel . '@' . $username)
  348. ], [
  349. 'fd' => $fd,
  350. 'username' => $username,
  351. 'channel' => $channel,
  352. 'update' => time()
  353. ]);
  354. });
  355. } catch (\Throwable $e) {
  356. }
  357. }
  358. /**
  359. * 清除用户
  360. * @param $fd
  361. */
  362. private function deleteUser($fd)
  363. {
  364. DB::table('ws')->where('fd', $fd)->delete();
  365. }
  366. /**
  367. * 获取用户
  368. * @param string $fd
  369. * @param string $channel
  370. * @param string $username
  371. * @return array
  372. */
  373. private function getUser($fd = '', $channel = '', $username = '')
  374. {
  375. $array = [];
  376. if ($fd) $array['fd'] = $fd;
  377. if ($channel) $array['channel'] = $channel;
  378. if ($username) $array['username'] = $username;
  379. if (empty($array)) {
  380. return [];
  381. }
  382. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  383. }
  384. private function getUserOfFd($fd, $channel = '') {
  385. return $this->getUser($fd, $channel);
  386. }
  387. private function getUserOfName($username, $channel = '') {
  388. return $this->getUser('', $channel, $username);
  389. }
  390. /**
  391. * 获取团队用户
  392. * @return array|string
  393. */
  394. private function getTeamUsers()
  395. {
  396. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  397. ['update', '>', time() - 600],
  398. ])->get());
  399. }
  400. /**
  401. * 获取跟任务有关系的用户(关注的、在项目里的、负责人、创建者)
  402. * @param $taskId
  403. * @return array
  404. */
  405. private function getTaskUsers($taskId)
  406. {
  407. $taskDeatil = Base::DBC2A(DB::table('project_task')->select(['follower', 'createuser', 'username', 'projectid'])->where('id', $taskId)->first());
  408. if (empty($taskDeatil)) {
  409. return [];
  410. }
  411. //关注的用户
  412. $userArray = Base::string2array($taskDeatil['follower']);
  413. //创建者
  414. $userArray[] = $taskDeatil['createuser'];
  415. //负责人
  416. $userArray[] = $taskDeatil['username'];
  417. //在项目里的用户
  418. if ($taskDeatil['projectid'] > 0) {
  419. $tempLists = Base::DBC2A(DB::table('project_users')->select(['username'])->where(['projectid' => $taskDeatil['projectid'], 'type' => '成员' ])->get());
  420. foreach ($tempLists AS $item) {
  421. $userArray[] = $item['username'];
  422. }
  423. }
  424. //
  425. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  426. ['update', '>', time() - 600],
  427. ])->whereIn('username', array_values(array_unique($userArray)))->get());
  428. }
  429. }