....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 66.29.146.187  /  Your IP : 216.73.216.61
Web Server : LiteSpeed
System : Linux premium302.web-hosting.com 4.18.0-553.54.1.lve.el8.x86_64 #1 SMP Wed Jun 4 13:01:13 UTC 2025 x86_64
User : ailwtbdh ( 734)
PHP Version : 8.1.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/utils.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import json
import logging
import os
import pwd
import shlex
import subprocess

from collections import defaultdict
from datetime import datetime, timedelta
from functools import lru_cache
from pathlib import Path
from typing import Optional

from defence360agent.contracts.config import (
    choose_value_from_config,
    MalwareScanScheduleInterval as Interval,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.subsys.panels.plesk import Plesk
from defence360agent.utils import async_lru_cache, atomic_rewrite, check_run

from imav.malwarelib.model import MalwareHit
from imav.malwarelib.scan.queue_supervisor_sync import (
    QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.utils import user_list
from imav.model.wordpress import WPSite
from imav.wordpress import (
    cli,
    WP_CLI_WRAPPER_PATH,
)
from imav.wordpress.exception import PHPError

CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user"
CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"

logger = logging.getLogger(__name__)


@async_lru_cache(ttl=60)
async def get_domain_paths() -> dict[str, list[str]]:
    """
    Get a mapping of docroots to their associated domains, with caching.
    """
    hosting_panel = HostingPanel()
    panel_paths = await hosting_panel.get_domain_paths()
    docroot_map = defaultdict(list)
    for domain, docroots in panel_paths.items():
        for docroot in docroots:
            docroot_map[docroot].append(domain)
    return docroot_map


def wp_wrapper(php_path: str, docroot: str) -> list:
    """Get wp cli common command list"""
    return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]


@lru_cache(maxsize=1)
def get_cagefs_enabled_users() -> set:
    """Get the list of users enabled for CageFS."""
    if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access(
        CAGEFS_CTL_PATH, os.X_OK
    ):
        return set()

    result = subprocess.run(
        [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True
    )
    if result.returncode != 0:
        return set()

    lines = result.stdout.strip().split("\n")
    return set(lines[1:])  # Skip the first line which is a summary


def clear_get_cagefs_enabled_users_cache():
    """Clear the cache for get_cagefs_enabled_users."""
    get_cagefs_enabled_users.cache_clear()


def build_command_for_user(username: str, args: list) -> list:
    """Build the necessary command to run the given cmdline args with specified user."""
    if username in get_cagefs_enabled_users():
        if os.path.isfile(CAGEFS_ENTER_PATH) and os.access(
            CAGEFS_ENTER_PATH, os.X_OK
        ):
            return [
                CAGEFS_ENTER_PATH,
                "--no-io-and-memory-limit",
                username,
                *args,
            ]

    return [
        "su",
        "-s",
        "/bin/bash",
        username,
        "-c",
        shlex.join(args),
    ]


async def get_domains_for_docroot(
    docroot: str, domain_to_exclude: str
) -> list[str]:
    """
    Get all domains associated with a given document root, excluding one domain.
    It's panel-agnostic and uses a cached mapping.
    """
    docroot_map = await get_domain_paths()
    all_domains = docroot_map.get(docroot, [])
    return [domain for domain in all_domains if domain != domain_to_exclude]


async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]:
    """Determine PHP binary path for the given WPSite."""
    from clcommon.cpapi import (
        get_domains_php_info,
        get_installed_php_versions,
    )

    domains_php_info = get_domains_php_info()
    installed_php_versions = get_installed_php_versions()

    def find_php_binary_for_domain(domain: str) -> Optional[str]:
        domain_info = domains_php_info.get(domain)
        if not domain_info or domain_info.get("username") != username:
            return None

        php_display_version = domain_info.get("display_version")
        if not php_display_version:
            return None

        for php_version in installed_php_versions:
            if php_version.get("identifier") == php_display_version:
                return php_version.get("bin")
        return None

    # First, try with the main domain of the site.
    php_binary_path = find_php_binary_for_domain(site.domain)
    if php_binary_path:
        return php_binary_path

    # If not found, try with other domains for the site's docroot.
    domains = await get_domains_for_docroot(
        site.docroot, domain_to_exclude=site.domain
    )
    for domain in domains:
        php_binary_path = find_php_binary_for_domain(domain)
        if php_binary_path:
            return php_binary_path

    raise PHPError(
        f"PHP binary was not identified for docroot: {site.docroot}, username:"
        f" {username}"
    )


def get_malware_history(username: str) -> list:
    """
    Get malware history for the specified user.

    This is an equivalent of calling `imunify360-agent malware history list --user {username}`.
    ``
    """
    (max_count, hits) = MalwareHit.malicious_list(user=username)
    return hits


async def get_last_scan(sink, username: str) -> dict:
    """
    Get the last scan for the specified user.

    This is an equivalent of calling `imunify360-agent malware user list --user {username}`.
    """
    queue = ScanQueue(sink)
    _, users = await user_list.fetch_user_list(
        queue.get_scans_from_paths, match={username}
    )

    if not users:
        return {}

    users = user_list.sort(users, "scan_date", desc=True)
    return users[0]


def calculate_next_scan_timestamp(interval, hour, day_of_month, day_of_week):
    """
    Calculate the next scan timestamp based on schedule configuration.

    Args:
        interval: Scan interval (DAY, WEEK, MONTH, or NONE)
        hour: Hour of day to run scan (0-23)
        day_of_month: Day of month to run scan (1-31)
        day_of_week: Day of week to run scan (0-6, where 0=Sunday)

    Returns:
        Timestamp of next scan, or None if interval is NONE
    """

    today = datetime.utcnow()

    if interval == Interval.DAY:
        next_scan = today.replace(
            hour=hour,
            minute=0,
            second=0,
            microsecond=0,
        )
        if today >= next_scan:
            next_scan += timedelta(days=1)
        return next_scan.timestamp()

    if interval == Interval.WEEK:
        # today.weekday() returns 0 for Monday, 6 for Sunday, but day_of_week uses 0 for Sunday,
        # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation.
        days_ahead = (day_of_week - (today.weekday() + 1) % 7 + 7) % 7
        if days_ahead == 0 and today.hour >= hour:
            days_ahead = 7
        next_scan_date = today + timedelta(days=days_ahead)
        return next_scan_date.replace(
            hour=hour, minute=0, second=0, microsecond=0
        ).timestamp()

    if interval == Interval.MONTH:
        from calendar import monthrange

        def find_next_suitable_month(year, month, days):
            """Find the next month that has at least given number of days."""
            current_year, current_month = year, month

            # Always start with the next month when advancing
            current_month += 1
            if current_month > 12:
                current_month = 1
                current_year += 1

            # Keep advancing months until we find one with enough days
            while True:
                days_in_month = monthrange(current_year, current_month)[1]
                if days <= days_in_month:
                    return current_year, current_month

                current_month += 1
                if current_month > 12:
                    current_month = 1
                    current_year += 1

        # Check if we need to advance to next month
        should_advance_month = (
            # Today is after the scheduled day, scan already ran this month
            today.day > day_of_month
            # Today is the scheduled day and the hour is after the scheduled hour, scan already ran earlier today
            or (today.day == day_of_month and today.hour >= hour)
            # Current month doesn't have enough days, scan should run next suitable month
            or day_of_month > monthrange(today.year, today.month)[1]
        )

        if should_advance_month:
            # Find the next month that can accommodate the configured day
            next_year, next_month = find_next_suitable_month(
                today.year, today.month, day_of_month
            )

            next_scan_date = today.replace(
                day=day_of_month,  # Use the actual configured day
                month=next_month,
                year=next_year,
                hour=hour,
                minute=0,
                second=0,
                microsecond=0,
            )
        else:
            # Current month can accommodate the configured day
            next_scan_date = today.replace(
                day=day_of_month,
                hour=hour,
                minute=0,
                second=0,
                microsecond=0,
            )

        return next_scan_date.timestamp()


def prepare_scan_data(
    last_scan_time: float,
    next_scan_time: float,
    username: str,
    site: WPSite,
    malware_by_site: dict,
) -> dict:
    """
    Prepare scan data JSON for a WordPress site.

    Args:
        last_scan_time: Timestamp of the last scan
        next_scan_time: Timestamp of the next scheduled scan
        username: Username of the site owner
        site: WordPress site object
        malware_by_site: Dictionary mapping site docroots to their malware hits

    Returns:
        dict: JSON data ready to be written to scan_data.php. The response includes:
            - lastScanTimestamp: Timestamp of the last scan
            - nextScanTimestamp: Timestamp of the next scheduled scan
            - username: Username of the site owner
            - malware: List of malware hits for the site
            - config: Configuration items for the site
            - license: License information including status and eligibility for Imunify patch
    """
    # Define the config sections and options needed
    config_sections = [
        ("MALWARE_SCANNING", "enable_scan_cpanel"),
        ("MALWARE_SCANNING", "default_action"),
        ("PROACTIVE_DEFENCE", "blamer"),
    ]

    # Build the config items
    config_items = {}
    for section, option in config_sections:
        if section not in config_items:
            config_items[section] = {}

        try:
            value, _ = choose_value_from_config(
                section,
                option,
                username=username,
            )
        except KeyError:
            value = None
        config_items[section][option] = value

    return {
        "lastScanTimestamp": last_scan_time,
        "nextScanTimestamp": next_scan_time,
        "username": username,
        "malware": malware_by_site.get(site.docroot, []),
        "config": config_items,
        "license": LicenseCLN.license_info(),
    }


def write_plugin_data_file_atomically(
    file_path, content: str, uid: int, gid: int
) -> None:
    """
    Helper function to write a plugin data file atomically with optional touch.

    Args:
        file_path: Path to the file to write
        content: Content to write to the file
        uid: User ID for file ownership
        gid: Group ID for file ownership
    """
    if not file_path.exists():
        file_path.touch()

    # Set permissions based on hosting panel
    permissions = 0o440 if HostingPanel().NAME == Plesk.NAME else 0o400

    atomic_rewrite(
        file_path,
        content,
        backup=False,
        uid=uid,
        gid=gid,
        permissions=permissions,
    )


def format_php_with_embedded_json(data: dict) -> str:
    """
    Format a dictionary as a PHP file that returns JSON-decoded data.

    This creates a WordPress-safe PHP file that:
    1. Checks if it's being included from WordPress (WPINC defined)
    2. Returns the data as a decoded JSON string

    Args:
        data: Dictionary to embed in the PHP file

    Returns:
        Formatted PHP file content as a string
    """
    return (
        "<?php\n"
        "if ( ! defined( 'WPINC' ) ) {\n"
        "\texit;\n"
        "}\n"
        "return json_decode( '"
        + json.dumps(data).replace("'", "\\'")
        + "', true );"
    )


def ensure_directory_listing_protection(
    data_dir: Path, uid: int, gid: int
) -> None:
    """
    Ensure directory listing protection files exist in the data directory.

    Creates .htaccess, index.php, and index.html files to prevent directory listing.
    Only creates files if they don't exist (idempotent).

    Args:
        data_dir: Path to the data directory
        uid: User ID for file ownership
        gid: Group ID for file ownership
    """
    protection_files = {
        ".htaccess": "DirectoryIndex index.php index.html\ndeny from all\n",
        "index.php": "<?php\n// This file is intentionally blank.\n",
        "index.html": "<!-- This file is intentionally blank. -->\n",
    }

    for filename, content in protection_files.items():
        file_path = data_dir / filename
        if not file_path.exists():
            write_plugin_data_file_atomically(
                file_path, content, uid=uid, gid=gid
            )


async def ensure_site_data_directory(
    site: WPSite, user_info: pwd.struct_passwd
) -> Path:
    """
    Ensure the site's data directory exists with correct permissions.

    Args:
        site: WordPress site
        user_info: User information from pwd

    Returns:
        Path to data directory

    Raises:
        Exception: If the data directory is a symlink or cannot be created
    """
    data_dir = await cli.get_data_dir(site)

    if os.path.islink(data_dir):
        raise Exception(
            "Data directory %s is a symlink, skipping.", str(data_dir)
        )

    if not data_dir.exists():
        command = build_command_for_user(
            user_info.pw_name,
            ["mkdir", "-p", str(data_dir)],
        )
        await check_run(command)

        if not data_dir.exists():
            raise Exception(
                "Failed to create directory %s for user %s",
                str(data_dir),
                user_info.pw_name,
            )

        # we can safely change the permissions of the directory because we just created it
        data_dir.chmod(0o750)

    # Ensure directory listing protection files exist
    ensure_directory_listing_protection(
        data_dir, uid=site.uid, gid=user_info.pw_gid
    )

    return data_dir

Youez - 2016 - github.com/yon3zu
LinuXploit