WebSocketService.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. <?php
  2. namespace App\Services;
  3. @error_reporting(E_ALL & ~E_NOTICE);
  4. use App\Module\Base;
  5. use App\Module\Chat;
  6. use App\Module\Users;
  7. use App\Tasks\ChromeExtendTask;
  8. use App\Tasks\PushTask;
  9. use Cache;
  10. use DB;
  11. use Hhxsv5\LaravelS\Swoole\Task\Task;
  12. use Hhxsv5\LaravelS\Swoole\WebSocketHandlerInterface;
  13. use Swoole\Http\Request;
  14. use Swoole\WebSocket\Frame;
  15. use Swoole\WebSocket\Server;
  16. /**
  17. * @see https://wiki.swoole.com/#/start/start_ws_server
  18. */
  19. class WebSocketService implements WebSocketHandlerInterface
  20. {
  21. /**
  22. * 声明没有参数的构造函数
  23. * WebSocketService constructor.
  24. */
  25. public function __construct()
  26. {
  27. }
  28. /**
  29. * 连接建立时触发
  30. * @param Server $server
  31. * @param Request $request
  32. */
  33. public function onOpen(Server $server, Request $request)
  34. {
  35. global $_A;
  36. $_A = [
  37. '__static_langdata' => [],
  38. ];
  39. //判断参数
  40. $fd = $request->fd;
  41. if (!isset($request->get['token'])) {
  42. $server->push($fd, Chat::formatMsgSend([
  43. 'messageType' => 'error',
  44. 'body' => [
  45. 'error' => '参数错误'
  46. ],
  47. ]));
  48. $server->close($fd);
  49. $this->deleteUser($fd);
  50. return;
  51. }
  52. //判断token
  53. $token = $request->get['token'];
  54. $channel = $request->get['channel'] ?: '';
  55. $cacheKey = "ws::token:" . md5($token);
  56. $username = Cache::remember($cacheKey, now()->addSeconds(1), function () use ($token) {
  57. list($id, $username, $encrypt, $timestamp) = explode("@", base64_decode($token) . "@@@@");
  58. if (intval($id) > 0 && intval($timestamp) + 2592000 > time()) {
  59. if (DB::table('users')->where(['id' => $id, 'username' => $username, 'encrypt' => $encrypt])->exists()) {
  60. return $username;
  61. }
  62. }
  63. return null;
  64. });
  65. if (empty($username)) {
  66. Cache::forget($cacheKey);
  67. $server->push($fd, Chat::formatMsgSend([
  68. 'messageType' => 'error',
  69. 'channel' => $channel,
  70. 'body' => [
  71. 'error' => '会员不存在',
  72. ],
  73. ]));
  74. $server->close($fd);
  75. $this->deleteUser($fd);
  76. return;
  77. }
  78. //踢下线
  79. /*if ($channel != 'chromeExtend') {
  80. $userLists = $this->getUser('', $channel, $username);
  81. foreach ($userLists AS $user) {
  82. $server->push($user['fd'], Chat::formatMsgSend([
  83. 'messageType' => 'kick',
  84. 'channel' => $channel,
  85. 'body' => [
  86. 'ip' => Base::getIp(),
  87. 'time' => time(),
  88. 'newfd' => $fd,
  89. ],
  90. ]));
  91. $this->deleteUser($user['fd']);
  92. }
  93. }*/
  94. //保存用户、发送open事件
  95. $this->saveUser($fd, $channel, $username);
  96. $server->push($fd, Chat::formatMsgSend([
  97. 'messageType' => 'open',
  98. 'channel' => $channel,
  99. 'body' => [
  100. 'fd' => $fd,
  101. ],
  102. ]));
  103. //发送最后一条未发送的信息
  104. $lastMsg = Base::DBC2A(DB::table('chat_msg')->where('receive', $username)->orderByDesc('indate')->first());
  105. if ($lastMsg && $lastMsg['roger'] === 0) {
  106. $dialog = Chat::openDialog($lastMsg['username'], $lastMsg['receive']);
  107. if (!Base::isError($dialog)) {
  108. $dialog = $dialog['data'];
  109. $unread = intval(DB::table('chat_dialog')->where('id', $dialog['id'])->value(($dialog['recField'] == 1 ? 'unread1' : 'unread2')));
  110. $body = Base::string2array($lastMsg['message']);
  111. $body['id'] = $lastMsg['id'];
  112. $body['resend'] = 1;
  113. $body['unread'] = $unread;
  114. $body['username'] = $lastMsg['username'];
  115. $body['userimg'] = Users::userimg($lastMsg['username']);
  116. $body['indate'] = $lastMsg['indate'];
  117. $server->push($fd, Chat::formatMsgSend([
  118. 'messageType' => 'user',
  119. 'contentId' => $lastMsg['id'],
  120. 'channel' => $channel,
  121. 'username' => $lastMsg['username'],
  122. 'target' => $lastMsg['receive'],
  123. 'body' => $body,
  124. 'time' => $lastMsg['indate'],
  125. ]));
  126. }
  127. }
  128. }
  129. /**
  130. * 收到消息时触发
  131. * @param Server $server
  132. * @param Frame $frame
  133. */
  134. public function onMessage(Server $server, Frame $frame)
  135. {
  136. global $_A;
  137. $_A = [
  138. '__static_langdata' => [],
  139. ];
  140. //
  141. $data = Chat::formatMsgReceive($frame->data);
  142. $back = [
  143. 'status' => 1,
  144. 'message' => '',
  145. ];
  146. //
  147. switch ($data['messageType']) {
  148. /**
  149. * 刷新
  150. */
  151. case 'refresh':
  152. DB::table('ws')->where([
  153. 'fd' => $frame->fd,
  154. 'channel' => $data['channel'],
  155. ])->update(['update' => time()]);
  156. break;
  157. /**
  158. * 总未读消息数
  159. */
  160. case 'unread':
  161. $username = DB::table('ws')->where([
  162. 'fd' => $frame->fd,
  163. 'channel' => $data['channel'],
  164. ])->value('username');
  165. if ($username) {
  166. $num = intval(DB::table('chat_dialog')->where('user1', $username)->sum('unread1'));
  167. $num+= intval(DB::table('chat_dialog')->where('user2', $username)->sum('unread2'));
  168. $back['message'] = $num;
  169. } else {
  170. $back['message'] = 0;
  171. }
  172. break;
  173. /**
  174. * 已读会员消息
  175. */
  176. case 'read':
  177. $username = DB::table('ws')->where([
  178. 'fd' => $frame->fd,
  179. 'channel' => $data['channel'],
  180. ])->value('username');
  181. $dialog = Chat::openDialog($username, $data['target']);
  182. if (!Base::isError($dialog)) {
  183. $dialog = $dialog['data'];
  184. $upArray = [];
  185. if ($dialog['user1'] == $dialog['user2']) {
  186. $upArray['unread1'] = 0;
  187. $upArray['unread2'] = 0;
  188. } else {
  189. $upArray[($dialog['recField'] == 1 ? 'unread2' : 'unread1')] = 0;
  190. }
  191. DB::table('chat_dialog')->where('id', $dialog['id'])->update($upArray);
  192. }
  193. $chromeExtendTask = new ChromeExtendTask($username);
  194. Task::deliver($chromeExtendTask);
  195. break;
  196. /**
  197. * 收到信息回执
  198. */
  199. case 'roger':
  200. if ($data['contentId'] > 0) {
  201. $username = DB::table('ws')->where([
  202. 'fd' => $frame->fd,
  203. 'channel' => $data['channel'],
  204. ])->value('username');
  205. DB::table('chat_msg')->where([
  206. 'id' => $data['contentId'],
  207. 'receive' => $username,
  208. ])->update([
  209. 'roger' => 1,
  210. ]);
  211. }
  212. break;
  213. /**
  214. * 发给用户
  215. */
  216. case 'user':
  217. $username = DB::table('ws')->where([
  218. 'fd' => $frame->fd,
  219. 'channel' => $data['channel'],
  220. ])->value('username');
  221. $res = Chat::saveMessage($username, $data['target'], $data['body']);
  222. if (Base::isError($res)) {
  223. $back = [
  224. 'status' => 0,
  225. 'message' => $res['msg'],
  226. ];
  227. } else {
  228. $resData = $res['data'];
  229. $back['message'] = $resData['id'];
  230. $data['contentId'] = $resData['id'];
  231. $data['body']['id'] = $resData['id'];
  232. $data['body']['unread'] = $resData['unread'];
  233. //
  234. $pushLists = [];
  235. foreach ($this->getUserOfName($data['target']) AS $item) {
  236. $pushLists[] = [
  237. 'fd' => $item['fd'],
  238. 'msg' => $data
  239. ];
  240. }
  241. $pushTask = new PushTask($pushLists);
  242. Task::deliver($pushTask);
  243. }
  244. break;
  245. /**
  246. * 发给用户(不保存记录)
  247. */
  248. case 'info':
  249. $pushLists = [];
  250. foreach ($this->getUserOfName($data['target']) AS $item) {
  251. $pushLists[] = [
  252. 'fd' => $item['fd'],
  253. 'msg' => $data
  254. ];
  255. }
  256. $pushTask = new PushTask($pushLists);
  257. Task::deliver($pushTask);
  258. break;
  259. /**
  260. * 发给整个团队
  261. */
  262. case 'team':
  263. if ($data['body']['type'] === 'taskA') {
  264. $taskId = intval(Base::val($data['body'], 'taskDetail.id'));
  265. if ($taskId > 0) {
  266. $userLists = $this->getTaskUsers($taskId);
  267. } else {
  268. $userLists = $this->getTeamUsers();
  269. }
  270. //
  271. $pushLists = [];
  272. foreach ($userLists as $user) {
  273. $data['messageType'] = 'user';
  274. $data['target'] = $user['username'];
  275. $pushLists[] = [
  276. 'fd' => $user['fd'],
  277. 'msg' => $data
  278. ];
  279. }
  280. $pushTask = new PushTask($pushLists);
  281. Task::deliver($pushTask);
  282. }
  283. break;
  284. /**
  285. * 知识库协作
  286. */
  287. case 'docs':
  288. $back['message'] = [];
  289. if ($data['body']['type'] === 'enter') {
  290. $sid = intval($data['body']['sid']);
  291. if ($sid > 0) {
  292. $array = Base::json2array(Cache::get("docs::" . $sid));
  293. if ($array) {
  294. foreach ($array AS $uname => $vbody) {
  295. if (intval($vbody['indate']) + 10 < time()) {
  296. unset($array[$uname]);
  297. }
  298. }
  299. }
  300. $array[$data['body']['username']] = $data['body'];
  301. Cache::put("docs::" . $sid, Base::array2json($array), 30);
  302. //
  303. ksort($array);
  304. $back['message'] = array_values($array);
  305. }
  306. }
  307. break;
  308. }
  309. if ($data['messageId']) {
  310. $pushLists = [];
  311. $pushLists[] = [
  312. 'fd' => $frame->fd,
  313. 'msg' => [
  314. 'messageType' => 'back',
  315. 'messageId' => $data['messageId'],
  316. 'body' => $back,
  317. ]
  318. ];
  319. $pushTask = new PushTask($pushLists);
  320. Task::deliver($pushTask);
  321. }
  322. }
  323. /**
  324. * 关闭连接时触发
  325. * @param Server $server
  326. * @param $fd
  327. * @param $reactorId
  328. */
  329. public function onClose(Server $server, $fd, $reactorId)
  330. {
  331. $this->deleteUser($fd);
  332. }
  333. /** ****************************************************************************** */
  334. /** ****************************************************************************** */
  335. /** ****************************************************************************** */
  336. /**
  337. * 保存用户
  338. * @param $fd
  339. * @param $channel
  340. * @param $username
  341. */
  342. private function saveUser($fd, $channel, $username)
  343. {
  344. try {
  345. DB::transaction(function () use ($username, $channel, $fd) {
  346. $this->deleteUser($fd);
  347. DB::table('ws')->updateOrInsert([
  348. 'key' => md5($fd . '@' . $channel . '@' . $username)
  349. ], [
  350. 'fd' => $fd,
  351. 'username' => $username,
  352. 'channel' => $channel,
  353. 'update' => time()
  354. ]);
  355. });
  356. } catch (\Throwable $e) {
  357. }
  358. }
  359. /**
  360. * 清除用户
  361. * @param $fd
  362. */
  363. private function deleteUser($fd)
  364. {
  365. DB::table('ws')->where('fd', $fd)->delete();
  366. }
  367. /**
  368. * 获取用户
  369. * @param string $fd
  370. * @param string $channel
  371. * @param string $username
  372. * @return array
  373. */
  374. private function getUser($fd = '', $channel = '', $username = '')
  375. {
  376. $array = [];
  377. if ($fd) $array['fd'] = $fd;
  378. if ($channel) $array['channel'] = $channel;
  379. if ($username) $array['username'] = $username;
  380. if (empty($array)) {
  381. return [];
  382. }
  383. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where($array)->get());
  384. }
  385. private function getUserOfFd($fd, $channel = '') {
  386. return $this->getUser($fd, $channel);
  387. }
  388. private function getUserOfName($username, $channel = '') {
  389. return $this->getUser('', $channel, $username);
  390. }
  391. /**
  392. * 获取团队用户
  393. * @return array|string
  394. */
  395. private function getTeamUsers()
  396. {
  397. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  398. ['update', '>', time() - 600],
  399. ])->get());
  400. }
  401. /**
  402. * 获取跟任务有关系的用户(关注的、在项目里的、负责人、创建者)
  403. * @param $taskId
  404. * @return array
  405. */
  406. private function getTaskUsers($taskId)
  407. {
  408. $taskDeatil = Base::DBC2A(DB::table('project_task')->select(['follower', 'createuser', 'username', 'projectid'])->where('id', $taskId)->first());
  409. if (empty($taskDeatil)) {
  410. return [];
  411. }
  412. //关注的用户
  413. $userArray = Base::string2array($taskDeatil['follower']);
  414. //创建者
  415. $userArray[] = $taskDeatil['createuser'];
  416. //负责人
  417. $userArray[] = $taskDeatil['username'];
  418. //在项目里的用户
  419. if ($taskDeatil['projectid'] > 0) {
  420. $tempLists = Base::DBC2A(DB::table('project_users')->select(['username'])->where(['projectid' => $taskDeatil['projectid'], 'type' => '成员' ])->get());
  421. foreach ($tempLists AS $item) {
  422. $userArray[] = $item['username'];
  423. }
  424. }
  425. //
  426. return Base::DBC2A(DB::table('ws')->select(['fd', 'username', 'channel'])->where([
  427. ['update', '>', time() - 600],
  428. ])->whereIn('username', array_values(array_unique($userArray)))->get());
  429. }
  430. }