....................................../////.===Shadow-Here===./////................................................ > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < > < ------------------------------------------------------------------------------------------------------------------- /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// RIFF¤ WEBPVP8 ˜ ðÑ *ôô>‘HŸK¥¤"§£±¨àð enü¹%½_F‘åè¿2ºQú³íªú`N¿­3ÿƒügµJžaÿ¯ÿ°~¼ÎùnúîÞÖô•òíôÁÉß®Sm¥Ü/ ‡ó˜f£Ùà<˜„xëJ¢Ù€SO3x<ªÔ©4¿+ç¶A`q@Ì“Úñè™ÍÿJÌ´ª-˜ÆtÊÛL]Ïq*‘Ý”ì#ŸÌÏãY]@ê`¿ /ªfkØB4·®£ó z—Üw¥Pxù–ÞLШKÇN¾AkÙTf½è'‰g gÆv›Øuh~ a˜Z— ïj*á¥t d£“uÒ ¨`K˜¹ßþ]b>˜]_ÏÔ6W—è2r4x•íÖ…"ƒÖNîä!¦å Ú}ýxGøÌ —@ ;ÆÚŠ=ɾ1ý8lªË¥ô ^yf®Œ¢u&2©nÙÇ›ñÂñŒ³ aPo['½»øFùà­+4ê“$!lövlüÞ=;N®3ð‚õ›DÉKòÞ>ÄÍ ¥ˆuߤ#ˆ$6ù™¥îЇy’ÍB¼ çxÛ;X"WL£R÷͝*ó-¶Zu}º.s¸sšXqù–DþÿvªhüïwyŸ ¯é³lÀ:KCûÄ£Ëá\…­ ~—ýóî ¼ûûÜTÓüÇy…ŽÆvc»¾×U ñ¸žþоP÷¦ó:Ò¨¨5;Ð#&#ÖúñläÿÁœ GxÉ­/ñ‡áQðìYÉtÒw޼GÔ´zàÒò ð*ëzƒ•4~H]Ø‹f ñÓÈñ`NåWçs'ÆÏW^ø¹!XžµmQ5ÃËoLœÎ: ÞËÍ¥J ù…î èo£ßPÎñ¶ž8.Œ]ʵ~5›ÙË-ù*8ÙÖß±~ ©¹rÓê‚j¶d¸{^Q'˜±Crß ÚH—#¥¥QlÀ×ëã‡DÜ«èî þ&Çæžî;ŽÏºò6ÒLÃXy&ZŒ'j‚¢Ù€IßÚù+–MGi‰*jE€‘JcÜ ÓÌ EÏÚj]o˜ Þr <¾U ûŪæÍ/šÝH¥˜b”¼ ÁñßX GP›ï2›4WŠÏà×£…íÓk†¦H·ÅíMh–*nó÷à]ÁjCº€b7<ب‹¨5車bp2:Á[UªM„QŒçiNMa#<5›áËó¸HýÊ"…×Éw¹¦ì2º–x<›»a±¸3Weü®FÝ⑱ö–î–³|LPÈ~çð~Çå‡|º kD¢µÏàÆAI %1À% ¹Ò – ”ϝS¦‰4&¶£°à Öý”û_Ò Áw°A«Å€?mÇÛgHÉ/8)á¾ÛìáöŽP í¨PŸNÙµº¦‡§Ùš"ÿ«>+ªÕ`Ê÷‡‚ß Õû˜þãÇ-PÍ.¾XV‘€ dÜ"þ4¹ ±Oú‘©t¥¦FªÄÃÄ•b‚znýu½—#cDs˜ÃiÑOˆñ×QO=*IAÊ,¶ŽZƒ;‡wøXè%EÐk:F±Ú” .Ѽ+Áu&Ç`."pÈÉw o&¿dE6‘’EqTuK@Ì¥ã™À(Êk(h‰,H}RÀIXÛš3µ1©_OqÚÒJAñ$ÊÙÜ;D3çŒ[þùœh¬Ã³™ö6ç†NY".Ú‰ï[ªŸŒ '²Ð öø_¨ÂÉ9ué¶³ÒŠõTàîMØ#û¯gN‡bÙ놚X„ö …ÉeüÌ^J ‹€.œ$Æ)βÄeæW#óüßĺŸ€ ÀzwV 9oä»f4V*uB «Ë†¹ì¯žR霓æHXa=&“I4K;¯ç‹h×·"UŠ~<•╪Vêª&ÍSÃÆÅ?ÔqÎ*mTM ˜›µwêd#[C¡©§‘D<©àb†–ÁœøvH/,í:¯( ²£|4-„Æövv„Yͼ™^Á$ˆ„¢Û[6yB.åH*V¨æ?$=˜Ñ€•ñ·­(VlŸ‘ nÀt8W÷´Bûba?q9ú¶Xƒl«ÿ\ù¶’þòUÐj/õ¢Ìµ³g$ƒÎR!¸»|Oߍë’BhîÚÑ¢ñåŒJ„®„£2Ð3•ô02Nt…!£Í]Ïc½Qÿ?ˆ<&ÃA¾Ú,JˆijÌ#5yz„‰Î|ÊŽ5QÏ:‹ÐaóVÔxW—CpeÏzÐïíçôÿÅ_[hãsÐ_/ŽTÝ?BîˆííV$<¿i>²F¬_Eß¿ †bÊŒº­ÿ®Z H“C}”¬,Mp ý/Bá£w>˜YV°aƒúh+cŠ- r/[%|üUMHäQ°X»|û/@|°¥Ð !BÔ Ç¢Ä©š+Õì D«7ìN¶ŽðÔ " ƶ’ÖçtA‰Û×}{tþz­¾GÍ›k¹OEJR$ Â׃ «ëÁ"oÉôž$oUK(Ä)Ãz³Ê-‹êN[Ò3Œñbï8P 4ƒ×q¢bo|?<ÛX¬òÄͰL–±›(™ûG?ýË©ÚÄ–ÂDØÐ_Ç¡ô ¾–ÄÏø ×e8Ë©$ÄF¹Å‹ì[©óìl:F¾f´‹‹Xì²ï®\¬ôùƒ ÿat¥óèÒùHß0äe‚;ü×h:ÆWðHž=Ã8骣"kœ'Y?³}Tûè€>?0l›e1Lòñ„aæKÆw…hÖŠùW…ÈÆÄ0ši·›[pcwËþñiêíY/~-Á5˜!¿†A›™Mÿþ(±“t@â“ö2­´TG5yé]çå僳 .·ÍïçÝ7UÚ±Ð/Nè»,_Ï ùdj7\ï Wì4›„»c¸àešg#ÒÊ⥭áØo5‘?ÌdÝô¯ ¹kzsƒ=´#ëÉK›Ø´±-¥eW?‡çßtòTã…$Ý+qÿ±ƒ÷_3Ô¥í÷:æ–ž<·Ö‡‰Å¢ š‡%Ô—utÌÈìðžgÖÀz²À—ï÷Óîäõ{K'´È÷³yaÏÁjƒô}ž§®æÊydÕÈë5¯èˆõvÕ©ã*çD„ “z„Ó‡^^xÂ3M§A´JG‚öï 3W'ˆ.OvXè¡ÊÕª?5º7†˜(˜Ç¶#çê’¶!ÌdZK§æ 0fãaN]òY³RV ™î$®K2R¨`W!1Ôó\;Ý ýB%qæK•&ÓÈe9È0êI±žeŸß -ú@žQr¦ ö4»M¼Áè¹µmw 9 EÆE_°2ó„ŸXKWÁ×Hóì^´²GѝF©óäR†¦‰ç"V»eØ<3ùd3ÿÚ¤Žú“Gi" —‘_ÙËÎ~Üö¯¥½Î»üŸEÚŽåmÞþí ;ÞólËΦMzA"Âf(´òá;Éï(/7½ûñÌ­cïÕçлþÝz¾-ÍvÑ“pH­–ðÓj$¸Äû¤‚‘ãUBË-n“2åPkS5&‹Â|+g^œ®Ì͆d!OïäîU«c;{Û!ÅŽ«ëZ9Ókóˆ]¯ƒ›né `ÇÒ+tÆš (ØKá¾—=3œ®•vuMñg²\ï Ec€ 05±d™‡×iÇ×›UúvÌ¢£Èþ¡ÕØô¶ßÎA"ß±#Ö²ˆÊŸ¦*Ä~ij|àø.-¼'»Ú¥£h ofº¦‡VsR=N½„Î v˜Z*SÌ{=jÑB‹tê…;’HžH¯8–îDù8ñ¢|Q•bÛçš–‹m³“ê¨ åÏ^m¬Žãþ©ïêO‡½6] µÆ„Ooòü ²x}N¦Ë3ïé¿»€›HA˜m%çÞ/¿í7Fø“‹léUk)É°Œµ8Q8›:ÀŠeT*šõ~ôڝG6 ¢}`ùH­–”¡k ‰P1>š†®9z11!X wKfmÁ¦xÑ,N1Q”–æB¶M…ÒÃv6SMˆhU¬ÊPŽï‘öj=·CŒ¯u¹ƒVIЃsx4’ömÛýcå¡¶7ßŠß 57^\wÒÐÆ k§h,Œý î«q^R½3]J¸ÇðN ‚çU¬ôº^Áì} ³f©Õœ§ˆã:FÄÈ‚é(€™?àýÓüè1Gô£¼éj‚OÅñ  #>×—ßtà 0G¥Åa뀐kßhc™À_ÉñÞ#±)GD" YîäË-ÿÙ̪ ¹™a¯´¢E\ÝÒö‚;™„ë]_ p8‰o¡ñ+^÷ 3‘'dT4œŽ ðVë½° :¬víÑ«£tßÚS-3¶“þ2 †üüʨòrš¹M{É_¤`Û¨0ìjœøJ‡:÷ÃáZ˜†@GP&œÑDGÏs¡þ¦þDGú‘1Yá9Ôþ¼ ûø…§÷8&–ÜÑnÄ_m®^üÆ`;ÉVÁJ£?â€-ßê}suÍ2sõA NÌúA磸‘îÿÚ»ƒìö·á¿±tÑÐ"Tÿü˜[@/äj¬€uüªìù¥Ý˜á8Ý´sõj 8@rˆð äþZÇD®ÿUÏ2ùôõrBzÆÏÞž>Ì™xœ“ wiÎ×7_… ¸ \#€MɁV¶¥üÕÿPÔ9Z‡ø§É8#H:ƒ5ÀÝå9ÍIŒ5åKÙŠ÷qÄ>1AÈøžj"µÂд/ªnÀ qªã}"iŸBå˜ÓÛŽ¦…&ݧ;G@—³b¯“•"´4í¨ôM¨åñC‹ïùÉó¯ÓsSH2Ý@ßáM‡ˆKÀªÛUeø/4\gnm¥‹ŸŒ qÄ b9ÞwÒNÏ_4Ég³ú=܆‚´ •â¥õeíþkjz>éÚyU«Íӝ݃6"8/ø{=Ô¢»G¥ äUw°W«,ô—¿ãㆅү¢³xŠUû™yŒ (øSópÐ 9\åTâ»—*oG$/×ÍT†Y¿1¤Þ¢_‡ ¼ „±ÍçèSaÓ 3ÛMÁBkxs‰’R/¡¤ˆÙçª(*õ„üXÌ´ƒ E§´¬EF"Ù”R/ÐNyÆÂ^°?™6¡œïJ·±$§?º>ÖüœcNÌù¯G ‹ñ2ЁBB„^·úìaz¨k:#¨Æ¨8LÎõލ£^§S&cŒÐU€ü(‡F±Š¼&P>8ÙÁ ‰ p5?0ÊÆƒZl¸aô š¼¡}gÿ¶zÆC²¹¬ÎÖG*HB¡O<º2#ñŒAƒ–¡B˜´É$¥›É:FÀÔx¾u?XÜÏÓvN©RS{2ʈãk9rmP¼Qq̳ è¼ÐFׄ^¡Öì fE“F4A…!ì/…¦Lƒ… … $%´¾yã@CI¬ á—3PþBÏNÿ<ý°4Ü ËÃ#ØÍ~âW«rEñw‹eùMMHß²`¬Öó½íf³:‹k˜¯÷}Z!ã¿<¥,\#öµÀ¯aÒNÆIé,Ћ–lŽ#Àæ9ÀÒS·I’½-Ïp Äz¤Š Â* ­íÄ9­< h>׍3ZkËU¹§˜ŒŠ±f­’¤º³Q ÏB?‹#µíÃ¥®@(Gs«†vI¥Mµ‹Á©e~2ú³ÁP4ìÕi‚²Ê^ö@-DþÓàlÜOÍ]n"µã:žpsŽ¢:! Aõ.ç~ÓBûH÷JCÌ]õVƒd «ú´QÙEA–¯¯Œ!.ˆˆëQ±ù œ·Ì!Õâ )ùL„ÅÀlÚè5@B…o´Æ¸XÓ&Û…O«˜”_#‡ƒ„ûÈt!¤ÁÏ›ÎÝŠ?c9 â\>lÓÁVÄÑ™£eØY]:fÝ–—ù+p{™ðè û³”g±OƒÚSù£áÁÊ„ä,ï7š²G ÕÌBk)~ÑiCµ|h#u¤¶îK¨² #²vݯGãeÖ϶ú…¾múÀ¶þÔñ‚Š9'^($¤§ò “š½{éúp÷J›ušS¹áªCÂubÃH9™D™/ZöØÁ‡¦ÝÙŸ·kð*_”.C‹{áXó€‡c¡c€§/šò/&éš÷,àéJþ‰X›fµ“C¨œ®r¬"kL‰Â_q…Z–.ÉL~O µ›zn‚¹À¦Öª7\àHµšÖ %»ÇníV[¥*Õ;ƒ#½¾HK-ÖIÊdÏEÚ#=o÷Óò³´Š: Ç?{¾+9›–‘OEáU·S€˜j"ÄaÜ ŒÛWt› á–c#a»pÔZÞdŽtWê=9éöÊ¢µ~ ë ;Öe‡Œ®:bî3±ýê¢wà¼îpêñ¹¾4 zc¾ðÖÿzdêŒÑÒŝÀ‰s6¤í³ÎÙB¿OZ”+F¤á‡3@Ñëäg©·Ž ˆèª<ù@É{&S„œÕúÀA)‰h:YÀ5^ÂÓŒ°õäU\ ùËÍû#²?Xe¬tu‰^zÒÔãë¼ÛWtEtû …‚g¶Úüâî*moGè¨7%u!]PhÏd™Ý%Îx: VÒ¦ôÊD3ÀŽKÛËãvÆî…N¯ä>Eró–ð`5 Œ%u5XkñÌ*NU%¶áœÊ:Qÿú»“úzyÏ6å-၇¾ ´ ÒÊ]y žO‘w2Äøæ…H’²f±ÎÇ.ª|¥'gîV•Ü .̘¯€šòü¤U~Ù†*¢!?ò wý,}´°ÔÞnïoKq5µb!áÓ3"vAßH¡³¡·G(ÐÎ0Îò¼MG!/ài®@—¬04*`…«é8ªøøló“ˆÊ”èù¤…ßÊoÿé'ËuÌÖ5×È¡§ˆˆfŽë9}hìâ_!!¯  B&Ëö¶‰ÀAÙNVŸ Wh›¸®XÑJì¨ú“¿÷3uj²˜¨ÍÎìë±aúŠÝå¯ð*Ó¨ôJ“yºØ)m°WýOè68†ŸÏ2—‰Ïüꪫٚ¥‹l1 ø ÏÄFjêµvÌbü¦èÝx:X±¢H=MÐß—,ˆÉÇ´(9ú¾^ÅÚ4¿m‡$âX‘å%(AlZo@½¨UOÌÕ”1ø¸jÎÀÃÃ_ µ‘Ü.œº¦Ut: Æï’!=¯uwû#,“pþÇúŒø(é@?³ü¥‘Mo §—s@Œ#)§ŒùkL}NOÆêA›¸~r½¼ÙA—HJ«eˆÖ´*¡ÓpÌŸö.m<-"³ûÈ$¬_6­åf£ïÚâj1y§ÕJ½@dÞÁr&Í\Z%D£Íñ·AZ Û³øüd/ªAi†/Й~  ‡âĮҮÏh§°b—›Û«mJžòG'[ÈYýŒ¦9psl ýÁ ®±f¦x,‰½tN ‚Xª9 ÙÖH.«Lo0×?͹m¡å†Ѽ+›2ƒF ±Ê8 7Hցϓ²Æ–m9…òŸï]Â1äN†VLâCˆU .ÿ‰Ts +ÅÎx(%¦u]6AF Š ØF鈄‘ |¢¶c±soŒ/t[a¾–û:s·`i햍ê›ËchÈ…8ßÀUÜewŒðNOƒõD%q#éû\9¤x¹&UE×G¥ Í—™$ð E6-‡¼!ýpãÔM˜ Âsìe¯ñµK¢Ç¡ùôléœ4Ö£”À Š®Ðc ^¨À}ÙËŸ§›ºê{ÊuÉC ×Sr€¤’fÉ*j!úÓ’Gsùìoîßîn%ò· àc Wp÷$¨˜)û»H ×8ŽÒ€Zj¤3ÀÙºY'Ql¦py{-6íÔCeiØp‘‡XÊîÆUߢ܂ž£Xé¼Y8þ©ëgñß}é.ÎógÒ„ÃØËø¯»™§Xýy M%@NŠ À(~áÐvu7&•,Ù˜ó€uP‡^^®=_E„jt’ 403WebShell
403Webshell
Server IP : 66.29.146.187  /  Your IP : 216.73.216.61
Web Server : LiteSpeed
System : Linux premium302.web-hosting.com 4.18.0-553.54.1.lve.el8.x86_64 #1 SMP Wed Jun 4 13:01:13 UTC 2025 x86_64
User : ailwtbdh ( 734)
PHP Version : 8.1.34
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/peewee_migrate/migrator.py
"""Run migrations."""

