Skip to content

route_rules

Modules:

Classes:

Functions:

Attributes:

PRESETS module-attribute

PRESETS: list[PresetConfig] = [
    PresetConfig(
        "ads",
        "🛑 ADs",
        [
            "blackmatrix7:Advertising",
            "DustinWin/geosite-all:ads",
            "MetaCubeX/geosite:*-ads,*-ads-all,*@ads",
        ],
        [],
    ),
    PresetConfig(
        "private",
        "🔒 Private",
        [
            "blackmatrix7:Lan,NTPService",
            "DustinWin/geoip-all:private",
            "DustinWin/geosite-all:private",
            "MetaCubeX/geoip:private",
            "MetaCubeX/geosite:category-ntp*,private",
        ],
        ["preset:ads"],
    ),
    PresetConfig(
        "cn",
        "🇨🇳 CN",
        [
            "blackmatrix7:ChinaMax,Direct",
            "DustinWin/geoip-all:cn",
            "DustinWin/geosite-all:cn",
            "liblaf:cn",
            "MetaCubeX/geoip:cn",
            "MetaCubeX/geosite:cn,*-cn,*@cn",
        ],
        ["liblaf:!cn", "preset:ads", "preset:private"],
    ),
    PresetConfig(
        "proxy",
        "✈️ Proxy",
        [
            "blackmatrix7:Global",
            "DustinWin/geosite-all:proxy",
            "MetaCubeX/geosite:*!cn*",
        ],
        ["preset:ads", "preset:cn", "preset:private"],
    ),
    PresetConfig(
        "ai",
        "🤖 AI",
        [
            "blackmatrix7:Claude,Copilot,Gemini,OpenAI",
            "DustinWin/geosite-all:ai",
            "MetaCubeX/geosite:openai",
        ],
        ["preset:ads", "preset:cn", "preset:private"],
    ),
    PresetConfig(
        "download",
        "☁️ Download",
        [
            "blackmatrix7:Download,OneDrive",
            "MetaCubeX/geosite:onedrive",
        ],
        ["preset:ads", "preset:cn", "preset:private"],
    ),
    PresetConfig(
        "emby",
        "🍟 Emby",
        ["liblaf:emby", "NotSFC:Emby"],
        ["preset:ads", "preset:cn", "preset:private"],
    ),
    PresetConfig(
        "media",
        "📺 Media",
        [
            "blackmatrix7:GlobalMedia",
            "DustinWin/geosite-all:youtube",
            "MetaCubeX/geosite-lite:proxymedia,youtube",
            "MetaCubeX/geosite:youtube",
        ],
        ["preset:ads", "preset:cn", "preset:private"],
    ),
]

PresetConfig

Bases: NamedTuple

Attributes:

Source code in src/route_rules/source/preset/_const.py
 7
 8
 9
10
11
class PresetConfig(NamedTuple):
    id: str
    name: str
    include: list[str]
    exclude: list[str]

exclude instance-attribute

exclude: list[str]

id instance-attribute

id: str

include instance-attribute

include: list[str]

name instance-attribute

name: str

Rule

Bases: BaseModel

Methods:

Attributes:

Source code in src/route_rules/container/_rule.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Rule(BaseModel):
    model_config = ConfigDict(extra="forbid")
    domain: Set = set()
    domain_suffix: Set = set()
    domain_keyword: Set = set()
    domain_regex: Set = set()
    ip_cidr: Set = set()

    @classmethod
    def from_file(cls, path: StrPath) -> "Rule":
        rule_set: rr.RuleSet = rr.RuleSet.from_file(path)
        return Rule().union(*rule_set.rules)

    def __getitem__(self, key: str) -> set[str]:
        return getattr(self, key)

    def __len__(self) -> int:
        return sum(len(v) for k, v in self)

    def __or__(self, other: "Rule") -> "Rule":
        return self.op(operator.or_, other)

    def __sub__(self, other: "Rule") -> "Rule":
        return self.op(operator.sub, other)

    def union(self, *others: "Rule") -> "Rule":
        return Rule(**{k: v.union(*(r[k] for r in others)) for k, v in self})

    def difference(self, *others: "Rule") -> "Rule":
        return Rule(**{k: v.difference(*(r[k] for r in others)) for k, v in self})

    def geoip(self) -> "Rule":
        return Rule(ip_cidr=self.ip_cidr)

    def geosite(self) -> "Rule":
        return Rule(
            domain=self.domain,
            domain_suffix=self.domain_suffix,
            domain_keyword=self.domain_keyword,
            domain_regex=self.domain_regex,
        )

    def op(self, op: Callable[[Any, Any], Any], other: "Rule") -> "Rule":
        return Rule(**{k: op(self[k], other[k]) for k, v in self})

    def optimize(self) -> None:
        self.domain = optim.remove_unresolvable(self.domain)
        self.domain, self.domain_suffix = optim.merge_domain_with_suffix(
            self.domain, self.domain_suffix
        )
        self.domain_suffix = optim.merge_between_suffix(self.domain_suffix)
        self.domain, self.domain_keyword = optim.merge_domain_with_keyword(
            self.domain, self.domain_keyword
        )
        self.domain_suffix, self.domain_keyword = optim.merge_suffix_with_keyword(
            self.domain_suffix, self.domain_keyword
        )
        self.ip_cidr = optim.merge_ip_cidr(self.ip_cidr)

    def save(self, path: StrPath) -> None:
        rr.RuleSet(version=1, rules=[self]).save(path)

    def summary(self) -> str:
        res: str = ""
        for k, v in self:
            if v:
                name: str = k.upper().replace("_", "-")
                res += f"{name}: {len(v)}\n"
        res += f"TOTAL: {len(self)}"
        return res

