# Copyright (c) 2020 Mobvoi Inc. (authors: Binbin Zhang, Di Wu) # 2024 Alibaba Inc (Xiang Lyu) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Modified from ESPnet(https://github.com/espnet/espnet) """ConvolutionModule definition.""" from typing import Tuple import torch from torch import nn import torch.nn.functional as F class ConvolutionModule(nn.Module): """ConvolutionModule in Conformer model.""" def __init__(self, channels: int, kernel_size: int = 15, activation: nn.Module = nn.ReLU(), norm: str = "batch_norm", causal: bool = False, bias: bool = True): """Construct an ConvolutionModule object. Args: channels (int): The number of channels of conv layers. kernel_size (int): Kernel size of conv layers. causal (int): Whether use causal convolution or not """ super().__init__() self.pointwise_conv1 = nn.Conv1d( channels, 2 * channels, kernel_size=1, stride=1, padding=0, bias=bias, ) # self.lorder is used to distinguish if it's a causal convolution, # if self.lorder > 0: it's a causal convolution, the input will be # padded with self.lorder frames on the left in forward. # else: it's a symmetrical convolution if causal: padding = 0 self.lorder = kernel_size - 1 else: # kernel_size should be an odd number for none causal convolution assert (kernel_size - 1) % 2 == 0 padding = (kernel_size - 1) // 2 self.lorder = 0 self.depthwise_conv = nn.Conv1d( channels, channels, kernel_size, stride=1, padding=padding, groups=channels, bias=bias, ) assert norm in ['batch_norm', 'layer_norm'] if norm == "batch_norm": self.use_layer_norm = False self.norm = nn.BatchNorm1d(channels) else: self.use_layer_norm = True self.norm = nn.LayerNorm(channels) self.pointwise_conv2 = nn.Conv1d( channels, channels, kernel_size=1, stride=1, padding=0, bias=bias, ) self.activation = activation def forward( self, x: torch.Tensor, mask_pad: torch.Tensor = torch.ones((0, 0, 0), dtype=torch.bool), cache: torch.Tensor = torch.zeros((0, 0, 0)), ) -> Tuple[torch.Tensor, torch.Tensor]: """Compute convolution module. Args: x (torch.Tensor): Input tensor (#batch, time, channels). mask_pad (torch.Tensor): used for batch padding (#batch, 1, time), (0, 0, 0) means fake mask. cache (torch.Tensor): left context cache, it is only used in causal convolution (#batch, channels, cache_t), (0, 0, 0) meas fake cache. Returns: torch.Tensor: Output tensor (#batch, time, channels). """ # exchange the temporal dimension and the feature dimension x = x.transpose(1, 2) # (#batch, channels, time) # mask batch padding if mask_pad.size(2) > 0: # time > 0 x.masked_fill_(~mask_pad, 0.0) if self.lorder > 0: if cache.size(2) == 0: # cache_t == 0 x = nn.functional.pad(x, (self.lorder, 0), 'constant', 0.0) else: assert cache.size(0) == x.size(0) # equal batch assert cache.size(1) == x.size(1) # equal channel x = torch.cat((cache, x), dim=2) assert (x.size(2) > self.lorder) new_cache = x[:, :, -self.lorder:] else: # It's better we just return None if no cache is required, # However, for JIT export, here we just fake one tensor instead of # None. new_cache = torch.zeros((0, 0, 0), dtype=x.dtype, device=x.device) # GLU mechanism x = self.pointwise_conv1(x) # (batch, 2*channel, dim) x = nn.functional.glu(x, dim=1) # (batch, channel, dim) # 1D Depthwise Conv x = self.depthwise_conv(x) if self.use_layer_norm: x = x.transpose(1, 2) x = self.activation(self.norm(x)) if self.use_layer_norm: x = x.transpose(1, 2) x = self.pointwise_conv2(x) # mask batch padding if mask_pad.size(2) > 0: # time > 0 x.masked_fill_(~mask_pad, 0.0) return x.transpose(1, 2), new_cache # NOTE(Xiang Lyu) causal conv module used in convolution-based vocoder class CausalConv1d(torch.nn.Conv1d): def __init__( self, in_channels: int, out_channels: int, kernel_size: int, stride: int = 1, dilation: int = 1, groups: int = 1, bias: bool = True, padding_mode: str = 'zeros', causal_type: str = 'left', device=None, dtype=None ) -> None: super(CausalConv1d, self).__init__(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=dilation, groups=groups, bias=bias, padding_mode=padding_mode, device=device, dtype=dtype) assert stride == 1 self.causal_padding = int((kernel_size * dilation - dilation) / 2) * 2 + (kernel_size + 1) % 2 assert causal_type in ['left', 'right'] self.causal_type = causal_type def forward(self, x: torch.Tensor, cache: torch.Tensor = torch.zeros(0, 0, 0)) -> Tuple[torch.Tensor]: input_timestep = x.shape[2] if cache.size(2) == 0: cache = torch.zeros(x.shape[0], x.shape[1], self.causal_padding).to(x) cache = cache.to(x.device) assert cache.size(2) == self.causal_padding if self.causal_type == 'left': x = torch.concat([cache, x], dim=2) else: print('x.device {} cache.device {}'.format(x.device, cache.device)) x = torch.concat([x, cache], dim=2) x = super(CausalConv1d, self).forward(x) assert x.shape[2] == input_timestep return x class CausalConv1dDownSample(torch.nn.Conv1d): def __init__( self, in_channels: int, out_channels: int, kernel_size: int, stride: int = 1, dilation: int = 1, groups: int = 1, bias: bool = True, padding_mode: str = 'zeros', device=None, dtype=None ) -> None: super(CausalConv1dDownSample, self).__init__(in_channels, out_channels, kernel_size, stride, padding=0, dilation=dilation, groups=groups, bias=bias, padding_mode=padding_mode, device=device, dtype=dtype) assert stride != 1 and dilation == 1 assert kernel_size % stride == 0 self.causal_padding = stride - 1 def forward(self, x: torch.Tensor, cache: torch.Tensor = torch.zeros(0, 0, 0)) -> Tuple[torch.Tensor, torch.Tensor]: if cache.size(2) == 0: x = F.pad(x, (self.causal_padding, 0), value=0.0) else: assert cache.size(2) == self.causal_padding x = torch.concat([cache, x], dim=2) x = super(CausalConv1dDownSample, self).forward(x) return x class CausalConv1dUpsample(torch.nn.Conv1d): def __init__( self, in_channels: int, out_channels: int, kernel_size: int, stride: int = 1, dilation: int = 1, groups: int = 1, bias: bool = True, padding_mode: str = 'zeros', device=None, dtype=None ) -> None: super(CausalConv1dUpsample, self).__init__(in_channels, out_channels, kernel_size, 1, padding=0, dilation=dilation, groups=groups, bias=bias, padding_mode=padding_mode, device=device, dtype=dtype) assert dilation == 1 self.causal_padding = kernel_size - 1 self.upsample = torch.nn.Upsample(scale_factor=stride, mode='nearest') def forward(self, x: torch.Tensor, cache: torch.Tensor = torch.zeros(0, 0, 0)) -> Tuple[torch.Tensor, torch.Tensor]: x = self.upsample(x) input_timestep = x.shape[2] if cache.size(2) == 0: x = F.pad(x, (self.causal_padding, 0), value=0.0) else: assert cache.size(2) == self.causal_padding x = torch.concat([cache, x], dim=2) x = super(CausalConv1dUpsample, self).forward(x) assert input_timestep == x.shape[2] return x