from __future__ import annotations

from contextlib import suppress
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Union, cast, overload

import peewee as pw
from playhouse.migrate import (
    SQL,
    Context,
    MySQLDatabase,
    Operation,
    PostgresqlDatabase,
    SqliteDatabase,
    make_index_name,
    operation,
)
from playhouse.migrate import MySQLMigrator as MqM
from playhouse.migrate import PostgresqlMigrator as PgM
from playhouse.migrate import SchemaMigrator as ScM
from playhouse.migrate import SqliteMigrator as SqM

from peewee_migrate.utils import depricated_method

from .logs import logger

if TYPE_CHECKING:
    from .types import TModelType, TVModelType


class ORM:
    __slots__ = ("__tables__", "__models__")

    def __init__(self: ORM):
        self.__tables__: Dict[str, TModelType] = {}
        self.__models__: Dict[str, TModelType] = {}

    def add(self, model: TModelType):
        self.__models__[model.__name__] = model
        self.__tables__[model._meta.table_name] = model  # type: ignore[]

    def remove(self, model: TModelType):
        del self.__models__[model.__name__]
        del self.__tables__[model._meta.table_name]  # type: ignore[]

    def __getattr__(self, name: str) -> TModelType:
        return self.__models__[name]

    def __getitem__(self, name: str) -> TModelType:
        return self.__tables__[name]

    def __iter__(self):
        return iter(self.__models__.values())