domain class-attribute instance-attribute

domain: Set = set()

domain_keyword class-attribute instance-attribute

domain_keyword: Set = set()

domain_regex class-attribute instance-attribute

domain_regex: Set = set()

domain_suffix class-attribute instance-attribute

domain_suffix: Set = set()

ip_cidr class-attribute instance-attribute

ip_cidr: Set = set()

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='forbid')

__getitem__

__getitem__(key: str) -> set[str]
Source code in src/route_rules/container/_rule.py
28
29
def __getitem__(self, key: str) -> set[str]:
    return getattr(self, key)

__len__

__len__() -> int
Source code in src/route_rules/container/_rule.py
31
32
def __len__(self) -> int:
    return sum(len(v) for k, v in self)

__or__

__or__(other: Rule) -> Rule
Source code in src/route_rules/container/_rule.py
34
35
def __or__(self, other: "Rule") -> "Rule":
    return self.op(operator.or_, other)

__sub__

__sub__(other: Rule) -> Rule
Source code in src/route_rules/container/_rule.py
37
38
def __sub__(self, other: "Rule") -> "Rule":
    return self.op(operator.sub, other)

difference

difference(*others: Rule) -> Rule
Source code in src/route_rules/container/_rule.py
43
44
def difference(self, *others: "Rule") -> "Rule":
    return Rule(**{k: v.difference(*(r[k] for r in others)) for k, v in self})

from_file classmethod

from_file(path: StrPath) -> Rule
Source code in src/route_rules/container/_rule.py
23
24
25
26
@classmethod
def from_file(cls, path: StrPath) -> "Rule":
    rule_set: rr.RuleSet = rr.RuleSet.from_file(path)
    return Rule().union(*rule_set.rules)

geoip

geoip() -> Rule
Source code in src/route_rules/container/_rule.py
46
47
def geoip(self) -> "Rule":
    return Rule(ip_cidr=self.ip_cidr)

geosite

geosite() -> Rule
Source code in src/route_rules/container/_rule.py
49
50
51
52
53
54
55
def geosite(self) -> "Rule":
    return Rule(
        domain=self.domain,
        domain_suffix=self.domain_suffix,
        domain_keyword=self.domain_keyword,
        domain_regex=self.domain_regex,
    )

op

op(op: Callable[[Any, Any], Any], other: Rule) -> Rule
Source code in src/route_rules/container/_rule.py
57
58
def op(self, op: Callable[[Any, Any], Any], other: "Rule") -> "Rule":
    return Rule(**{k: op(self[k], other[k]) for k, v in self})

optimize

optimize() -> None
Source code in src/route_rules/container/_rule.py
60
61
62
63
64
65
66
67
68
69
70
71
72
def optimize(self) -> None:
    self.domain = optim.remove_unresolvable(self.domain)
    self.domain, self.domain_suffix = optim.merge_domain_with_suffix(
        self.domain, self.domain_suffix
    )
    self.domain_suffix = optim.merge_between_suffix(self.domain_suffix)
    self.domain, self.domain_keyword = optim.merge_domain_with_keyword(
        self.domain, self.domain_keyword
    )
    self.domain_suffix, self.domain_keyword = optim.merge_suffix_with_keyword(
        self.domain_suffix, self.domain_keyword
    )
    self.ip_cidr = optim.merge_ip_cidr(self.ip_cidr)