class Migrator:
    """Provide migrations."""

    def __init__(self, database: Union[pw.Database, pw.Proxy]):
        """Initialize the migrator."""
        self.orm: ORM = ORM()

        if isinstance(database, pw.Proxy):
            database = database.obj

        self.__database__ = database
        self.__ops__: List[Union[Operation, Callable]] = []
        self.__migrator__ = SchemaMigrator.from_database(database)

    def __call__(self):
        """Run operations."""
        for op in self.__ops__:
            if isinstance(op, Operation):
                logger.info("%s %s", op.method, op.args)
                op.run()
            else:
                logger.info("Run %s", op.__name__)
                op()
        self.__ops__ = []

    def __iter__(self):
        """Iterate over models."""
        return iter(self.orm)

    def fake(self):
        """Sync the current snapshot but not run operations.

        >> with migrator.sync():
        >>  # ... do changes
        """
        return SyncContext(self)

    @overload
    def __get_model__(self, model: TVModelType) -> TVModelType:
        ...

    @overload
    def __get_model__(self, model: str) -> TModelType:
        ...

    def __get_model__(self, model: Union[TVModelType, str]) -> Union[TVModelType, TModelType]:
        """Get model by name."""
        if isinstance(model, str):
            if model in self.orm.__models__:
                return self.orm.__models__[model]
            if model in self.orm.__tables__:
                return self.orm[model]

            raise ValueError("Model %s not found" % model)

        return model

    def sql(self, sql: str, *params):
        """Execute raw SQL."""
        op = cast(Operation, self.__migrator__.sql(sql, *params))
        self.__ops__.append(op)

    def run(self, func: Callable, *args, **kwargs):
        """Run a python function."""
        self.__ops__.append(lambda: func(*args, **kwargs))

    python = depricated_method(run, "python")

    def create_model(self, model: TVModelType) -> TVModelType:
        """Create model and table in database.

        :param model: Model class

        >> migrator.create_model(Model)
        """
        meta = model._meta  # type: ignore[]
        self.orm.add(model)

        meta.database = self.__database__
        self.__ops__.append(model.create_table)
        return model

    create_table = depricated_method(create_model, "create_table")

    def remove_model(self, model: Union[str, TModelType], *, cascade: bool = True):
        """Drop model and table from database.

        :param model: Model class or table name
        :param cascade(true): Drop table with cascade

        >> migrator.remove_model(Model, cascade=True)
        """
        model = self.__get_model__(model)
        self.orm.remove(model)
        self.__ops__.append(self.__migrator__.drop_table(model, cascade=cascade))

    drop_table = depricated_method(remove_model, "drop_table")

    def add_fields(self, model: Union[str, TModelType], **fields: pw.Field) -> TModelType:
        """Change fields.

        :param model: Model class or table name

        >> migrator.change_fields(Model, name=pw.CharField(null=True))
        """
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        for name, field in fields.items():
            meta.add_field(name, field)

            self.__ops__.append(
                self.__migrator__.add_column(  # type: ignore[]
                    meta.table_name, field.column_name, field
                )
            )
            if field.unique:
                self.__ops__.append(
                    self.__migrator__.add_index(meta.table_name, (field.column_name,), unique=True)
                )
        return model

    add_columns = depricated_method(add_fields, "add_columns")

    def change_fields(self, model: Union[str, TModelType], **fields: pw.Field) -> TModelType:
        """Change fields.

        :param model: Model class or table name

        >> migrator.change_fields(Model, name=pw.CharField(null=True))
        """
        model = self.__get_model__(model)
        meta: pw.Metadata = model._meta  # type: ignore[]
        for name, field in fields.items():
            old_field = meta.fields.get(name, field)
            old_column_name = old_field and old_field.column_name

            meta.add_field(name, field)

            if isinstance(old_field, pw.ForeignKeyField):
                self.__ops__.append(
                    self.__migrator__.drop_foreign_key_constraint(meta.table_name, old_column_name)
                )

            if old_column_name != field.column_name:
                self.__ops__.append(
                    self.__migrator__.rename_column(
                        meta.table_name, old_column_name, field.column_name
                    )
                )

            if isinstance(field, pw.ForeignKeyField):
                on_delete = field.on_delete if field.on_delete else "RESTRICT"
                on_update = field.on_update if field.on_update else "RESTRICT"
                self.__ops__.append(
                    self.__migrator__.add_foreign_key_constraint(
                        meta.table_name,
                        field.column_name,
                        field.rel_model._meta.table_name,
                        field.rel_field.name,
                        on_delete,
                        on_update,
                    )
                )
                continue

            self.__ops__.append(
                self.__migrator__.change_column(  # type: ignore[]
                    meta.table_name, field.column_name, field
                )
            )

            if field.unique == old_field.unique:
                continue

            if field.unique:
                index = (field.column_name,), field.unique
                meta.indexes.append(index)
                self.__ops__.append(self.__migrator__.add_index(meta.table_name, *index))
            else:
                index = field.column_name
                with suppress(ValueError):
                    meta.indexes.remove(((field.column_name,), True))
                self.__ops__.append(self.__migrator__.drop_index(meta.table_name, index))

        return model

    change_columns = depricated_method(change_fields, "change_columns")

    def remove_fields(
        self, model: Union[str, TModelType], *names: str, cascade: bool = True
    ) -> TModelType:
        """Remove fields from model.

        :param model: Model class or table name
        :param cascade(true): Drop columns with cascade

        >> migrator.remove_fields(Model, "name", "age", cascade=True)
        """
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        fields = [field for field in meta.fields.values() if field.name in names]
        for field in fields:
            self.__del_field__(model, field)
            if field.unique:
                index_name = make_index_name(meta.table_name, [field.column_name])
                self.__ops__.append(self.__migrator__.drop_index(meta.table_name, index_name))
            self.__ops__.append(
                self.__migrator__.drop_column(  # type: ignore[]
                    meta.table_name, field.column_name, cascade=cascade
                )
            )
        return model

    drop_columns = depricated_method(remove_fields, "drop_columns")

    def rename_field(
        self, model: Union[str, TModelType], old_name: str, new_name: str
    ) -> TModelType:
        """Rename field in model.

        :param model: Model class or table name
        :param old_name: Old field name
        :param new_name: New field name

        >> migrator.rename_field(Model, "name", "full_name")
        """
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        field = meta.fields[old_name]
        if isinstance(field, pw.ForeignKeyField):
            old_name = field.column_name
        self.__del_field__(model, field)
        field.name = field.column_name = new_name
        if isinstance(field, pw.ForeignKeyField):
            field.column_name = field.column_name + "_id"
        meta.add_field(new_name, field)
        self.__ops__.append(
            self.__migrator__.rename_column(meta.table_name, old_name, field.column_name)
        )
        return model

    rename_column = depricated_method(rename_field, "rename_column")

    def __del_field__(self, model: TModelType, field: pw.Field):
        """Delete field from model."""
        meta = model._meta  # type: ignore[]
        meta.remove_field(field.name)
        delattr(model, field.name)
        if isinstance(field, pw.ForeignKeyField):
            obj_id_name = field.column_name
            if field.column_name == field.name:
                obj_id_name += "_id"
            if hasattr(model, obj_id_name):
                delattr(model, obj_id_name)
            delattr(field.rel_model, field.backref)

    def rename_table(self, model: Union[str, TModelType], new_name: str) -> TModelType:
        """Rename table in database."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        old_name = meta.table_name
        self.orm.remove(model)
        meta.table_name = new_name
        self.orm.add(model)
        self.__ops__.append(self.__migrator__.rename_table(old_name, new_name))
        return model

    def add_index(self, model: Union[str, TModelType], *columns: str, unique=False) -> TModelType:
        """Create indexes."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        meta.indexes.append((columns, unique))
        columns_ = []
        for col in columns:
            field = meta.fields.get(col)

            if len(columns) == 1:
                field.unique = unique
                field.index = not unique

            columns_.append(field.column_name)

        self.__ops__.append(self.__migrator__.add_index(meta.table_name, columns_, unique=unique))
        return model

    def drop_index(self, model: Union[str, TModelType], *columns: str) -> TModelType:
        """Drop indexes."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        columns_ = []
        for col in columns:
            field = meta.fields.get(col)
            if not field:
                continue

            if len(columns) == 1:
                field.unique = field.index = False

            columns_.append(field.column_name)

        index_name = make_index_name(meta.table_name, columns_)
        meta.indexes = [(cols, _) for (cols, _) in meta.indexes if columns != cols]
        self.__ops__.append(self.__migrator__.drop_index(meta.table_name, index_name))
        return model

    def add_not_null(self, model: Union[str, TModelType], *names: str) -> TModelType:
        """Add not null."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        for name in names:
            field = meta.fields[name]
            field.null = False
            self.__ops__.append(self.__migrator__.add_not_null(meta.table_name, field.column_name))
        return model

    def drop_not_null(self, model: Union[str, TModelType], *names: str) -> TModelType:
        """Drop not null."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        for name in names:
            field = meta.fields[name]
            field.null = True
            self.__ops__.append(self.__migrator__.drop_not_null(meta.table_name, field.column_name))
        return model

    def add_default(self, model: Union[str, TModelType], name: str, default: Any) -> TModelType:
        """Add default."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        field = meta.fields[name]
        meta.defaults[field] = field.default = default
        self.__ops__.append(self.__migrator__.apply_default(meta.table_name, name, field))
        return model

    def add_constraint(self, model: Union[str, TModelType], name, constraint):
        """Add constraint."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        self.__ops__.append(self.__migrator__.add_constraint(meta.table_name, name, constraint))
        return model

    def drop_constraints(self, model: Union[str, TModelType], *names: str) -> TModelType:
        """Drop constraints."""
        model = self.__get_model__(model)
        meta = model._meta  # type: ignore[]
        self.__ops__.extend(
            [self.__migrator__.drop_constraint(meta.table_name, name) for name in names]
        )
        return model


class SchemaMigrator(ScM):
    """Implement migrations."""

    @classmethod
    def from_database(cls, database: Union[pw.Database, pw.Proxy]) -> SchemaMigrator:
        """Initialize migrator by db."""
        if isinstance(database, PostgresqlDatabase):
            return PostgresqlMigrator(database)

        if isinstance(database, SqliteDatabase):
            return SqliteMigrator(database)

        if isinstance(database, MySQLDatabase):
            return MySQLMigrator(database)

        raise ValueError("Unsupported database: %s" % database)

    def drop_table(self, model: TModelType, *, cascade: bool = True) -> Callable[[], Any]:
        """Drop table."""
        return lambda: model.drop_table(cascade=cascade)

    @operation
    def sql(self, sql: str, *params) -> SQL:
        """Execute raw SQL."""
        return SQL(sql, *params)

    @operation
    def change_column(
        self, table: str, column_name: str, field: pw.Field
    ) -> List[Union[Context, Operation]]:
        """Change column."""
        operations: List[Union[Context, Operation]] = self.alter_change_column(
            table, column_name, field
        )
        if not field.null:
            operations.append(self.add_not_null(table, column_name))
        return operations

    @operation
    def add_default(self, table: str, column: str, field: pw.Field):
        default = field.default
        if callable(default):
            default = default()
        alter_column: pw.Context = self._alter_column(self.make_context(), table, column)
        ctx: pw.Context = alter_column.literal(" SET DEFAULT ")
        return ctx.sql(field.db_value(default))

    def alter_change_column(
        self, table: str, column: str, field: pw.Field
    ) -> List[Union[Context, Operation]]:
        """Support change columns."""
        ctx = self.make_context()
        field_null, field.null = field.null, True
        ctx = self._alter_table(ctx, table).literal(" ALTER COLUMN ").sql(field.ddl(ctx))
        field.null = field_null
        return [ctx]

    def alter_add_column(
        self, table: str, column_name: str, field: pw.Field, **kwargs
    ) -> Operation:
        """Fix fieldname for ForeignKeys."""
        name = field.name
        op = super(SchemaMigrator, self).alter_add_column(table, column_name, field, **kwargs)
        if isinstance(field, pw.ForeignKeyField):
            field.name = name
        return op


class MySQLMigrator(SchemaMigrator, MqM):
    """Support MySQL."""

    def alter_change_column(
        self, table: str, column: str, field: pw.Field
    ) -> List[Union[Context, Operation]]:
        """Support change columns."""
        ctx = self.make_context()
        field_null, field.null = field.null, True
        ctx = self._alter_table(ctx, table).literal(" MODIFY COLUMN ").sql(field.ddl(ctx))
        field.null = field_null
        return [ctx]


class PostgresqlMigrator(SchemaMigrator, PgM):
    """Support the migrations in postgresql."""

    def alter_change_column(
        self, table: str, column: str, field: pw.Field
    ) -> List[Union[Context, Operation]]:
        """Support change columns."""
        ctx = self.make_context()
        fn, field.null = field.null, True
        fc, field.constraints = field.constraints, []
        ddl = field.ddl(ctx)
        ddl.nodes.insert(1, pw.SQL("TYPE"))
        ctx = self._alter_table(ctx, table).literal(" ALTER COLUMN ").sql(ddl)
        field.null, field.constraints = fn, fc
        res = [ctx]
        if field.default is not None:
            res.append(self.add_default(table, column, field))
        return res


class SqliteMigrator(SchemaMigrator, SqM):
    """Support the migrations in sqlite."""

    def drop_table(self, model: pw.Model, *, cascade: bool = True) -> Callable:
        """Sqlite doesnt support cascade syntax by default."""
        return lambda: model.drop_table(cascade=False)

    def alter_change_column(
        self, table: str, column: str, field: pw.Field
    ) -> List[Union[Operation, Context]]:
        """Support change columns."""

        def fn(c_name, c_def):
            ctx = self.make_context()
            ctx.sql(field.ddl(ctx))
            return ctx.query()[0]

        return [self._update_column(table, column, fn)]  # type: ignore[]


class SyncContext:
    def __init__(self, migrator):
        self.migrator = migrator
        self.ops = None

    def __enter__(self):
        self.ops = list(self.migrator.__ops__)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.migrator.__ops__ = self.ops

Youez - 2016 - github.com/yon3zu
LinuXploit