save

save(path: StrPath) -> None
Source code in src/route_rules/container/_rule.py
74
75
def save(self, path: StrPath) -> None:
    rr.RuleSet(version=1, rules=[self]).save(path)

summary

summary() -> str
Source code in src/route_rules/container/_rule.py
77
78
79
80
81
82
83
84
def summary(self) -> str:
    res: str = ""
    for k, v in self:
        if v:
            name: str = k.upper().replace("_", "-")
            res += f"{name}: {len(v)}\n"
    res += f"TOTAL: {len(self)}"
    return res

union

union(*others: Rule) -> Rule
Source code in src/route_rules/container/_rule.py
40
41
def union(self, *others: "Rule") -> "Rule":
    return Rule(**{k: v.union(*(r[k] for r in others)) for k, v in self})

RuleSet

Bases: BaseModel

Methods:

Attributes:

Source code in src/route_rules/container/_rule_set.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class RuleSet(BaseModel):
    version: Literal[1, 2]
    rules: list[rr.Rule]

    @classmethod
    def from_file(cls, path: StrPath) -> "RuleSet":
        fpath: Path = Path(path)
        return RuleSet.model_validate_json(fpath.read_text())

    def save(self, path: StrPath) -> None:
        json: str = self.model_dump_json(exclude_defaults=True)
        fpath: Path = Path(path)
        fpath.parent.mkdir(parents=True, exist_ok=True)
        fpath.write_text(json)

rules instance-attribute

rules: list[Rule]

version instance-attribute

version: Literal[1, 2]

from_file classmethod

from_file(path: StrPath) -> RuleSet
Source code in src/route_rules/container/_rule_set.py
14
15
16
17
@classmethod
def from_file(cls, path: StrPath) -> "RuleSet":
    fpath: Path = Path(path)
    return RuleSet.model_validate_json(fpath.read_text())

save

save(path: StrPath) -> None
Source code in src/route_rules/container/_rule_set.py
19
20
21
22
23
def save(self, path: StrPath) -> None:
    json: str = self.model_dump_json(exclude_defaults=True)
    fpath: Path = Path(path)
    fpath.parent.mkdir(parents=True, exist_ok=True)
    fpath.write_text(json)

Source

Bases: ABC

Methods:

Attributes:

Source code in src/route_rules/source/_abc.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Source(abc.ABC):
    name: str
    _key_cache: list[str] | None = None
    _rule_cache: LRU[str, Rule]

    def __init__(self) -> None:
        self._rule_cache = LRU()

    async def get(self, *key: str) -> Rule:
        return Rule().union(*(await asyncio.gather(*(self._get(k) for k in key))))

    async def keys(self) -> list[str]:
        if self._key_cache is not None:
            return self._key_cache
        self._key_cache = await self._keys_nocache()
        return self._key_cache

    @abc.abstractmethod
    async def _get_nocache(self, key: str) -> Rule: ...

    async def _get(self, key: str) -> Rule:
        if (r := self._rule_cache.get(key)) is not None:
            return r
        rule: Rule = await self._get_nocache(key)
        self._rule_cache[key] = rule
        return rule

    @abc.abstractmethod
    async def _keys_nocache(self) -> list[str]: ...

name instance-attribute

name: str

__init__

__init__() -> None
Source code in src/route_rules/source/_abc.py
14
15
def __init__(self) -> None:
    self._rule_cache = LRU()

get async

get(*key: str) -> Rule
Source code in src/route_rules/source/_abc.py
17
18
async def get(self, *key: str) -> Rule:
    return Rule().union(*(await asyncio.gather(*(self._get(k) for k in key))))

keys async

keys() -> list[str]
Source code in src/route_rules/source/_abc.py
20
21
22
23
24
async def keys(self) -> list[str]:
    if self._key_cache is not None:
        return self._key_cache
    self._key_cache = await self._keys_nocache()
    return self._key_cache

get_rule async

get_rule(*spec: str) -> Rule
Source code in src/route_rules/source/preset/_rule.py
10
11
async def get_rule(*spec: str) -> Rule:
    return Rule().union(*(await asyncio.gather(*(_get_rule(s) for s in spec))))

get_source

get_source(name: str) -> Source
Source code in src/route_rules/source/_const.py
49
50
51
52
53
def get_source(name: str) -> Source:
    for source in SOURCES:
        if source.name == name:
            return source
    raise KeyError(name)